
1. Setting up a Kafka Producer:
   a) Write a Python program to create a Kafka producer.
   


In [None]:
from confluent_kafka import Producer


def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")


def produce_kafka_message(producer, topic, message):
    producer.produce(topic, message.encode('utf-8'), callback=delivery_report)
    producer.poll(0.5)  # Flushes the producer buffer


def main():
    # Kafka broker configuration
    bootstrap_servers = 'localhost:9092'
    topic = 'my_topic'

    # Create Kafka producer configuration
    conf = {
        'bootstrap.servers': bootstrap_servers
    }

    # Create Kafka producer instance
    producer = Producer(conf)

    # Produce some messages
    messages = [
        'Hello, Kafka!',
        'This is a test message.',
        'Another message for Kafka.'
    ]

    for message in messages:
        produce_kafka_message(producer, topic, message)

    # Flush any remaining messages in the producer buffer
    producer.flush()


if __name__ == '__main__':
    main()


1b) Configure the producer to connect to a Kafka cluster.
  

In [None]:
from confluent_kafka import Producer


def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")


def produce_kafka_message(producer, topic, message):
    producer.produce(topic, message.encode('utf-8'), callback=delivery_report)
    producer.poll(0.5)  # Flushes the producer buffer


def main():
    # Kafka broker configuration
    bootstrap_servers = 'kafka1:9092,kafka2:9092,kafka3:9092'
    topic = 'my_topic'

    # Create Kafka producer configuration
    conf = {
        'bootstrap.servers': bootstrap_servers,
        'client.id': 'my_producer',  # Optional: Assign a client ID for better tracking
        'acks': 'all',  # Optional: Set the number of acknowledgments the producer requires
        'compression.type': 'gzip'  # Optional: Enable compression for messages
        # Add any other desired configuration options here
    }

    # Create Kafka producer instance
    producer = Producer(conf)

    # Produce some messages
    messages = [
        'Hello, Kafka!',
        'This is a test message.',
        'Another message for Kafka.'
    ]

    for message in messages:
        produce_kafka_message(producer, topic, message)

    # Flush any remaining messages in the producer buffer
    producer.flush()


if __name__ == '__main__':
    main()


 1c) Implement logic to send messages to a Kafka topic.


*********************************************************************************

In [None]:
from confluent_kafka import Producer


def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")


def send_message(producer, topic, message):
    producer.produce(topic, message.encode('utf-8'), callback=delivery_report)
    producer.poll(0.5)  # Flushes the producer buffer


def main():
    # Kafka broker configuration
    bootstrap_servers = 'kafka1:9092,kafka2:9092,kafka3:9092'
    topic = 'my_topic'

    # Create Kafka producer configuration
    conf = {
        'bootstrap.servers': bootstrap_servers,
        'client.id': 'my_producer',
        'acks': 'all',
        'compression.type': 'gzip'
    }

    # Create Kafka producer instance
    producer = Producer(conf)

    # Send messages to Kafka topic
    while True:
        message = input("Enter a message (or 'exit' to quit): ")
        if message.lower() == 'exit':
            break
        send_message(producer, topic, message)

    # Flush any remaining messages in the producer buffer
    producer.flush()


if __name__ == '__main__':
    main()


##########################################################
:

2. Setting up a Kafka Consumer:
   a) Write a Python program to create a Kafka consumer.
   
  



In [None]:
from confluent_kafka import Consumer


def consume_kafka_messages(consumer, topic):
    consumer.subscribe([topic])

    while True:
        msg = consumer.poll(1.0)  # Poll for new messages
        if msg is None:
            continue
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue

        # Process the received message
        print(f"Received message: {msg.value().decode('utf-8')}")

    consumer.close()


def main():
    # Kafka broker configuration
    bootstrap_servers = 'localhost:9092'
    topic = 'my_topic'

    # Create Kafka consumer configuration
    conf = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': 'my_consumer_group',  # Consumer group ID for managing offsets
        'auto.offset.reset': 'earliest'  # Start consuming from the beginning of the topic
        # Add any other desired configuration options here
    }

    # Create Kafka consumer instance
    consumer = Consumer(conf)

    # Consume messages from Kafka topic
    consume_kafka_messages(consumer, topic)


if __name__ == '__main__':
    main()


 2b) Configure the consumer to connect to a Kafka cluster.
   


In [None]:
from confluent_kafka import Consumer


def consume_kafka_messages(consumer, topic):
    consumer.subscribe([topic])

    while True:
        messages = consumer.consume(num_messages=5, timeout=1.0)  # Poll for new messages
        for message in messages:
            if message is None:
                continue
            if message.error():
                print(f"Consumer error: {message.error()}")
                continue

            # Process the received message
            print(f"Received message: {message.value().decode('utf-8')}")

    consumer.close()


def main():
    # Kafka broker configuration
    bootstrap_servers = 'kafka1:9092,kafka2:9092,kafka3:9092'
    topic = 'my_topic'

    # Create Kafka consumer configuration
    conf = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': 'my_consumer_group',
        'auto.offset.reset': 'earliest'
        # Add any other desired configuration options here
    }

    # Create Kafka consumer instance
    consumer = Consumer(conf)

    # Consume messages from Kafka topic
    consume_kafka_messages(consumer, topic)


if __name__ == '__main__':
    main()


 c) Implement logic to consume messages from a Kafka topic.

In [None]:
from confluent_kafka import Consumer


def consume_kafka_messages(consumer, topic):
    consumer.subscribe([topic])

    while True:
        messages = consumer.consume(num_messages=5, timeout=1.0)  # Poll for new messages
        for message in messages:
            if message is None:
                continue
            if message.error():
                print(f"Consumer error: {message.error()}")
                continue

            # Process the received message
            print(f"Received message: {message.value().decode('utf-8')}")

        # Commit the consumed messages' offsets
        consumer.commit()


def main():
    # Kafka broker configuration
    bootstrap_servers = 'kafka1:9092,kafka2:9092,kafka3:9092'
    topic = 'my_topic'

    # Create Kafka consumer configuration
    conf = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': 'my_consumer_group',
        'auto.offset.reset': 'earliest'
        # Add any other desired configuration options here
    }

    # Create Kafka consumer instance
    consumer = Consumer(conf)

    # Consume messages from Kafka topic
    consume_kafka_messages(consumer, topic)


if __name__ == '__main__':
    main()



3. Creating and Managing Kafka Topics:
   a) Write a Python program to create a new Kafka topic.
   

In [None]:
from confluent_kafka.admin import AdminClient, NewTopic


def create_kafka_topic(bootstrap_servers, topic, partitions=1, replication_factor=1):
    # Create AdminClient configuration
    conf = {
        'bootstrap.servers': bootstrap_servers
    }

    # Create AdminClient instance
    admin_client = AdminClient(conf)

    # Create NewTopic object
    new_topic = NewTopic(
        topic,
        num_partitions=partitions,
        replication_factor=replication_factor
    )

    # Create the topic
    admin_client.create_topics([new_topic])


def main():
    # Kafka broker configuration
    bootstrap_servers = 'localhost:9092'
    topic = 'my_topic'
    partitions = 3
    replication_factor = 1

    # Create the Kafka topic
    create_kafka_topic(bootstrap_servers, topic, partitions, replication_factor)


if __name__ == '__main__':
    main()


b) Implement functionality to list existing topics.
   
   

In [None]:
from confluent_kafka.admin import AdminClient


def list_kafka_topics(bootstrap_servers):
    # Create AdminClient configuration
    conf = {
        'bootstrap.servers': bootstrap_servers
    }

    # Create AdminClient instance
    admin_client = AdminClient(conf)

    # Get the list of existing topics
    topics_metadata = admin_client.list_topics().topics

    # Print the list of topics
    print("Existing Topics:")
    for topic, metadata in topics_metadata.items():
        print(f"- {topic}")


def main():
    # Kafka broker configuration
    bootstrap_servers = 'localhost:9092'

    # List the Kafka topics
    list_kafka_topics(bootstrap_servers)


if __name__ == '__main__':
    main()


3 c) Develop logic to delete an existing Kafka topic.


In [None]:
from confluent_kafka.admin import AdminClient


def delete_kafka_topic(bootstrap_servers, topic):
    # Create AdminClient configuration
    conf = {
        'bootstrap.servers': bootstrap_servers
    }

    # Create AdminClient instance
    admin_client = AdminClient(conf)

    # Delete the topic
    admin_client.delete_topics([topic])


def main():
    # Kafka broker configuration
    bootstrap_servers = 'localhost:9092'
    topic = 'my_topic'

    # Delete the Kafka topic
    delete_kafka_topic(bootstrap_servers, topic)


if __name__ == '__main__':
    main()


#############################


4. Producing and Consuming Messages:
   a) Write a Python program to produce messages to a Kafka topic.


In [None]:
from confluent_kafka import Producer


def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")


def produce_kafka_message(producer, topic, message):
    producer.produce(topic, message.encode('utf-8'), callback=delivery_report)
    producer.poll(0.5)  # Flushes the producer buffer


def main():
    # Kafka broker configuration
    bootstrap_servers = 'localhost:9092'
    topic = 'my_topic'

    # Create Kafka producer configuration
    conf = {
        'bootstrap.servers': bootstrap_servers
    }

    # Create Kafka producer instance
    producer = Producer(conf)

    # Produce some messages
    messages = [
        'Hello, Kafka!',
        'This is a test message.',
        'Another message for Kafka.'
    ]

    for message in messages:
        produce_kafka_message(producer, topic, message)

    # Flush any remaining messages in the producer buffer
    producer.flush()


if __name__ == '__main__':
    main()


   b) Implement logic to consume messages from the same Kafka topic.
  

In [None]:
from confluent_kafka import Consumer


def consume_kafka_messages(consumer, topic):
    consumer.subscribe([topic])

    while True:
        msg = consumer.poll(1.0)  # Poll for new messages
        if msg is None:
            continue
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue

        # Process the received message
        print(f"Received message: {msg.value().decode('utf-8')}")


def main():
    # Kafka broker configuration
    bootstrap_servers = 'localhost:9092'
    topic = 'my_topic'

    # Create Kafka consumer configuration
    conf = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': 'my_consumer_group',
        'auto.offset.reset': 'earliest'
        # Add any other desired configuration options here
    }

    # Create Kafka consumer instance
    consumer = Consumer(conf)

    # Consume messages from Kafka topic
    consume_kafka_messages(consumer, topic)


if __name__ == '__main__':
    main()


 c) Test the end-to-end flow of message production and consumption.


In [1]:
Enter a message (or 'exit' to quit): Hello, Kafka!
Enter a message (or 'exit' to quit): This is a test message.
Enter a message (or 'exit' to quit): Another message for Kafka.


SyntaxError: ignored


5. Working with Kafka Consumer Groups:
   a) Write a Python program to create a Kafka consumer within a consumer group.



In [None]:
  from confluent_kafka import Consumer


def consume_kafka_messages(consumer, topic):
    consumer.subscribe([topic])

    while True:
        messages = consumer.consume(num_messages=5, timeout=1.0)  # Poll for new messages
        for message in messages:
            if message is None:
                continue
            if message.error():
                print(f"Consumer error: {message.error()}")
                continue

            # Process the received message
            print(f"Received message: {message.value().decode('utf-8')}")

        # Commit the consumed messages' offsets
        consumer.commit()


def main():
    # Kafka broker configuration
    bootstrap_servers = 'localhost:9092'
    topic = 'my_topic'
    group_id = 'my_consumer_group'

    # Create Kafka consumer configuration
    conf = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'
        # Add any other desired configuration options here
    }

    # Create Kafka consumer instance
    consumer = Consumer(conf)

    # Consume messages from Kafka topic within the consumer group
    consume_kafka_messages(consumer, topic)


if __name__ == '__main__':
    main()


 b) Implement logic to handle messages consumed by different consumers within the same group.
    

In [None]:
from confluent_kafka import Consumer


def consume_kafka_messages(consumer, topic):
    consumer.subscribe([topic])

    while True:
        messages = consumer.consume(num_messages=5, timeout=1.0)  # Poll for new messages
        for message in messages:
            if message is None:
                continue
            if message.error():
                print(f"Consumer error: {message.error()}")
                continue

            # Process the received message
            print(f"Consumer {consumer.consumer_id()} - Received message: {message.value().decode('utf-8')}")

        # Commit the consumed messages' offsets
        consumer.commit()


def main():
    # Kafka broker configuration
    bootstrap_servers = 'localhost:9092'
    topic = 'my_topic'
    group_id = 'my_consumer_group'

    # Create Kafka consumer configuration
    conf = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'
        # Add any other desired configuration options here
    }

    # Create Kafka consumer instance
    consumer = Consumer(conf)

    # Consume messages from Kafka topic within the consumer group
    consume_kafka_messages(consumer, topic)


if __name__ == '__main__':
    main()


c) Observe the behavior of consumer group rebalancing when adding or removing consumers.

When we add or remove consumers from a Kafka consumer group, it triggers a process called rebalancing. Rebalancing is the mechanism by which Kafka redistributes the partitions among the active consumers in the group to maintain balanced workload and ensure that each partition is consumed by only one consumer at a time. Here's how you can observe the behavior of consumer group rebalancing when adding or removing consumers:

Start the Kafka cluster: Making sure Kafka cluster is running and accessible.

Start the initial set of consumers: Run the consumer program (as described earlier) with a specified group.id and observe the messages being consumed by the consumers.

Add a new consumer: Start another instance of the consumer program with the same group.id as the existing consumers. Observe that the new consumer joins the group and starts consuming messages from some partitions. The consumer group rebalances to distribute the partitions among the active consumers.

Observe rebalancing behavior: During the rebalancing process, you will see log messages indicating the partition assignments and revocations. The rebalancing may take a few moments to complete, and each consumer's delivery_report callback may be called with err set to kafka.KafkaError._PARTITION_EOF. This indicates that the partitions being revoked are no longer assigned to that consumer.

Verify consumption behavior: After the rebalancing process completes, observe that the consumers have been assigned different partitions, and each partition is consumed by only one consumer. You will see the respective consumer IDs and the consumed messages in the output.

Remove a consumer: Stop one of the consumers that were previously running. Observe that the consumer group rebalances again to redistribute the partitions among the remaining consumers.

Observe rebalancing behavior: During the rebalancing process triggered by the consumer's removal, you will again see log messages indicating the partition assignments and revocations. The remaining consumers will be assigned the partitions that were previously consumed by the stopped consumer.

Verify consumption behavior: Once the rebalancing process completes, observe that the remaining consumers have new partition assignments, and each partition is consumed by only one consumer. The respective consumer IDs and consumed messages will be displayed in the output.

By adding or removing consumers while the consumer group is active, you can observe how Kafka rebalances the partitions among the consumers to ensure a balanced workload distribution.

Making sure have the necessary dependencies (confluent-kafka library) installed and