<a href="https://colab.research.google.com/github/ArkS0001/KAFKA-sample/blob/main/Kafka.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install confluent_kafka


Collecting confluent_kafka
  Downloading confluent_kafka-2.8.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (21 kB)
Downloading confluent_kafka-2.8.0-cp311-cp311-manylinux_2_28_x86_64.whl (3.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.8/3.8 MB[0m [31m24.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: confluent_kafka
Successfully installed confluent_kafka-2.8.0


Apache Kafka: Overview

Apache Kafka is a distributed event streaming platform used for high-performance data processing. It is designed to handle real-time data feeds efficiently and reliably.

Kafka is widely used for:

    Messaging: Acts as a message broker between producers and consumers.
    Event Streaming: Collecting and processing real-time event data.
    Log Aggregation: Storing and distributing logs across distributed systems.
    Data Processing Pipelines: Connecting data sources and analytics tools.

Key Components of Kafka

    Producer: Sends data (messages) to a Kafka topic.
    Consumer: Reads data from a Kafka topic.
    Broker: A Kafka server that stores and manages messages.
    Topic: A logical channel to which messages are sent.
    Partition: A division within a topic to enable parallel processing.
    Zookeeper: Manages Kafka cluster configurations.

Kafka Example in Google Colab

Google Colab does not support direct installation of Kafka, as it requires a running cluster, but you can simulate a Kafka-like workflow using the confluent_kafka library.
Steps

    Install Kafka Client (confluent_kafka).
    Create a producer to send messages.
    Create a consumer to read messages.

In [None]:
from confluent_kafka import Producer, Consumer, KafkaException
import time

# Kafka Configuration (Simulated)
conf_producer = {'bootstrap.servers': 'localhost:9092'}
conf_consumer = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
}

# Simulating Kafka Producer
def kafka_producer():
    producer = Producer(conf_producer)
    for i in range(5):
        message = f"Message {i}"
        producer.produce("test_topic", message.encode('utf-8'))
        producer.flush()  # Ensure message is sent
        print(f"Produced: {message}")
        time.sleep(1)

# Simulating Kafka Consumer
def kafka_consumer():
    consumer = Consumer(conf_consumer)
    consumer.subscribe(["test_topic"])

    print("Consumer started. Waiting for messages...")
    for _ in range(5):
        msg = consumer.poll(1.0)  # Poll for new messages
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        print(f"Consumed: {msg.value().decode('utf-8')}")

    consumer.close()

# Running Producer and Consumer
kafka_producer()
time.sleep(2)  # Allow time for messages to be produced
kafka_consumer()


In [None]:
!pip install confluent-kafka

from confluent_kafka import Producer, Consumer

# Kafka broker details (change this to your Kafka broker's address)
KAFKA_BROKER = "your-kafka-broker:9092"
TOPIC = "test_topic"

# Producer Example
producer = Producer({'bootstrap.servers': KAFKA_BROKER})
producer.produce(TOPIC, key="key1", value="Hello, Kafka from Colab!")
producer.flush()

# Consumer Example
consumer = Consumer({
    'bootstrap.servers': KAFKA_BROKER,
    'group.id': 'colab-consumer',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe([TOPIC])

msg = consumer.poll(5.0)  # Wait for a message
if msg:
    print(f"Received: {msg.value().decode('utf-8')}")
consumer.close()




In [1]:
from confluent_kafka import Producer, Consumer

# Replace this with your Kafka broker (e.g., AWS, GCP, or local machine)
KAFKA_BROKER = "your-kafka-broker:9092"
TOPIC = "test_topic"

# ✅ Kafka Producer
producer = Producer({'bootstrap.servers': KAFKA_BROKER})

def delivery_report(err, msg):
    """ Callback for delivery reports. """
    if err:
        print(f'❌ Message delivery failed: {err}')
    else:
        print(f'✅ Message delivered to {msg.topic()} [{msg.partition()}]')

# Send message
producer.produce(TOPIC, key="key1", value="Hello, Kafka from Colab!", callback=delivery_report)
# producer.flush()  # Ensure message is sent
print("📤 Message sent!")

# ✅ Kafka Consumer
consumer = Consumer({
    'bootstrap.servers': KAFKA_BROKER,
    'group.id': 'colab-consumer',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe([TOPIC])

print("🔄 Waiting for messages...")
msg = consumer.poll(5.0)  # Wait for a message (timeout 5s)

if msg is None:
    print("⚠️ No new messages.")
elif msg.error():
    print(f"❌ Consumer error: {msg.error()}")
else:
    print(f"✅ Received: {msg.value().decode('utf-8')}")

consumer.close()


📤 Message sent!
🔄 Waiting for messages...
⚠️ No new messages.


In [1]:
# Install required libraries
!pip install confluent-kafka

# Kafka broker configuration (using a public broker for simplicity)
KAFKA_BROKER = 'pkc-4r087.us-west2.gcp.confluent.cloud:9092'  # Replace with your public broker
KAFKA_TOPIC = 'test-topic'

# Kafka producer code
from confluent_kafka import Producer

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

producer_conf = {
    'bootstrap.servers': KAFKA_BROKER,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': 'YOUR_CONFLUENT_API_KEY',  # Replace with your Confluent API key
    'sasl.password': 'YOUR_CONFLUENT_API_SECRET'  # Replace with your Confluent API secret
}

producer = Producer(producer_conf)

# Produce a message
producer.produce(KAFKA_TOPIC, key='key', value='Hello, Kafka!', callback=delivery_report)

# Kafka consumer code
from confluent_kafka import Consumer, KafkaError

consumer_conf = {
    'bootstrap.servers': KAFKA_BROKER,
    'group.id': 'colab-consumer-group',
    'auto.offset.reset': 'earliest',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': 'YOUR_CONFLUENT_API_KEY',  # Replace with your Confluent API key
    'sasl.password': 'YOUR_CONFLUENT_API_SECRET'  # Replace with your Confluent API secret
}

consumer = Consumer(consumer_conf)
consumer.subscribe([KAFKA_TOPIC])

# Poll for messages
try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print(f'End of partition reached {msg.partition()}')
            else:
                print(f'Error: {msg.error()}')
        else:
            print(f'Received message: {msg.value().decode("utf-8")}')
            break  # Exit after receiving the first message
except KeyboardInterrupt:
    pass
finally:
    consumer.close()



In [2]:
import queue
import threading
import time

# Simulated Kafka broker (queue)
message_queue = queue.Queue()

# ✅ Kafka Producer Simulation
def producer():
    for i in range(5):
        message = f"Message {i+1}"
        print(f"📤 Producer: Sending -> {message}")
        message_queue.put(message)  # Put message in queue
        time.sleep(1)  # Simulate real-time streaming

    # Send a STOP signal to notify the consumer to exit
    message_queue.put("STOP")

# ✅ Kafka Consumer Simulation
def consumer():
    while True:
        message = message_queue.get()  # Wait for a message
        if message == "STOP":
            print("🔴 Consumer: Stopping.")
            break
        print(f"✅ Consumer: Received -> {message}")
        time.sleep(0.5)  # Simulate processing delay

# ✅ Run Producer & Consumer in Parallel
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
time.sleep(1)  # Allow producer to start sending messages
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

print("🚀 Kafka-like pipeline simulation complete!")


📤 Producer: Sending -> Message 1
✅ Consumer: Received -> Message 1
📤 Producer: Sending -> Message 2
✅ Consumer: Received -> Message 2
📤 Producer: Sending -> Message 3
✅ Consumer: Received -> Message 3
📤 Producer: Sending -> Message 4
✅ Consumer: Received -> Message 4
📤 Producer: Sending -> Message 5
✅ Consumer: Received -> Message 5
🔴 Consumer: Stopping.
🚀 Kafka-like pipeline simulation complete!


In [3]:
import queue
import threading
import time

# Simulated Kafka broker (message queue)
message_queue = queue.Queue()

# Number of consumers
NUM_CONSUMERS = 3

# Consumer Queues (Round-Robin Distribution)
consumer_queues = [queue.Queue() for _ in range(NUM_CONSUMERS)]

# ✅ Kafka Producer Simulation
def producer():
    for i in range(10):  # Sending 10 messages
        message = f"Message {i+1}"
        print(f"📤 Producer: Sending -> {message}")
        message_queue.put(message)  # Put message in broker queue
        time.sleep(0.5)  # Simulate streaming delay

    # Send STOP signals to notify the broker to stop
    for _ in range(NUM_CONSUMERS):
        message_queue.put("STOP")

# ✅ Kafka Broker (Round-Robin Dispatcher)
def broker():
    consumer_index = 0  # Track which consumer gets the next message

    while True:
        message = message_queue.get()  # Get message from producer
        if message == "STOP":
            # Send STOP signal to each consumer
            for cq in consumer_queues:
                cq.put("STOP")
            break

        # Send message to a consumer in Round-Robin order
        consumer_queues[consumer_index].put(message)
        print(f"🔀 Broker: Routed -> {message} to Consumer-{consumer_index+1}")

        # Move to the next consumer (Round-Robin)
        consumer_index = (consumer_index + 1) % NUM_CONSUMERS

# ✅ Kafka Consumer Simulation
def consumer(consumer_id):
    while True:
        message = consumer_queues[consumer_id].get()  # Get assigned message
        if message == "STOP":
            print(f"🔴 Consumer-{consumer_id+1}: Stopping.")
            break
        print(f"✅ Consumer-{consumer_id+1}: Received -> {message}")
        time.sleep(1)  # Simulate processing delay

# ✅ Run Producer, Broker & Consumers in Parallel
producer_thread = threading.Thread(target=producer)
broker_thread = threading.Thread(target=broker)
consumer_threads = [threading.Thread(target=consumer, args=(i,)) for i in range(NUM_CONSUMERS)]

producer_thread.start()
time.sleep(1)  # Allow producer to start sending messages
broker_thread.start()

for ct in consumer_threads:
    ct.start()

# Wait for all threads to finish
producer_thread.join()
broker_thread.join()
for ct in consumer_threads:
    ct.join()

print("🚀 Kafka-like pipeline with Round-Robin broker complete!")


📤 Producer: Sending -> Message 1
📤 Producer: Sending -> Message 2
📤 Producer: Sending -> Message 3
🔀 Broker: Routed -> Message 1 to Consumer-1
🔀 Broker: Routed -> Message 2 to Consumer-2
✅ Consumer-1: Received -> Message 1
🔀 Broker: Routed -> Message 3 to Consumer-3
✅ Consumer-2: Received -> Message 2
✅ Consumer-3: Received -> Message 3
📤 Producer: Sending -> Message 4
🔀 Broker: Routed -> Message 4 to Consumer-1
📤 Producer: Sending -> Message 5
🔀 Broker: Routed -> Message 5 to Consumer-2
✅ Consumer-1: Received -> Message 4
✅ Consumer-2: Received -> Message 5
📤 Producer: Sending -> Message 6
🔀 Broker: Routed -> Message 6 to Consumer-3✅ Consumer-3: Received -> Message 6

📤 Producer: Sending -> Message 7
🔀 Broker: Routed -> Message 7 to Consumer-1
✅ Consumer-1: Received -> Message 7
📤 Producer: Sending -> Message 8
🔀 Broker: Routed -> Message 8 to Consumer-2✅ Consumer-2: Received -> Message 8

📤 Producer: Sending -> Message 9
🔀 Broker: Routed -> Message 9 to Consumer-3
✅ Consumer-3: Recei