In [18]:
import json
import random
import time

from faker import Faker
from confluent_kafka import SerializingProducer
from datetime import datetime

In [19]:
fake = Faker()

def generate_sales_transactions():
    products = {
        '1': {'name': 'MacBook Pro', 'brand': 'Apple', 'category': 'electronics', 'price': 1500.00},
        '2': {'name': 'Galaxy S21', 'brand': 'Samsung', 'category': 'electronics', 'price': 999.99},
        '3': {'name': 'iPad Air', 'brand': 'Apple', 'category': 'electronics', 'price': 599.00},
        '4': {'name': 'PlayStation 5', 'brand': 'Sony', 'category': 'gaming', 'price': 499.99},
        '5': {'name': 'Xbox Series X', 'brand': 'Microsoft', 'category': 'gaming', 'price': 499.99},
        '6': {'name': 'Echo Dot', 'brand': 'Amazon', 'category': 'home', 'price': 49.99},
        '7': {'name': 'Yoga Laptop', 'brand': 'Lenovo', 'category': 'electronics', 'price': 1200.00},
        '8': {'name': 'Noise Cancelling Headphones', 'brand': 'Bose', 'category': 'electronics', 'price': 299.99},
        '9': {'name': 'OLED TV', 'brand': 'LG', 'category': 'electronics', 'price': 1800.00},
        '10': {'name': 'Fitbit Charge 4', 'brand': 'Fitbit', 'category': 'health', 'price': 129.95}
    }
    
    productId, productDetails = random.choice(list(products.items()))
    
    customer_ids = ['customer_1', 'customer_2', 'customer_3', 'customer_4', 'customer_5', 'customer_6', 'customer_7', 'customer_8', 'customer_9', 'customer_10']

    return {
        "transactionId": fake.uuid4(),
        "productId": productId,
        "productName": productDetails['name'],
        'productCategory': productDetails['category'],
        'productPrice': productDetails['price'],
        'productQuantity': random.randint(1, 10),
        'productBrand': productDetails['brand'],
        'customerId': random.choice(customer_ids),
        'transactionDate': datetime.utcnow().isoformat() + 'Z',
        "paymentMethod": random.choice(['credit_card', 'debit_card', 'online_transfer']),
        'storeId': random.choice(['store_1', 'store_2', 'store_3', 'store_4', 'store_5'])
    }

In [20]:
def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f"Message delivered to {msg.topic} [{msg.partition()}]")

In [21]:
def main():
    topic = 'financial_transactions'
    producer= SerializingProducer({
        'bootstrap.servers': 'localhost:9092'
    })

    curr_time = datetime.now()

    while (datetime.now() - curr_time).seconds < 120:
        try:
            transaction = generate_sales_transactions()
            transaction['totalAmount'] = transaction['productPrice'] * transaction['productQuantity']

            print(transaction)

            producer.produce(topic,
                             key=transaction['transactionId'],
                             value=json.dumps(transaction),
                             on_delivery=delivery_report
                             )
            producer.poll(0)
            
            # Wait for 5 seconds before sending the next transaction
            time.sleep(5)
        except BufferError:
            print("Buffer full! Waiting...")
            time.sleep(1)
        except Exception as e:
            print(e)
    
    producer.flush()

In [22]:
if __name__ == "__main__":
    main()

  'transactionDate': datetime.utcnow().isoformat() + 'Z',


{'transactionId': '1e340683-314d-4962-910e-239ec352690d', 'productId': '3', 'productName': 'iPad Air', 'productCategory': 'electronics', 'productPrice': 599.0, 'productQuantity': 4, 'productBrand': 'Apple', 'customerId': 'customer_6', 'transactionDate': '2024-10-30T17:59:56.556674Z', 'paymentMethod': 'debit_card', 'storeId': 'store_2', 'totalAmount': 2396.0}
{'transactionId': '904f293d-c91d-475d-a942-bdca6f48598f', 'productId': '6', 'productName': 'Echo Dot', 'productCategory': 'home', 'productPrice': 49.99, 'productQuantity': 2, 'productBrand': 'Amazon', 'customerId': 'customer_6', 'transactionDate': '2024-10-30T18:00:01.567048Z', 'paymentMethod': 'credit_card', 'storeId': 'store_2', 'totalAmount': 99.98}
Message delivered to <built-in method topic of cimpl.Message object at 0x74881f574840> [0]
{'transactionId': '38a3559b-fbd5-4ed2-8d0a-e06236226796', 'productId': '9', 'productName': 'OLED TV', 'productCategory': 'electronics', 'productPrice': 1800.0, 'productQuantity': 4, 'productBra

{'transactionId': '272a66f0-adde-41f7-98fa-99c3767f8b54', 'productId': '9', 'productName': 'OLED TV', 'productCategory': 'electronics', 'productPrice': 1800.0, 'productQuantity': 7, 'productBrand': 'LG', 'customerId': 'customer_9', 'transactionDate': '2024-10-30T18:01:31.592152Z', 'paymentMethod': 'debit_card', 'storeId': 'store_2', 'totalAmount': 12600.0}
Message delivered to <built-in method topic of cimpl.Message object at 0x7488240e1440> [0]
{'transactionId': '6a80d53d-d54e-48e0-a20a-fb1bf157fa11', 'productId': '3', 'productName': 'iPad Air', 'productCategory': 'electronics', 'productPrice': 599.0, 'productQuantity': 5, 'productBrand': 'Apple', 'customerId': 'customer_10', 'transactionDate': '2024-10-30T18:01:36.593593Z', 'paymentMethod': 'debit_card', 'storeId': 'store_5', 'totalAmount': 2995.0}
Message delivered to <built-in method topic of cimpl.Message object at 0x7488240e1440> [0]
{'transactionId': '3c1c1e00-5b68-4a98-926e-3734ba64daa8', 'productId': '8', 'productName': 'Noise

%4|1730311316.600|TERMINATE|rdkafka#producer-4| [thrd:app]: Producer terminating with 1 message (366 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
