Setting up a Kafka Producer:
   a) Write a Python program to create a Kafka producer.
   b) Configure the producer to connect to a Kafka cluster.
   c) Implement logic to send messages to a Kafka topic.


In [None]:
from confluent_kafka import Producer

def kafka_producer(bootstrap_servers):
    # Configure Kafka producer
    producer_config = {
        'bootstrap.servers': bootstrap_servers
    }

    # Create Kafka producer instance
    producer = Producer(producer_config)

    return producer

def send_message(producer, topic, message):
    # Send message to Kafka topic
    producer.produce(topic, message)

    # Flush producer to ensure message is sent
    producer.flush()

    print(f"Message sent to topic '{topic}': {message}")

# Kafka connection details
bootstrap_servers = "your_bootstrap_servers"
kafka_topic = "your_kafka_topic"

# Create Kafka producer
producer = kafka_producer(bootstrap_servers)

# Send a message to Kafka topic
message = "Hello, Kafka!"
send_message(producer, kafka_topic, message)

# Close the Kafka producer
producer.close()


2. Setting up a Kafka Consumer:
   a) Write a Python program to create a Kafka consumer.
   b) Configure the consumer to connect to a Kafka cluster.
   c) Implement logic to consume messages from a Kafka topic.


In [None]:
from confluent_kafka import Consumer

def kafka_consumer(bootstrap_servers, group_id):
    # Configure Kafka consumer
    consumer_config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id
    }

    # Create Kafka consumer instance
    consumer = Consumer(consumer_config)

    return consumer

def consume_messages(consumer, topic):
    # Subscribe to Kafka topic
    consumer.subscribe([topic])

    # Consume messages from Kafka topic
    while True:
        message = consumer.poll(1.0)  # Wait for message for 1 second

        if message is None:
            continue
        if message.error():
            print(f"Consumer error: {message.error()}")
            continue

        # Process received message
        value = message.value().decode('utf-8')
        print(f"Received message from topic '{topic}': {value}")

    # Close the Kafka consumer
    consumer.close()

# Kafka connection details
bootstrap_servers = "your_bootstrap_servers"
consumer_group_id = "your_group_id"
kafka_topic = "your_kafka_topic"

# Create Kafka consumer
consumer = kafka_consumer(bootstrap_servers, consumer_group_id)

# Consume messages from Kafka topic
consume_messages(consumer, kafka_topic)


3. Creating and Managing Kafka Topics:
   a) Write a Python program to create a new Kafka topic.
   b) Implement functionality to list existing topics.
   c) Develop logic to delete an existing Kafka topic.


In [None]:
from confluent_kafka.admin import AdminClient, NewTopic

def create_topic(bootstrap_servers, topic_name, partitions, replication_factor):
    # Configure AdminClient
    admin_config = {'bootstrap.servers': bootstrap_servers}
    admin_client = AdminClient(admin_config)

    # Create NewTopic object
    new_topic = NewTopic(topic_name, num_partitions=partitions, replication_factor=replication_factor)

    # Create topic using AdminClient
    admin_client.create_topics([new_topic])

    print(f"Topic '{topic_name}' created successfully.")

    # Close AdminClient
    admin_client.close()

def list_topics(bootstrap_servers):
    # Configure AdminClient
    admin_config = {'bootstrap.servers': bootstrap_servers}
    admin_client = AdminClient(admin_config)

    # Get list of existing topics
    topics = admin_client.list_topics().topics

    print("Existing topics:")
    for topic_name in topics:
        print(topic_name)

    # Close AdminClient
    admin_client.close()

def delete_topic(bootstrap_servers, topic_name):
    # Configure AdminClient
    admin_config = {'bootstrap.servers': bootstrap_servers}
    admin_client = AdminClient(admin_config)

    # Delete topic using AdminClient
    admin_client.delete_topics([topic_name])

    print(f"Topic '{topic_name}' deleted successfully.")

    # Close AdminClient
    admin_client.close()

# Kafka connection details
bootstrap_servers = "your_bootstrap_servers"
topic_name = "your_topic_name"
partitions = 3
replication_factor = 1

# Create a new Kafka topic
create_topic(bootstrap_servers, topic_name, partitions, replication_factor)

# List existing Kafka topics
list_topics(bootstrap_servers)

# Delete an existing Kafka topic
delete_topic(bootstrap_servers, topic_name)


4. Producing and Consuming Messages:
   a) Write a Python program to produce messages to a Kafka topic.
   b) Implement logic to consume messages from the same Kafka topic.
   c) Test the end-to-end flow of message production and consumption.


In [None]:
from confluent_kafka import Producer

def kafka_producer(bootstrap_servers):
    # Configure Kafka producer
    producer_config = {
        'bootstrap.servers': bootstrap_servers
    }

    # Create Kafka producer instance
    producer = Producer(producer_config)

    return producer

def produce_messages(producer, topic, messages):
    # Produce messages to Kafka topic
    for message in messages:
        producer.produce(topic, message.encode('utf-8'))

    # Flush producer to ensure all messages are sent
    producer.flush()

    print("Messages sent to Kafka topic.")

# Kafka connection details
bootstrap_servers = "your_bootstrap_servers"
kafka_topic = "your_kafka_topic"

# Create Kafka producer
producer = kafka_producer(bootstrap_servers)

# Produce messages to Kafka topic
messages = [
    "Message 1",
    "Message 2",
    "Message 3"
]
produce_messages(producer, kafka_topic, messages)

# Close the Kafka producer
producer.close()

from confluent_kafka import Consumer

def kafka_consumer(bootstrap_servers, group_id):
    # Configure Kafka consumer
    consumer_config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id
    }

    # Create Kafka consumer instance
    consumer = Consumer(consumer_config)

    return consumer

def consume_messages(consumer, topic):
    # Subscribe to Kafka topic
    consumer.subscribe([topic])

    # Consume messages from Kafka topic
    while True:
        message = consumer.poll(1.0)  # Wait for message for 1 second

        if message is None:
            continue
        if message.error():
            print(f"Consumer error: {message.error()}")
            continue

        # Process received message
        value = message.value().decode('utf-8')
        print(f"Received message from topic '{topic}': {value}")

    # Close the Kafka consumer
    consumer.close()

# Kafka connection details
bootstrap_servers = "your_bootstrap_servers"
consumer_group_id = "your_group_id"
kafka_topic = "your_kafka_topic"

# Create Kafka consumer
consumer = kafka_consumer(bootstrap_servers, consumer_group_id)

# Consume messages from Kafka topic
consume

# Kafka connection details
bootstrap_servers = "your_bootstrap_servers"
kafka_topic = "your_kafka_topic"

# Create Kafka producer
producer = kafka_producer(bootstrap_servers)

# Produce messages to Kafka topic
messages = [
    "Message 1",
    "Message 2",
    "Message 3"
]
produce_messages(producer, kafka_topic, messages)

# Close the Kafka producer
producer.close()

# Create Kafka consumer
consumer = kafka_consumer(bootstrap_servers, consumer_group_id)

# Consume messages from Kafka topic
consume_messages(consumer, kafka_topic)
