In [5]:
!pip install confluent_kafka



In [6]:
from confluent_kafka import Producer, Consumer, KafkaError
import json

In [4]:
# Kafka bootstrap servers
bootstrap_servers = 'localhost:9092'

# Kafka topics
inventory_topic = 'inventory_orders'
delivery_topic = 'delivery_orders'

# Kafka producer configurations
producer_config = {
    'bootstrap.servers': bootstrap_servers
}

# Create Kafka producers
inventory_producer = Producer(producer_config)
delivery_producer = Producer(producer_config)

def send_inventory_order(order):
    order_type = order.get('type')
    if order_type == 'inventory':
        inventory_producer.produce(inventory_topic, json.dumps(order))
        inventory_producer.flush()

def send_delivery_order(order):
    order_type = order.get('type')
    if order_type == 'delivery':
        delivery_producer.produce(delivery_topic, json.dumps(order))
        delivery_producer.flush()

# Example usage
if __name__ == "__main__":
    inventory_order = {
        'type': 'inventory',
        'order_id': 'INV123',
        'product_id': 'PROD001',
        'quantity': 10
    }
    delivery_order = {
        'type': 'delivery',
        'order_id': 'DEL456',
        'customer_id': 'CUST001',
        'delivery_address': '123 Main St'
    }
    send_inventory_order(inventory_order)
    send_delivery_order(delivery_order)


In [7]:
bootstrap_servers = 'localhost:9092'

# Kafka topics
inventory_topic = 'inventory_orders'
delivery_topic = 'delivery_orders'

# Kafka consumer group
consumer_group = 'ecommerce_consumers'

# Kafka consumer configurations
consumer_config = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': consumer_group,
    'auto.offset.reset': 'earliest'
}

inventory_consumer = Consumer(consumer_config)
delivery_consumer = Consumer(consumer_config)

def process_inventory_message(message):
    try:
        order = json.loads(message.value())
        # Process inventory message, update inventory database/system accordingly
        print("Processing inventory order:", order)
    except json.JSONDecodeError as e:
        print("Error decoding JSON:", e)

def process_delivery_message(message):
    try:
        order = json.loads(message.value())
        # Perform delivery action: schedule deliveries, update status, notify customers
        print("Processing delivery order:", order)
    except json.JSONDecodeError as e:
        print("Error decoding JSON:", e)

# Subscribe to Kafka topics
inventory_consumer.subscribe([inventory_topic])
delivery_consumer.subscribe([delivery_topic])

try:
    while True:
        # Inventory consumer
        msg_inventory = inventory_consumer.poll(timeout=1.0)
        if msg_inventory is None:
            continue
        if msg_inventory.error():
            if msg_inventory.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(msg_inventory.error())
                break
        process_inventory_message(msg_inventory)

        msg_delivery = delivery_consumer.poll(timeout=1.0)
        if msg_delivery is None:
            continue
        if msg_delivery.error():
            if msg_delivery.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(msg_delivery.error())
                break
        process_delivery_message(msg_delivery)

except KeyboardInterrupt:
    pass

finally:
    inventory_consumer.close()
    delivery_consumer.close()

In [None]:
from confluent_kafka import Producer
import json

# Kafka bootstrap servers
bootstrap_servers = 'localhost:9092'

# Kafka topics
inventory_topic = 'inventory_orders'
delivery_topic = 'delivery_orders'

# Kafka producer configurations
producer_config = {
    'bootstrap.servers': bootstrap_servers
}

# Create Kafka producers
inventory_producer = Producer(producer_config)
delivery_producer = Producer(producer_config)

def send_inventory_order(order):
    """
    Send inventory order to Kafka topic.
    """
    inventory_producer.produce(inventory_topic, json.dumps(order))
    inventory_producer.flush()

def send_delivery_order(order):
    """
    Send delivery order to Kafka topic.
    """
    delivery_producer.produce(delivery_topic, json.dumps(order))
    delivery_producer.flush()

# Function to read JSON data from file
def read_json_file(filename):
    with open(filename, 'r') as file:
        return json.load(file)

# Example usage
if __name__ == "__main__":
    # Read data from JSON file
    data = read_json_file('data.json')

    # Iterate through data and send orders to Kafka topics
    for order in data:
        if order['type'] == 'inventory':
            send_inventory_order(order)
        elif order['type'] == 'delivery':
            send_delivery_order(order)
