```

1. Setting up a Kafka Producer:
   a) Write a Python program to create a Kafka producer.
   b) Configure the producer to connect to a Kafka cluster.
   c) Implement logic to send messages to a Kafka topic.

2. Setting up a Kafka Consumer:
   a) Write a Python program to create a Kafka consumer.
   b) Configure the consumer to connect to a Kafka cluster.
   c) Implement logic to consume messages from a Kafka topic.

3. Creating and Managing Kafka Topics:
   a) Write a Python program to create a new Kafka topic.
   b) Implement functionality to list existing topics.
   c) Develop logic to delete an existing Kafka topic.

4. Producing and Consuming Messages:
   a) Write a Python program to produce messages to a Kafka topic.
   b) Implement logic to consume messages from the same Kafka topic.
   c) Test the end-to-end flow of message production and consumption.

5. Working with Kafka Consumer Groups:
   a) Write a Python program to create a Kafka consumer within a consumer group.
   b) Implement logic to handle messages consumed by different consumers within the same group.
   c) Observe the behavior of consumer group rebalancing when adding or removing consumers.

```

```
1. Setting up a Kafka Producer:

a) Python program to create a Kafka producer:
```python
from kafka import KafkaProducer

def create_kafka_producer(bootstrap_servers):
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
    return producer

# Example usage
bootstrap_servers = 'localhost:9092'
kafka_producer = create_kafka_producer(bootstrap_servers)
```

b) Configuring the producer to connect to a Kafka cluster:
Ensure that you have the `kafka-python` library installed (`pip install kafka-python`). Set the `bootstrap_servers` parameter to the address of your Kafka cluster.

c) Logic to send messages to a Kafka topic:
```python
def send_message(producer, topic, message):
    producer.send(topic, message.encode('utf-8'))
    producer.flush()

# Example usage
topic = 'my_topic'
message = 'Hello, Kafka!'
send_message(kafka_producer, topic, message)
```

2. Setting up a Kafka Consumer:

a) Python program to create a Kafka consumer:
```python
from kafka import KafkaConsumer

def create_kafka_consumer(bootstrap_servers, group_id):
    consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, group_id=group_id)
    return consumer

# Example usage
bootstrap_servers = 'localhost:9092'
group_id = 'my_consumer_group'
kafka_consumer = create_kafka_consumer(bootstrap_servers, group_id)
```

b) Configuring the consumer to connect to a Kafka cluster:
Ensure that you have the `kafka-python` library installed (`pip install kafka-python`). Set the `bootstrap_servers` parameter to the address of your Kafka cluster. Choose a unique `group_id` for your consumer.

c) Logic to consume messages from a Kafka topic:
```python
def consume_messages(consumer, topic):
    consumer.subscribe([topic])
    for message in consumer:
        print(message.value.decode('utf-8'))

# Example usage
topic = 'my_topic'
consume_messages(kafka_consumer, topic)
```

3. Creating and Managing Kafka Topics:

a) Python program to create a new Kafka topic:
```python
from kafka.admin import KafkaAdminClient, NewTopic

def create_kafka_topic(bootstrap_servers, topic, partitions, replication_factor):
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    topic_list = [NewTopic(name=topic, num_partitions=partitions, replication_factor=replication_factor)]
    admin_client.create_topics(new_topics=topic_list, validate_only=False)

# Example usage
bootstrap_servers = 'localhost:9092'
topic = 'my_topic'
partitions = 3
replication_factor = 1
create_kafka_topic(bootstrap_servers, topic, partitions, replication_factor)
```

b) Functionality to list existing topics:
```python
def list_kafka_topics(bootstrap_servers):
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    topics = admin_client.list_topics()
    return topics

# Example usage
bootstrap_servers = 'localhost:9092'
topics = list_kafka_topics(bootstrap_servers)
for topic in topics:
    print(topic)

```

c) Logic to delete an existing Kafka topic:
```python
def delete_kafka_topic(bootstrap_servers, topic):
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    admin_client.delete_topics(topics=[topic])

# Example usage
bootstrap_servers = 'localhost:9092'
topic = 'my_topic'
delete_kafka_topic(bootstrap_servers, topic)
```

4. Producing and Consuming Messages:

a) Python program to produce messages to a Kafka topic:
```python
from kafka import KafkaProducer

def create_kafka_producer(bootstrap_servers):
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
    return producer

def send_message(producer, topic, message):
    producer.send(topic, message.encode('utf-8'))
    producer.flush()

# Example usage
bootstrap_servers = 'localhost:9092'
kafka_producer = create_kafka_producer(bootstrap_servers)
topic = 'my_topic'
message = 'Hello, Kafka!'
send_message(kafka_producer, topic, message)
```

b) Python program to consume messages from the same Kafka topic:
```python
from kafka import KafkaConsumer

def create_kafka_consumer(bootstrap_servers, group_id):
    consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, group_id=group_id)
    return consumer

def consume_messages(consumer, topic):
    consumer.subscribe([topic])
    for message in consumer:
        print(message.value.decode('utf-8'))

# Example usage
bootstrap_servers = 'localhost:9092'
group_id = 'my_consumer_group'
kafka_consumer = create_kafka_consumer(bootstrap_servers, group_id)
topic = 'my_topic'
consume_messages(kafka_consumer, topic)
```

c) Test the end-to-end flow of message production and consumption.

5. Working with Kafka Consumer Groups:

a) Python program to create a Kafka consumer within a consumer group:
```python
from kafka import KafkaConsumer

def create_kafka_consumer(bootstrap_servers, group_id):
    consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, group_id=group_id)
    return consumer

# Example usage
bootstrap_servers = 'localhost:9092'
group_id = 'my_consumer_group'
kafka_consumer = create_kafka_consumer(bootstrap_servers, group_id)
```

b) Logic to handle messages consumed by different consumers within the same group:
```python
def consume_messages(consumer, topic):
    consumer.subscribe([topic])
    for message in consumer:
        print("Consumer ID:", consumer._consumer_id, "- Message:", message.value.decode('utf-8'))

# Example usage
topic = 'my_topic'
consume_messages(kafka_consumer, topic)
```

c) Observe the behavior of consumer group rebalancing when adding or removing consumers.

Note: Make sure you have the `kafka-python` library installed (`pip install kafka-python`). Adjust the bootstrap server address, topic names, and group IDs according to your Kafka cluster configuration.
```