In [None]:
# 1. Setting up a Kafka Producer:
#    a) Write a Python program to create a Kafka producer.
#    b) Configure the producer to connect to a Kafka cluster.
#    c) Implement logic to send messages to a Kafka topic.
#Sol:
from kafka import KafkaProducer

# Set up Kafka producer configuration
bootstrap_servers = 'your_kafka_server:9092'  # Replace with your Kafka server address
topic = 'your_topic'  # Replace with your desired Kafka topic

# Create Kafka producer
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

# Function to send messages to Kafka topic
def send_message(message):
    producer.send(topic, message.encode('utf-8'))
    producer.flush()  # Optional: To ensure the message is sent immediately

# Example usage
message = 'Hello, Kafka!'
send_message(message)



In [None]:
# 2. Setting up a Kafka Consumer:
#    a) Write a Python program to create a Kafka consumer.
#    b) Configure the consumer to connect to a Kafka cluster.
#    c) Implement logic to consume messages from a Kafka topic.
#Sol:
from confluent_kafka import Consumer, KafkaError

# Kafka cluster configuration
bootstrap_servers = 'localhost:9092'
topic = 'your_topic_name'
group_id = 'your_consumer_group_id'

# Create consumer configuration
consumer_config = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': group_id,
    'auto.offset.reset': 'earliest'  # Set to 'latest' if you want to consume only new messages
}

# Create Kafka consumer instance
consumer = Consumer(consumer_config)

# Subscribe to the Kafka topic
consumer.subscribe([topic])

# Start consuming messages
try:
    while True:
        msg = consumer.poll(1.0)  # Wait for 1 second for new messages

        if msg is None:
            continue
        elif msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition, continue polling
                continue
            else:
                # Handle other errors
                print(f'Error: {msg.error().str()}')
                continue

        # Process the received message
        print(f'Received message: {msg.value().decode("utf-8")}')

except KeyboardInterrupt:
    pass

# Close the consumer
consumer.close()


In [None]:

# 3. Creating and Managing Kafka Topics:
# a) Write a Python program to create a new Kafka topic.
#Sol:
from kafka.admin import KafkaAdminClient, NewTopic

def create_topic(bootstrap_servers, topic_name, partitions=1, replication_factor=1):
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    topic = NewTopic(name=topic_name, num_partitions=partitions, replication_factor=replication_factor)
    admin_client.create_topics([topic])
    print(f"Topic '{topic_name}' created successfully!")

# Example usage
bootstrap_servers = 'localhost:9092'  # Replace with your Kafka broker addresses
topic_name = 'my_topic'
create_topic(bootstrap_servers, topic_name)

# b) Implement functionality to list existing topics.
#Sol:
from kafka import KafkaAdminClient

def list_topics(bootstrap_servers):
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    topics = admin_client.list_topics()
    print("Existing topics:")
    for topic in topics:
        print(topic)

# Example usage
bootstrap_servers = 'localhost:9092'  # Replace with your Kafka broker addresses
list_topics(bootstrap_servers)
# c) Develop logic to delete an existing Kafka topic.
#Sol:
from kafka.admin import KafkaAdminClient

def delete_topic(bootstrap_servers, topic_name):
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    admin_client.delete_topics([topic_name])
    print(f"Topic '{topic_name}' deleted successfully!")

# Example usage
bootstrap_servers = 'localhost:9092'  # Replace with your Kafka broker addresses
topic_name = 'my_topic'
delete_topic(bootstrap_servers, topic_name)



In [None]:
# 4. Producing and Consuming Messages:
#    a) Write a Python program to produce messages to a Kafka topic.
# Sol:
from kafka import KafkaProducer

def produce_messages(bootstrap_servers, topic_name, messages):
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
    for message in messages:
        producer.send(topic_name, message.encode('utf-8'))
    producer.flush()
    print("Messages produced successfully!")

# Example usage
bootstrap_servers = 'localhost:9092'  # Replace with your Kafka broker addresses
topic_name = 'my_topic'
messages = ['Hello', 'World']
produce_messages(bootstrap_servers, topic_name, messages)

# b) Implement logic to consume messages from the same Kafka topic.
# c) Test the end-to-end flow of message production and consumption.
# Sol:
from kafka import KafkaConsumer

def consume_messages(bootstrap_servers, topic_name):
    consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, group_id='my_consumer_group')
    consumer.subscribe([topic_name])

    for message in consumer:
        print(f"Received message: {message.value.decode('utf-8')}")

# Example usage
bootstrap_servers = 'localhost:9092'  # Replace with your Kafka broker addresses
topic_name = 'my_topic'
consume_messages(bootstrap_servers, topic_name)



In [None]:
# 5. Working with Kafka Consumer Groups:
#    a) Write a Python program to create a Kafka consumer within a consumer group.
#Sol:
from confluent_kafka import Consumer, KafkaError

def consume_messages():
    consumer_config = {
        'bootstrap.servers': 'localhost:9092',  # Kafka broker(s) address
        'group.id': 'my-consumer-group',  # Consumer group ID
        'auto.offset.reset': 'earliest'  # Start consuming from the beginning of the topic
    }

    consumer = Consumer(consumer_config)
    consumer.subscribe(['my-topic'])  # Subscribe to the desired topic(s)

    while True:
        message = consumer.poll(1.0)  # Poll for new messages, with a timeout of 1 second

        if message is None:
            continue

        if message.error():
            if message.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"Error: {message.error()}")
                break

        # Process the consumed message
        print(f"Received message: {message.value().decode('utf-8')}")

    consumer.close()


if __name__ == '__main__':
    consume_messages()

#    b) Implement logic to handle messages consumed by different consumers within the same group.
#Sol:
from confluent_kafka import Consumer, KafkaError

def consume_messages():
    consumer_config = {
        'bootstrap.servers': 'localhost:9092',  # Kafka broker(s) address
        'group.id': 'my-consumer-group',  # Consumer group ID
        'auto.offset.reset': 'earliest'  # Start consuming from the beginning of the topic
    }

    consumer = Consumer(consumer_config)
    consumer.subscribe(['my-topic'])  # Subscribe to the desired topic(s)

    while True:
        message = consumer.poll(1.0)  # Poll for new messages, with a timeout of 1 second

        if message is None:
            continue

        if message.error():
            if message.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"Error: {message.error()}")
                break

        # Process the consumed message
        process_message(message)

    consumer.close()


def process_message(message):
    # Implement your logic to handle the message here
    print(f"Consumer ID: {message.key().decode('utf-8')}, Message: {message.value().decode('utf-8')}")


if __name__ == '__main__':
    consume_messages()

#    c) Observe the behavior of consumer group rebalancing when adding or removing consumers.
#Sol:
# When you add or remove consumers from a Kafka consumer group, the group undergoes rebalancing. Rebalancing involves redistributing the partitions among the active consumers to ensure that each consumer is assigned a fair share of partitions.

# To observe the behavior of consumer group rebalancing when adding or removing consumers, you can follow these steps:

# 1:Start the initial set of consumers running the consumer program mentioned in part (a).
# 2:Observe the partition assignments for each consumer in the group. You can print the assigned partitions using the assignment() method of the consumer object.
# 3:Add or remove consumers from the consumer group by starting or stopping additional instances of the consumer program.
# 4:Observe the console output of the consumers. You will see that the partition assignments may change as rebalancing occurs.
# 5:Monitor the partition assignments periodically to see how they adjust during rebalancing.

# By following these steps, you can observe how Kafka handles consumer group rebalancing dynamically based on the number of consumers and the topic's partition configuration.
