# 1. Setting up a Kafka Producer:
   a) Write a Python program to create a Kafka producer.
   b) Configure the producer to connect to a Kafka cluster.
   c) Implement logic to send messages to a Kafka topic.


In [None]:
from confluent_kafka import Producer

def delivery_report(err, msg):
    """Delivery callback function called on successful or failed delivery of message"""
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")

def main():
    # Kafka broker(s) configuration
    kafka_brokers = "localhost:9092"  # Replace with your Kafka broker(s) address

    # Kafka topic to produce messages to
    kafka_topic = "my_topic"  # Replace with your Kafka topic

    # Create producer configuration
    producer_config = {
        "bootstrap.servers": kafka_brokers
    }

    # Create Kafka producer
    producer = Producer(producer_config)

    # Produce some messages
    for i in range(10):
        message_value = f"Message {i}"
        producer.produce(kafka_topic, value=message_value, callback=delivery_report)

    # Flush producer to ensure all messages are delivered
    producer.flush()

    # Close the producer
    producer.close()


if __name__ == "__main__":
    main()


# 2. Setting up a Kafka Consumer:
   a) Write a Python program to create a Kafka consumer.
   b) Configure the consumer to connect to a Kafka cluster.
   c) Implement logic to consume messages from a Kafka topic.


In [None]:
from confluent_kafka import Consumer, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic


def create_topic_if_not_exists(broker, topic, num_partitions=1, replication_factor=1):
    """Create a Kafka topic if it does not exist"""
    admin_client = AdminClient({"bootstrap.servers": broker})
    topic_metadata = admin_client.list_topics(timeout=5)

    if topic_metadata.topics.get(topic) is None:
        new_topic = NewTopic(topic, num_partitions, replication_factor)
        admin_client.create_topics([new_topic])
        print(f"Topic '{topic}' created successfully")
    else:
        print(f"Topic '{topic}' already exists")


def main():
    # Kafka broker(s) configuration
    kafka_brokers = "localhost:9092"  # Replace with your Kafka broker(s) address

    # Kafka topic to consume messages from
    kafka_topic = "my_topic"  # Replace with your Kafka topic

    # Create consumer configuration
    consumer_config = {
        "bootstrap.servers": kafka_brokers,
        "group.id": "my_consumer_group",  # Replace with your consumer group ID
        "auto.offset.reset": "earliest"
    }

    # Create Kafka consumer
    consumer = Consumer(consumer_config)

    # Subscribe to the Kafka topic
    consumer.subscribe([kafka_topic])

    # Start consuming messages
    try:
        while True:
            message = consumer.poll(timeout=1.0)

            if message is None:
                continue

            if message.error():
                if message.error().code() == KafkaException._PARTITION_EOF:
                    # End of partition event
                    print(f"Reached end of partition {message.partition()}")
                else:
                    # Error occurred
                    print(f"Error occurred: {message.error().str()}")
            else:
                # Process the received message
                print(f"Received message: {message.value().decode('utf-8')}")

    except KeyboardInterrupt:
        # User interrupted the program
        pass

    # Close the consumer
    consumer.close()


if __name__ == "__main__":
    main()


# 3. Creating and Managing Kafka Topics:
   a) Write a Python program to create a new Kafka topic.
   b) Implement functionality to list existing topics.
   c) Develop logic to delete an existing Kafka topic.


In [None]:
from confluent_kafka.admin import AdminClient, NewTopic


def create_topic(broker, topic, num_partitions=1, replication_factor=1):
    """Create a new Kafka topic"""
    admin_client = AdminClient({"bootstrap.servers": broker})
    topic_config = {
        "num_partitions": num_partitions,
        "replication_factor": replication_factor
    }
    new_topic = NewTopic(topic, **topic_config)

    # Create the topic
    admin_client.create_topics([new_topic])
    print(f"Topic '{topic}' created successfully")


def list_topics(broker):
    """List existing Kafka topics"""
    admin_client = AdminClient({"bootstrap.servers": broker})
    topic_metadata = admin_client.list_topics(timeout=5)
    topics = topic_metadata.topics.keys()

    if topics:
        print("Existing topics:")
        for topic in topics:
            print(topic)
    else:
        print("No topics found")


def delete_topic(broker, topic):
    """Delete an existing Kafka topic"""
    admin_client = AdminClient({"bootstrap.servers": broker})

    # Delete the topic
    admin_client.delete_topics([topic])
    print(f"Topic '{topic}' deleted successfully")


def main():
    # Kafka broker(s) configuration
    kafka_brokers = "localhost:9092"  # Replace with your Kafka broker(s) address

    # Create a new topic
    new_topic = "my_new_topic"
    create_topic(kafka_brokers, new_topic, num_partitions=3, replication_factor=1)

    # List existing topics
    list_topics(kafka_brokers)

    # Delete an existing topic
    topic_to_delete = "topic_to_delete"
    delete_topic(kafka_brokers, topic_to_delete)


if __name__ == "__main__":
    main()


# 4. Producing and Consuming Messages:
   a) Write a Python program to produce messages to a Kafka topic.
   b) Implement logic to consume messages from the same Kafka topic.
   c) Test the end-to-end flow of message production and consumption.


In [None]:
from confluent_kafka import Producer, Consumer, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic


def create_topic_if_not_exists(broker, topic, num_partitions=1, replication_factor=1):
    """Create a Kafka topic if it does not exist"""
    admin_client = AdminClient({"bootstrap.servers": broker})
    topic_metadata = admin_client.list_topics(timeout=5)

    if topic_metadata.topics.get(topic) is None:
        new_topic = NewTopic(topic, num_partitions, replication_factor)
        admin_client.create_topics([new_topic])
        print(f"Topic '{topic}' created successfully")
    else:
        print(f"Topic '{topic}' already exists")


def delivery_report(err, msg):
    """Delivery callback function called on successful or failed delivery of message"""
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")


def produce_messages(broker, topic, num_messages):
    """Produce messages to a Kafka topic"""
    producer = Producer({"bootstrap.servers": broker})

    for i in range(num_messages):
        message_value = f"Message {i}"
        producer.produce(topic, value=message_value, callback=delivery_report)

    producer.flush()
    producer.close()


def consume_messages(broker, topic):
    """Consume messages from a Kafka topic"""
    consumer = Consumer({
        "bootstrap.servers": broker,
        "group.id": "my_consumer_group",
        "auto.offset.reset": "earliest"
    })

    consumer.subscribe([topic])

    try:
        while True:
            message = consumer.poll(timeout=1.0)

            if message is None:
                continue

            if message.error():
                if message.error().code() == KafkaException._PARTITION_EOF:
                    print(f"Reached end of partition {message.partition()}")
                else:
                    print(f"Error occurred: {message.error().str()}")
            else:
                print(f"Received message: {message.value().decode('utf-8')}")

    except KeyboardInterrupt:
        pass

    consumer.close()


def main():
    # Kafka broker(s) configuration
    kafka_brokers = "localhost:9092"  # Replace with your Kafka broker(s) address

    # Kafka topic
    kafka_topic = "my_topic"  # Replace with your Kafka topic

    # Create the Kafka topic if it does not exist
    create_topic_if_not_exists(kafka_brokers, kafka_topic)

    # Produce messages to the Kafka topic
    num_messages_to_produce = 10
    produce_messages(kafka_brokers, kafka_topic, num_messages_to_produce)

    # Consume messages from the Kafka topic
    consume_messages(kafka_brokers, kafka_topic)


if __name__ == "__main__":
    main()


# 5. Working with Kafka Consumer Groups:
   a) Write a Python program to create a Kafka consumer within a consumer group.
   b) Implement logic to handle messages consumed by different consumers within the same group.
   c) Observe the behavior of consumer group rebalancing when adding or removing consumers.


In [None]:
from confluent_kafka import Consumer, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic


def create_topic_if_not_exists(broker, topic, num_partitions=1, replication_factor=1):
    """Create a Kafka topic if it does not exist"""
    admin_client = AdminClient({"bootstrap.servers": broker})
    topic_metadata = admin_client.list_topics(timeout=5)

    if topic_metadata.topics.get(topic) is None:
        new_topic = NewTopic(topic, num_partitions, replication_factor)
        admin_client.create_topics([new_topic])
        print(f"Topic '{topic}' created successfully")
    else:
        print(f"Topic '{topic}' already exists")


def consume_messages(broker, topic, group_id):
    """Consume messages from a Kafka topic within a consumer group"""
    consumer = Consumer({
        "bootstrap.servers": broker,
        "group.id": group_id,
        "auto.offset.reset": "earliest"
    })

    consumer.subscribe([topic])

    try:
        while True:
            message = consumer.poll(timeout=1.0)

            if message is None:
                continue

            if message.error():
                if message.error().code() == KafkaException._PARTITION_EOF:
                    print(f"Reached end of partition {message.partition()}")
                else:
                    print(f"Error occurred: {message.error().str()}")
            else:
                print(f"Consumer {group_id} received message: {message.value().decode('utf-8')}")

    except KeyboardInterrupt:
        pass

    consumer.close()


def main():
    # Kafka broker(s) configuration
    kafka_brokers = "localhost:9092"  # Replace with your Kafka broker(s) address

    # Kafka topic
    kafka_topic = "my_topic"  # Replace with your Kafka topic

    # Consumer group ID
    consumer_group_id = "my_consumer_group"  # Replace with your consumer group ID

    # Create the Kafka topic if it does not exist
    create_topic_if_not_exists(kafka_brokers, kafka_topic)

    # Start consumer within the consumer group
    consume_messages(kafka_brokers, kafka_topic, consumer_group_id)


if __name__ == "__main__":
    main()
