#Install Required Library

In [None]:
pip install kafka-python

#Import Required Libraries

In [None]:
import pandas as pd
from kafka import KafkaProducer
from time import sleep
from json import dumps
import json

#Configure Kafka Producer
Here, a KafkaProducer instance is created with the following configurations:
* Bootstrap servers: [':9092'] (Note: You need to add your IP address here)
* Value serializer: A lambda function to encode messages as JSON and then to UTF-8

In [None]:
producer = KafkaProducer(bootstrap_servers=[':9092'], #change ip here
                         value_serializer=lambda x:
                         dumps(x).encode('utf-8'))

#Send a Test Message

In [None]:
producer.send('demo_test', value={'surnasdasdame':'parasdasdmar'})

#Load and Preview Data

In [None]:
df = pd.read_csv("data/indexProcessed.csv")

In [None]:
df.head()

#Continuously Send Data to Kafka
This loop continuously sends data to Kafka:
* It randomly samples one row from the DataFrame
* Converts the sampled row to a dictionary
* Sends the dictionary as a message to the 'demo_test' topic
* Waits for 1 second before the next iteration

In [None]:
while True:
    dict_stock = df.sample(1).to_dict(orient="records")[0]
    producer.send('demo_test', value=dict_stock)
    sleep(1)

#Flush the Producer

In [None]:
producer.flush() #clear data from kafka server

#Message Batching:
Kafka producers can batch messages for better throughput. You can configure this using the batch_size and linger_ms parameters:

In [None]:
producer = KafkaProducer(
    bootstrap_servers=[':9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8'),
    batch_size=16384,
    linger_ms=100
)

#Message Compression:
You can enable compression to reduce network bandwidth usage:

In [None]:
producer = KafkaProducer(
    bootstrap_servers=[':9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8'),
    compression_type='gzip'
)

#Asynchronous Sending:
Instead of using sleep(), you can use asynchronous sending with callbacks:

In [None]:
def on_send_success(record_metadata):
    print(f"Topic: {record_metadata.topic}, Partition: {record_metadata.partition}, Offset: {record_metadata.offset}")

def on_send_error(excp):
    print('I am an errback', exc_info=excp)

while True:
    dict_stock = df.sample(1).to_dict(orient="records")[0]
    producer.send('demo_test', value=dict_stock).add_callback(on_send_success).add_errback(on_send_error)

#Message Keys:
You can add keys to your messages for partitioning:

In [None]:
producer.send('demo_test', key=b'stock_data', value=dict_stock)

#Producer Configurations:
 You can add more configurations to the producer for better control:

In [None]:
producer = KafkaProducer(
    bootstrap_servers=[':9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8'),
    acks='all',
    retries=3,
    max_in_flight_requests_per_connection=1
)