**Import des packages**

In [6]:
import json
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import Producer, Consumer

In [7]:
conf = {'bootstrap.servers': 'xxxx:9092'}
kadmin = AdminClient(conf)

**Créer un topic**

In [8]:
new_topic = [NewTopic('topicname', num_partitions=3, replication_factor=1),
             NewTopic('test_kakfa', num_partitions=3, replication_factor=1)]
fs = kadmin.create_topics(new_topic)

In [9]:
for topic, f in fs.items():
    try:
        f.result()
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

Topic topicname created
Topic test_kakfa created


**Lister les topics**

In [10]:
topics = kadmin.list_topics().topics
for item in topics.values():
    print(item.topic)

_confluent-controlcenter-6-2-0-1-cluster-rekey
_confluent-controlcenter-6-2-0-1-KSTREAM-OUTEROTHER-0000000106-store-repartition
_confluent-controlcenter-6-2-0-1-TriggerEventsStore-changelog
_confluent-controlcenter-6-2-0-1-group-aggregate-store-ONE_MINUTE-changelog
_confluent-metrics
_confluent-controlcenter-6-2-0-1-group-aggregate-store-ONE_MINUTE-repartition
_confluent-controlcenter-6-2-0-1-MonitoringStream-THREE_HOURS-repartition
_confluent-controlcenter-6-2-0-1-monitoring-message-rekey-store
_confluent-controlcenter-6-2-0-1-MonitoringStream-ONE_MINUTE-changelog
_confluent-controlcenter-6-2-0-1-group-aggregate-store-THREE_HOURS-repartition
_confluent-controlcenter-6-2-0-1-aggregatedTopicPartitionTableWindows-ONE_MINUTE-repartition
_confluent-controlcenter-6-2-0-1-KSTREAM-OUTEROTHER-0000000106-store-changelog
_confluent-controlcenter-6-2-0-1-Group-ONE_MINUTE-changelog
__transaction_state
_confluent-controlcenter-6-2-0-1-Group-THREE_HOURS-changelog
default_ksql_processing_log
_conflue

**Supprimer un topic**

In [19]:
kadmin.delete_topics(["test_kakfa"])

{'test_kakfa': <Future at 0x7f9189c2a810 state=running>}

**Producer**

In [20]:
topic = 'topicname'
producer = Producer(conf)

In [21]:
delivered_records = 0

def acked(err, msg):
    global delivered_records
    """Delivery report handler called on
    successful or failed delivery of message
    """
    if err is not None:
        print("Failed to deliver message: {}".format(err))
    else:
        delivered_records += 1
        print("Produced record to topic {} partition [{}] @ offset {}"
              .format(msg.topic(), msg.partition(), msg.offset()))

for n in range(10):
    record_key = "alice"
    record_value = json.dumps({'count': n})
    print("Producing record: {}\t{}".format(record_key, record_value))
    producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)
    producer.poll(0)

producer.flush()

print("{} messages were produced to topic {}!".format(delivered_records, topic))


Producing record: alice	{"count": 0}
Producing record: alice	{"count": 1}
Producing record: alice	{"count": 2}
Producing record: alice	{"count": 3}
Producing record: alice	{"count": 4}
Producing record: alice	{"count": 5}
Producing record: alice	{"count": 6}
Producing record: alice	{"count": 7}
Producing record: alice	{"count": 8}
Producing record: alice	{"count": 9}
Produced record to topic topicname partition [2] @ offset 0
Produced record to topic topicname partition [2] @ offset 1
Produced record to topic topicname partition [2] @ offset 2
Produced record to topic topicname partition [2] @ offset 3
Produced record to topic topicname partition [2] @ offset 4
Produced record to topic topicname partition [2] @ offset 5
Produced record to topic topicname partition [2] @ offset 6
Produced record to topic topicname partition [2] @ offset 7
Produced record to topic topicname partition [2] @ offset 8
Produced record to topic topicname partition [2] @ offset 9
10 messages were produced to t

**Consumer**

In [22]:
consumer_conf = conf
consumer_conf['group.id'] = 'python_example_group_1'
consumer_conf['auto.offset.reset'] = 'earliest'
consumer = Consumer(consumer_conf)

In [23]:
# Subscribe to topic
consumer.subscribe([topic])

In [24]:
# Process messages
total_count = 0
try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            print("Waiting for message or event/error in poll()")
            continue
        elif msg.error():
            print('error: {}'.format(msg.error()))
        else:
            # Check for Kafka message
            record_key = msg.key()
            record_value = msg.value()
            data = json.loads(record_value)
            count = data['count']
            total_count += count
            print("Consumed record with key {} and value {}, \
                  and updated total count to {}"
                  .format(record_key, record_value, total_count))
except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Consumed record with key b'alice' and value b'{"count": 0}',                   and updated total count to 0
Consumed record with key b'alice' and value b'{"count": 1}',                   and updated total count to 1
Consumed record with key b'alice' and value b'{"count": 2}',                   and updated total count to 3
Consumed record with key b'alice' and value b'{"count": 3}',                   and updated total count to 6
Consumed record with key b'alice' and value b'{"count": 4}',                   and updated total count to 10
Consumed record with key b'alice' and value b'{"count": 5}',                   and updated total count to 15
Consumed record with key b'alice' and value b'{"count": 6}',                   and updated total count to 21
Consumed record with key b'alice' and value b'{"count": 7}',                   and updated total count to 28
Consumed record with key b'alice' and value b'{"count": 8}',                   and updated total count to 36
Consumed record with ke

In [25]:
kadmin.delete_topics(["topicname"])

{'topicname': <Future at 0x7f9189c17250 state=running>}

**Sources:**  
[Github Confluent Kafka](https://github.com/confluentinc/confluent-kafka-python)