# Introduction

![](https://camo.githubusercontent.com/56166d361c3975dee750ecce16d605bbbf66516b/68747470733a2f2f75706c6f61642e77696b696d656469612e6f72672f77696b6970656469612f636f6d6d6f6e732f352f35332f4170616368655f6b61666b615f776f7264747970652e737667)


[Apache Kafka](https://kafka.apache.org/) is a horizontally-scalable, faul-tolerant, distributed message queue. Designed at Linkeding and maintaned by Confluent, it was open-sourced un 2009. Around Kafka, it evolved an ecosystem of solution tailored for (streaming) data ingestion. Today we will focus on Kafka's bacis.

Kafka is implemented using Java and Scala. However, for a number of libraries exist to interact with it your favourite language. Today, we will use python 3 and notebook, as they are a convenient environment for teaching.

## Kafka Architecture

![](https://github.com/DataSystemsGroupUT/dataeng/blob/dataeng/attachments/KafkaArchitecture.png?raw=true)

## Interacting with the Cluster (Zookeper) using the Admin API.

During the first part of the class, we will understand how we can interact with the cluster, handle the topic life-cycle. If not explicitly created, topics are create the first time a producer tries to write or a consumer tries to read using the default configuration (1 partition no replicas, 1 week retention)

In [1]:
from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions
from confluent_kafka import KafkaException
import sys
from uuid import uuid4

In [2]:
brokers = "kafka1:9092,kafka2:9093" # Brokers act as cluster entripoints

In [3]:
conf = {'bootstrap.servers': brokers}

In [4]:
a = AdminClient(conf)

In [5]:
md = a.list_topics(timeout=10)

In [6]:
print(" {} topics:".format(len(md.topics)))
for t in iter(md.topics.values()):
    if t.error is not None:
        errstr = ": {}".format(t.error)
    else:
        errstr = ""
    print("  \"{}\" with {} partition(s){}".format(t, len(t.partitions), errstr))

 4 topics:
  "_confluent-metrics" with 12 partition(s)
  "__confluent.support.metrics" with 1 partition(s)
  "_schemas" with 1 partition(s)
  "__consumer_offsets" with 50 partition(s)


You probably have noticed that there are some topics we did not create. They are prefixed with underscores, and are in practice "private" topics used internally by Kafka to manage the cluster.

### Creating Topics

In [7]:
topic_names = ["test1p", "test2p", "test2die"]

In [8]:
new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topic_names]

#### Number of Partitions

![](https://miro.medium.com/max/915/1*GoRlq7O8qMNui6tvnq30cg.png)

#### Replication Factor

![](https://content.linkedin.com/content/dam/engineering/en-us/blog/migrated/kafka_replication_diagram.png)

### Remember: Producers/Consumers do not access followers
![](https://github.com/DataSystemsGroupUT/dataeng/raw/dataeng/attachments/replicas2.png)

In [9]:
fs = a.create_topics(new_topics)

In [10]:
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

Topic test1p created
Topic test2p created
Topic test2die created


### Let's see how each topics are configured

In [11]:
md = a.list_topics(timeout=10)
print(" {} topics:".format(len(md.topics)))
for t in iter(md.topics.values()):
    if t.error is not None:
        errstr = ": {}".format(t.error)
    else:
        errstr = ""
    print("  \"{}\" with {} partition(s){}".format(t, len(t.partitions), errstr))

 7 topics:
  "test2p" with 1 partition(s)
  "_schemas" with 1 partition(s)
  "test1p" with 1 partition(s)
  "__consumer_offsets" with 50 partition(s)
  "_confluent-metrics" with 12 partition(s)
  "test2die" with 1 partition(s)
  "__confluent.support.metrics" with 1 partition(s)


### "Deleting" Topics

We actually schedule for deletion. Topics are deleted **eventually** by the Kafka Cluster.

In [12]:
ds = a.delete_topics([topic_names[2]], operation_timeout=30)
for topic, f in ds.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} deleted".format(topic))
    except Exception as e:
        print("Failed to delete topic {}: {}".format(topic, e))

Topic test2die deleted


### Creating Partitions

Topics are created with a default number of partitions. Partition can be alterated (add/remove). However, this practice requires to rebalance the brokers (leaders and replicas) and requires time.

In [13]:
topic = topic_names[1]

In [14]:
new_parts = [NewPartitions(topic, int(2))]

In [15]:
fs = a.create_partitions(new_parts, validate_only=False)
# Wait for operation to finish.
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Additional partitions created for topic {}".format(topic))
    except Exception as e:
        print("Failed to add partitions to topic {}: {}".format(topic, e))

Additional partitions created for topic test2p


In [16]:
md = a.list_topics(timeout=10)
for t in iter(md.topics.values()):
    if str(t)==topic:
        l = t.partitions.values()
        print("Topic {} has {} partitions".format(t, len(l)))
        for p in iter(l):
            if p.error is not None:
                errstr = ": {}".format(p.error)
            else:
                errstr = ""
            print("partition {} leader: {}, replicas: {}," 
                  " isrs: {} errstr: {}".format(p.id, p.leader, p.replicas, p.isrs, errstr))

Topic test2p has 2 partitions
partition 0 leader: 1, replicas: [1], isrs: [1] errstr: 
partition 1 leader: 2, replicas: [2], isrs: [2] errstr: 


# Producer and Consumer API

In this part of the class, we start producing and consuming from a topic

## Write to Topic "test1p"

In [17]:
from confluent_kafka import Producer
import sys
conf = {'bootstrap.servers': brokers}

In [18]:
p = Producer(**conf)

#### Producer API requires a call back function to control the message delivery (Python)

In [20]:
def delivery_callback(err, msg):
        if err:
            sys.stderr.write('%% Message failed delivery: %s\n' % err)
        else:
            sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' %
                             (msg.topic(), msg.partition(), msg.offset()))

### Let's send some messages

In [21]:
for n in range(1,10):
    try:
        # Produce line (without newline)
        print(n)
        p.produce(topic_names[0], str(n), callback=delivery_callback)
        p.poll(0)
    except BufferError:
        sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))

1
2
3
4
5
6
7
8
9


  p.produce(topic_names[0], str(n), callback=delivery_callback)


**Ensure nothing is in the buffered by the producer.**

In [22]:
p.flush()

% Message delivered to test1p [0] @ 0
% Message delivered to test1p [0] @ 1
% Message delivered to test1p [0] @ 2
% Message delivered to test1p [0] @ 3
% Message delivered to test1p [0] @ 4
% Message delivered to test1p [0] @ 5
% Message delivered to test1p [0] @ 6
% Message delivered to test1p [0] @ 7
% Message delivered to test1p [0] @ 8


0

## Reading from Topic "test1p"

In [24]:
from confluent_kafka import Consumer, KafkaException

In [25]:
conf = {
    'bootstrap.servers': brokers, 
    'group.id': str(uuid4()), 
    'session.timeout.ms': 6000,
    'auto.offset.reset': 'earliest'
}

In [26]:
c = Consumer(conf)

In [27]:
c.subscribe([topic_names[0]])

In [28]:
# Read messages from Kafka, print to stdout
try:
    while True: #Consumer run forever
        msg = c.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        else:
            # Proper message
            sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
             (msg.topic(), msg.partition(), msg.offset(), str(msg.key())))
            print(msg.value())
except KeyboardInterrupt:
    sys.stderr.write('%% Aborted by user\n')
finally:
    # Close down consumer to commit final offsets.
    c.close()

% test1p [0] at offset 0 with key None:
% test1p [0] at offset 1 with key None:
% test1p [0] at offset 2 with key None:
% test1p [0] at offset 3 with key None:
% test1p [0] at offset 4 with key None:
% test1p [0] at offset 5 with key None:
% test1p [0] at offset 6 with key None:
% test1p [0] at offset 7 with key None:
% test1p [0] at offset 8 with key None:


b'1'
b'2'
b'3'
b'4'
b'5'
b'6'
b'7'
b'8'
b'9'


%% Aborted by user


## Write to Topic "test2p"

#### What is important about "test2p"?

In [29]:
conf = {'bootstrap.servers': brokers}
p = Producer(**conf)

In [30]:
for n in range(1,10):
    try:
        # Produce line (without newline)
        print(n)
        p.produce(topic_names[1], str(n), callback=delivery_callback)
        p.poll(0)
        p.flush()
    except BufferError:
        sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))

  p.produce(topic_names[1], str(n), callback=delivery_callback)


1
2
3
4
5
6
7
8
9


% Message delivered to test2p [1] @ 0
% Message delivered to test2p [0] @ 0
% Message delivered to test2p [1] @ 1
% Message delivered to test2p [1] @ 2
% Message delivered to test2p [1] @ 3
% Message delivered to test2p [0] @ 1
% Message delivered to test2p [1] @ 4
% Message delivered to test2p [1] @ 5
% Message delivered to test2p [0] @ 2


## Reading from Topic "test2p"

In [31]:
conf = {
    'bootstrap.servers': brokers, 
    'group.id': str(uuid4()), 
    'session.timeout.ms': 6000,
    'auto.offset.reset': 'earliest'
}

In [32]:
c = Consumer(conf)
c.subscribe([topic_names[1]])

In [33]:
# Read messages from Kafka, print to stdout
try:
    while True:
        msg = c.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        else:
            # Proper message
            print("{} [{}] at offset {} with key  {}:  {}".format(msg.topic(), msg.partition(), msg.offset(), str(msg.key()), str(msg.value())))
except KeyboardInterrupt:
    sys.stderr.write('%% Aborted by user\n')
finally:
    # Close down consumer to commit final offsets.
    c.close()

test2p [1] at offset  0 with key  None:  b'1'
test2p [1] at offset  1 with key  None:  b'3'
test2p [1] at offset  2 with key  None:  b'4'
test2p [1] at offset  3 with key  None:  b'5'
test2p [1] at offset  4 with key  None:  b'7'
test2p [1] at offset  5 with key  None:  b'8'
test2p [0] at offset  0 with key  None:  b'2'
test2p [0] at offset  1 with key  None:  b'6'
test2p [0] at offset  2 with key  None:  b'9'


%% Aborted by user


### What do you notice?

![](https://github.com/DataSystemsGroupUT/dataeng/raw/dataeng/attachments/order.png)