In [17]:
pip install kafka-python

Note: you may need to restart the kernel to use updated packages.


In [7]:
import pandas as pd 
from kafka import KafkaProducer  # Importing the KafkaProducer class from the kafka library for producing messages to Kafka
from time import sleep  # Importing the sleep function from the time module for introducing delays in code execution
from json import dumps  # Importing the dumps function from the json module for serializing Python objects to JSON format
import json  # Importing the json module for general JSON handling


In [8]:
# Create a KafkaProducer instance for producing messages to Kafka
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],  # Specify the Kafka broker(s) to connect to localhost
    value_serializer=lambda x: dumps(x).encode('utf-8')
    # Specify how to serialize the message values before sending.
    # Here, lambda x: dumps(x).encode('utf-8') is used as a custom serializer.
    # It converts the message value (Python object) into a JSON string using the dumps() function from the json module,
    # and then encodes the resulting JSON string to UTF-8 bytes format before sending it to Kafka.
)


In [9]:
# Read the CSV file into a DataFrame
df = pd.read_csv("indexProcessed.csv")


In [10]:
# Display the first few rows of the DataFrame 'df'
df.head()


Unnamed: 0,Index,Date,Open,High,Low,Close,Adj Close,Volume,CloseUSD
0,HSI,1986-12-31,2568.300049,2568.300049,2568.300049,2568.300049,2568.300049,0.0,333.879006
1,HSI,1987-01-02,2540.100098,2540.100098,2540.100098,2540.100098,2540.100098,0.0,330.213013
2,HSI,1987-01-05,2552.399902,2552.399902,2552.399902,2552.399902,2552.399902,0.0,331.811987
3,HSI,1987-01-06,2583.899902,2583.899902,2583.899902,2583.899902,2583.899902,0.0,335.906987
4,HSI,1987-01-07,2607.100098,2607.100098,2607.100098,2607.100098,2607.100098,0.0,338.923013


In [11]:
# Sample a random row from the DataFrame 'df' and convert it to a dictionary format
df.sample(1).to_dict(orient="records")


[{'Index': '399001.SZ',
  'Date': '2008-07-17',
  'Open': 9460.30957,
  'High': 9555.25,
  'Low': 9211.129883,
  'Close': 9288.370117,
  'Adj Close': 9288.266602,
  'Volume': 51700.0,
  'CloseUSD': 1486.13921872}]

In [12]:
# Flag to control the loop
running = True

# Counter to keep track of the number of iterations
iteration_count = 0

# Maximum number of iterations
max_iterations = 5

while running:
    # Sample a random row from the DataFrame 'df' and convert it to a dictionary format
    dict_stock = df.sample(1).to_dict(orient="records")[0]
    
    # Sending a value to the Kafka topic 
    producer.send('METCS777', value=dict_stock)
    
    # Pause execution for 1 second before sending the next message
    sleep(1)

    # Incrementing iteration count
    iteration_count += 1

    # Check if the maximum number of iterations has been reached or not
    if iteration_count >= max_iterations:
        running = False  # Set the flag to False to exit the loop after task is completed
