1. Setting up a Kafka Producer:
   a) Write a Python program to create a Kafka producer.
   b) Configure the producer to connect to a Kafka cluster.
   c) Implement logic to send messages to a Kafka topic.


In [None]:
pip install confluent-kafka
from confluent_kafka import Producer

# Configure the Kafka broker(s)
bootstrap_servers = 'localhost:9092'  # Update with your Kafka broker addresses

# Create Kafka producer configuration
producer_config = {
    'bootstrap.servers': bootstrap_servers,
    'client.id': 'my-kafka-producer'  # Provide a unique client ID
}

# Create the Kafka producer instance
producer = Producer(producer_config)

# Define the Kafka topic to which messages will be sent
topic = 'my-topic'  # Update with your Kafka topic name

# Define the message to be sent
message = 'Hello, Kafka!'

# Produce and send the message to the Kafka topic
producer.produce(topic, value=message)

# Flush the producer to ensure the message is sent
producer.flush()

# Close the Kafka producer
producer.close()

2. Setting up a Kafka Consumer:
   a) Write a Python program to create a Kafka consumer.
   b) Configure the consumer to connect to a Kafka cluster.
   c) Implement logic to consume messages from a Kafka topic.


In [None]:
from confluent_kafka import Consumer

# Configure the Kafka broker(s)
bootstrap_servers = 'localhost:9092'  # Update with your Kafka broker addresses

# Create Kafka consumer configuration
consumer_config = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': 'my-consumer-group',  # Provide a unique consumer group ID
    'auto.offset.reset': 'earliest'  # Start consuming from the beginning of the topic
}

# Create the Kafka consumer instance
consumer = Consumer(consumer_config)

# Subscribe to the Kafka topic
topic = 'my-topic'  # Update with your Kafka topic name
consumer.subscribe([topic])

# Start consuming messages from the Kafka topic
while True:
    msg = consumer.poll(1.0)  # Wait for 1 second for new messages

    if msg is None:
        continue

    if msg.error():
        print(f"Consumer error: {msg.error()}")
        continue

    # Process the consumed message
    message = msg.value().decode('utf-8')
    print(f"Received message: {message}")

# Close the Kafka consumer
consumer.close()


3. Creating and Managing Kafka Topics:
   a) Write a Python program to create a new Kafka topic.
   b) Implement functionality to list existing topics.
   c) Develop logic to delete an existing Kafka topic.


In [None]:
from confluent_kafka.admin import AdminClient, NewTopic

# Configure the Kafka broker(s)
bootstrap_servers = 'localhost:9092'  # Update with your Kafka broker addresses

# Create Kafka admin client configuration
admin_config = {
    'bootstrap.servers': bootstrap_servers
}

# Create the Kafka admin client instance
admin_client = AdminClient(admin_config)

def create_topic(topic_name, num_partitions, replication_factor):
    # Create a NewTopic object with the desired configuration
    new_topic = NewTopic(topic_name, num_partitions, replication_factor)

    # Create the topic using the admin client
    admin_client.create_topics([new_topic])

    print(f"Topic '{topic_name}' created successfully.")

def list_topics():
    # Retrieve the list of existing topics using the admin client
    topic_metadata = admin_client.list_topics().topics

    print("Existing topics:")
    for topic_name, topic_info in topic_metadata.items():
        print(f"- {topic_name} (partitions: {len(topic_info.partitions)})")

def delete_topic(topic_name):
    # Delete the specified topic using the admin client
    admin_client.delete_topics([topic_name])

    print(f"Topic '{topic_name}' deleted successfully.")

# Usage examples
create_topic("my-topic", num_partitions=3, replication_factor=1)
list_topics()
delete_topic("my-topic")


4. Producing and Consuming Messages:
   a) Write a Python program to produce messages to a Kafka topic.
   b) Implement logic to consume messages from the same Kafka topic.
   c) Test the end-to-end flow of message production and consumption.


In [None]:
from confluent_kafka import Producer, Consumer

# Configure the Kafka broker(s)
bootstrap_servers = 'localhost:9092'  # Update with your Kafka broker addresses

# Create Kafka producer configuration
producer_config = {
    'bootstrap.servers': bootstrap_servers,
    'client.id': 'my-kafka-producer'  # Provide a unique client ID
}

# Create Kafka consumer configuration
consumer_config = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': 'my-consumer-group',  # Provide a unique consumer group ID
    'auto.offset.reset': 'earliest'  # Start consuming from the beginning of the topic
}

# Create the Kafka producer and consumer instances
producer = Producer(producer_config)
consumer = Consumer(consumer_config)

# Define the Kafka topic to produce and consume messages
topic = 'my-topic'  # Update with your Kafka topic name

# Define the messages to be sent
messages = [
    'Message 1',
    'Message 2',
    'Message 3'
]

# Produce messages to the Kafka topic
for message in messages:
    producer.produce(topic, value=message)

# Flush the producer to ensure all messages are sent
producer.flush()

# Subscribe to the Kafka topic to consume messages
consumer.subscribe([topic])

# Start consuming messages from the Kafka topic
while True:
    msg = consumer.poll(1.0)  # Wait for 1 second for new messages

    if msg is None:
        continue

    if msg.error():
        print(f"Consumer error: {msg.error()}")
        continue

    # Process the consumed message
    message = msg.value().decode('utf-8')
    print(f"Received message: {message}")

    # Commit the offset to mark the message as consumed
    consumer.commit()

    # Break the loop if all messages have been consumed
    if len(messages) == consumer.committed_messages():
        break

# Close the Kafka producer and consumer
producer.close()
consumer.close()


5. Working with Kafka Consumer Groups:
   a) Write a Python program to create a Kafka consumer within a consumer group.
   b) Implement logic to handle messages consumed by different consumers within the same group.
   c) Observe the behavior of consumer group rebalancing when adding or removing consumers.


In [None]:
from confluent_kafka import Consumer, KafkaException

# Configure the Kafka broker(s)
bootstrap_servers = 'localhost:9092'  # Update with your Kafka broker addresses

# Create Kafka consumer configuration
consumer_config = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': 'my-consumer-group',  # Provide a unique consumer group ID
    'auto.offset.reset': 'earliest'  # Start consuming from the beginning of the topic
}

# Create the Kafka consumer instance
consumer = Consumer(consumer_config)

# Define the Kafka topic to consume messages from
topic = 'my-topic'  # Update with your Kafka topic name

# Subscribe to the Kafka topic
consumer.subscribe([topic])

# Start consuming messages from the Kafka topic
try:
    while True:
        msg = consumer.poll(1.0)  # Wait for 1 second for new messages

        if msg is None:
            continue

        if msg.error():
            if msg.error().code() == KafkaException._PARTITION_EOF:
                # Reached the end of a partition, ignore
                continue

            print(f"Consumer error: {msg.error()}")
            break

        # Process the consumed message
        message = msg.value().decode('utf-8')
        print(f"Received message: {message}")

except KeyboardInterrupt:
    pass

finally:
    # Close the Kafka consumer
    consumer.close()