In [1]:
import json
import uuid
import pandas as pd
import time

from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin.new_topic import NewTopic
from kafka.errors import TopicAlreadyExistsError

### Configuration Parameters 

> **TODO:** Change the configuration prameters to the appropriate values for your setup.

In [2]:
config = dict(
    bootstrap_servers=['kafka.kafka.svc.cluster.local:9092'],
    first_name='Michael',
    last_name='Hotaling'
)

config['client_id'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)
config['topic_prefix'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)

config

{'bootstrap_servers': ['kafka.kafka.svc.cluster.local:9092'],
 'first_name': 'Michael',
 'last_name': 'Hotaling',
 'client_id': 'HotalingMichael',
 'topic_prefix': 'HotalingMichael'}

### Create Topic Utility Function

The `create_kafka_topic` helps create a Kafka topic based on your configuration settings.  For instance, if your first name is *John* and your last name is *Doe*, `create_kafka_topic('locations')` will create a topic with the name `DoeJohn-locations`.  The function will not create the topic if it already exists. 

In [3]:
def create_kafka_topic(topic_name, config=config, num_partitions=1, replication_factor=1):
    bootstrap_servers = config['bootstrap_servers']
    client_id = config['client_id']
    topic_prefix = config['topic_prefix']
    name = '{}-{}'.format(topic_prefix, topic_name)
    
    admin_client = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers, 
        client_id=client_id
    )
    
    topic = NewTopic(
        name=name,
        num_partitions=num_partitions,
        replication_factor=replication_factor
    )

    topic_list = [topic]
    try:
        admin_client.create_topics(new_topics=topic_list)
        print('Created topic "{}"'.format(name))
    except TopicAlreadyExistsError as e:
        print('Topic "{}" already exists'.format(name))
    
create_kafka_topic('locations')
create_kafka_topic('accelerations')

Topic "HotalingMichael-locations" already exists
Topic "HotalingMichael-accelerations" already exists


### Kafka Producer

The following code creates a `KafkaProducer` object which you can use to send Python objects that are serialized as JSON.

**Note:** This producer serializes Python objects as JSON. This means that object must be JSON serializable.  As an example, Python `DateTime` values are not JSON serializable and must be converted to a string (e.g. ISO 8601) or a numeric value (e.g. a Unix timestamp) before being sent.

In [4]:
producer = KafkaProducer(
  bootstrap_servers=config['bootstrap_servers'],
  value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

### Send Data Function

The `send_data` function sends a Python object to a Kafka topic. This function adds the `topic_prefix` to the topic so `send_data('locations', data)` sends a JSON serialized message to `DoeJohn-locations`. The function also registers callbacks to let you know if the message has been sent or if an error has occured. 

In [5]:
def on_send_success(record_metadata):
    print('Message sent:\n    Topic: "{}"\n    Partition: {}\n    Offset: {}'.format(
        record_metadata.topic,
        record_metadata.partition,
        record_metadata.offset
    ))
    
def on_send_error(excp):
    print('I am an errback', exc_info=excp)
    # handle exception

def send_data(topic, data, config=config, producer=producer, msg_key=None):
    topic_prefix = config['topic_prefix']
    topic_name = '{}-{}'.format(topic_prefix, topic)
    
    if msg_key is not None:
        key = msg_key
    else:
        key = uuid.uuid4().hex
    
    producer.send(
        topic_name, 
        value=data,
        key=key.encode('utf-8')
    ).add_callback(on_send_success).add_errback(on_send_error)

In [6]:
example_data = dict(
    key1='value1',
    key2='value2'
)
send_data('locations', example_data)
send_data('accelerations', example_data)

In [7]:
#Writing data in for the next examples
send_data('joined', example_data)
send_data('simple', example_data)
send_data('windowed', example_data)

Message sent:
    Topic: "HotalingMichael-accelerations"
    Partition: 0
    Offset: 224
Message sent:
    Topic: "HotalingMichael-locations"
    Partition: 0
    Offset: 14
Message sent:
    Topic: "HotalingMichael-joined"
    Partition: 0
    Offset: 2
Message sent:
    Topic: "HotalingMichael-simple"
    Partition: 0
    Offset: 2
Message sent:
    Topic: "HotalingMichael-windowed"
    Partition: 0
    Offset: 2

# Location Data

In [8]:
import os
from pathlib import Path




In [9]:
current_dir = Path(os.getcwd()).absolute()
current_dir = Path(current_dir).parents[2]
data_dir = current_dir.joinpath('data')
processed_dir = data_dir.joinpath('processed')
bdd_dir = processed_dir.joinpath('bdd')
locations_dir = bdd_dir.joinpath('locations')
accelerations_dir = bdd_dir.joinpath('accelerations')
locations_dir

PosixPath('/home/jovyan/dsc650-master/data/processed/bdd/locations')

In [10]:
df_locations = pd.read_parquet(locations_dir).sort_values('offset').reset_index()
df_locations.timestamp = df_locations.timestamp.dt.strftime('%Y-%m-%d %H:%M:%S.%f')
df_dict_locations = df_locations.to_dict(orient='records')

In [11]:
df_locations.head(5)

Unnamed: 0,index,id,ride_id,uuid,timestamp,offset,course,latitude,longitude,geohash,speed,accuracy,timelapse,filename,t
0,1,85c61911b7fe2ced1000c33c9e932706,6760ffa3f41908695d1405b776c3e8d5,dad7eae44e784b549c8c5a3aa051a8c7,1970-01-01 00:25:07.320453,1.077913,158.203125,40.677641,-73.81793,dr5x2jpkmtcy,2.12,10.0,False,d745b92f-aefd-467d-9121-7a71308e8d6d.mov,0.0
1,0,58682c5d48cad9d9e103431d773615bf,c9a2b46c9aa515b632eddc45c4868482,19b9aa10588646b3bf22c9b4865a7995,1970-01-01 00:25:03.882586,1.525061,299.619141,40.76287,-73.961949,dr5ruuwscttz,0.0,10.0,False,e2f795a7-6a7d-4500-b5d7-4569de996811.mov,0.0
2,2,58682c5d48cad9d9e103431d773615bf,c9a2b46c9aa515b632eddc45c4868482,19b9aa10588646b3bf22c9b4865a7995,1970-01-01 00:25:03.882583,4.525061,299.619141,40.76287,-73.961949,dr5ruuwsctv3,0.0,10.0,False,e2f795a7-6a7d-4500-b5d7-4569de996811.mov,4.5
3,3,85c61911b7fe2ced1000c33c9e932706,6760ffa3f41908695d1405b776c3e8d5,dad7eae44e784b549c8c5a3aa051a8c7,1970-01-01 00:25:07.320449,5.077913,159.960938,40.677883,-73.818047,dr5x2jpmfffw,11.75,10.0,False,d745b92f-aefd-467d-9121-7a71308e8d6d.mov,4.5
4,5,85c61911b7fe2ced1000c33c9e932706,6760ffa3f41908695d1405b776c3e8d5,dad7eae44e784b549c8c5a3aa051a8c7,1970-01-01 00:25:07.320446,8.077913,159.609375,40.678191,-73.818193,dr5x2jppxkqj,13.15,10.0,False,d745b92f-aefd-467d-9121-7a71308e8d6d.mov,7.8


In [12]:
df_dict_locations[0]

{'index': 1,
 'id': '85c61911b7fe2ced1000c33c9e932706',
 'ride_id': '6760ffa3f41908695d1405b776c3e8d5',
 'uuid': 'dad7eae44e784b549c8c5a3aa051a8c7',
 'timestamp': '1970-01-01 00:25:07.320453',
 'offset': 1.0779125295566454,
 'course': 158.203125,
 'latitude': 40.677641336844,
 'longitude': -73.81793000742218,
 'geohash': 'dr5x2jpkmtcy',
 'speed': 2.119999885559082,
 'accuracy': 10.0,
 'timelapse': False,
 'filename': 'd745b92f-aefd-467d-9121-7a71308e8d6d.mov',
 't': '000.0'}

In [13]:
def send_location_data(index):
    payload = df_dict_locations[index]
    send_data('locations', payload)

In [14]:
last_offset = 0
for index,row in df_locations.iterrows():
    current_offset = row['offset']
    time.sleep(current_offset - last_offset)
    send_location_data(index)
    last_offset = current_offset

Message sent:
    Topic: "HotalingMichael-locations"
    Partition: 0
    Offset: 15
Message sent:
    Topic: "HotalingMichael-locations"
    Partition: 0
    Offset: 16
Message sent:
    Topic: "HotalingMichael-locations"
    Partition: 0
    Offset: 17
Message sent:
    Topic: "HotalingMichael-locations"
    Partition: 0
    Offset: 18
Message sent:
    Topic: "HotalingMichael-locations"
    Partition: 0
    Offset: 19
Message sent:
    Topic: "HotalingMichael-locations"
    Partition: 0
    Offset: 20
Message sent:
    Topic: "HotalingMichael-locations"
    Partition: 0
    Offset: 21
Message sent:
    Topic: "HotalingMichael-locations"
    Partition: 0
    Offset: 22
Message sent:
    Topic: "HotalingMichael-locations"
    Partition: 0
    Offset: 23
Message sent:
    Topic: "HotalingMichael-locations"
    Partition: 0
    Offset: 24
Message sent:
    Topic: "HotalingMichael-locations"
    Partition: 0
    Offset: 25
Message sent:
    Topic: "HotalingMichael-locations"
    Partitio

KeyboardInterrupt: 

# Accelerations Data

In [15]:
create_kafka_topic('accelerations')

Topic "HotalingMichael-accelerations" already exists


In [16]:
df_accelerations = pd.read_parquet(accelerations_dir).sort_values('offset').reset_index()
df_accelerations.timestamp = df_accelerations.timestamp.dt.strftime('%Y-%m-%d %H:%M:%S.%f')
df_dict_accelerations = df_accelerations.to_dict(orient='records')

In [17]:
df_accelerations.head(5)

Unnamed: 0,index,id,ride_id,uuid,timestamp,offset,x,y,z,timelapse,filename,t
0,0,58682c5d48cad9d9e103431d773615bf,c9a2b46c9aa515b632eddc45c4868482,19b9aa10588646b3bf22c9b4865a7995,1970-01-01 00:25:03.882586,0.822061,-0.994,0.045,-0.036,False,e2f795a7-6a7d-4500-b5d7-4569de996811.mov,0.0
1,1,58682c5d48cad9d9e103431d773615bf,c9a2b46c9aa515b632eddc45c4868482,19b9aa10588646b3bf22c9b4865a7995,1970-01-01 00:25:03.882586,0.842061,-0.998,0.046,-0.04,False,e2f795a7-6a7d-4500-b5d7-4569de996811.mov,0.0
2,2,58682c5d48cad9d9e103431d773615bf,c9a2b46c9aa515b632eddc45c4868482,19b9aa10588646b3bf22c9b4865a7995,1970-01-01 00:25:03.882586,0.862061,-0.999,0.047,-0.036,False,e2f795a7-6a7d-4500-b5d7-4569de996811.mov,0.0
3,3,58682c5d48cad9d9e103431d773615bf,c9a2b46c9aa515b632eddc45c4868482,19b9aa10588646b3bf22c9b4865a7995,1970-01-01 00:25:03.882586,0.882061,-0.999,0.045,-0.034,False,e2f795a7-6a7d-4500-b5d7-4569de996811.mov,0.0
4,4,58682c5d48cad9d9e103431d773615bf,c9a2b46c9aa515b632eddc45c4868482,19b9aa10588646b3bf22c9b4865a7995,1970-01-01 00:25:03.882586,0.902061,-0.999,0.048,-0.033,False,e2f795a7-6a7d-4500-b5d7-4569de996811.mov,0.0


In [18]:
df_dict_accelerations[0]

{'index': 0,
 'id': '58682c5d48cad9d9e103431d773615bf',
 'ride_id': 'c9a2b46c9aa515b632eddc45c4868482',
 'uuid': '19b9aa10588646b3bf22c9b4865a7995',
 'timestamp': '1970-01-01 00:25:03.882586',
 'offset': 0.8220608865228429,
 'x': -0.994,
 'y': 0.045,
 'z': -0.036000000000000004,
 'timelapse': False,
 'filename': 'e2f795a7-6a7d-4500-b5d7-4569de996811.mov',
 't': '000.0'}

In [19]:
def send_acceleration_data(index):
    payload = df_dict_accelerations[index]
    send_data('accelerations', payload)

In [20]:
last_offset = 0
for index,row in df_accelerations.iterrows():
    current_offset = row['offset']
    time.sleep(current_offset - last_offset)
    send_acceleration_data(index)
    last_offset = current_offset

Message sent:
    Topic: "HotalingMichael-accelerations"
    Partition: 0
    Offset: 225
Message sent:
    Topic: "HotalingMichael-accelerations"
    Partition: 0
    Offset: 226
Message sent:
    Topic: "HotalingMichael-accelerations"
    Partition: 0
    Offset: 227
Message sent:
    Topic: "HotalingMichael-accelerations"
    Partition: 0
    Offset: 228
Message sent:
    Topic: "HotalingMichael-accelerations"
    Partition: 0
    Offset: 229
Message sent:
    Topic: "HotalingMichael-accelerations"
    Partition: 0
    Offset: 230
Message sent:
    Topic: "HotalingMichael-accelerations"
    Partition: 0
    Offset: 231
Message sent:
    Topic: "HotalingMichael-accelerations"
    Partition: 0
    Offset: 232
Message sent:
    Topic: "HotalingMichael-accelerations"
    Partition: 0
    Offset: 233
Message sent:
    Topic: "HotalingMichael-accelerations"
    Partition: 0
    Offset: 234
Message sent:
    Topic: "HotalingMichael-accelerations"
    Partition: 0
    Offset: 235
Message se

KeyboardInterrupt: 