### DSC 650
### Week 8
### Alexis Zimmer

In [1]:
## import libraries
import json
import uuid
from json import dumps
from kafka import KafkaProducer, KafkaAdminClient, KafkaConsumer, TopicPartition
from kafka.admin.new_topic import NewTopic
from kafka.errors import TopicAlreadyExistsError
from kafka.errors import KafkaError
from time import sleep
import decimal
import threading
import shutil
import os
import glob
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as parq
import time
import dask.dataframe as dd
from pathlib import Path
from datetime import datetime, timedelta
from pyspark.sql.types import *
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master('local') \
    .appName('parquetFile') \
    .config('spark.executor.memory', '5gb') \
    .config("spark.cores.max", "6") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/29 20:30:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
## Speeds up spark
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") 
spark.conf.set("spark.rapids.sql.format.parquet.read.enabled", "true")
spark.conf.set("spark.rapids.sql.format.parquet.write.enabled", "true")
spark.conf.set("spark.rapids.sql.format.parquet.reader.type=MULTITHREADED", "true")


In [6]:
config = dict(
    bootstrap_servers=['kafka.kafka.svc.cluster.local:9092'],
    first_name='Alexis',
    last_name='Zimmer'
)

config['client_id'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)
config['topic_prefix'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)

config

{'bootstrap_servers': ['kafka.kafka.svc.cluster.local:9092'],
 'first_name': 'Alexis',
 'last_name': 'Zimmer',
 'client_id': 'ZimmerAlexis',
 'topic_prefix': 'ZimmerAlexis'}

In [7]:
producer = KafkaProducer(bootstrap_servers=config['bootstrap_servers'], value_serializer=lambda x: json.dumps(x).encode('utf-8'))
general_consumer = KafkaConsumer(bootstrap_servers=config['bootstrap_servers'], consumer_timeout_ms=1000)

In [8]:
def loadParquet(parq_path):
    pqr = spark.read.parquet(parq_path)
    pqr = pqr.toPandas()
    return pqr

In [9]:
def splitstr(std):
    before, after = str(std).split('.')
    return before, after

In [11]:
def startTimer(results_dir):
    print("call function")
    retval = startTimedParquetStreamUpdateLoop(results_dir)

    if ((time.time() - start_time) < 70 and retval == 0):
        t = threading.Timer(interval, startTimer(results_dir))

In [13]:
def create_kafka_topic(topic_name, config=config, num_partitions=1, replication_factor=1):
    bootstrap_servers = config['bootstrap_servers']
    client_id = config['client_id']
    topic_prefix = config['topic_prefix']
    name = '{}-{}'.format(topic_prefix, topic_name)

    admin_client = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers,
        client_id=client_id
    )

    topic = NewTopic(
        name=name,
        num_partitions=num_partitions,
        replication_factor=replication_factor
    )

    topic_list = [topic]
    try:
        admin_client.create_topics(new_topics=topic_list)
        print('Topic Created "{}"'.format(name))
    except TopicAlreadyExistsError as e:
        print('Topic "{}" already exists'.format(name))

In [14]:
def create_kafka_consumer(topics, config=config):
    bootstrap_servers = config['bootstrap_servers']
    client_id = config['client_id']
    topic_prefix = config['topic_prefix']
    topic_list = ['{}-{}'.format(topic_prefix, topic) for topic in topics]

    return KafkaConsumer(
        *topic_list,
        client_id=client_id,
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset='earliest',
        enable_auto_commit=False,
        value_deserializer=lambda x: json.loads(x)
    )

consumer = create_kafka_consumer(['locations', 'accelerations'])

In [15]:
def print_messages(consumer=consumer):
    try:

        for message in consumer:
            msg_metadata = 'Message Metadata: {}:{}:{}'.format(
                message.topic, message.partition, message.offset
            )

            topic = message.topic
            tp = TopicPartition(topic, 0)
            consumer.seek_to_end(tp)
            lastOffset = consumer.position(tp)

            if message.key is not None:
                msg_key = message.key.decode('utf-8')
            else:
                msg_key = ''
            msg_value = json.dumps(message.value, indent=2)
            msg_value = '\n'.join(['  {}'.format(value) for value in msg_value.split('\n')])

            print('Message Metadata:')
            print('  Topic: {}'.format(message.topic))
            print('  Partition: {}'.format(message.partition))
            print('  Offset: {}'.format(message.offset))
            print('Message Key: {}'.format(msg_key))
            print('Message Value:')
            print(msg_value)
            print()
            if message.offset == lastOffset - 1:
                break

    except KeyboardInterrupt:
        print("Interrupted")


In [16]:
def on_send_success(record_metadata):
    print('Message sent:\n    Topic: "{}"\n    Partition: {}\n    Offset: {}'.format(
        record_metadata.topic,
        record_metadata.partition,
        record_metadata.offset
    ))


In [17]:
def on_send_error(excp):
    print('Error', exc_info=excp)

In [19]:
def send_data(topic, data, config=config, producer=producer, msg_key=None):
    topic_prefix = config['topic_prefix']
    topic_name = '{}-{}'.format(topic_prefix, topic)
    print(topic)
    print(topic_prefix)
    print(topic_name)

    if msg_key is not None:
        key = msg_key
    else:
        key = uuid.uuid4().hex

    print(data)
    sendout = producer.send(topic_name, key=key.encode('utf-8'), value=data).add_callback(on_send_success).add_errback(on_send_error)

    try:
        record_metadata = sendout.get(timeout=10)
    except KafkaError:
        log.exception()
        pass
    
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)

In [20]:
create_kafka_topic('locations')
create_kafka_topic('accelerations')

Topic Created "ZimmerAlexis-locations"
Topic Created "ZimmerAlexis-accelerations"


In [21]:
base_dir = Path('C:/Users/21223198/Documents/GitHub/dsc650/assignments/assignment08')
results_dir = base_dir.joinpath('results')

In [25]:
fpath = 'C:/Users/21223198/Documents/GitHub/dsc650/data/processed/bdd/accelerations/'
targetparqfilenames = os.listdir(fpath)
for fname in targetparqfilenames:
    std = (time.time() - start_time)
    before, after = str(std).split('.')
    fname_secs = decimal.Decimal(fname.replace("t=",""))
    #print(fname_secs)
    while std < fname_secs:
        sleep(0.05)
        std = (time.time() - start_time)
        print('Looping til after... ', fname_secs)

    print(fname_secs)
    parqaccl = 'C:/Users/21223198/Documents/GitHub/dsc650/data/processed/bdd/accelerations/'+str(fname)
    parqloc = 'C:/Users/21223198/Documents/GitHub/dsc650/data/processed/bdd/locations/'+str(fname)

    if fname_secs == 52.5:
        # Producer
        print("At the 52.5 mark")
        par_accelerations = loadParquet(parqaccl)
        par_accelerations = par_accelerations.to_json()
        par_locations = loadParquet(parqloc)
        par_locations = par_locations.to_json()
        send_data('accelerations', par_accelerations)
        send_data('locations', par_locations)
        break
    print("Finished!")

FileNotFoundError: [Errno 2] No such file or directory: 'C:/Users/21223198/Documents/GitHub/dsc650/data/processed/bdd/accelerations/'