# PRODUCER

### STEP 1 = install kafka-python which is an open-source Python library that provides a way to interact with Apache Kafka. It is easy to use, supports kafka features and is lightweight.

In [None]:
pip install kafka-python 

###  STEP 2 = import the required libraries. Here the json-dumps = converts a Python object (like a dictionary or list) into a JSON-formatted string.

In [None]:
import pandas as pd
from kafka import KafkaProducer
from time import sleep
from json import dumps
import json

### STEP 3 = define producer
### bootstrap_servers = Specifies the Kafka broker's IP address (this is my public IPV4 address on EC2 instamnce) and port (9092).
### kafka understands only byte encoded messages. therefore, we need a serializer 
### lambda x: dumps(x).encode('utf-8') 
### x is the message you want to send.
### dumps(x) converts the Python object (x) into a JSON-formatted string.
### .encode('utf-8') converts the JSON string into bytes.

In [None]:
producer = KafkaProducer(bootstrap_servers=[':9092'], #add your public ip
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))


### STEP 4 = send the message to the broker / kafka server to store the message under the specific topic 

In [None]:
#producer.send('demo_test2', value={'name':'abc'}) # demo or sample test 

In [None]:
#df = pd.read_csv("data/indexProcessed.csv")
df = pd.read_csv("/****/taxi_tripdata.csv", nrows=500) #add path 

In [None]:
df.head()


In [None]:
while True:
    dict_stock = df.sample(1).to_dict(orient="records")[0]
    producer.send('demo_test2', value=dict_stock)
    sleep(1)

In [None]:
producer.flush() #clear data from kafka server