# **Kafka Producer**

Read data from a Pandas DataFrame and send it to a Kafka topic.

## **1. Importing Required Libraries**

First install the `kafka-python` library, which provides the Kafka client for Python.

In [None]:
!pip install kafka-python

In [None]:
import json
import pandas as pd
from time import sleep
from json import dumps
from kafka import KafkaProducer

- `sleep`: To add delays between message sends.
- `dumps`: To serialize Python objects to JSON format.
- `pandas`: For data manipulation and analysis.
- `KafkaProducer`: To send messages to the Kafka topic.

## **2. Setting Up the Kafka Producer**

Initialize the Kafka producer with the appropriate settings.

Replace `<your_ip>` with the IP address of your Kafka server.

In [None]:
producer = KafkaProducer(
    bootstrap_servers=['<your_ip>:9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8')
)

- `bootstrap_servers`: List of host/port pairs to use for establishing the initial connection to the Kafka cluster.
- `value_serializer`: Function that transforms the message before sending it to the broker.

## **3. Sending a Test Message**

Send a test message to ensure the producer is set up correctly.

In [None]:
producer.send('kafka_test', value={'sample_key': 'sample_value'})

- `send`: Sends a message to the specified topic.

## **4. Reading Data from the CSV File**

Read data from a CSV file into a Pandas DataFrame.

Ensure that the CSV file `indexProcessed.csv` exists in the `data` directory relative to this notebook.

In [None]:
# Read the CSV file into a DataFrame
df = pd.read_csv("../data/indexProcessed.csv")

# Display the first few rows of the DataFrame
df.head()

## **5. Sending Data to the Kafka Topic**

Iterate over the DataFrame and send each row as a message to the Kafka topic.

In a production environment, we might use an infinite loop to continuously send data. For demonstration purposes, we're limiting the number of messages to prevent the loop from running indefinitely.

In [None]:
# Set the number of messages you want to send
num_messages = 10

for _ in range(num_messages):
    # Randomly select one row from the DataFrame
    dict_stock = df.sample(1).to_dict(orient="records")[0]
    # Send the data to the Kafka topic
    producer.send('kafka_test', value=dict_stock)
    # Wait for 1 second before sending the next message
    sleep(1)

- `sleep(1)`: Adds a delay of 1 second between each message send.
- `df.sample(1)`: Randomly selects one row from the DataFrame.
- `to_dict(orient="records")[0]`: Converts the DataFrame row to a dictionary.

## **6. Flushing the Producer**

Ensure all buffered messages are sent to the Kafka topic.

In [None]:
# Flush the producer to make sure all messages have been sent
producer.flush()

- `flush`: Blocks until all pending messages are sent to the Kafka topic.