In [None]:
from confluent_kafka import Producer

# Configure the Kafka cluster settings
bootstrap_servers = 'localhost:9092'  # Replace with your Kafka broker addresses
topic = 'your_topic'  # Replace with the desired topic name

def delivery_report(err, msg):
    """Callback function called once the message is delivered or delivery fails."""
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

def send_message(producer, message):
    """Send a message to the Kafka topic."""
    producer.produce(topic, value=message, callback=delivery_report)
    # Flush the producer to ensure the message is sent
    producer.flush()

def create_producer():
    """Create and configure a Kafka producer."""
    config = {
        'bootstrap.servers': bootstrap_servers
    }
    producer = Producer(config)
    return producer

# Create a Kafka producer
producer = create_producer()

# Send messages to the Kafka topic
while True:
    message = input("Enter a message (or 'exit' to quit): ")
    if message.lower() == 'exit':
        break
    send_message(producer, message.encode('utf-8'))

# Close the Kafka producer
producer.flush()
producer.close()


In [None]:
from confluent_kafka import Consumer, KafkaException, KafkaError

# Configure the Kafka cluster settings
bootstrap_servers = 'localhost:9092'  # Replace with your Kafka broker addresses
topic = 'your_topic'  # Replace with the desired topic name
group_id = 'your_consumer_group'  # Replace with your consumer group ID

def consume_messages(consumer):
    """Consume messages from the Kafka topic."""
    consumer.subscribe([topic])

    while True:
        try:
            message = consumer.poll(1.0)

            if message is None:
                continue
            if message.error():
                if message.error().code() == KafkaError._PARTITION_EOF:
                    print(f'Reached end of partition {message.partition()}')
                else:
                    print(f'Error occurred: {message.error().str()}')
                continue

            # Process the received message
            print(f'Received message: {message.value().decode("utf-8")}')

        except KeyboardInterrupt:
            break

    # Close the consumer
    consumer.close()

def create_consumer():
    """Create and configure a Kafka consumer."""
    config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'
    }
    consumer = Consumer(config)
    return consumer

# Create a Kafka consumer
consumer = create_consumer()

# Consume messages from the Kafka topic
consume_messages(consumer)


In [None]:
from confluent_kafka.admin import AdminClient, NewTopic

# Configure the Kafka cluster settings
bootstrap_servers = 'localhost:9092'  # Replace with your Kafka broker addresses

def create_topic(admin_client, topic_name, num_partitions, replication_factor):
    """Create a new Kafka topic."""
    topic = NewTopic(topic_name, num_partitions, replication_factor)
    admin_client.create_topics([topic])
    print(f"Topic '{topic_name}' created successfully.")

def list_topics(admin_client):
    """List existing Kafka topics."""
    topics = admin_client.list_topics().topics
    print("Existing topics:")
    for topic in topics:
        print(topic)

def delete_topic(admin_client, topic_name):
    """Delete an existing Kafka topic."""
    admin_client.delete_topics([topic_name])
    print(f"Topic '{topic_name}' deleted successfully.")

def create_admin_client():
    """Create an AdminClient to manage Kafka topics."""
    config = {
        'bootstrap.servers': bootstrap_servers
    }
    admin_client = AdminClient(config)
    return admin_client

# Create an AdminClient
admin_client = create_admin_client()

# Create a new topic
create_topic(admin_client, 'my_topic', num_partitions=3, replication_factor=1)

# List existing topics
list_topics(admin_client)

# Delete an existing topic
delete_topic(admin_client, 'my_topic')

# Close the AdminClient
admin_client.close()


In [None]:
from confluent_kafka import Producer, Consumer, KafkaException, KafkaError

# Configure the Kafka cluster settings
bootstrap_servers = 'localhost:9092'  # Replace with your Kafka broker addresses
topic = 'your_topic'  # Replace with the desired topic name

def delivery_report(err, msg):
    """Callback function called once the message is delivered or delivery fails."""
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

def produce_messages(producer):
    """Produce messages to the Kafka topic."""
    while True:
        message = input("Enter a message (or 'exit' to quit): ")
        if message.lower() == 'exit':
            break
        producer.produce(topic, value=message, callback=delivery_report)
        # Flush the producer to ensure the message is sent
        producer.flush()

def consume_messages(consumer):
    """Consume messages from the Kafka topic."""
    consumer.subscribe([topic])

    while True:
        try:
            message = consumer.poll(1.0)

            if message is None:
                continue
            if message.error():
                if message.error().code() == KafkaError._PARTITION_EOF:
                    print(f'Reached end of partition {message.partition()}')
                else:
                    print(f'Error occurred: {message.error().str()}')
                continue

            # Process the received message
            print(f'Received message: {message.value().decode("utf-8")}')

        except KeyboardInterrupt:
            break

    # Close the consumer
    consumer.close()

def create_producer():
    """Create and configure a Kafka producer."""
    config = {
        'bootstrap.servers': bootstrap_servers
    }
    producer = Producer(config)
    return producer

def create_consumer():
    """Create and configure a Kafka consumer."""
    config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': 'your_consumer_group',  # Replace with your consumer group ID
        'auto.offset.reset': 'earliest'
    }
    consumer = Consumer(config)
    return consumer

# Create a Kafka producer
producer = create_producer()

# Produce messages to the Kafka topic
produce_messages(producer)

# Create a Kafka consumer
consumer = create_consumer()

# Consume messages from the Kafka topic
consume_messages(consumer)


In [None]:
from confluent_kafka import Consumer, KafkaException, KafkaError

# Configure the Kafka cluster settings
bootstrap_servers = 'localhost:9092'  # Replace with your Kafka broker addresses
topic = 'your_topic'  # Replace with the desired topic name
group_id = 'your_consumer_group'  # Replace with your consumer group ID

def consume_messages(consumer):
    """Consume messages from the Kafka topic within the consumer group."""
    consumer.subscribe([topic])

    while True:
        try:
            message = consumer.poll(1.0)

            if message is None:
                continue
            if message.error():
                if message.error().code() == KafkaError._PARTITION_EOF:
                    print(f'Reached end of partition {message.partition()}')
                else:
                    print(f'Error occurred: {message.error().str()}')
                continue

            # Process the received message
            print(f'Consumer: {consumer.consumer_id()}\tPartition: {message.partition()}\tOffset: {message.offset()}\tValue: {message.value().decode("utf-8")}')

        except KeyboardInterrupt:
            break

    # Close the consumer
    consumer.close()

def create_consumer():
    """Create and configure a Kafka consumer within the consumer group."""
    config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'
    }
    consumer = Consumer(config)
    return consumer

# Create a Kafka consumer within the consumer group
consumer = create_consumer()

# Consume messages from the Kafka topic within the consumer group
consume_messages(consumer)
