In [None]:
# Q1.Setting up a Kafka Producer:


# a) Create a Kafka producer:

from kafka import KafkaProducer

def create_kafka_producer(bootstrap_servers):
    return KafkaProducer(bootstrap_servers=bootstrap_servers)

producer = create_kafka_producer('localhost:9092')


# b) Configure the producer to connect to a Kafka cluster:

def create_kafka_producer(bootstrap_servers):
    return KafkaProducer(bootstrap_servers=bootstrap_servers)

bootstrap_servers = ['localhost:9092', 'localhost:9093', 'localhost:9094']
producer = create_kafka_producer(bootstrap_servers)


# c) Send messages to a Kafka topic:

def send_message(producer, topic, message):
    producer.send(topic, message.encode('utf-8')).get()

topic = 'my_topic'
message = 'Hello, Kafka!'
send_message(producer, topic, message)

In [None]:
# Q2.Setting up a Kafka Consumer:

# a) Create a Kafka consumer:

from kafka import KafkaConsumer

def create_kafka_consumer(bootstrap_servers, group_id):
    return KafkaConsumer(bootstrap_servers=bootstrap_servers, group_id=group_id)

consumer = create_kafka_consumer('localhost:9092', 'my_group')

# b) Configure the consumer to connect to a Kafka cluster:

def create_kafka_consumer(bootstrap_servers, group_id):
    return KafkaConsumer(bootstrap_servers=bootstrap_servers, group_id=group_id)

bootstrap_servers = ['localhost:9092', 'localhost:9093', 'localhost:9094']
group_id = 'my_group'
consumer = create_kafka_consumer(bootstrap_servers, group_id)

# c) Consume messages from a Kafka topic:

def consume_messages(consumer, topic):
    consumer.subscribe(topics=[topic])
    for message in consumer:
        print(message.value.decode('utf-8'))

topic = 'my_topic'
consume_messages(consumer, topic)

In [None]:
# Q3.Creating and Managing Kafka Topics:

# a) Create a new Kafka topic:

from kafka.admin import KafkaAdminClient, NewTopic

def create_kafka_topic(bootstrap_servers, topic, partitions, replication_factor):
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    topic_list = [NewTopic(name=topic, num_partitions=partitions, replication_factor=replication_factor)]
    admin_client.create_topics(new_topics=topic_list, validate_only=False)

topic = 'my_topic'
partitions = 1
replication_factor = 1
create_kafka_topic('localhost:9092', topic, partitions, replication_factor)

# b) List existing topics:

def list_kafka_topics(bootstrap_servers):
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    topics = admin_client.list_topics()
    for topic in topics:
        print(topic)

list_kafka_topics('localhost:9092')


# c) Delete an existing Kafka topic:

def delete_kafka_topic(bootstrap_servers, topic):
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    admin_client.delete_topics(topics=[topic], timeout_ms=10000)

topic = 'my_topic'
delete_kafka_topic('localhost:9092', topic)

In [None]:
# Q4.Producing and Consuming Messages:

# a) Produce messages to a Kafka topic:

from kafka import KafkaProducer

def create_kafka_producer(bootstrap_servers):
    return KafkaProducer(bootstrap_servers=bootstrap_servers)

def send_message(producer, topic, message):
    producer.send(topic, message.encode('utf-8')).get()

bootstrap_servers = ['localhost:9092', 'localhost:9093', 'localhost:9094']
producer = create_kafka_producer(bootstrap_servers)
topic = 'my_topic'
message = 'Hello, Kafka!'
send_message(producer, topic, message)

# b) Consume messages from the same Kafka topic:

from kafka import KafkaConsumer

def create_kafka_consumer(bootstrap_servers, group_id):
    return KafkaConsumer(bootstrap_servers=bootstrap_servers, group_id=group_id)

def consume_messages(consumer, topic):
    consumer.subscribe(topics=[topic])
    for message in consumer:
        print(message.value.decode('utf-8'))

bootstrap_servers = ['localhost:9092', 'localhost:9093', 'localhost:9094']
group_id = 'my_group'
consumer = create_kafka_consumer(bootstrap_servers, group_id)
topic = 'my_topic'
consume_messages(consumer, topic)

# c) Test the end-to-end flow of message production and consumption:

from kafka import KafkaProducer, KafkaConsumer

def create_kafka_producer(bootstrap_servers):
    return KafkaProducer(bootstrap_servers=bootstrap_servers)

def create_kafka_consumer(bootstrap_servers, group_id):
    return KafkaConsumer(bootstrap_servers=bootstrap_servers, group_id=group_id)

def send_message(producer, topic, message):
    producer.send(topic, message.encode('utf-8')).get()

def consume_messages(consumer, topic):
    consumer.subscribe(topics=[topic])
    for message in consumer:
        print(message.value.decode('utf-8'))

bootstrap_servers = ['localhost:9092', 'localhost:9093', 'localhost:9094']
topic = 'my_topic'
group_id = 'my_group'

# Create a Kafka producer
producer = create_kafka_producer(bootstrap_servers)

# Create a Kafka consumer
consumer = create_kafka_consumer(bootstrap_servers, group_id)

# Send messages to Kafka topic
send_message(producer, topic, 'Hello, Kafka!')
send_message(producer, topic, 'Testing message production and consumption.')

# Consume messages from Kafka topic
consume_messages(consumer, topic)

In [None]:
# Q5.Working with Kafka Consumer Groups:

# a) Create a Kafka consumer within a consumer group:

from kafka import KafkaConsumer

def create_kafka_consumer(bootstrap_servers, group_id):
    return KafkaConsumer(bootstrap_servers=bootstrap_servers, group_id=group_id)

bootstrap_servers = ['localhost:9092', 'localhost:9093', 'localhost:9094']
group_id = 'my_group'
consumer = create_kafka_consumer(bootstrap_servers, group_id)

# b) Handle messages consumed by different consumers within the same group:

def consume_messages(consumer, topic):
    consumer.subscribe(topics=[topic])
    for message in consumer:
        print(f"Consumer ID: {consumer._client_id}, Message: {message.value.decode('utf-8')}")

topic = 'my_topic'
consume_messages(consumer, topic)

# c) Observe the behavior of consumer group rebalancing when adding or removing consumers:
from kafka import KafkaConsumer
from time import sleep

def create_kafka_consumer(bootstrap_servers, group_id):
    return KafkaConsumer(bootstrap_servers=bootstrap_servers, group_id=group_id)

def consume_messages(consumer, topic):
    consumer.subscribe(topics=[topic])
    for message in consumer:
        print(f"Consumer ID: {consumer._client_id}, Message: {message.value.decode('utf-8')}")

bootstrap_servers = ['localhost:9092', 'localhost:9093', 'localhost:9094']
topic = 'my_topic'
group_id = 'my_group'

# Create the initial Kafka consumer
consumer = create_kafka_consumer(bootstrap_servers, group_id)

# Start consuming messages
consume_messages(consumer, topic)
