# Kafka and Python - Examples
4OH4  
January 2021

This notebook contains examples of interfacing with Kafka using the `kafka-python` package. The threaded wrapper classes defined below allow demonstration of multi-producer and consumer systems on a single host, and show how allocating consumers to groups and partitioning topics can be used to scale workloads horizontally.

As the wrapper classes are threaded, it is not possible to 'Run all cells' in this notebook - instead, execute them one-by-one using `Shift-Enter`.

## Installation and setup

Install the Python package requirements:  
`pip install -r requirements.txt`

Bring up the Kafka and Zookeeper containers using Docker Compose:   
`docker-compose up`

## Simple publish-subscribe

In [1]:
from kafka import KafkaConsumer, KafkaProducer

kafka_host = 'localhost:9092'

def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=[kafka_host], api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer
    

def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message published successfully.')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))


In [2]:
producer = connect_kafka_producer()

topic_name = 'my_topic'
key = 'a'
value = '123'

publish_message(producer, topic_name, key, value)

Message published successfully.


In [3]:
consumer = KafkaConsumer(topic_name, 
                         auto_offset_reset='earliest',
                         bootstrap_servers=[kafka_host], 
                         api_version=(0, 10), 
                         consumer_timeout_ms=1000)

for msg in consumer:
    print('Message received:')
    print(f'\t Topic: {msg.topic}')
    print(f'\t Partition: {msg.partition}')
    print(f'\t Offset: {msg.offset}')
    print(f'\t Timestamp: {msg.timestamp}')
    print(f'\t Key/value: {msg.key.decode()}:{msg.value.decode()}')
    
consumer.close()

Message received:
	 Topic: my_topic
	 Partition: 0
	 Offset: 0
	 Timestamp: 1610924763057
	 Key/value: a:123


## Multiple consumers and producers

The threaded wrapper classes send and receive messages, with extra logging so that you can see whats going on. We send a key-value pair consisting of a randomly generated string and an incremental integer message ID - in production use the value would typically be more complex, such as JSON data.

In [4]:
import uuid
import threading
import time
import random
import logging
import sys

from kafka import KafkaConsumer, KafkaProducer

logging.getLogger("python-kafka").addHandler(logging.StreamHandler())

In [5]:

class Base:
    # Base class to store meta data and set up logging
    
    def __init__(self, id, topic, verbose):
        
        if id is not None:
            self._id = id
        else:
            self._id = str(uuid.uuid1())[:8]
            
        self.topic = topic
        self._verbose = verbose
        
        self._logger = logging.getLogger("python-kafka")
        if self._verbose:
            self._logger.setLevel("DEBUG")
        else:
            self._logger.setLevel("INFO")
            

class Producer(Base):
    # Wrapper class around KafkaProducer with logging and random, asynchronous message sending
    
    max_run_time = 10  # seconds
    time_delta = 1.01 # seconds, between sending messages
    
    message_count = 0
    
    def __init__(self, id=None, topic="main_topic", send_prob=0.5, verbose=True):
        super().__init__(id, topic, verbose)
        
        self.send_prob = send_prob  # probability to send per time_delta [0-1]
        
        try:
            self._producer = KafkaProducer(bootstrap_servers=[kafka_host], 
                                           client_id=self._id, 
                                           api_version=(0, 10))
            self._logger.info(f"Producer {self._id}: created")
        except Exception as ex:
            self._logger.error("Exception occured while connecting to Kafka")
            self._logger.error(str(ex))
            
        self._running = True
        self._thread = threading.Thread(target=self._random_send).start()
        
    def _random_send(self):
        # Send a bunch of messages at random over a period of time
        run_time = time.time()
        time.sleep(self.time_delta)
        while self._running and ((time.time()-run_time) < self.max_run_time):
            if random.uniform(0,1) > (1-self.send_prob):
                self.publish_message(topic_name=self.topic,
                                    key=str(uuid.uuid1())[:8],
                                    value=str(self.message_count))
                self.message_count += 1
            if not self._verbose:
                print(".", end="")
                
            time.sleep(self.time_delta)
            
        # block until all async messages are sent
        self._producer.flush()
        time.sleep(1)
        self._logger.info(f"Producer {self._id}: finished - {self.message_count} messages sent\r")
        
    def stop(self):
        self._running = False
    
    def publish_message(self, topic_name, key, value):
        try:
            if key is not None:
                key = bytes(key, encoding='utf-8')
            if value is not None:
                value = bytes(value, encoding='utf-8')
            self._producer \
                .send(topic_name, key=key, value=value) \
                .add_callback(self.on_send_success) \
                .add_errback(self.on_send_error)
            self._logger.debug(f"Producer {self._id}: message queued for send\r")
            
            # block until all async messages are sent
            # self._producer.flush()
            
        except Exception as ex:
            self._logger.error(f"Producer {self._id}: exception occured whilst publishing message\r")
            print(str(ex))        
    
    def on_send_success(self, record_metadata):
        # Callback function, called when message is sent
        self._logger.debug(f"Producer {self._id}: message sent successfully - topic: {record_metadata.topic}, partition: {record_metadata.partition}, offset: {record_metadata.offset}   \r")

    def on_send_error(self, excp):
        # Callback function, if there are errors during sending
        self._logger.error(f"Producer {self._id}: exception occured while sending message\r", exc_info=excp)
        # handle exception


In [6]:
class Consumer(Base):
    # Wrapper class around KafkaConsumer with logging and threading
    
    max_run_time = 12  # seconds
    time_delta = 1 # seconds, between message polling
    
    message_count = 0
    
    def __init__(self, id=None, topic="main_topic", group_id=None, verbose=True):
        super().__init__(id, topic, verbose)
        
        try:
            self._consumer = KafkaConsumer(self.topic,
                                           client_id=self._id,
                                           #auto_offset_reset='earliest', # if set, the offset begins at zero (or lowest)
                                           group_id=group_id,
                                           bootstrap_servers=[kafka_host], 
                                           api_version=(0, 10))
            self._logger.info(f"Consumer {self._id}: created\r")
        except Exception as ex:
            self._logger.error("Exception occured while connecting to Kafka\r", exc_info=ex)
            
        self._running = True
        self._thread = threading.Thread(target=self._receive).start()
        
    def _receive(self):
        # Receive messages from Kafka over a time period
        run_time = time.time()
        while self._running and ((time.time()-run_time) < self.max_run_time):
            self._logger.debug(f"Consumer {self._id}: polling for new messages\r")
            response = self._consumer.poll(timeout_ms=self.time_delta*1000)
            for topic_partition, messages in response.items():
                for msg in messages:
                    self._handle_message(msg)
                    self.message_count += 1
        self.stop()
        
    def stop(self):
        self._running = False
        self._consumer.close()
        self._logger.info(f"Consumer {self._id}: finished - {self.message_count} messages received\r")        
        
    def _handle_message(self, msg):
        self._logger.debug(f"Consumer {self._id}: message received - topic: {msg.topic}, partition: {msg.partition}, offset: {msg.offset}, key/value: {msg.key.decode()}:{msg.value.decode()}\r")


In [7]:
# One consumer, one producer
c1 = Consumer()
p1 = Producer()

Consumer 96866eb8: created
Consumer 96866eb8: polling for new messages
Producer 96886f93: created
Consumer 96866eb8: polling for new messages
Producer 96886f93: message queued for send
Consumer 96866eb8: message received - topic: main_topic, partition: 0, offset: 0, key/value: 97243cc9:0
Producer 96886f93: message sent successfully - topic: main_topic, partition: 0, offset: 0   
Consumer 96866eb8: polling for new messages
Producer 96886f93: message queued for send
Consumer 96866eb8: polling for new messages
Producer 96886f93: message sent successfully - topic: main_topic, partition: 0, offset: 1   
Consumer 96866eb8: message received - topic: main_topic, partition: 0, offset: 1, key/value: 97c13a56:1
Consumer 96866eb8: polling for new messages
Consumer 96866eb8: polling for new messages
Producer 96886f93: message queued for send
Producer 96886f93: message sent successfully - topic: main_topic, partition: 0, offset: 2   
Consumer 96866eb8: message received - topic: main_topic, partition

In [8]:
# One consumer, two producers
c1 = Consumer()
p1 = Producer()
p2 = Producer()

Consumer a0d1d06e: created
Consumer a0d1d06e: polling for new messages
Producer a0d37e30: created
Producer a0d52bdb: created
Consumer a0d1d06e: polling for new messages
Producer a0d37e30: message queued for send
Consumer a0d1d06e: message received - topic: main_topic, partition: 0, offset: 7, key/value: a16f6a35:0
Producer a0d37e30: message sent successfully - topic: main_topic, partition: 0, offset: 7   
Consumer a0d1d06e: polling for new messages
Producer a0d37e30: message queued for send
Consumer a0d1d06e: message received - topic: main_topic, partition: 0, offset: 8, key/value: a20ab167:1
Producer a0d52bdb: message queued for send
Producer a0d37e30: message sent successfully - topic: main_topic, partition: 0, offset: 8   
Consumer a0d1d06e: polling for new messages
Producer a0d52bdb: message sent successfully - topic: main_topic, partition: 0, offset: 9   
Consumer a0d1d06e: message received - topic: main_topic, partition: 0, offset: 9, key/value: a20b268c:0
Consumer a0d1d06e: poll

Again - we see here that the single consumer picks up all messages from both producers

In [11]:
# Two consumers, one producer 
c1 = Consumer(id=1)
c2 = Consumer(id=2)
p1 = Producer()

Consumer 1: created
Consumer 1: polling for new messages
Consumer 2: created
Consumer 2: polling for new messages
Producer e573e501: created
Consumer 1: polling for new messages
Consumer 2: polling for new messages
Producer e573e501: message queued for send
Consumer 2: message received - topic: main_topic, partition: 0, offset: 29, key/value: e60f134e:0
Consumer 1: message received - topic: main_topic, partition: 0, offset: 29, key/value: e60f134e:0
Producer e573e501: message sent successfully - topic: main_topic, partition: 0, offset: 29   
Consumer 2: polling for new messages
Consumer 1: polling for new messages
Producer e573e501: message queued for send
Producer e573e501: message sent successfully - topic: main_topic, partition: 0, offset: 30   
Consumer 1: message received - topic: main_topic, partition: 0, offset: 30, key/value: e6a9f1d9:1
Consumer 2: message received - topic: main_topic, partition: 0, offset: 30, key/value: e6a9f1d9:1
Consumer 1: polling for new messages
Consumer

When running two consumers, they both pick up all the messages. Whilst this could sometimes be useful, in most cases this is undesireable as it means that effort is being duplicated. To effectively scale our application workload across multiple nodes, we want them to coordinate to split up the work between them.

To make this happen, we assign the consumers to the same consumer group.

In [13]:
# Two consumers, one producer 
c1 = Consumer(id=1, group_id='consumers1')
c2 = Consumer(id=2, group_id='consumers1')
p1 = Producer()

Consumer 1: created
Consumer 1: polling for new messages
Consumer 2: created
Consumer 2: polling for new messages
Producer 059c8141: created
Producer 059c8141: message queued for send
Producer 059c8141: message sent successfully - topic: main_topic, partition: 0, offset: 44   
Producer 059c8141: message queued for send
Producer 059c8141: message sent successfully - topic: main_topic, partition: 0, offset: 45   
Consumer 2: polling for new messages
Consumer 1: message received - topic: main_topic, partition: 0, offset: 44, key/value: 0637e2ad:0
Consumer 1: message received - topic: main_topic, partition: 0, offset: 45, key/value: 06d30007:1
Consumer 1: polling for new messages
Producer 059c8141: message queued for send
Producer 059c8141: message sent successfully - topic: main_topic, partition: 0, offset: 46   
Consumer 1: message received - topic: main_topic, partition: 0, offset: 46, key/value: 0807a000:2
Consumer 1: polling for new messages
Consumer 2: polling for new messages
Consum

Although the consumers are no longer duplicating their effort, there is a complication here - now all the messages are being processed by a single consumer. This is because our topic only has a single *partition*.

Kafka dividies a topic into partitions, and only one consumer from a consumer group can consume from each partition. As a general rule, the number of partitions in a topic should be greater than the number of consumers.

Further reading:
https://blog.cloudera.com/scalability-of-kafka-messaging-using-consumer-groups/

In [14]:
# Print the partition IDs
topic = "main_topic"
consumer = KafkaConsumer(
    topic, bootstrap_servers=kafka_host)

partition_ids = consumer.partitions_for_topic(topic)
consumer.close()

print(f"{len(partition_ids)} partition(s) found, identifiers: {partition_ids}")

1 partition(s) found, identifiers: {0}


### Create a new topic with multiple partitions

We can use the admin client interface to create a new topic, and set the number of partitions.

In [15]:
from kafka.admin import KafkaAdminClient, NewTopic

admin_client = KafkaAdminClient(bootstrap_servers=kafka_host)

topic_list = []
topic_list.append(NewTopic(name="new_topic", num_partitions=9, replication_factor=1))
admin_client.create_topics(new_topics=topic_list)

CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='new_topic', error_code=0, error_message=None)])

In [16]:
# Two consumers, one producer
c1 = Consumer(id="1", topic="new_topic", group_id='consumers2', verbose=False)
c2 = Consumer(id="2", topic="new_topic", group_id='consumers2', verbose=False)
c3 = Consumer(id="3", topic="new_topic", group_id='consumers2', verbose=False)
producer_array = []
for _ in range(5):
    this_producer = Producer(topic="new_topic", send_prob=1, verbose=False)
    this_producer.time_delta = 0.1  # secs, for high-rate sending
    producer_array.append(this_producer)

Consumer 1: created
Consumer 2: created
Consumer 3: created
Producer 1881cfcb: created
Producer 1883083b: created
Producer 1883cf35: created
Producer 18852b3b: created
Producer 18863d1f: created


.............................................................................................................................................................................................................................................................................................................................................................................................................................................................

Producer 1881cfcb: finished - 89 messages sent
Producer 1883083b: finished - 89 messages sent
Producer 1883cf35: finished - 89 messages sent
Producer 18852b3b: finished - 89 messages sent
Producer 18863d1f: finished - 89 messages sent
Consumer 3: finished - 54 messages received
Consumer 1: finished - 75 messages received
Consumer 2: finished - 54 messages received


Now, with the consumers in a new group and the number of partitions higher, the messages are divided (at random) between the three consumers.

Note: in the producer, we generate a random string as the `key` for each message. This is hashed by the message broker to assign messages to partitions - if every message had the same key, they would all be asigned to the same partition.

### Administration

In [None]:
# Delete 'new_topic'
admin_client.delete_topics(topics=['new_topic'])