![image](https://user-images.githubusercontent.com/57321948/196933065-4b16c235-f3b9-4391-9cfe-4affcec87c35.png)

# Submitted by: Mohammad Wasiq

## Email: `gl0427@myamu.ac.in`

# Pre-Placement Training Assignment - `Big Data` 

## Apache Kafka

**Q1. Setting up a Kafka Producer:**

**a) Write a Python program to create a Kafka producer.**

In [None]:
from confluent_kafka import Producer

def produce_messages(producer, topic):
    # Produce messages to Kafka topic
    for i in range(10):
        message = f"Message {i + 1}"
        producer.produce(topic, value=message.encode('utf-8'))
        producer.flush()

def create_kafka_producer():
    # Configure Kafka producer
    conf = {
        "bootstrap.servers": "localhost:9092"  # Kafka broker address
    }

    # Create Kafka producer instance
    producer = Producer(conf)

    return producer

# Usage example
if __name__ == "__main__":
    topic = "my_topic"

    # Create Kafka producer
    producer = create_kafka_producer()

    # Produce messages to Kafka topic
    produce_messages(producer, topic)

**b) Configure the producer to connect to a Kafka cluster.**

In [None]:
from confluent_kafka import Producer

def produce_messages(producer, topic):
    # Produce messages to Kafka topic
    for i in range(10):
        message = f"Message {i + 1}"
        producer.produce(topic, value=message.encode('utf-8'))
        producer.flush()

def create_kafka_producer():
    # Configure Kafka producer
    conf = {
        "bootstrap.servers": "kafka1:9092,kafka2:9092,kafka3:9092",  # Kafka broker addresses
        "group.id": "my_group_id",  # Consumer group ID (optional)
        "client.id": "my_client_id"  # Client ID (optional)
    }

    # Create Kafka producer instance
    producer = Producer(conf)

    return producer

# Usage example
if __name__ == "__main__":
    topic = "my_topic"

    # Create Kafka producer
    producer = create_kafka_producer()

    # Produce messages to Kafka topic
    produce_messages(producer, topic)

**c) Implement logic to send messages to a Kafka topic.**

In [None]:
from confluent_kafka import Producer

def produce_messages(producer, topic):
    # Produce messages to Kafka topic
    for i in range(10):
        message = f"Message {i + 1}"
        producer.produce(topic, value=message.encode('utf-8'))
        producer.flush()

def create_kafka_producer(bootstrap_servers):
    # Configure Kafka producer
    conf = {
        "bootstrap.servers": bootstrap_servers,
        "group.id": "my_group_id",  # Consumer group ID (optional)
        "client.id": "my_client_id"  # Client ID (optional)
    }

    # Create Kafka producer instance
    producer = Producer(conf)

    return producer

# Usage example
if __name__ == "__main__":
    bootstrap_servers = "kafka1:9092,kafka2:9092,kafka3:9092"  # Kafka broker addresses
    topic = "my_topic"

    # Create Kafka producer
    producer = create_kafka_producer(bootstrap_servers)

    # Produce messages to Kafka topic
    produce_messages(producer, topic)

**Q2. Setting up a Kafka Consumer:**

**a) Write a Python program to create a Kafka consumer.**

In [None]:
from confluent_kafka import Consumer

def consume_messages(consumer, topic):
    # Subscribe to Kafka topic
    consumer.subscribe([topic])

    # Consume messages from Kafka topic
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue

        # Process the received message
        print(f"Received message: {msg.value().decode('utf-8')}")

def create_kafka_consumer(bootstrap_servers, group_id):
    # Configure Kafka consumer
    conf = {
        "bootstrap.servers": bootstrap_servers,
        "group.id": group_id,
        "auto.offset.reset": "earliest",  # Start consuming from the beginning of the topic (optional)
        "client.id": "my_client_id"  # Client ID (optional)
    }

    # Create Kafka consumer instance
    consumer = Consumer(conf)

    return consumer

# Usage example
if __name__ == "__main__":
    bootstrap_servers = "kafka1:9092,kafka2:9092,kafka3:9092"  # Kafka broker addresses
    group_id = "my_consumer_group"  # Consumer group ID
    topic = "my_topic"

    # Create Kafka consumer
    consumer = create_kafka_consumer(bootstrap_servers, group_id)

    # Consume messages from Kafka topic
    consume_messages(consumer, topic)

**b) Configure the consumer to connect to a Kafka cluster.**  

In [None]:
from kafka import KafkaConsumer

# Configure Kafka consumer
bootstrap_servers = 'localhost:9092'  # Kafka broker addresses
group_id = 'my_consumer_group'  # Consumer group ID

# Create Kafka consumer instance
consumer = KafkaConsumer(
    bootstrap_servers=bootstrap_servers,
    group_id=group_id,
    auto_offset_reset='earliest',  # Start consuming from the beginning of the topic (optional)
    enable_auto_commit=True,  # Commit offsets automatically (optional)
    value_deserializer=lambda x: x.decode('utf-8')  # Deserializer for message values (optional)
)

# Example usage: Consume messages from a Kafka topic
topic = 'my_topic'
consumer.subscribe([topic])

for message in consumer:
    print(f'Received message: {message.value}')

**c) Implement logic to consume messages from a Kafka topic**

In [None]:
from kafka import KafkaConsumer

# Configure Kafka consumer
bootstrap_servers = 'localhost:9092'  # Kafka broker addresses
group_id = 'my_consumer_group'  # Consumer group ID

# Create Kafka consumer instance
consumer = KafkaConsumer(
    bootstrap_servers=bootstrap_servers,
    group_id=group_id,
    auto_offset_reset='earliest',  # Start consuming from the beginning of the topic (optional)
    enable_auto_commit=True,  # Commit offsets automatically (optional)
    value_deserializer=lambda x: x.decode('utf-8')  # Deserializer for message values (optional)
)

# Example usage: Consume messages from a Kafka topic
topic = 'my_topic'
consumer.subscribe([topic])

for message in consumer:
    print(f'Received message: {message.value}')
    # Add your custom logic here to process the message

**Q3. Creating and Managing Kafka Topics:**

**a) Write a Python program to create a new Kafka topic.**

In [None]:
from kafka import KafkaAdminClient
from kafka.admin import NewTopic

# Configure Kafka admin client
bootstrap_servers = 'localhost:9092'  # Kafka broker addresses

# Create Kafka admin client instance
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

# Example usage: Create a new Kafka topic
topic_name = 'my_topic'
partitions = 3  # Number of partitions for the topic
replication_factor = 1  # Replication factor for the topic

new_topic = NewTopic(topic_name, num_partitions=partitions, replication_factor=replication_factor)

admin_client.create_topics([new_topic])
print(f'Topic {topic_name} created successfully.')

**b) Implement functionality to list existing topics.**

In [None]:
from kafka import KafkaAdminClient

# Configure Kafka admin client
bootstrap_servers = 'localhost:9092'  # Kafka broker addresses

# Create Kafka admin client instance
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

# Example usage: List existing topics
topics = admin_client.list_topics()

# Retrieve the topic names
topic_names = [topic for topic in topics]

print(f'Existing topics: {topic_names}')

**c) Develop logic to delete an existing Kafka topic.**

In [None]:
from kafka import KafkaAdminClient, NewTopic

# Configure Kafka admin client
bootstrap_servers = 'localhost:9092'  # Kafka broker addresses

# Create Kafka admin client instance
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

# Example usage: Delete a topic
topic_name = 'my_topic'  # Name of the topic to delete

# Delete the topic
admin_client.delete_topics(topics=[topic_name])

print(f'Topic "{topic_name}" deleted successfully')

**Q4. Producing and Consuming Messages:**

**a) Write a Python program to produce messages to a Kafka topic.**

In [None]:
from kafka import KafkaProducer

# Configure Kafka producer
bootstrap_servers = 'localhost:9092'  # Kafka broker addresses
topic_name = 'my_topic'  # Name of the Kafka topic

# Create Kafka producer instance
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

# Example usage: Produce messages
messages = [
    'Hello, Kafka!',
    'This is a test message.',
    'Another message to Kafka topic.'
]

# Produce messages to the Kafka topic
for message in messages:
    producer.send(topic_name, message.encode('utf-8'))

print('Messages produced successfully')

**b) Implement logic to consume messages from the same Kafka topic.**

In [None]:
from kafka import KafkaConsumer

# Configure Kafka consumer
bootstrap_servers = 'localhost:9092'  # Kafka broker addresses
topic_name = 'my_topic'  # Name of the Kafka topic

# Create Kafka consumer instance
consumer = KafkaConsumer(topic_name, bootstrap_servers=bootstrap_servers)

# Example usage: Consume messages
for message in consumer:
    print(message.value.decode('utf-8'))

# Close the consumer connection
consumer.close()

**c) Test the end-to-end flow of message production and consumption.**

In [None]:
# kafka_producer.py

from kafka import KafkaProducer

# Configure the Kafka producer
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Send messages to the Kafka topic
topic = 'test-topic'
for i in range(10):
    message = f'Message {i}'
    producer.send(topic, message.encode('utf-8'))
    print(f'Sent: {message}')

# Close the producer
producer.close()

In [None]:
# kafka_consumer.py

from kafka import KafkaConsumer

# Configure the Kafka consumer
consumer = KafkaConsumer('test-topic', bootstrap_servers='localhost:9092')

# Consume messages from the Kafka topic
for message in consumer:
    print(f'Received: {message.value.decode("utf-8")}')

# Close the consumer
consumer.close()

**Q5. Working with Kafka Consumer Groups:**

**a) Write a Python program to create a Kafka consumer within a consumer group.**

In [None]:
from kafka import KafkaConsumer

# Configure the Kafka consumer with a consumer group
consumer_group = 'my-consumer-group'
consumer = KafkaConsumer(
    'test-topic',
    bootstrap_servers='localhost:9092',
    group_id=consumer_group
)

# Consume messages from the Kafka topic within the consumer group
for message in consumer:
    print(f'Received: {message.value.decode("utf-8")}')

# Close the consumer
consumer.close()

**b) Implement logic to handle messages consumed by different consumers within the same group.**


In [None]:
from kafka import KafkaConsumer
from concurrent.futures import ThreadPoolExecutor

def consume_messages():
    consumer = KafkaConsumer(
        'test-topic',
        bootstrap_servers='localhost:9092',
        group_id='my-consumer-group'
    )
    
    for message in consumer:
        print(f'Consumer {consumer_id}: Received: {message.value.decode("utf-8")}')

    consumer.close()

# Number of consumers
num_consumers = 3

# Create a ThreadPoolExecutor with the desired number of threads
executor = ThreadPoolExecutor(max_workers=num_consumers)

# Submit consumer tasks to the executor
for consumer_id in range(num_consumers):
    executor.submit(consume_messages)

# Shutdown the executor once all tasks are complete
executor.shutdown()

**c) Observe the behavior of consumer group rebalancing when adding or removing consumers.**

In [None]:
from confluent_kafka import Consumer, KafkaException

# Kafka consumer configuration
consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my_consumer_group',
    'auto.offset.reset': 'earliest',
}

# Create a Kafka consumer within a consumer group
consumer = Consumer(consumer_config)

# Subscribe to the Kafka topic
consumer.subscribe(['my_topic'])

# Consume messages from the Kafka topic
try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaException._PARTITION_EOF:
                # End of partition, continue consuming
                continue
            else:
                # Handle error
                print(f"Error: {msg.error()}")
                break
        # Process the consumed message
        print(f"Consumed message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
    pass
finally:
    # Close the consumer
    consumer.close()