In [1]:
import pandas as pd
from easyavro import EasyAvroProducer

import logging
L = logging.getLogger('easyavro')
L.handlers = [logging.StreamHandler()]
L.setLevel(logging.INFO)

### Run from release config file

In [2]:
df = pd.read_csv(
    '../data/particle_sample.csv',
    header=None,
    names=['time', 'lat', 'lon', 'nparticles'],
    parse_dates=[0],
    infer_datetime_format=True
)
df

Unnamed: 0,time,lat,lon,nparticles
0,2017-01-01 00:00:00,26.0,236.0,2
1,2017-01-01 12:00:00,26.5,236.5,4
2,2017-01-02 00:00:00,27.0,237.0,8
3,2017-01-02 12:00:00,27.5,237.5,16
4,2017-01-03 00:00:00,28.0,238.0,32
5,2017-01-03 12:00:00,28.5,238.5,64


In [3]:
records_to_send = []
for i, x in df.iterrows():
    x.time = x.time.isoformat()
    records_to_send.append(x.to_dict())
records_to_send

[{'time': '2017-01-01T00:00:00', 'lat': 26.0, 'lon': 236.0, 'nparticles': 2},
 {'time': '2017-01-01T12:00:00', 'lat': 26.5, 'lon': 236.5, 'nparticles': 4},
 {'time': '2017-01-02T00:00:00', 'lat': 27.0, 'lon': 237.0, 'nparticles': 8},
 {'time': '2017-01-02T12:00:00', 'lat': 27.5, 'lon': 237.5, 'nparticles': 16},
 {'time': '2017-01-03T00:00:00', 'lat': 28.0, 'lon': 238.0, 'nparticles': 32},
 {'time': '2017-01-03T12:00:00', 'lat': 28.5, 'lon': 238.5, 'nparticles': 64}]

In [7]:
kafka_base = 'kafka-int'
p = EasyAvroProducer(
    schema_registry_url=f'http://{kafka_base}:7002',
    kafka_brokers=[f'{kafka_base}:7001'],
    kafka_topic='mil-darpa-oot-particle-releases',
    key_schema='nokey'
)

to_send = [(
    None,
    {
        'id': 'notebook-run',
        'records': records_to_send
    }
)]
display(to_send)
p.produce(to_send)

[(None,
  {'id': 'notebook-run',
   'records': [{'time': '2017-01-01T00:00:00',
     'lat': 26.0,
     'lon': 236.0,
     'nparticles': 2},
    {'time': '2017-01-01T12:00:00',
     'lat': 26.5,
     'lon': 236.5,
     'nparticles': 4},
    {'time': '2017-01-02T00:00:00',
     'lat': 27.0,
     'lon': 237.0,
     'nparticles': 8},
    {'time': '2017-01-02T12:00:00',
     'lat': 27.5,
     'lon': 237.5,
     'nparticles': 16},
    {'time': '2017-01-03T00:00:00',
     'lat': 28.0,
     'lon': 238.0,
     'nparticles': 32},
    {'time': '2017-01-03T12:00:00',
     'lat': 28.5,
     'lon': 238.5,
     'nparticles': 64}]})]

1/1 messages
Done producing
