# Using Schema Registry and Protobuf

In [None]:
# import os
# import sys
# from pathlib import Path
# from confluent_kafka import DeserializingConsumer
# from confluent_kafka.serialization import StringDeserializer
# from google.protobuf.message import DecodeError
# from dotenv import load_dotenv

# # Import the generated Protobuf schema
# sys.path.append('/home/jovyan/protobuf')
# import schema_pb2

# # Load environment variables from .env file
# dotenv_path = Path('/opt/app/.env')
# load_dotenv(dotenv_path=dotenv_path)

# kafka_host = os.getenv('KAFKA_HOST')
# topic_name = os.getenv('KAFKA_TOPIC_NAME')
# group_id = 'petshop-consumer-group'

# # Kafka Configuration
# bootstrap_servers = f'{kafka_host}:9092'

# # Consumer Configuration
# consumer_conf = {
#     'bootstrap.servers': bootstrap_servers,
#     'key.deserializer': StringDeserializer('utf_8'),  # Deserializer for key
#     'value.deserializer': lambda value, ctx: value,  # Return raw bytes, deserialization will happen later
#     'group.id': group_id,
#     'auto.offset.reset': 'earliest'  # Start consuming from the earliest available message
# }

# # Initialize Kafka consumer
# consumer = DeserializingConsumer(consumer_conf)

# # Subscribe to the topic
# consumer.subscribe([topic_name])

# # Variables to perform calculations
# total_price = 0
# total_weight = 0
# event_count = 0

# def consume_and_calculate():
#     """Consume Protobuf-encoded messages from Kafka and perform calculations."""
#     global total_price, total_weight, event_count

#     print("Starting consumer and subscribing to topic...")

#     while True:
#         try:
#             # Poll for a message
#             print("Polling for a message...")
#             msg = consumer.poll(10)  # Timeout of 10 second

#             if msg is None:
#                 print("No message received.")
#                 continue  # No message received, continue polling

#             if msg.error():
#                 print(f"Consumer error: {msg.error()}")
#                 continue

#             print(f"Raw message received: {msg.value()}")

#             # Deserialize the received message
#             pet_event = schema_pb2.PetShopEvent()

#             # Debugging: Check if the message value is bytes
#             if isinstance(msg.value(), bytes):
#                 try:
#                     pet_event.ParseFromString(msg.value())
#                     print(f"Deserialized PetShopEvent: {pet_event}")
#                 except DecodeError as e:
#                     print(f"Error decoding Protobuf message: {e}")
#                     continue
#             else:
#                 print(f"Unexpected value type: {type(msg.value())}, expected bytes.")
#                 continue

#             # Perform calculations on the consumed data
#             total_price += pet_event.price
#             total_weight += pet_event.weight
#             event_count += 1

#             # Calculate average weight
#             average_weight = total_weight / event_count if event_count > 0 else 0

#             # Print event and current calculations
#             print(f"Current Total Price: {total_price}")
#             print(f"Current Average Weight: {average_weight}")
#             print(f"Total Events Processed: {event_count}\n")

#         except KeyboardInterrupt:
#             print("Consumer stopped.")
#             break

#     # Close the consumer after use
#     consumer.close()
#     print("Consumer closed.")



# if __name__ == "__main__":
#     consume_and_calculate()

# Without Schema Registry and Protobuf

In [None]:
import os
import sys
import json
from pathlib import Path
from confluent_kafka import DeserializingConsumer
from confluent_kafka.serialization import StringDeserializer
from dotenv import load_dotenv

# Load environment variables from .env file
dotenv_path = Path('/opt/app/.env')
load_dotenv(dotenv_path=dotenv_path)

kafka_host = os.getenv('KAFKA_HOST')
topic_name = os.getenv('KAFKA_TOPIC_NAME')
group_id = 'petshop-consumer-group'

# Kafka Configuration
bootstrap_servers = f'{kafka_host}:9092'

# Consumer Configuration
consumer_conf = {
    'bootstrap.servers': bootstrap_servers,
    'key.deserializer': StringDeserializer('utf_8'),  # Deserializer for key
    'value.deserializer': StringDeserializer('utf_8'),  # Deserializer for JSON string
    'group.id': group_id,
    'auto.offset.reset': 'earliest'  # Start consuming from the earliest available message
}

# Initialize Kafka consumer
consumer = DeserializingConsumer(consumer_conf)

# Subscribe to the topic
consumer.subscribe([topic_name])

# Variables to perform calculations
total_price = 0
total_weight = 0
event_count = 0

def consume_and_calculate():
    """Consume JSON-encoded messages from Kafka and perform calculations."""
    global total_price, total_weight, event_count

    print("Starting consumer and subscribing to topic...")

    while True:
        try:
            # Poll for a message
            print("Polling for a message...")
            msg = consumer.poll(10)  # Timeout of 10 second

            if msg is None:
                print("No message received.")
                continue  # No message received, continue polling

            if msg.error():
                print(f"Consumer error: {msg.error()}")
                continue

            # Deserialize the received message (from JSON)
            pet_event = json.loads(msg.value())  # Convert the JSON string back to a dictionary

            print(f"Deserialized PetShopEvent: {pet_event}")

            # Perform calculations on the consumed data
            total_price += pet_event['price']
            total_weight += pet_event['weight']
            event_count += 1

            # Calculate average weight
            average_weight = total_weight / event_count if event_count > 0 else 0

            # Print event and current calculations
            print(f"Current Total Price: {total_price}")
            print(f"Current Average Weight: {average_weight}")
            print(f"Total Events Processed: {event_count}\n")

        except KeyboardInterrupt:
            print("Consumer stopped.")
            break

    # Close the consumer after use
    consumer.close()
    print("Consumer closed.")

if __name__ == "__main__":
    consume_and_calculate()
