# Redpanda Tutorial: Producing and Consuming Events

## In this tutorial, we'll cover:
* Installing the Confluent Kafka SDK module
* Connecting to a Redpanda Server
* Producing Events with UUID Keys and JSON Values
* Consuming Events from the Same Topic

## Install Packages

In [3]:
%pip install --upgrade --quiet confluent-kafka

[0mNote: you may need to restart the kernel to use updated packages.


## Set Environment Variables

In [4]:
import env
env.load()

## Module Import And Client Setup

In [5]:
import os
import uuid
import json
from confluent_kafka import Producer, Consumer, KafkaError

REDPANDA_BOOTSTRAP_SERVERS = os.environ['REDPANDA_BOOTSTRAP_SERVERS']

topic_name_products = 'cillers_play_products'

## Produce Events

In [6]:
producer_conf = {
    'bootstrap.servers': REDPANDA_BOOTSTRAP_SERVERS
}

producer = Producer(producer_conf)

def delivery_callback(err, msg):
    if err:
        print(f'Error: Message failed delivery: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')

# Produce 3 sample events
for i in range(3):
    # Generate a UUID as key
    key = str(uuid.uuid4())
    
    # Create a JSON object with the structure {"name": name}
    name = f'Product {i}'
    value = json.dumps({"name": name})
    
    # Produce the message
    producer.produce(topic_name_products, key=key.encode('utf-8'), value=value.encode('utf-8'), callback=delivery_callback)
    producer.flush()  # Ensure all messages are sent

Message delivered to cillers_play_products [0] at offset 3
Message delivered to cillers_play_products [0] at offset 4
Message delivered to cillers_play_products [0] at offset 5


## Consume Events

We'll consume messages in a loop. In a real-world scenario, you'd likely run this in a separate thread or process.

In [7]:
consumer_conf = {
    'bootstrap.servers': REDPANDA_BOOTSTRAP_SERVERS,
    'group.id': 'my_consumer_group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(consumer_conf)

consumer.subscribe([topic_name_products])

try:
    while True:
        msg = consumer.poll(1.0)  # Poll for messages with a timeout of 1 second

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                print(f'Reached end of partition at offset {msg.offset()}')
            else:
                print(f'Error: {msg.error()}')
        else:
            print(f'Received message: {msg.value().decode("utf-8")} at offset {msg.offset()}')
except KeyboardInterrupt:
    print('Aborted by user')
finally:
    # Close down consumer to commit final offsets.
    consumer.close()


Received message: {"name": "Product 0"} at offset 3
Received message: {"name": "Product 1"} at offset 4
Received message: {"name": "Product 2"} at offset 5
Aborted by user
