In [1]:
from confluent_kafka import Consumer, KafkaError, TopicPartition, KafkaException
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.protobuf import ProtobufDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
from pathlib import Path
from dotenv import load_dotenv
import time
import os
import sys
import random

In [2]:
# Load environment variables from .env
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [3]:
# Access environment variables
kafka_host = os.getenv('KAFKA_HOST_1')
topic_name = os.getenv('KAFKA_TOPIC_NAME')
replication_factor = int(os.getenv('KAFKA_REPLICATION'))
num_partitions = int(os.getenv('KAFKA_PARTITION'))
schema_registry_host = os.getenv('SCHEMA_REG_HOST')

In [4]:
# Protobuf schema import (use the same schema as producer)
sys.path.append('./protobuf')
import protobuf_schema_pb2

In [5]:
# Kafka Configuration
# bootstrap_servers = '172.22.0.2:9092'
# schema_registry_url = 'http://172.22.0.5:8081'

In [5]:
# # Kafka Configuration
bootstrap_servers = f'{kafka_host}:9092'
schema_registry_url = f'http://{schema_registry_host}:8081'

In [6]:
# Schema Registry
schema_registry_client = SchemaRegistryClient({'url': schema_registry_url})

In [7]:
# Protobuf Serializer
protobuf_deserializer = ProtobufDeserializer(protobuf_schema_pb2.ProductSale, 
                                             {'schema.registry.url': schema_registry_url,
                                              'use.deprecated.format': False})

In [8]:
# Consumer Configuration

consumer_conf = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': 'product-sale-consumer-group', # Consumer group for parallel processing
    'auto.offset.reset': 'earliest' # Start reading from the earliest offset
}

# Kafka Consumer
consumer = Consumer(consumer_conf)

# Subscribe with on_assign callback
consumer.subscribe([topic_name])

In [None]:
# Dictionaries for real-time analytics 
sales_by_category = {}
sales_by_product = {}

# Variables for simple analytics
total = 0
num_transactions = 0

while True:
    msg = consumer.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    # Create a valid SerializationContext
    ctx = SerializationContext(topic=msg.topic(), field=MessageField.VALUE)

    try:
        # Deserialize the message with context
        product_sale = protobuf_deserializer(msg.value(), ctx)

        # Update total sales and number of transactions
        total += product_sale.total_sales
        num_transactions += 1

        # Update sales by category and product
        if product_sale.category not in sales_by_category:
            sales_by_category[product_sale.category] = 0
        sales_by_category[product_sale.category] += product_sale.total_sales

        if product_sale.product not in sales_by_product:
            sales_by_product[product_sale.product] = 0
        sales_by_product[product_sale.product] += product_sale.total_sales

        print(f"Transaction ID: {product_sale.transaction_id}")
        print(f"Product: {product_sale.product}")
        print(f"Category: {product_sale.category}")
        print(f"Total Sales: ${product_sale.total_sales}")
        print("----------------------")

        # Simple Analytics - Calculate average sale
        if num_transactions > 0:
            average_sales = total / num_transactions
            print(f"Average Sales: ${average_sales:.2f}")

    except Exception as e:
        print(f"Deserialization error: {e}")


Transaction ID: 9902fd4a-de94-4868-8fae-cdf72cc108e8
Product: Foundation
Category: Skincare
Total Sales: $14997278.6
----------------------
Average Sales: $14997278.60
Transaction ID: 7f367ab8-e534-439e-b540-822cbd12f9b3
Product: Vitamin C Serum
Category: Skincare
Total Sales: $7800642.78
----------------------
Average Sales: $11398960.69
Transaction ID: 3bd7e14c-5bbb-4f8f-856a-ee9c823878d9
Product: Matte Lipstick
Category: Skincare
Total Sales: $1813651.4
----------------------
Average Sales: $8203857.59
Transaction ID: 3216308c-89ac-40ce-9d15-7bd5cd36bf76
Product: Hydrating Serum
Category: Makeup
Total Sales: $6749017.52
----------------------
Average Sales: $7840147.57
Transaction ID: 516b51d5-41aa-45eb-b520-e86512db4be9
Product: Sunscreen
Category: Makeup
Total Sales: $1447531.44
----------------------
Average Sales: $6561624.35
Transaction ID: a11ce868-81a5-4f70-8ee6-851d195dfe6a
Product: Hydrating Serum
Category: Skincare
Total Sales: $212535.02
----------------------
Average Sal