In [7]:
import json
import uuid

from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin.new_topic import NewTopic
from kafka.errors import TopicAlreadyExistsError

### Configuration Parameters 

> **TODO:** Change the configuration prameters to the appropriate values for your setup.

In [2]:
config = dict(
    bootstrap_servers=['kafka.kafka.svc.cluster.local:9092'],
    first_name='Mithil',
    last_name='Patel'
)

config['client_id'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)
config['topic_prefix'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)

config

{'bootstrap_servers': ['kafka.kafka.svc.cluster.local:9092'],
 'first_name': 'Mithil',
 'last_name': 'Patel',
 'client_id': 'PatelMithil',
 'topic_prefix': 'PatelMithil'}

### Create Topic Utility Function

The `create_kafka_topic` helps create a Kafka topic based on your configuration settings.  For instance, if your first name is *John* and your last name is *Doe*, `create_kafka_topic('locations')` will create a topic with the name `DoeJohn-locations`.  The function will not create the topic if it already exists. 

In [3]:
def create_kafka_topic(topic_name, config=config, num_partitions=1, replication_factor=1):
    bootstrap_servers = config['bootstrap_servers']
    client_id = config['client_id']
    topic_prefix = config['topic_prefix']
    name = '{}-{}'.format(topic_prefix, topic_name)
    
    admin_client = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers, 
        client_id=client_id
    )
    
    topic = NewTopic(
        name=name,
        num_partitions=num_partitions,
        replication_factor=replication_factor
    )

    topic_list = [topic]
    try:
        admin_client.create_topics(new_topics=topic_list)
        print('Created topic "{}"'.format(name))
    except TopicAlreadyExistsError as e:
        print('Topic "{}" already exists'.format(name))
    
create_kafka_topic('locations')

Topic "PatelMithil-locations" already exists


### Kafka Producer

The following code creates a `KafkaProducer` object which you can use to send Python objects that are serialized as JSON.

**Note:** This producer serializes Python objects as JSON. This means that object must be JSON serializable.  As an example, Python `DateTime` values are not JSON serializable and must be converted to a string (e.g. ISO 8601) or a numeric value (e.g. a Unix timestamp) before being sent.

In [4]:
producer = KafkaProducer(
  bootstrap_servers=config['bootstrap_servers'],
  value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

### Send Data Function

The `send_data` function sends a Python object to a Kafka topic. This function adds the `topic_prefix` to the topic so `send_data('locations', data)` sends a JSON serialized message to `DoeJohn-locations`. The function also registers callbacks to let you know if the message has been sent or if an error has occured. 

In [5]:
def on_send_success(record_metadata):
    print('Message sent:\n    Topic: "{}"\n    Partition: {}\n    Offset: {}'.format(
        record_metadata.topic,
        record_metadata.partition,
        record_metadata.offset
    ))
    
def on_send_error(excp):
    print('I am an errback', exc_info=excp)
    # handle exception

def send_data(topic, data, config=config, producer=producer, msg_key=None):
    topic_prefix = config['topic_prefix']
    topic_name = '{}-{}'.format(topic_prefix, topic)
    
    if msg_key is not None:
        key = msg_key
    else:
        key = uuid.uuid4().hex
    
    producer.send(
        topic_name, 
        value=data,
        key=key.encode('utf-8')
    ).add_callback(on_send_success).add_errback(on_send_error)

In [8]:
# example_data = dict(
#     key1='value1',
#     key2='value2'
# )

# send_data('locations', example_data)

Message sent:
    Topic: "PatelMithil-locations"
    Partition: 0
    Offset: 3


In [11]:
from pyspark.sql.functions import to_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [12]:
import os 
data_path = os.path.abspath("../../../data/processed/bdd/")

'/home/jovyan/dsc650/data/processed/bdd/locations/'

In [18]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ParquetDataset").getOrCreate()
df_location = spark.read.parquet(data_path+'/locations/')
df_location.show(5)

+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+-------------------+------------+------------------+--------+---------+--------------------+-----+
|                  id|             ride_id|                uuid|           timestamp|            offset|            course|          latitude|          longitude|     geohash|             speed|accuracy|timelapse|            filename|    t|
+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+-------------------+------------+------------------+--------+---------+--------------------+-----+
|d602b69361b975ad3...|6ef732ea5d79eeb95...|7901ba7a61704c1bb...|1970-01-01 00:25:...|102.57438001660061|174.02879333496094| 40.75754753567096| -73.79306668914728|dr5xbg9tbvt3| 7.010000228881836|    10.0|    false|2330d18d-4f74-45d...|102.5|
|d602b69361b975ad3...|6ef732ea5d79ee

In [19]:
# Sorting dataframe by 't' for timestamps
df_location = df_location.orderBy('t')
df_location.show(5)

+--------------------+--------------------+--------------------+--------------------+------------------+-------------+-----------------+------------------+------------+------------------+--------+---------+--------------------+---+
|                  id|             ride_id|                uuid|           timestamp|            offset|       course|         latitude|         longitude|     geohash|             speed|accuracy|timelapse|            filename|  t|
+--------------------+--------------------+--------------------+--------------------+------------------+-------------+-----------------+------------------+------------+------------------+--------+---------+--------------------+---+
|85c61911b7fe2ced1...|6760ffa3f41908695...|dad7eae44e784b549...|1970-01-01 00:25:...|1.0779125295566454|   158.203125|  40.677641336844|-73.81793000742218|dr5x2jpkmtcy| 2.119999885559082|    10.0|    false|d745b92f-aefd-467...|0.0|
|58682c5d48cad9d9e...|c9a2b46c9aa515b63...|19b9aa10588646b3b...|1970-01-

In [27]:
# getting timestamps and info to push messages
t_stamps = [(row.t,row.asDict()) for row in df_location.collect()]
t_stamps[:5]

[(0.0,
  {'id': '85c61911b7fe2ced1000c33c9e932706',
   'ride_id': '6760ffa3f41908695d1405b776c3e8d5',
   'uuid': 'dad7eae44e784b549c8c5a3aa051a8c7',
   'timestamp': datetime.datetime(1970, 1, 1, 0, 25, 7, 320453),
   'offset': 1.0779125295566454,
   'course': 158.203125,
   'latitude': 40.677641336844,
   'longitude': -73.81793000742218,
   'geohash': 'dr5x2jpkmtcy',
   'speed': 2.119999885559082,
   'accuracy': 10.0,
   'timelapse': False,
   'filename': 'd745b92f-aefd-467d-9121-7a71308e8d6d.mov',
   't': 0.0}),
 (0.0,
  {'id': '58682c5d48cad9d9e103431d773615bf',
   'ride_id': 'c9a2b46c9aa515b632eddc45c4868482',
   'uuid': '19b9aa10588646b3bf22c9b4865a7995',
   'timestamp': datetime.datetime(1970, 1, 1, 0, 25, 3, 882586),
   'offset': 1.525060886522843,
   'course': 299.619140625,
   'latitude': 40.76287002542555,
   'longitude': -73.96194855681718,
   'geohash': 'dr5ruuwscttz',
   'speed': 0.0,
   'accuracy': 10.0,
   'timelapse': False,
   'filename': 'e2f795a7-6a7d-4500-b5d7-4569de