## Kafka Producer

In [None]:
import time
import json
import kafka

In [None]:
import io
import avro.schema
from avro.io import DatumWriter

In [None]:
# Without Schema
# topic = 'hello'
# bootstrap_server = "localhost:9092"
# producer = kafka.KafkaProducer(
#     bootstrap_servers=[bootstrap_server],
#     key_serializer=lambda key: key.encode('utf-8'),
#     value_serializer=lambda x: json.dumps(x).encode('utf-8')
# )

In [None]:
# With Schema
"""https://github.com/thanhson1085/python-kafka-avro"""
SCHEMA_PATH = "stock_schema.avsc"
SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())

In [None]:
topic = 'hello'
bootstrap_server = "localhost:9092"
producer = kafka.KafkaProducer(
    bootstrap_servers=[bootstrap_server]
)

In [None]:
'''
Sample loop to send Data to kafka
for i in range(1000):
    data = {
        "num": i
    }
    producer.send(topic, key="num", value=data)
    time.sleep(3)
'''

## Get Stock Live Data

In [None]:
# !pip install yfinance

In [None]:
import yfinance as yf
import datetime

In [None]:
# Getting wierd graphs since there a gap between days
# df = yf.download(tickers='UBER', period='5d', interval='5m')

df = yf.download(tickers='UBER', period='1d', interval='1m')

In [None]:
stock_dict = df["High"].to_dict().items()

In [None]:
# for d in stock_dict:
#     data = (d[0].strftime('%Y-%m-%d %H:%M:%S'), d[1])
#     print(data)
#     break

In [None]:
# Without Schema
# try:
#     for d in stock_dict:
#         data = (d[0].strftime('%Y-%m-%d %H:%M:%S'), d[1])
        
#         # We are applying Synchronous send since it is financial data.
#         # We will wait till we received the acknowledgement
#         recorded_metadata = producer.send(topic, key="UBER", value=data).get(timeout=10)
#         print("Below are the data sent: ")
#         print(key, data)
#         print(recorded_metadata.topic)
#         print(recorded_metadata.partition)
#         print(recorded_metadata.offset)
#     #     time.sleep(3)
# except Exception as e:
#     print("We recevied a error:::")
#     print(e)
# finally:
#     print("<---------------->")

In [None]:
# With Schema
try:
    for d in stock_dict:
        
        bytes_writer = io.BytesIO()
        encoder = avro.io.BinaryEncoder(bytes_writer)
        
        writer = DatumWriter(SCHEMA)
        data = {
            "name": "UBER",
            "time": d[0].strftime('%Y-%m-%d %H:%M:%S'),
            "price": d[1]
        }
        writer.write(data, encoder)
        
        raw_bytes = bytes_writer.getvalue()
        print(data, raw_bytes)
        producer.send(topic, raw_bytes)
        time.sleep(5)
except Exception as e:
    print("We recevied a error:::")
    print(e)
finally:
    print("<---------------->")

In [None]:
producer.flush()
producer.close()