In [5]:
from kafka import KafkaProducer
from datetime import datetime
import time
from json import dumps
import random
from configparser import ConfigParser
from manage_kafka_topics import KafkaTopicManager

In [6]:
conf_file_name = "streaming_app.conf"
config_obj = ConfigParser()
config_read_obj = config_obj.read(conf_file_name)
print(config_obj.sections())

['kafka']


In [7]:
# Kafka Cluster/Server Details
kafka_host_name = config_obj.get('kafka', 'host')
kafka_port_no = config_obj.get('kafka', 'port_no')
kafka_topic_name = config_obj.get('kafka', 'topic_name')

k_t_m = KafkaTopicManager(kafka_host_name, kafka_port_no)
k_t_m.create_topic(kafka_topic_name)
KAFKA_TOPIC_NAME_CONST = kafka_topic_name
KAFKA_BOOTSTRAP_SERVERS_CONST = kafka_host_name + ':' + kafka_port_no

Kafka Topic previously created is available


In [8]:
# Sample data
# {"order_id": 101, "order_date": "2021-02-06 07:55:00", "order_amount": 100}

kafka_producer_obj = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONST,
                                   value_serializer=lambda x: dumps(x).encode('utf-8'))


product_name_list = ["Laptop", "Desktop Computer", "Mobile Phone", "Wrist Band", "Wrist Watch", "LAN Cable",
                     "HDMI Cable", "TV", "TV Stand", "Text Books", "External Hard Drive", "Pen Drive"]

order_card_type_list = ['Visa', 'Master', 'Maestro', 'American Express', 'PhonePe', 'PayPal']

country_name_city_name_list = ["Sydney,Australia", "Florida,United States", "New York City,United States",
                               "Paris,France", "Colombo,Sri Lanka", "Dhaka,Bangladesh", "Islamabad,Pakistan",
                               "Beijing,China", "Rome,Italy", "Berlin,Germany", "Ottawa,Canada",
                               "London,United Kingdom", "Jerusalem,Israel", "Bangkok,Thailand",
                               "Chennai,India", "Bangalore,India", "Mumbai,India", "Pune,India",
                               "New Delhi,Inida", "Hyderabad,India", "Kolkata,India", "Singapore,Singapore"]

ecommerce_website_name_list = ["www.amazon.com", "www.flipkart.com", "www.snapdeal.com", "www.ebay.com"]

message_list = []
message = None
for ind, it_ in enumerate(range(100)):
    message = {}
    print("Preparing message: " + str(ind))
    event_datetime = datetime.now()

    message["order_id"] = str(ind)
    message["order_product_name"] = random.choice(product_name_list)
    message["order_card_type"] = random.choice(order_card_type_list)
    message["order_amount"] = round(random.uniform(5.5, 1555.5), 2)
    message["order_datetime"] = event_datetime.strftime("%Y-%m-%d %H:%M:%S")
    location = random.choice(country_name_city_name_list)
    message["order_location"] = location
    message["order_ecommerce_source"] = random.choice(ecommerce_website_name_list)

    print("Message: ", message)
    kafka_producer_obj.send(KAFKA_TOPIC_NAME_CONST, message)
    time.sleep(1)

print("Kafka Producer Application Completed. ")

Preparing message: 0
Message:  {'order_id': '0', 'order_product_name': 'Wrist Watch', 'order_card_type': 'American Express', 'order_amount': 296.1, 'order_datetime': '2024-05-10 17:52:09', 'order_location': 'Bangkok,Thailand', 'order_ecommerce_source': 'www.snapdeal.com'}
Preparing message: 1
Message:  {'order_id': '1', 'order_product_name': 'Desktop Computer', 'order_card_type': 'American Express', 'order_amount': 267.29, 'order_datetime': '2024-05-10 17:52:10', 'order_location': 'Jerusalem,Israel', 'order_ecommerce_source': 'www.snapdeal.com'}
Preparing message: 2
Message:  {'order_id': '2', 'order_product_name': 'Pen Drive', 'order_card_type': 'Visa', 'order_amount': 145.72, 'order_datetime': '2024-05-10 17:52:11', 'order_location': 'Jerusalem,Israel', 'order_ecommerce_source': 'www.snapdeal.com'}
Preparing message: 3
Message:  {'order_id': '3', 'order_product_name': 'HDMI Cable', 'order_card_type': 'American Express', 'order_amount': 1082.02, 'order_datetime': '2024-05-10 17:52:12'

Preparing message: 31
Message:  {'order_id': '31', 'order_product_name': 'TV', 'order_card_type': 'PhonePe', 'order_amount': 578.91, 'order_datetime': '2024-05-10 17:52:40', 'order_location': 'London,United Kingdom', 'order_ecommerce_source': 'www.flipkart.com'}
Preparing message: 32
Message:  {'order_id': '32', 'order_product_name': 'Mobile Phone', 'order_card_type': 'Maestro', 'order_amount': 177.27, 'order_datetime': '2024-05-10 17:52:41', 'order_location': 'Florida,United States', 'order_ecommerce_source': 'www.ebay.com'}
Preparing message: 33
Message:  {'order_id': '33', 'order_product_name': 'HDMI Cable', 'order_card_type': 'Master', 'order_amount': 586.1, 'order_datetime': '2024-05-10 17:52:42', 'order_location': 'Beijing,China', 'order_ecommerce_source': 'www.snapdeal.com'}
Preparing message: 34
Message:  {'order_id': '34', 'order_product_name': 'Pen Drive', 'order_card_type': 'Maestro', 'order_amount': 297.37, 'order_datetime': '2024-05-10 17:52:43', 'order_location': 'Hyderab

Preparing message: 63
Message:  {'order_id': '63', 'order_product_name': 'LAN Cable', 'order_card_type': 'American Express', 'order_amount': 1154.11, 'order_datetime': '2024-05-10 17:53:12', 'order_location': 'Ottawa,Canada', 'order_ecommerce_source': 'www.snapdeal.com'}
Preparing message: 64
Message:  {'order_id': '64', 'order_product_name': 'External Hard Drive', 'order_card_type': 'Maestro', 'order_amount': 613.6, 'order_datetime': '2024-05-10 17:53:13', 'order_location': 'Sydney,Australia', 'order_ecommerce_source': 'www.flipkart.com'}
Preparing message: 65
Message:  {'order_id': '65', 'order_product_name': 'TV Stand', 'order_card_type': 'Master', 'order_amount': 548.58, 'order_datetime': '2024-05-10 17:53:14', 'order_location': 'Paris,France', 'order_ecommerce_source': 'www.flipkart.com'}
Preparing message: 66
Message:  {'order_id': '66', 'order_product_name': 'Laptop', 'order_card_type': 'Maestro', 'order_amount': 849.63, 'order_datetime': '2024-05-10 17:53:15', 'order_location':

Preparing message: 94
Message:  {'order_id': '94', 'order_product_name': 'HDMI Cable', 'order_card_type': 'PayPal', 'order_amount': 1297.32, 'order_datetime': '2024-05-10 17:53:43', 'order_location': 'Bangkok,Thailand', 'order_ecommerce_source': 'www.amazon.com'}
Preparing message: 95
Message:  {'order_id': '95', 'order_product_name': 'Laptop', 'order_card_type': 'American Express', 'order_amount': 361.51, 'order_datetime': '2024-05-10 17:53:44', 'order_location': 'Islamabad,Pakistan', 'order_ecommerce_source': 'www.amazon.com'}
Preparing message: 96
Message:  {'order_id': '96', 'order_product_name': 'External Hard Drive', 'order_card_type': 'Master', 'order_amount': 609.6, 'order_datetime': '2024-05-10 17:53:45', 'order_location': 'Singapore,Singapore', 'order_ecommerce_source': 'www.flipkart.com'}
Preparing message: 97
Message:  {'order_id': '97', 'order_product_name': 'TV', 'order_card_type': 'American Express', 'order_amount': 561.25, 'order_datetime': '2024-05-10 17:53:46', 'order