In [1]:
# Check version protobuf and generate protobuf file

!protoc --version
!protoc --python_out=. 'proto/transaction_product.proto'

libprotoc 3.21.12


In [2]:
# Install library

!pip install faker-commerce

Collecting faker-commerce
  Downloading faker_commerce-1.0.4-py3-none-any.whl (3.3 kB)
Installing collected packages: faker-commerce
Successfully installed faker-commerce-1.0.4


In [3]:
# Import library

import os
import uuid
import faker_commerce
from datetime import datetime, timedelta
from dotenv import load_dotenv
from faker import Faker
from kafka import KafkaProducer
from pathlib import Path
from proto import transaction_product_pb2
from time import sleep

In [4]:
# Load environment file

dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [5]:
# Set kafka variable configuration

kafka_host = os.getenv('KAFKA_HOST')
kafka_host_broker_2 = os.getenv('KAFKA_BROKER_2_HOST')
kafka_topic = os.getenv('topic')
kafka_topic_partition = os.getenv('topic') + "-1"

In [6]:
# Set faker for dummy data add ecommerce to faker

faker = Faker()
faker.add_provider(faker_commerce.Provider)

In [7]:
# Create Faker data with format protobuf

class Datagenerator(object):
    @staticmethod
    def get_data():

        # Random time stamp for random transaction range 5 second
        timestamp = datetime.now() - timedelta(seconds=faker.random_int(min=0, max=5))
        transaction = transaction_product_pb2.TransactionProduct()
        transaction.id = uuid.uuid4().__str__()
        transaction.product_name = faker.ecommerce_name()
        transaction.customer_name = faker.name()
        transaction.status = faker.random_element(elements=("checkout", "cancel"))
        transaction.timestamp = datetime.strftime(timestamp, "%d-%m-%y %H:%M:%S")
        transaction.total_payment = faker.random_int(min=5000, max=5000000)
        return transaction

In [8]:
# Create producer kafka using 2 broker with 2 replication

producer = KafkaProducer(bootstrap_servers=[f'{kafka_host}:9092', f'{kafka_host_broker_2}:9095'], value_serializer=lambda v: v.SerializeToString())

In [9]:
# Send message using topic assignment with 4 partition

for i in range(0, 200):
    transaction_data = Datagenerator.get_data()
    response = producer.send(topic=kafka_topic, value=transaction_data)
    print(transaction_data)
    sleep(5)

id: "151a9737-3fdf-4562-8492-4cda36730919"
product_name: "Rustic Chips"
customer_name: "Jacob Martin"
status: "cancel"
timestamp: "04-11-24 05:34:20"
total_payment: 3868427

id: "7520fc2e-dbf2-4701-9b45-bb3bcf5cf763"
product_name: "Chicken"
customer_name: "Michael Fernandez"
status: "cancel"
timestamp: "04-11-24 05:34:25"
total_payment: 3440949

id: "6a776c34-1bab-4af7-8087-2c2f91f81a74"
product_name: "Practical Cotton Cheese"
customer_name: "Daniel Bishop"
status: "checkout"
timestamp: "04-11-24 05:34:32"
total_payment: 245308

id: "8f504aa8-ed08-4d1f-b282-ca1b10217cf9"
product_name: "Fantastic Plastic Pants"
customer_name: "Stacey Willis"
status: "checkout"
timestamp: "04-11-24 05:34:34"
total_payment: 567413

id: "c1bbb8ba-3418-4e77-8322-ba4d1cc26c06"
product_name: "Mouse"
customer_name: "Laura Harris"
status: "cancel"
timestamp: "04-11-24 05:34:41"
total_payment: 2796100

id: "cae2fcb7-a581-492b-a69a-9418a7906063"
product_name: "Gorgeous Keyboard"
customer_name: "Geoffrey Lee"
stat

KeyboardInterrupt: 