# Python Client Tutorial

This tutorial introduces the Producer, Consumer, and AdminClient classes from the confluent-kafka package.

Requirements:
- Python 3
- docker-compose (for running Kafka locally)

## Docker Compose

The following cell will create the `docker-compose.yml` file that will be used to start an Apache Kafka broker, and a Zookeeper node. 

In [1]:
%%bash
echo "---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.1.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - \"2181:2181\"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.1.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - \"29092:29092\"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0" > docker-compose.yml


Now we can run the following command to spin up our infrastructure.

In [2]:
%%bash
docker-compose up -d

Creating network "kafka-python-tutorials_default" with the default driver
Creating zookeeper ... 
Creating zookeeper ... done
Creating broker    ... 
Creating broker    ... done


## Confluent Kafka

The classes that we will be working with in this tutorial, Producer, Consumer, and AdminClient, all reside in the `confluent-kafka` package, which we will install in the next cell.

In [3]:
%pip install confluent-kafka

[0mNote: you may need to restart the kernel to use updated packages.


## Configuration

The `AdminClient`, `Producer`, and `Consumer` will all take a configuration dictionary in their constructor. For the first two, the only value we *have to* have is `bootstrap.servers`. So, we will start off by creating a dictionary with a `bootstrap.servers` that points to the Kafka broker we started with docker-compose.compile


In [4]:
config = {
          "bootstrap.servers":"127.0.0.1:29092"
         }

## AdminClient

Next let's use the `AdminClient` to create a new topic.

In [5]:
from confluent_kafka.admin import AdminClient, NewTopic

admin = AdminClient(config)
result_dict = admin.create_topics([NewTopic("hello_topic", num_partitions=1, replication_factor=1)])
for topic, future in result_dict.items():
    try:
        future.result()  # The result itself is None
        print(f"Topic {topic} created")
    except Exception as e:
            print(f"Failed to create topic {topic}: {e}")

Topic hello_topic created


The previous cell should have shown a message confirming that our new topic was created, but we can also use the `list_topics()` method of the `AdminClient` to confirm that our new topic exists.

In [6]:
md = admin.list_topics()
print(md.topics.keys())

dict_keys(['hello_topic'])


## Producer

Now we will use the `Producer` class to produce events to Kafka. First we need to import the `Producer` from the `confluent_kafka` package, then we can use the same config dictionay we used for our `AdminClient` to create our `Producer` instance.

In [7]:
from confluent_kafka import Producer

producer = Producer(config)

The `produce()` method of the `Producer` class is very straight forward and easy to use. So, let's use it to send a handful of `Hello world` events in different languages.

In [8]:
producer.produce("hello_topic", key="a", value="Hello world!")
producer.produce("hello_topic", key="a", value="¡Hola Mundo!")
producer.produce("hello_topic", key="a", value="Hallo Wereld!")
producer.produce("hello_topic", key="a", value="Bonjour monde!")
producer.produce("hello_topic", key="a", value="Hej världen!")


## Consumer

Our Hello world events have been produced, but it would be great to verify that. For this we'll use the `Consumer` class.

In this next cell, we'll import the `Consumer` class and then we'll add a couple of consumer specific properties to our config dictionary. The default value for `auto.offset.reset` is `latest`, but if we leave that, our `Consumer` will not pick up the events we just produced. So, we'll set it to `earliest`. The other property we're adding is `group.id` and it can be set to any string value.

In [9]:
from confluent_kafka import Consumer
config["auto.offset.reset"] = "earliest"
config["group.id"] = "kafka-tutorial"

Next we'll use the updated config dictionary to construct our `Consumer` instance. Before we can begin consuming events from a Kafka topic, we need to subscribe to that topic. The `subscribe()` method takes a list of topic names.

In [10]:
consumer = Consumer(config)
consumer.subscribe(["hello_topic"])

Now we can use the `poll()` method of the `Consumer` class to retrieve events from our Kafka topic. Often the `poll()` method will be used in a continuous loop, but for our purposes here, we will use a finite loop based on the number of events we produced earlier. Feel free to adjust this number if you had added more events in your favorite languages.

In [11]:
for i in range(5):
    evt = consumer.poll(1.0)
    if evt is None:
        pass
    else:
        print(evt.value())

b'Hello world!'
b'\xc2\xa1Hola Mundo!'
b'Hallo Wereld!'
b'Bonjour monde!'
b'Hej v\xc3\xa4rlden!'


## Cleanup

Congratulations! You've completed this tutorial and have used the `AdminClient`, `Producer`, and `Consumer` classes of the `confluent-kafka` package. Now we will make one more call to `docker-compoase` to shut down the infrastructure we started up at the beginning of this tutoria.

In [None]:
%%bash
docker-compose down