In [8]:
import pandas as pd
import numpy as np
import json
import uuid

from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin.new_topic import NewTopic
from kafka.errors import TopicAlreadyExistsError

### Configuration Parameters 

> **TODO:** Change the configuration prameters to the appropriate values for your setup.

In [2]:
config = dict(
    bootstrap_servers=['kafka.kafka.svc.cluster.local:9092'],
    first_name='Abed',
    last_name='Tabbalat'
)

config['client_id'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)
config['topic_prefix'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)

config

{'bootstrap_servers': ['kafka.kafka.svc.cluster.local:9092'],
 'first_name': 'Abed',
 'last_name': 'Tabbalat',
 'client_id': 'TabbalatAbed',
 'topic_prefix': 'TabbalatAbed'}

### Create Topic Utility Function

The `create_kafka_topic` helps create a Kafka topic based on your configuration settings.  For instance, if your first name is *John* and your last name is *Doe*, `create_kafka_topic('locations')` will create a topic with the name `DoeJohn-locations`.  The function will not create the topic if it already exists. 

In [3]:
def create_kafka_topic(topic_name, config=config, num_partitions=1, replication_factor=1):
    bootstrap_servers = config['bootstrap_servers']
    client_id = config['client_id']
    topic_prefix = config['topic_prefix']
    name = '{}-{}'.format(topic_prefix, topic_name)
    
    admin_client = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers, 
        client_id=client_id
    )
    
    topic = NewTopic(
        name=name,
        num_partitions=num_partitions,
        replication_factor=replication_factor
    )

    topic_list = [topic]
    try:
        admin_client.create_topics(new_topics=topic_list)
        print('Created topic "{}"'.format(name))
    except TopicAlreadyExistsError as e:
        print('Topic "{}" already exists'.format(name))
    
create_kafka_topic('locations')

Topic "TabbalatAbed-locations" already exists


### Kafka Producer

The following code creates a `KafkaProducer` object which you can use to send Python objects that are serialized as JSON.

**Note:** This producer serializes Python objects as JSON. This means that object must be JSON serializable.  As an example, Python `DateTime` values are not JSON serializable and must be converted to a string (e.g. ISO 8601) or a numeric value (e.g. a Unix timestamp) before being sent.

In [4]:
producer = KafkaProducer(
  bootstrap_servers=config['bootstrap_servers'],
  value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

### Send Data Function

The `send_data` function sends a Python object to a Kafka topic. This function adds the `topic_prefix` to the topic so `send_data('locations', data)` sends a JSON serialized message to `DoeJohn-locations`. The function also registers callbacks to let you know if the message has been sent or if an error has occured. 

In [5]:
def on_send_success(record_metadata):
    print('Message sent:\n    Topic: "{}"\n    Partition: {}\n    Offset: {}'.format(
        record_metadata.topic,
        record_metadata.partition,
        record_metadata.offset
    ))
    
def on_send_error(excp):
    print('I am an errback', exc_info=excp)
    # handle exception

def send_data(topic, data, config=config, producer=producer, msg_key=None):
    topic_prefix = config['topic_prefix']
    topic_name = '{}-{}'.format(topic_prefix, topic)
    
    if msg_key is not None:
        key = msg_key
    else:
        key = uuid.uuid4().hex
    
    producer.send(
        topic_name, 
        value=data,
        key=key.encode('utf-8')
    ).add_callback(on_send_success).add_errback(on_send_error)

In [6]:
# Define the Kafka topic to which locations data will be sent
topics = create_kafka_topic('locations')

# Specify the directory where the processed locations data is stored
data_dir = '/home/jovyan/dsc650/data/processed/bdd/locations'

Topic "TabbalatAbed-locations" already exists


In [9]:
# Read data from the specified directory and convert it to a DataFrame
df = pd.read_parquet(data_dir)

# Convert timestamp column to string type
df['timestamp'] = df['timestamp'].astype('str')

# Convert DataFrame to a dictionary
data_dict = df.set_index('t').transpose().to_dict()  # orient='list'

# Iterate over the dictionary and send the data to the Kafka topic
for k,v in data_dict.items():
    send_data(topic='locations', data=v, config=config, producer=producer, msg_key=k)

  data_dict = df.set_index('t').transpose().to_dict()  # orient='list'


Message sent:
    Topic: "TabbalatAbed-locations"
    Partition: 0
    Offset: 1024
Message sent:
    Topic: "TabbalatAbed-locations"
    Partition: 0
    Offset: 1025
Message sent:
    Topic: "TabbalatAbed-locations"
    Partition: 0
    Offset: 1026
Message sent:
    Topic: "TabbalatAbed-locations"
    Partition: 0
    Offset: 1027
Message sent:
    Topic: "TabbalatAbed-locations"
    Partition: 0
    Offset: 1028
Message sent:
    Topic: "TabbalatAbed-locations"
    Partition: 0
    Offset: 1029
Message sent:
    Topic: "TabbalatAbed-locations"
    Partition: 0
    Offset: 1030
Message sent:
    Topic: "TabbalatAbed-locations"
    Partition: 0
    Offset: 1031
Message sent:
    Topic: "TabbalatAbed-locations"
    Partition: 0
    Offset: 1032
Message sent:
    Topic: "TabbalatAbed-locations"
    Partition: 0
    Offset: 1033
Message sent:
    Topic: "TabbalatAbed-locations"
    Partition: 0
    Offset: 1034
Message sent:
    Topic: "TabbalatAbed-locations"
    Partition: 0
    Offse

In [10]:
# Define the Kafka topic to which acceleration data will be sent
topics_act = create_kafka_topic('accelerations')

# Specify the directory where the processed acceleration data is stored
data_dir_act = '/home/jovyan/dsc650/data/processed/bdd/accelerations'

Topic "TabbalatAbed-accelerations" already exists


In [11]:
# Read acceleration data from the specified directory and convert it to a DataFrame
df_act = pd.read_parquet(data_dir_act)

# Convert timestamp column to string type
df_act['timestamp'] = df_act['timestamp'].astype('str')

# Convert DataFrame to a dictionary
data_dict_act = df_act.set_index('t').transpose().to_dict()  # orient='list'

# Iterate over the dictionary and send the acceleration data to the Kafka topic
for k,v in data_dict_act.items():
    send_data(topic='accelerations', data=v, config=config, producer=producer, msg_key=k)

  data_dict_act = df_act.set_index('t').transpose().to_dict()  # orient='list'


Message sent:
    Topic: "TabbalatAbed-accelerations"
    Partition: 0
    Offset: 68

In [12]:
# Read location data from the specified directory and convert it to a DataFrame
df = pd.read_parquet(data_dir)

# Convert timestamp and offset columns to string type
df['timestamp'] = df['timestamp'].astype('str')
df['offset'] = df['offset'].astype('str')

# Convert DataFrame to a dictionary indexed by 'offset'
data_dict_offset = df.set_index('offset').transpose().to_dict()  # orient='list'

# Iterate over the dictionary and send the location data to the Kafka topic
for k,v in data_dict_offset.items(): 
    send_data(topic='locations', data=v, config=config, producer=producer, msg_key=k)


Message sent:
    Topic: "TabbalatAbed-accelerations"
    Partition: 0
    Offset: 69
Message sent:
    Topic: "TabbalatAbed-accelerations"
    Partition: 0
    Offset: 70
Message sent:
    Topic: "TabbalatAbed-accelerations"
    Partition: 0
    Offset: 71
Message sent:
    Topic: "TabbalatAbed-accelerations"
    Partition: 0
    Offset: 72
Message sent:
    Topic: "TabbalatAbed-accelerations"
    Partition: 0
    Offset: 73
Message sent:
    Topic: "TabbalatAbed-accelerations"
    Partition: 0
    Offset: 74
Message sent:
    Topic: "TabbalatAbed-accelerations"
    Partition: 0
    Offset: 75
Message sent:
    Topic: "TabbalatAbed-accelerations"
    Partition: 0
    Offset: 76
Message sent:
    Topic: "TabbalatAbed-accelerations"
    Partition: 0
    Offset: 77
Message sent:
    Topic: "TabbalatAbed-accelerations"
    Partition: 0
    Offset: 78
Message sent:
    Topic: "TabbalatAbed-accelerations"
    Partition: 0
    Offset: 79
Message sent:
    Topic: "TabbalatAbed-accelerations"