
1. Setting up a Kafka Producer:
   a) Write a Python program to create a Kafka producer.





```
from confluent_kafka import Producer

# Kafka broker configuration
bootstrap_servers = 'localhost:9092'
topic = 'my_topic'

def delivery_report(err, msg):
    """Callback function called once the message is delivered or delivery failed."""
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

def create_kafka_producer():
    """Create a Kafka producer with the specified bootstrap servers."""
    config = {
        'bootstrap.servers': bootstrap_servers
    }
    producer = Producer(config)
    return producer

def produce_messages(producer, messages):
    """Produce messages to the specified Kafka topic."""
    for message in messages:
        producer.produce(topic, message, callback=delivery_report)
    # Wait for any outstanding messages to be delivered
    producer.flush()

if __name__ == '__main__':
    # Create a Kafka producer
    kafka_producer = create_kafka_producer()

    # Define the messages to be produced
    messages = [
        'Hello Kafka!',
        'This is a test message.',
        'Another message.',
    ]

    # Produce the messages
    produce_messages(kafka_producer, messages)

    # Close the Kafka producer
    kafka_producer.close()
```



   b) Configure the producer to connect to a Kafka cluster.





```
from confluent_kafka import Producer

# Kafka cluster configuration
bootstrap_servers = 'kafka1:9092,kafka2:9092,kafka3:9092'
topic = 'my_topic'

def delivery_report(err, msg):
    """Callback function called once the message is delivered or delivery failed."""
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

def create_kafka_producer():
    """Create a Kafka producer with the specified bootstrap servers."""
    config = {
        'bootstrap.servers': bootstrap_servers
    }
    producer = Producer(config)
    return producer

def produce_messages(producer, messages):
    """Produce messages to the specified Kafka topic."""
    for message in messages:
        producer.produce(topic, message, callback=delivery_report)
    # Wait for any outstanding messages to be delivered
    producer.flush()

if __name__ == '__main__':
    # Create a Kafka producer
    kafka_producer = create_kafka_producer()

    # Define the messages to be produced
    messages = [
        'Hello Kafka!',
        'This is a test message.',
        'Another message.',
    ]

    # Produce the messages
    produce_messages(kafka_producer, messages)

    # Close the Kafka producer
    kafka_producer.close()
```



   c) Implement logic to send messages to a Kafka topic.



```
from confluent_kafka import Producer

# Kafka broker configuration
bootstrap_servers = 'localhost:9092'
topic = 'my_topic'

def delivery_report(err, msg):
    """Callback function called once the message is delivered or delivery failed."""
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

def create_kafka_producer():
    """Create a Kafka producer with the specified bootstrap servers."""
    config = {
        'bootstrap.servers': bootstrap_servers
    }
    producer = Producer(config)
    return producer

def produce_messages(producer, messages):
    """Produce messages to the specified Kafka topic."""
    for message in messages:
        producer.produce(topic, message.encode('utf-8'), callback=delivery_report)
    # Wait for any outstanding messages to be delivered
    producer.flush()

if __name__ == '__main__':
    # Create a Kafka producer
    kafka_producer = create_kafka_producer()

    # Define the messages to be produced
    messages = [
        'Hello Kafka!',
        'This is a test message.',
        'Another message.',
    ]

    # Produce the messages
    produce_messages(kafka_producer, messages)

    # Close the Kafka producer
    kafka_producer.close()
```



2. Setting up a Kafka Consumer:
   a) Write a Python program to create a Kafka consumer.




```
from confluent_kafka import Consumer, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic

# Kafka broker configuration
bootstrap_servers = 'localhost:9092'
group_id = 'my_consumer_group'
topic = 'my_topic'

def create_kafka_consumer():
    """Create a Kafka consumer with the specified bootstrap servers and group ID."""
    config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id
    }
    consumer = Consumer(config)
    return consumer

def subscribe_to_topic(consumer):
    """Subscribe to the specified Kafka topic."""
    consumer.subscribe([topic])

def consume_messages(consumer):
    """Consume messages from the subscribed Kafka topic."""
    try:
        while True:
            msg = consumer.poll(1.0)  # Poll for new messages
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaException._PARTITION_EOF:
                    # End of partition reached, continue polling
                    continue
                else:
                    print(f'Error occurred: {msg.error().str()}')
                    break
            else:
                # Process the received message
                print(f'Received message: {msg.value().decode("utf-8")}')

    except KeyboardInterrupt:
        pass

    finally:
        # Close the Kafka consumer
        consumer.close()

if __name__ == '__main__':
    # Create a Kafka consumer
    kafka_consumer = create_kafka_consumer()

    # Subscribe to the Kafka topic
    subscribe_to_topic(kafka_consumer)

    # Consume messages from the topic
    consume_messages(kafka_consumer)
```



   b) Configure the consumer to connect to a Kafka cluster.



```
from confluent_kafka import Consumer, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic

# Kafka cluster configuration
bootstrap_servers = 'kafka1:9092,kafka2:9092,kafka3:9092'
group_id = 'my_consumer_group'
topic = 'my_topic'

def create_kafka_consumer():
    """Create a Kafka consumer with the specified bootstrap servers and group ID."""
    config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id
    }
    consumer = Consumer(config)
    return consumer

def subscribe_to_topic(consumer):
    """Subscribe to the specified Kafka topic."""
    consumer.subscribe([topic])

def consume_messages(consumer):
    """Consume messages from the subscribed Kafka topic."""
    try:
        while True:
            msg = consumer.poll(1.0)  # Poll for new messages
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaException._PARTITION_EOF:
                    # End of partition reached, continue polling
                    continue
                else:
                    print(f'Error occurred: {msg.error().str()}')
                    break
            else:
                # Process the received message
                print(f'Received message: {msg.value().decode("utf-8")}')

    except KeyboardInterrupt:
        pass

    finally:
        # Close the Kafka consumer
        consumer.close()

if __name__ == '__main__':
    # Create a Kafka consumer
    kafka_consumer = create_kafka_consumer()

    # Subscribe to the Kafka topic
    subscribe_to_topic(kafka_consumer)

    # Consume messages from the topic
    consume_messages(kafka_consumer)
```



   c) Implement logic to consume messages from a Kafka topic.



```
from confluent_kafka import Consumer, KafkaException

# Kafka broker configuration
bootstrap_servers = 'localhost:9092'
group_id = 'my_consumer_group'
topic = 'my_topic'

def create_kafka_consumer():
    """Create a Kafka consumer with the specified bootstrap servers and group ID."""
    config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id
    }
    consumer = Consumer(config)
    return consumer

def subscribe_to_topic(consumer):
    """Subscribe to the specified Kafka topic."""
    consumer.subscribe([topic])

def consume_messages(consumer):
    """Consume messages from the subscribed Kafka topic."""
    try:
        while True:
            msg = consumer.poll(1.0)  # Poll for new messages
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaException._PARTITION_EOF:
                    # End of partition reached, continue polling
                    continue
                else:
                    print(f'Error occurred: {msg.error().str()}')
                    break
            else:
                # Process the received message
                print(f'Received message: {msg.value().decode("utf-8")}')

    except KeyboardInterrupt:
        pass

    finally:
        # Close the Kafka consumer
        consumer.close()

if __name__ == '__main__':
    # Create a Kafka consumer
    kafka_consumer = create_kafka_consumer()

    # Subscribe to the Kafka topic
    subscribe_to_topic(kafka_consumer)

    # Consume messages from the topic
    consume_messages(kafka_consumer)

```



3. Creating and Managing Kafka Topics

  a) Write a Python program to create a new Kafka topic.




```
from confluent_kafka.admin import AdminClient, NewTopic

# Kafka broker configuration
bootstrap_servers = 'localhost:9092'

def create_topic(topic_name, num_partitions, replication_factor):
    """Create a new Kafka topic."""
    # Configure the admin client
    admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})

    # Create a NewTopic object with the topic configuration
    new_topic = NewTopic(
        topic=topic_name,
        num_partitions=num_partitions,
        replication_factor=replication_factor
    )

    # Create the topic
    admin_client.create_topics([new_topic])

    # Close the admin client
    admin_client.close()

if __name__ == '__main__':
    # Define the topic configuration
    topic_name = 'my_topic'
    num_partitions = 3
    replication_factor = 1

    # Create the Kafka topic
    create_topic(topic_name, num_partitions, replication_factor)
```



   b) Implement functionality to list existing topics.





```
from confluent_kafka.admin import AdminClient

# Kafka broker configuration
bootstrap_servers = 'localhost:9092'

def list_topics():
    """List the existing Kafka topics."""
    # Configure the admin client
    admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})

    # Get the metadata for all topics
    metadata = admin_client.list_topics(timeout=5)

    # Extract the topic names from the metadata
    topic_names = list(metadata.topics.keys())

    # Close the admin client
    admin_client.close()

    # Return the list of topic names
    return topic_names

if __name__ == '__main__':
    # List the Kafka topics
    topics = list_topics()

    # Print the topic names
    for topic in topics:
        print(topic)
```



   c) Develop logic to delete an existing Kafka topic.



```
from confluent_kafka.admin import AdminClient

# Kafka broker configuration
bootstrap_servers = 'localhost:9092'

def delete_topic(topic_name):
    """Delete an existing Kafka topic."""
    # Configure the admin client
    admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})

    # Delete the topic
    admin_client.delete_topics([topic_name], operation_timeout=30)

    # Close the admin client
    admin_client.close()

if __name__ == '__main__':
    # Specify the topic to delete
    topic_name = 'my_topic'

    # Delete the Kafka topic
    delete_topic(topic_name)
```



4. Producing and Consuming Messages:

   a) Write a Python program to produce messages to a Kafka topic.




```
from confluent_kafka import Producer

# Kafka broker configuration
bootstrap_servers = 'localhost:9092'
topic = 'my_topic'

def delivery_report(err, msg):
    """Callback function called once the message is delivered or delivery failed."""
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

def create_kafka_producer():
    """Create a Kafka producer with the specified bootstrap servers."""
    config = {
        'bootstrap.servers': bootstrap_servers
    }
    producer = Producer(config)
    return producer

def produce_messages(producer, messages):
    """Produce messages to the specified Kafka topic."""
    for message in messages:
        producer.produce(topic, message, callback=delivery_report)
    # Wait for any outstanding messages to be delivered
    producer.flush()

if __name__ == '__main__':
    # Create a Kafka producer
    kafka_producer = create_kafka_producer()

    # Define the messages to be produced
    messages = [
        'Hello Kafka!',
        'This is a test message.',
        'Another message.',
    ]

    # Produce the messages
    produce_messages(kafka_producer, messages)

    # Close the Kafka producer
    kafka_producer.close()
```





```
from confluent_kafka import Consumer, KafkaException

# Kafka broker configuration
bootstrap_servers = 'localhost:9092'
group_id = 'my_consumer_group'
topic = 'my_topic'

def create_kafka_consumer():
    """Create a Kafka consumer with the specified bootstrap servers and group ID."""
    config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id
    }
    consumer = Consumer(config)
    return consumer

def subscribe_to_topic(consumer):
    """Subscribe to the specified Kafka topic."""
    consumer.subscribe([topic])

def consume_messages(consumer):
    """Consume messages from the subscribed Kafka topic."""
    try:
        while True:
            msg = consumer.poll(1.0)  # Poll for new messages
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaException._PARTITION_EOF:
                    # End of partition reached, continue polling
                    continue
                else:
                    print(f'Error occurred: {msg.error().str()}')
                    break
            else:
                # Process the received message
                print(f'Received message: {msg.value().decode("utf-8")}')

    except KeyboardInterrupt:
        pass

    finally:
        # Close the Kafka consumer
        consumer.close()

if __name__ == '__main__':
    # Create a Kafka consumer
    kafka_consumer = create_kafka_consumer()

    # Subscribe to the Kafka topic
    subscribe_to_topic(kafka_consumer)

    # Consume messages from the topic
    consume_messages(kafka_consumer)
```



   c) Test the end-to-end flow of message production and consumption.

To test the end-to-end flow of message production and consumption, you need to run both the producer and consumer programs simultaneously. Here's an example of how you can test the flow:

1. Run the Kafka producer program in one terminal or command prompt window:


```
python kafka_producer.py
```

2.In another terminal or command prompt window, run the Kafka consumer program:

```
python kafka_consumer.py
```





5. Working with Kafka Consumer Groups:

   a) Write a Python program to create a Kafka consumer within a consumer group.




```
from confluent_kafka import Consumer, KafkaException

# Kafka broker configuration
bootstrap_servers = 'localhost:9092'
group_id = 'my_consumer_group'
topic = 'my_topic'

def create_kafka_consumer():
    """Create a Kafka consumer with the specified bootstrap servers and group ID."""
    config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'  # Start consuming from the beginning of the topic
    }
    consumer = Consumer(config)
    return consumer

def subscribe_to_topic(consumer):
    """Subscribe to the specified Kafka topic."""
    consumer.subscribe([topic])

def consume_messages(consumer):
    """Consume messages from the subscribed Kafka topic."""
    try:
        while True:
            msg = consumer.poll(1.0)  # Poll for new messages
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaException._PARTITION_EOF:
                    # End of partition reached, continue polling
                    continue
                else:
                    print(f'Error occurred: {msg.error().str()}')
                    break
            else:
                # Process the received message
                print(f'Received message: {msg.value().decode("utf-8")}')

    except KeyboardInterrupt:
        pass

    finally:
        # Close the Kafka consumer
        consumer.close()

if __name__ == '__main__':
    # Create a Kafka consumer
    kafka_consumer = create_kafka_consumer()

    # Subscribe to the Kafka topic
    subscribe_to_topic(kafka_consumer)

    # Consume messages from the topic
    consume_messages(kafka_consumer)
```



 b) Implement logic to handle messages consumed by different consumers within the same group.




```
from confluent_kafka import Consumer, KafkaException

# Kafka broker configuration
bootstrap_servers = 'localhost:9092'
group_id = 'my_consumer_group'
topic = 'my_topic'

def create_kafka_consumer():
    """Create a Kafka consumer with the specified bootstrap servers and group ID."""
    config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'  # Start consuming from the beginning of the topic
    }
    consumer = Consumer(config)
    return consumer

def subscribe_to_topic(consumer):
    """Subscribe to the specified Kafka topic."""
    consumer.subscribe([topic])

def consume_messages(consumer):
    """Consume messages from the subscribed Kafka topic."""
    try:
        while True:
            msg = consumer.poll(1.0)  # Poll for new messages
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaException._PARTITION_EOF:
                    # End of partition reached, continue polling
                    continue
                else:
                    print(f'Error occurred: {msg.error().str()}')
                    break
            else:
                # Process the received message
                process_message(msg)

    except KeyboardInterrupt:
        pass

    finally:
        # Close the Kafka consumer
        consumer.close()

def process_message(msg):
    """Process the received message."""
    partition = msg.partition()
    offset = msg.offset()
    value = msg.value().decode('utf-8')
    print(f'Received message: Partition={partition}, Offset={offset}, Value={value}')

if __name__ == '__main__':
    # Create a Kafka consumer
    kafka_consumer = create_kafka_consumer()

    # Subscribe to the Kafka topic
    subscribe_to_topic(kafka_consumer)

    # Consume messages from the topic
    consume_messages(kafka_consumer)
```



c) Observe the behavior of consumer group rebalancing when adding or removing consumers.

When you add or remove consumers within a consumer group, Kafka performs a process called "consumer group rebalancing" to redistribute the partitions among the consumers. This ensures an even distribution of partitions and load across the consumers within the group. Let's observe the behavior of consumer group rebalancing by performing the following steps:

1.Start the consumer program with a single consumer within the consumer group:

```
python kafka_consumer.py
```
2.While the consumer is running, start another instance of the consumer program in a new terminal or command prompt window:


```
python kafka_consumer.py
```


