#KAFKA Assignment

Name: Ayush Patel

ID : 202318036

In [None]:
!pip install confluent_kafka

Collecting confluent_kafka
  Downloading confluent_kafka-2.4.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.0/4.0 MB[0m [31m30.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: confluent_kafka
Successfully installed confluent_kafka-2.4.0


In [None]:
from confluent_kafka import Consumer, KafkaError
import json

def consume_messages(consumer, topic, data_type):
    """
    Function to consume messages from Kafka topic.

    Args:
    - consumer: Kafka Consumer object.
    - topic: Kafka topic to subscribe to.
    - data_type: Type of data being consumed (e.g., 'inventory' or 'delivery').
    """
    # Subscribe to the given topic
    consumer.subscribe([topic])

    # Consume messages
    while True:
        msg = consumer.poll(timeout=1.0)  # Poll for messages
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition
                continue
            else:
                print(f"Consumer error: {msg.error()}")
                break
        # Process message
        message_data = json.loads(msg.value().decode('utf-8'))
        print(f"Received {data_type} data:")
        print(json.dumps(message_data, indent=2))  # Print data in formatted JSON
        # Perform actions based on the data (e.g., update databases, schedule deliveries)

def main():
    # Kafka consumer configuration
    kafka_config = {'bootstrap.servers': 'localhost:9092'}

    # Create Kafka consumer for inventory data
    inventory_consumer = Consumer({**kafka_config, 'group.id': 'inventory_group'})
    consume_messages(inventory_consumer, 'inventory_orders', 'inventory')

    # Create Kafka consumer for delivery data
    delivery_consumer = Consumer({**kafka_config, 'group.id': 'delivery_group'})
    consume_messages(delivery_consumer, 'delivery_orders', 'delivery')

if __name__ == "__main__":
    main()
