** 1. Setting up a Kafka Producer:
   
   a) Write a Python program to create a Kafka producer.
   
   b) Configure the producer to connect to a Kafka cluster.
   
   c) Implement logic to send messages to a Kafka topic.**


In [None]:
a)
from kafka import KafkaProducer

def create_kafka_producer(bootstrap_servers):
    # Create a Kafka producer instance
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

    # Return the producer instance
    return producer

def publish_message(producer, topic, message):
    # Publish the message to the specified topic
    producer.send(topic, message.encode('utf-8'))
    producer.flush()

def close_kafka_producer(producer):
    # Close the Kafka producer
    producer.close()

if __name__ == '__main__':
    # Set the Kafka broker server addresses
    bootstrap_servers = 'localhost:9092'

    # Create a Kafka producer
    kafka_producer = create_kafka_producer(bootstrap_servers)

    # Publish a message to a topic
    topic = 'my_topic'
    message = 'Hello, Kafka!'
    publish_message(kafka_producer, topic, message)

    # Close the Kafka producer
    close_kafka_producer(kafka_producer)

In [None]:
b)
from kafka import KafkaProducer

def create_kafka_producer(bootstrap_servers):
    # Create a Kafka producer instance
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

    # Return the producer instance
    return producer

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092'

# Create a Kafka producer
kafka_producer = create_kafka_producer(bootstrap_servers)

# Rest of the code for producing and publishing messages
# ...

# Close the Kafka producer
kafka_producer.close()

In [None]:
c)
from kafka import KafkaProducer

def send_messages(bootstrap_servers, topic, messages):
    # Create a Kafka producer instance
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

    # Send messages to the specified topic
    for message in messages:
        producer.send(topic, message.encode('utf-8'))
        producer.flush()

    # Close the Kafka producer
    producer.close()

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092'

# Set the Kafka topic to send messages to
topic = 'my_topic'

# Set the messages to send
messages = [
    'Message 1',
    'Message 2',
    'Message 3'
]

# Send messages to the Kafka topic
send_messages(bootstrap_servers, topic, messages)

**
2. Setting up a Kafka Consumer:

   a) Write a Python program to create a Kafka consumer.

   b) Configure the consumer to connect to a Kafka cluster.

   c) Implement logic to consume messages from a Kafka topic.
**

In [None]:
a)
from kafka import KafkaProducer

def send_messages(bootstrap_servers, topic, messages):
    # Create a Kafka producer instance
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

    # Send messages to the specified topic
    for message in messages:
        producer.send(topic, message.encode('utf-8'))
        producer.flush()

    # Close the Kafka producer
    producer.close()

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092'

# Set the Kafka topic to send messages to
topic = 'my_topic'

# Set the messages to send
messages = [
    'Message 1',
    'Message 2',
    'Message 3'
]

# Send messages to the Kafka topic
send_messages(bootstrap_servers, topic, messages)

In [None]:
b)
from kafka import KafkaConsumer

def create_kafka_consumer(bootstrap_servers, topic):
    # Create a Kafka consumer instance
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        group_id='my-consumer-group'
    )

    # Return the consumer instance
    return consumer

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092'

# Set the Kafka topic to consume messages from
topic = 'my_topic'

# Create a Kafka consumer
kafka_consumer = create_kafka_consumer(bootstrap_servers, topic)

# Rest of the code for consuming and processing messages
# ...

# Close the Kafka consumer
kafka_consumer.close()

In [None]:
c)
from kafka import KafkaConsumer

def consume_messages(bootstrap_servers, topic):
    # Create a Kafka consumer instance
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        group_id='my-consumer-group'
    )

    # Continuously consume messages from the specified topic
    for message in consumer:
        print(f"Received message: {message.value.decode('utf-8')}")

    # Close the Kafka consumer
    consumer.close()

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092'

# Set the Kafka topic to consume messages from
topic = 'my_topic'

# Consume messages from the Kafka topic
consume_messages(bootstrap_servers, topic)

**
3. Creating and Managing Kafka Topics:

   a) Write a Python program to create a new Kafka topic.

   b) Implement functionality to list existing topics.
   
   c) Develop logic to delete an existing Kafka topic.


**

In [None]:
a)
from kafka.admin import KafkaAdminClient, NewTopic

def create_kafka_topic(bootstrap_servers, topic, num_partitions, replication_factor):
    # Create a KafkaAdminClient instance
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

    # Create a NewTopic object with the desired topic configuration
    new_topic = NewTopic(name=topic, num_partitions=num_partitions, replication_factor=replication_factor)

    # Create the topic
    admin_client.create_topics(new_topics=[new_topic])

    # Close the KafkaAdminClient
    admin_client.close()

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092'

# Set the properties for the new Kafka topic
topic = 'my_new_topic'
num_partitions = 3
replication_factor = 2

# Create the Kafka topic
create_kafka_topic(bootstrap_servers, topic, num_partitions, replication_factor)

In [None]:
b)
from kafka.admin import KafkaAdminClient

def list_topics(bootstrap_servers):
    # Create a KafkaAdminClient instance
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

    # Get the list of existing topics
    topic_metadata = admin_client.list_topics()

    # Extract the topic names from the metadata
    topics = [topic for topic in topic_metadata]

    # Close the KafkaAdminClient
    admin_client.close()

    # Return the list of topics
    return topics

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092'

# List the existing topics
existing_topics = list_topics(bootstrap_servers)

# Print the list of topics
print("Existing Topics:")
for topic in existing_topics:
    print(topic)

In [None]:
c)
from kafka.admin import KafkaAdminClient, DeleteTopicsRequest

def delete_kafka_topic(bootstrap_servers, topic):
    # Create a KafkaAdminClient instance
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

    # Prepare the request to delete the topic
    delete_request = DeleteTopicsRequest(topics=[topic])

    # Delete the topic
    admin_client.delete_topics(requests=[delete_request])

    # Close the KafkaAdminClient
    admin_client.close()

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092'

# Set the Kafka topic to delete
topic = 'my_topic'

# Delete the Kafka topic
delete_kafka_topic(bootstrap_servers, topic)

**
4. Producing and Consuming Messages:

   a) Write a Python program to produce messages to a Kafka topic.

   b) Implement logic to consume messages from the same Kafka topic.

   c) Test the end-to-end flow of message production and consumption.

**

In [None]:
a)
from kafka import KafkaProducer

def produce_messages(bootstrap_servers, topic, messages):
    # Create a Kafka producer instance
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

    # Produce messages to the specified topic
    for message in messages:
        producer.send(topic, message.encode('utf-8'))

    # Flush and close the Kafka producer
    producer.flush()
    producer.close()

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092'

# Set the Kafka topic to produce messages to
topic = 'my_topic'

# Set the messages to produce
messages = [
    'Message 1',
    'Message 2',
    'Message 3'
]

# Produce messages to the Kafka topic
produce_messages(bootstrap_servers, topic, messages)

In [None]:
b)
from kafka import KafkaConsumer

def consume_messages(bootstrap_servers, topic):
    # Create a Kafka consumer instance
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        group_id='my-consumer-group'
    )

    # Continuously consume messages from the specified topic
    for message in consumer:
        print(f"Received message: {message.value.decode('utf-8')}")

    # Close the Kafka consumer
    consumer.close()

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092'

# Set the Kafka topic to consume messages from
topic = 'my_topic'

# Consume messages from the Kafka topic
consume_messages(bootstrap_servers, topic)

In [None]:
c)
from kafka import KafkaProducer
import time

def produce_messages(bootstrap_servers, topic, messages):
    # Create a Kafka producer instance
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

    # Produce messages to the specified topic
    for message in messages:
        producer.send(topic, message.encode('utf-8'))
        producer.flush()
        print(f"Produced message: {message}")
        time.sleep(1)  # Wait for 1 second between messages

    # Close the Kafka producer
    producer.close()

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'localhost:9092'

# Set the Kafka topic to produce messages to
topic = 'my_topic'

# Set the messages to produce
messages = [
    'Message 1',
    'Message 2',
    'Message 3'
]

# Produce messages to the Kafka topic
produce_messages(bootstrap_servers, topic, messages)


In [None]:
from kafka import KafkaConsumer

def consume_messages(bootstrap_servers, topic):
    # Create a Kafka consumer instance
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        group_id='my-consumer-group'
    )

    # Continuously consume messages from the specified topic
    for message in consumer:
        print(f"Received message: {message.value.decode('utf-8')}")

    # Close the Kafka consumer
    consumer.close()

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'localhost:9092'

# Set the Kafka topic to consume messages from
topic = 'my_topic'

# Consume messages from the Kafka topic
consume_messages(bootstrap_servers, topic)

Terminal 1: python producer.py

Terminal 1: python consumer.py


5. Working with Kafka Consumer Groups:

   a) Write a Python program to create a Kafka consumer within a consumer group.

   b) Implement logic to handle messages consumed by different consumers within the same group.
   
   c) Observe the behavior of consumer group rebalancing when adding or removing consumers.


In [None]:
a)
from kafka import KafkaConsumer

def consume_messages(bootstrap_servers, topic, group_id):
    # Create a Kafka consumer instance within a consumer group
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        group_id=group_id
    )

    # Continuously consume messages from the specified topic
    for message in consumer:
        print(f"Consumer Group: {group_id}, Received message: {message.value.decode('utf-8')}")

    # Close the Kafka consumer
    consumer.close()

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'localhost:9092'

# Set the Kafka topic to consume messages from
topic = 'my_topic'

# Set the consumer group IDs
group_id1 = 'consumer-group-1'
group_id2 = 'consumer-group-2'

# Create the first consumer within group 1
consume_messages(bootstrap_servers, topic, group_id1)

# Create the second consumer within group 2
consume_messages(bootstrap_servers, topic, group_id2)

In [None]:
b)
from kafka import KafkaConsumer
from threading import Thread

def handle_messages(consumer):
    # Continuously consume messages from the consumer
    for message in consumer:
        print(f"Consumer Group: {consumer.group_id}, Consumer ID: {consumer.consumer_id}, Message: {message.value.decode('utf-8')}")

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'localhost:9092'

# Set the Kafka topic to consume messages from
topic = 'my_topic'

# Set the consumer group ID
group_id = 'my_consumer_group'

# Set the number of consumers within the group
num_consumers = 3

# Create and start multiple consumers within the same group
consumers = []
for i in range(num_consumers):
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        group_id=group_id
    )
    consumer.consumer_id = f"Consumer-{i+1}"
    consumer.group_id = group_id
    consumer_thread = Thread(target=handle_messages, args=(consumer,))
    consumer_thread.start()
    consumers.append(consumer)

# Wait for all consumer threads to finish
for consumer_thread in consumers:
    consumer_thread.join()

In [None]:
c)
from kafka import KafkaConsumer
from threading import Thread
import time

def handle_messages(consumer):
    # Continuously consume messages from the consumer
    for message in consumer:
        print(f"Consumer Group: {consumer.group_id}, Consumer ID: {consumer.consumer_id}, Message: {message.value.decode('utf-8')}")

# Set the Kafka cluster's bootstrap servers
bootstrap_servers = 'localhost:9092'

# Set the Kafka topic to consume messages from
topic = 'my_topic'

# Set the consumer group ID
group_id = 'my_consumer_group'

# Set the initial number of consumers within the group
num_consumers = 2

# Create and start the initial consumers within the group
consumers = []
for i in range(num_consumers):
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        group_id=group_id
    )
    consumer.consumer_id = f"Consumer-{i+1}"
    consumer.group_id = group_id
    consumer_thread = Thread(target=handle_messages, args=(consumer,))
    consumer_thread.start()
    consumers.append(consumer)

# Wait for a while to observe the initial consumer behavior
time.sleep(10)

# Add more consumers to the group
new_consumer_count = 4
for i in range(new_consumer_count):
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        group_id=group_id
    )
    consumer.consumer_id = f"Consumer-{num_consumers + i + 1}"
    consumer.group_id = group_id
    consumer_thread = Thread(target=handle_messages, args=(consumer,))
    consumer_thread.start()
    consumers.append(consumer)

# Wait for a while to observe the rebalancing behavior
time.sleep(30)

# Stop and remove some consumers from the group
consumers_to_remove = 2
for i in range(consumers_to_remove):
    consumer = consumers.pop(0)
    consumer.close()

# Wait for a while to observe the rebalancing behavior after consumer removal
time.sleep(30)

# Stop the remaining consumers
for consumer in consumers:
    consumer.close()