### Assignment 8

The first part of the assignment involves creating a Jupyter notebook that mimics a real-time streaming data feed. The basic loop for the notebook is simple. The notebook should load processed data and publish data at the appropriate time. You can use either the time given in the parquet partition or you can use the `offset` data found within the parquet data. For example, once your notebook has passed the 52.5-second mark it should load the data from the `t=052.5` directory and publish it to the appropriate Kafka topic. Similarly, you could example the `offset` column and publish the data at the appropriate time.  

> **Hint**: You may want to use the Python [heapq](https://docs.python.org/3/library/heapq.html) library as an event queue.  

The [DSC 650 Github contains example notebooks](https://github.com/bellevue-university/dsc650/tree/master/dsc650/assignments/assignment08) you can use to help you create topics, publish data to a Kafka broker, and consume the data.  

Use the following parameters when publishing simulated data to the Bellevue University Data Science Cluster Kafka broker.  

|                    |                                    |
| :----------------- | :--------------------------------- |
| Bootstrap Server   | kafka.kafka.svc.cluster.local:9092 |  
| Location Topic     | LastnameFirstname-locations        |  
| Acceleration Topic | LastnameFirstname-accelerations    |  

The following code is an example of code that uses the `kafka-python` library to publish a message to Kafka topic using a JSON serializer:  
~~~python
import json
from kafka import KafkaProducer

bootstrap_server = 'kafka.kafka.svc.cluster.local:9092'

producer = KafkaProducer(
  bootstrap_servers=[bootstrap_server],
  value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

producer.send(
  'DoeJohn-locations', 
  {"dataObjectID": "test1"}
)
~~~

> **Hint**: When creating the notebook producer, you may want to automatically restart sending the data from the beginning when you reach the end of the dataset. This enables you to continue testing without having to manually restart the notebook.  

### example code:

In [1]:
import json
from kafka import KafkaProducer

In [2]:
bootstrap_server = 'kafka.kafka.svc.cluster.local:9092'

producer = KafkaProducer(
  bootstrap_servers=[bootstrap_server],
  value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

producer.send(
  'DoeJohn-locations', 
  {"dataObjectID": "test1"}
)

<kafka.producer.future.FutureRecordMetadata at 0x7f3b1812e880>

### ok?

In [6]:
import json
import uuid

from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin.new_topic import NewTopic
from kafka.errors import TopicAlreadyExistsError

### Configuration Parameters 

> **TODO:** Change the configuration prameters to the appropriate values for your setup.

In [None]:
config = dict(
    bootstrap_servers=['kafka.kafka.svc.cluster.local:9092'],
    first_name='Scott',
    last_name='Breitbach'
)

config['client_id'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)
config['topic_prefix'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)

config

### Create Topic Utility Function

The `create_kafka_topic` helps create a Kafka topic based on your configuration settings.  For instance, if your first name is *John* and your last name is *Doe*, `create_kafka_topic('locations')` will create a topic with the name `DoeJohn-locations`.  The function will not create the topic if it already exists. 

In [None]:
def create_kafka_topic(topic_name, config=config, num_partitions=1, replication_factor=1):
    bootstrap_servers = config['bootstrap_servers']
    client_id = config['client_id']
    topic_prefix = config['topic_prefix']
    name = '{}-{}'.format(topic_prefix, topic_name)
    
    admin_client = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers, 
        client_id=client_id
    )
    
    topic = NewTopic(
        name=name,
        num_partitions=num_partitions,
        replication_factor=replication_factor
    )

    topic_list = [topic]
    try:
        admin_client.create_topics(new_topics=topic_list)
        print('Created topic "{}"'.format(name))
    except TopicAlreadyExistsError as e:
        print('Topic "{}" already exists'.format(name))
    
create_kafka_topic('locations')

In [1]:
# # Set directories
# current_dir = Path(os.getcwd()).absolute()
# results_dir = current_dir.joinpath('results')
# if results_dir.exists():
#     shutil.rmtree(results_dir)
# results_dir.mkdir(parents=True, exist_ok=True)
src_data_path = '/home/jovyan/dsc650/data/processed/bdd/'

The following code is an example that uses the kafka-python library to consume messages from a Kafka topic. You should create another Jupyter notebook to consume messages from the Kafka producer to validate that you are properly publishing messages to the appropriate topic:  

~~~python
from kafka import KafkaConsumer

bootstrap_server = 'kafka.kafka.svc.cluster.local:9092'

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer(
    'DoeJohn-locations',
    bootstrap_servers=[bootstrap_server]
)
~~~ 

> **Note**: While creating a separate notebook that acts as a Kafka consumer is not strictly necessary for the assignment, it is recommended that you create one to aid in debugging and testing.

In [3]:
from kafka import KafkaConsumer

In [5]:
bootstrap_server = 'kafka.kafka.svc.cluster.local:9092'

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer(
    'DoeJohn-locations',
    bootstrap_servers=[bootstrap_server]
)

In [None]:
# if you'd like to read the data from S3 instead, you can use this script:
import os
import json
import time
from collections import namedtuple
import heapq
import uuid
import pandas as pd
import s3fs
import pyarrow.parquet as pq

endpoint_url='https://storage.budsc.midwest-datascience.com'
s3 = s3fs.S3FileSystem(
    anon=True,
    client_kwargs={
        'endpoint_url': endpoint_url
    }
)

acceleration_columns = [
    'offset',
    'id',
    'ride_id',
    'uuid',
    'x',
    'y',
    'z',
#     't'
]
Acceleration = namedtuple('Acceleration', acceleration_columns)
def read_accelerations():
    df = pq.ParquetDataset(
        's3://data/processed/bdd/accelerations',
        filesystem=s3
    ).read_pandas().to_pandas()
    
    df = df[acceleration_columns].sort_values(by=['offset'])
    
    records = [Acceleration(*record) for record in df.to_records(index=False)]
    
    return records
accelerations = read_accelerations()

location_columns = [
    'offset',
    'id',
    'ride_id',
    'uuid',
    'course',
    'latitude',
    'longitude',
    'geohash',
    'speed',
    'accuracy',
#     't'
]
Location = namedtuple('Location', location_columns)
def read_locations():
    df = pq.ParquetDataset(
        's3://data/processed/bdd/locations',
        filesystem=s3
    ).read_pandas().to_pandas()
    
    df = df[location_columns].sort_values(by=['offset'])
    
    records = [Location(*record) for record in df.to_records(index=False)]
    
    return records
    
locations = read_locations(

### organize

In [2]:
# Load libraries
import pyarrow.parquet as pq
from collections import namedtuple
# Set path to data
src_data_path = '/home/jovyan/dsc650/data/processed/bdd/'

#### accelerations

In [15]:
# Load acceleration data into pandas dataframe
accel_df = pq.ParquetDataset(
    src_data_path + 'accelerations/').read_pandas().to_pandas()

In [17]:
accel_df.columns

Index(['id', 'ride_id', 'uuid', 'timestamp', 'offset', 'x', 'y', 'z',
       'timelapse', 'filename', 't'],
      dtype='object')

In [19]:
# Reorder columns
accel_cols = [
    'offset',
    'ride_id',
    'uuid', 
    'timestamp', 
    'x', 'y', 'z',
    'timelapse', 
    'filename', 
    't'
]

# Order df by specified columns & sort by offset value
accel_df = accel_df[accel_cols].sort_values(by=['offset'])

In [26]:
accel_df.head(1)

Unnamed: 0,offset,ride_id,uuid,timestamp,x,y,z,timelapse,filename,t
0,0.822061,c9a2b46c9aa515b632eddc45c4868482,19b9aa10588646b3bf22c9b4865a7995,1970-01-01 00:25:03.882586,-0.994,0.045,-0.036,False,e2f795a7-6a7d-4500-b5d7-4569de996811.mov,0.0


In [27]:
accel_df.tail(1)

Unnamed: 0,offset,ride_id,uuid,timestamp,x,y,z,timelapse,filename,t
23351,122.469896,7758b047316155c7991ceddfcb964f96,6cfaa4a137fc419199221719e2b34aae,1970-01-01 00:25:03.500103,-0.983,-0.02,-0.091,False,ef07c198-5de5-430f-b648-632c762c7b3a.mov,121.4


In [20]:
# # accel_df.columns
# # Select columns
# accel_cols = [
#     'offset',
#     'id',
#     'ride_id',
#     'uuid',
#     'x',
#     'y',
#     'z',
# #     't'
# ]

# # Filter df for selected columns & sort by offset value
# accel_df = accel_df[accel_cols].sort_values(by=['offset'])

In [5]:
# # Define named tuple
# Accelerations = namedtuple('Accelerations', accel_cols)

# # Assign records to named tuple
# records = [Accelerations(*record) for record in accel_df.to_records(index=False)]

In [29]:
records[0].offset

1.0779125295566454

#### locations

In [23]:
# Load location data into pandas dataframe
locat_df = pq.ParquetDataset(
    src_data_path + 'locations/').read_pandas().to_pandas()

In [24]:
locat_df.columns

Index(['id', 'ride_id', 'uuid', 'timestamp', 'offset', 'course', 'latitude',
       'longitude', 'geohash', 'speed', 'accuracy', 'timelapse', 'filename',
       't'],
      dtype='object')

In [25]:
# Reorder columns
locat_cols = [
    'offset',
    'id', 
    'ride_id', 
    'uuid', 
    'timestamp', 
    'course', 
    'latitude',
    'longitude', 
    'geohash', 
    'speed', 
    'accuracy', 
    'timelapse', 
    'filename',
    't'
]

# Order df by specified columns & sort by offset value
locat_df = locat_df[locat_cols].sort_values(by=['offset'])

In [10]:
# # locat_df.columns
# # Select columns
# locat_cols = [
#     'offset',
#     'id',
#     'ride_id',
#     'uuid',
#     'course',
#     'latitude',
#     'longitude',
#     'geohash',
#     'speed',
#     'accuracy',
# #     't'
# ]

# # Filter df for selected columns & sort by offset value
# locat_df = locat_df[locat_cols].sort_values(by=['offset'])

In [12]:
# # Define named tuple
# Locations = namedtuple('Locations', locat_cols)

# # Assign records to named tuple
# records = [Locations(*record) for record in locat_df.to_records(index=False)]

In [30]:
len(accel_df)

23512

In [31]:
len(locat_df)

478

In [32]:
locat_df

Unnamed: 0,offset,id,ride_id,uuid,timestamp,course,latitude,longitude,geohash,speed,accuracy,timelapse,filename,t
1,1.077913,85c61911b7fe2ced1000c33c9e932706,6760ffa3f41908695d1405b776c3e8d5,dad7eae44e784b549c8c5a3aa051a8c7,1970-01-01 00:25:07.320453,158.203125,40.677641,-73.817930,dr5x2jpkmtcy,2.120000,10.0,False,d745b92f-aefd-467d-9121-7a71308e8d6d.mov,000.0
0,1.525061,58682c5d48cad9d9e103431d773615bf,c9a2b46c9aa515b632eddc45c4868482,19b9aa10588646b3bf22c9b4865a7995,1970-01-01 00:25:03.882586,299.619141,40.762870,-73.961949,dr5ruuwscttz,0.000000,10.0,False,e2f795a7-6a7d-4500-b5d7-4569de996811.mov,000.0
2,4.525061,58682c5d48cad9d9e103431d773615bf,c9a2b46c9aa515b632eddc45c4868482,19b9aa10588646b3bf22c9b4865a7995,1970-01-01 00:25:03.882583,299.619141,40.762870,-73.961949,dr5ruuwsctv3,0.000000,10.0,False,e2f795a7-6a7d-4500-b5d7-4569de996811.mov,004.5
3,5.077913,85c61911b7fe2ced1000c33c9e932706,6760ffa3f41908695d1405b776c3e8d5,dad7eae44e784b549c8c5a3aa051a8c7,1970-01-01 00:25:07.320449,159.960938,40.677883,-73.818047,dr5x2jpmfffw,11.750000,10.0,False,d745b92f-aefd-467d-9121-7a71308e8d6d.mov,004.5
5,8.077913,85c61911b7fe2ced1000c33c9e932706,6760ffa3f41908695d1405b776c3e8d5,dad7eae44e784b549c8c5a3aa051a8c7,1970-01-01 00:25:07.320446,159.609375,40.678191,-73.818193,dr5x2jppxkqj,13.150000,10.0,False,d745b92f-aefd-467d-9121-7a71308e8d6d.mov,007.8
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
476,121.440927,064184d44429da1967d8b2873d6c5053,6974a887428b102c9564dbcc41b52df0,f57cdc42893d4cbc8b139c1e1650482f,1970-01-01 00:25:04.256850,173.629547,32.172733,34.893325,sv8z40bec2ht,5.900000,5.0,False,e85b702f-9912-4899-9e10-ccd89606f089.mov,121.4
473,121.927121,,b710d8312757efeb9d4d1dbf0e545653,95ffd0af905a4549bfe975526605e41f,1970-01-01 00:24:50.489901,296.872864,37.804754,-122.272297,9q9p1dhh8uzj,0.000000,10.0,False,faa2aa67-b813-4d82-a1e1-4e308be8086d.mov,121.4
472,122.166896,193ec33b0d4330452d8e6214388efbe6,7758b047316155c7991ceddfcb964f96,6cfaa4a137fc419199221719e2b34aae,1970-01-01 00:25:03.500104,200.039062,40.683265,-74.001895,dr5rkn1mny7g,19.049999,5.0,False,ef07c198-5de5-430f-b648-632c762c7b3a.mov,121.4
475,122.420328,4a7bf56df03b1db624c2813b6b94dfda,3b210670ccfce98e494c10d1c5a70ca7,d1f37820e0e842069d59649cae7cd0ef,1970-01-01 00:25:07.065369,159.372467,40.923765,-73.857733,dr72z3fqnndx,4.930000,30.0,False,92d1b741-3093-4991-9b35-ba406aa55d78.mov,121.4


#### okay, so...

In [6]:
# Load libraries
import pyarrow.parquet as pq
from collections import namedtuple
import pandas as pd
import time
from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin.new_topic import NewTopic
import uuid
import json

# Set path to data
src_data_path = '/home/jovyan/dsc650/data/processed/bdd/'

In [35]:
# Load acceleration data into pandas dataframe
accel_df = pq.ParquetDataset(
    src_data_path + 'accelerations/').read_pandas().to_pandas()

# Reorder columns
accel_cols = [
    'offset',
    'ride_id',
    'uuid', 
    'timestamp', 
    'x', 'y', 'z',
    'timelapse', 
    'filename', 
    't'
]

# Order df by specified columns & sort by offset value
accel_df = accel_df[accel_cols].sort_values(by=['offset'])

In [36]:
# Load location data into pandas dataframe
locat_df = pq.ParquetDataset(
    src_data_path + 'locations/').read_pandas().to_pandas()

# Reorder columns
locat_cols = [
    'offset',
    'id', 
    'ride_id', 
    'uuid', 
    'timestamp', 
    'course', 
    'latitude',
    'longitude', 
    'geohash', 
    'speed', 
    'accuracy', 
    'timelapse', 
    'filename',
    't'
]

# Order df by specified columns & sort by offset value
locat_df = locat_df[locat_cols].sort_values(by=['offset'])

In [None]:
# infos = []
# offsets = []
# topic = []

##### ~convert timestamp to string~
nvm, using JSON instead of dict

In [None]:
# accel_df['timestamp'] = accel_df['timestamp'].astype(str)

In [33]:
accel_df.iloc[0].to_json()

'{"offset":0.8220608865,"ride_id":"c9a2b46c9aa515b632eddc45c4868482","uuid":"19b9aa10588646b3bf22c9b4865a7995","timestamp":1503882,"x":-0.994,"y":0.045,"z":-0.036,"timelapse":false,"filename":"e2f795a7-6a7d-4500-b5d7-4569de996811.mov","t":"000.0"}'

In [36]:
test_loc = locat_df.head()
test_acc = accel_df.head()

In [45]:
cols = ['offset', 'topic', 'data']

In [46]:
import pandas as pd

In [40]:
# test_loc.iloc[0]

offset                                        1.07791
id                   85c61911b7fe2ced1000c33c9e932706
ride_id              6760ffa3f41908695d1405b776c3e8d5
uuid                 dad7eae44e784b549c8c5a3aa051a8c7
timestamp                  1970-01-01 00:25:07.320453
course                                        158.203
latitude                                      40.6776
longitude                                    -73.8179
geohash                                  dr5x2jpkmtcy
speed                                            2.12
accuracy                                           10
timelapse                                       False
filename     d745b92f-aefd-467d-9121-7a71308e8d6d.mov
t                                               000.0
Name: 1, dtype: object

In [44]:
# test_loc.columns

Index(['offset', 'id', 'ride_id', 'uuid', 'timestamp', 'course', 'latitude',
       'longitude', 'geohash', 'speed', 'accuracy', 'timelapse', 'filename',
       't'],
      dtype='object')

In [88]:
df = pd.DataFrame(columns=cols)

In [89]:
topic = 'locations'
for i in range(len(test_loc)):
    ofst = test_loc.offset.iloc[i]
    data = test_loc.iloc[i].to_dict()
    df2 = pd.DataFrame([[ofst, topic, data]], columns=cols)
#     print(df2)
#     df.append(df2)
    df = pd.concat([df, df2])
    
df

Unnamed: 0,offset,topic,data
0,1.077913,locations,"{'offset': 1.0779125295566454, 'id': '85c61911..."
0,1.525061,locations,"{'offset': 1.525060886522843, 'id': '58682c5d4..."
0,4.525061,locations,"{'offset': 4.5250608865228426, 'id': '58682c5d..."
0,5.077913,locations,"{'offset': 5.077912529556645, 'id': '85c61911b..."
0,8.077913,locations,"{'offset': 8.077912529556645, 'id': '85c61911b..."


#### and then...

In [37]:
def reformat_data(df, topic):
    cols = ['offset', 'topic', 'data']
    temp_df = pd.DataFrame(columns=cols)
    for row in range(len(df)):
        record_offset = df.offset.iloc[row]
        record_data = df.iloc[row].to_json() # changed from .to_dict()
        record = pd.DataFrame([[record_offset, topic, record_data]], 
                              columns=cols)
        temp_df = pd.concat([temp_df, record])
    return temp_df

In [115]:
# acc_ref = reformat_data(test_acc, 'accelerations')
# # acc_ref

In [116]:
# loc_ref = reformat_data(test_loc, 'locations')
# # loc_ref

In [117]:
# df = pd.concat([acc_ref, loc_ref]).sort_values('offset', ignore_index=True)
# # df

In [38]:
acc_ref = reformat_data(accel_df, 'accelerations')
loc_ref = reformat_data(locat_df, 'locations')
df = pd.concat([acc_ref, loc_ref]).sort_values('offset', ignore_index=True)

In [39]:
df

Unnamed: 0,offset,topic,data
0,0.822061,accelerations,"{""offset"":0.8220608865,""ride_id"":""c9a2b46c9aa5..."
1,0.842061,accelerations,"{""offset"":0.8420608865,""ride_id"":""c9a2b46c9aa5..."
2,0.862061,accelerations,"{""offset"":0.8620608865,""ride_id"":""c9a2b46c9aa5..."
3,0.882061,accelerations,"{""offset"":0.8820608865,""ride_id"":""c9a2b46c9aa5..."
4,0.902061,accelerations,"{""offset"":0.9020608865,""ride_id"":""c9a2b46c9aa5..."
...,...,...,...
23985,122.449896,accelerations,"{""offset"":122.4498959608,""ride_id"":""7758b04731..."
23986,122.453927,accelerations,"{""offset"":122.4539268052,""ride_id"":""6974a88742..."
23987,122.455121,accelerations,"{""offset"":122.4551214097,""ride_id"":""b710d83127..."
23988,122.465328,accelerations,"{""offset"":122.4653281476,""ride_id"":""3b210670cc..."


In [10]:
# import time

# test = [1, 1, 4, 5, 8]
# time_start = time.time()
# for i in test:
#     while (time.time() - time_start < i):
#         pass
#     print(i)

#### producer time

In [12]:
## Set configuration parameters
config = dict(
    bootstrap_servers=['kafka.kafka.svc.cluster.local:9092'],
    first_name='Scott',
    last_name='Breitbach'
)

config['client_id'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)
config['topic_prefix'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)

config

{'bootstrap_servers': ['kafka.kafka.svc.cluster.local:9092'],
 'first_name': 'Scott',
 'last_name': 'Breitbach',
 'client_id': 'BreitbachScott',
 'topic_prefix': 'BreitbachScott'}

In [13]:
## Create a kafka topic based on config settings
def create_kafka_topic(topic_name, config=config, num_partitions=1, replication_factor=1):
    bootstrap_servers = config['bootstrap_servers']
    client_id = config['client_id']
    topic_prefix = config['topic_prefix']
    name = '{}-{}'.format(topic_prefix, topic_name)
    
    admin_client = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers, 
        client_id=client_id
    )
    
    topic = NewTopic(
        name=name,
        num_partitions=num_partitions,
        replication_factor=replication_factor
    )

    topic_list = [topic]
    try:
        admin_client.create_topics(new_topics=topic_list)
        print('Created topic "{}"'.format(name))
    except TopicAlreadyExistsError as e:
        print('Topic "{}" already exists'.format(name))
    
# create_kafka_topic('locations')

In [18]:
from kafka.admin.new_topic import NewTopic

In [19]:
## Create Kafka topics
create_kafka_topic('accelerations')
create_kafka_topic('locations')

Created topic "BreitbachScott-accelerations"
Created topic "BreitbachScott-locations"


In [15]:
from kafka import KafkaProducer, KafkaAdminClient

In [16]:
## Serialize Python objects as JSON
producer = KafkaProducer(
  bootstrap_servers=config['bootstrap_servers'],
  value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

In [20]:
## Send Python objects to a Kafka topic
def on_send_success(record_metadata):
    print('Message sent:\n    Topic: "{}"\n    Partition: {}\n    Offset: {}'.format(
        record_metadata.topic,
        record_metadata.partition,
        record_metadata.offset
    ))
    
def on_send_error(excp):
    print('I am an errback', exc_info=excp)
    # handle exception

def send_data(topic, data, config=config, producer=producer, msg_key=None):
    topic_prefix = config['topic_prefix']
    topic_name = '{}-{}'.format(topic_prefix, topic)
    
    if msg_key is not None:
        key = msg_key
    else:
        key = uuid.uuid4().hex
    
    producer.send(
        topic_name, 
        value=data,
        key=key.encode('utf-8')
    ).add_callback(on_send_success).add_errback(on_send_error)

In [43]:
df.head(1)

Unnamed: 0,offset,topic,data
0,0.822061,accelerations,"{""offset"":0.8220608865,""ride_id"":""c9a2b46c9aa5..."


In [46]:
loc = 1

In [47]:
test_offset = df.offset.iloc[loc]
print(test_offset)
test_topic = df.topic.iloc[loc]
print(test_topic)
test_data = df.data.iloc[loc]
print(test_data)

0.8420608865228429
accelerations
{"offset":0.8420608865,"ride_id":"c9a2b46c9aa515b632eddc45c4868482","uuid":"19b9aa10588646b3bf22c9b4865a7995","timestamp":1503882,"x":-0.998,"y":0.046,"z":-0.04,"timelapse":false,"filename":"e2f795a7-6a7d-4500-b5d7-4569de996811.mov","t":"000.0"}


In [29]:
import uuid
import json

In [48]:
send_data(test_topic, test_data)

Message sent:
    Topic: "BreitbachScott-accelerations"
    Partition: 0
    Offset: 1


#### send

In [None]:
time_start = time.time()

for i in range(len(df)):
    record_offset = df.offset.iloc[i]
    while (time.time() - time_start) < record_offset:
        pass
    send_data(df.topic.iloc[i], df.data.iloc[i])

In [None]:
time_start = time.time()
for i in range(len(test)):
    j = test['offset'].iloc[i]
    while (time.time() - time_start) < j:
        pass
    print(j)