For this tutorial we will use kafka-python package which is a Python client for Apache Kafka

In [1]:
!pip install kafka-python



###Download and setup Kafka and Zookeeper instances
###For demo purposes, the following instances are setup locally:

Kafka (Brokers: 127.0.0.1:9092)<br>
Zookeeper (Node: 127.0.0.1:2181)

In [6]:
!curl -sSOL https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
!tar -xzf kafka_2.13-3.7.0.tgz

In [7]:
!kafka_2.13-3.7.0/bin/zookeeper-server-start.sh config/zookeeper.properties

[2024-04-28 04:49:18,187] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-04-28 04:49:18,195] WARN config/zookeeper.properties is relative. Prepend ./ to indicate that you're sure! (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-04-28 04:49:18,197] ERROR Invalid config, exiting abnormally (org.apache.zookeeper.server.quorum.QuorumPeerMain)
org.apache.zookeeper.server.quorum.QuorumPeerConfig$ConfigException: Error processing config/zookeeper.properties
	at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:198)
	at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:125)
	at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:91)
Caused by: java.lang.IllegalArgumentException: config/zookeeper.properties file is missing
	at org.apache.zookeeper.server.util.VerifyingFileFactory.doFailForNonExistingPath(VerifyingFileFact

In [8]:
!kafka_2.13-3.7.0/bin/kafka-server-start.sh config/server.properties

[2024-04-28 04:49:20,072] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2024-04-28 04:49:20,376] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.nio.file.NoSuchFileException: config/server.properties
	at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
	at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
	at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
	at java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:219)
	at java.base/java.nio.file.Files.newByteChannel(Files.java:371)
	at java.base/java.nio.file.Files.newByteChannel(Files.java:422)
	at java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:420)
	at java.base/java.nio.file.Files.newInputStream(Files.java:156)
	at org.apache.kafka.common.utils.Utils.loadProps(Utils.java:686)
	at kafka.Kafka$.getPropsFr

Using the default configurations (provided by Apache Kafka) for spinning up the instances.

In [9]:
!kafka_2.13-3.7.0/bin/zookeeper-server-start.sh -daemon kafka_2.13-3.7.0/config/zookeeper.properties
!kafka_2.13-3.7.0/bin/kafka-server-start.sh -daemon kafka_2.13-3.7.0/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10

Waiting for 10 secs until kafka and zookeeper services are up and running


Once the instances are started as daemon processes, grep for kafka in the processes list.<br>The two java processes correspond to zookeeper and the kafka instances.

In [10]:
!ps -ef | grep kafka

root        3125     783  0 04:49 ?        00:00:00 /bin/bash -c ps -ef | grep kafka
root        3127    3125  0 04:49 ?        00:00:00 grep kafka


For this tutorial, we will simulate a bunch of tasks that start and stop randomly for 10 seconds. After 10 seconds, we stop the simulation.

Each time a task starts, it writes a message in the hello topic and when it ends it writes a message in the goodbye topic.

We will create `hello` topic to have 1 partition whereas the `goodbye` topic will have 2 partitions.<br>
We will create 2 consumers each consuming from the partition(s) from one topic.

Create the kafka topics with the following specs:

hello: partitions=1, replication-factor=1<br>
goodbye: partitions=2, replication-factor=1

In [11]:
!kafka_2.13-3.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic hello
!kafka_2.13-3.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic goodbye

Created topic hello.
Created topic goodbye.


Describe the topic for details on the configuration

In [12]:
!kafka_2.13-3.7.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic hello
!kafka_2.13-3.7.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic goodbye

Topic: hello	TopicId: DMvv9z18TmWmGWK0LWN5jg	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: hello	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic: goodbye	TopicId: qnx_ZBRlTUSa78kPXjTIhA	PartitionCount: 2	ReplicationFactor: 1	Configs: 
	Topic: goodbye	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: goodbye	Partition: 1	Leader: 0	Replicas: 0	Isr: 0


The replication factor 1 indicates that the data is not being replicated. This is due to the presence of a single broker in our kafka setup.
In production systems, the number of bootstrap servers can be in the range of 100's of nodes. That is where the fault-tolerance using replication comes into picture.

Please refer to the [docs](https://kafka.apache.org/documentation/#replication) for more details.


First we create a producer instance to add data into Kafka

In [13]:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

Now we create the consumers. For this simulation, we will use a timeout of 100ms.

First we create a consumer instance to consume data from `hello` topic

In [14]:
from kafka import KafkaConsumer
consumer_hello = KafkaConsumer('hello', bootstrap_servers=['127.0.0.1:9092'], group_id='hello_logger', consumer_timeout_ms=100)

We create another consumer instance to consume data from the `goodbye` topic

In [15]:
consumer_goodbye = KafkaConsumer('goodbye', bootstrap_servers=['127.0.0.1:9092'], group_id='goodbye_logger', consumer_timeout_ms=100)

Running a small test before our simulation to check if producer and consumers are working correctly

Note that we are encoding and decoding our messages using `utf-8`.<br>
While encoding we achieve it using the `b` prefix before our message.

In [16]:
#send test messsage to hello topic
producer.send('hello', b'1234')

#send test messsage to goodbye topic
producer.send('goodbye', b'5678')

producer.flush()

#Get message from hello topic
for msg in consumer_hello:
  print(f"Hello {msg.value.decode('utf-8')}")

#Get message from goodbye topic
for msg in consumer_goodbye:
  print(f"Goodbye {msg.value.decode('utf-8')}")

The outputs look good! Let's move on to the simulation now.

Note that we run `producer.flush()` at the end of the producer test.<br>

Kafka likes to batch multiple messages and send them at once. While `flush` puts the messages at least on the network, it does not however guarantee delivery to Kafka for consumption by consumers.

We now create helper functions to randomly create task, stop tasks, send message to Kafka and consume message.

We are defining consumer processes to be simple message loggers.

In [17]:
import random

def randomly_stop_task(task_list):
  """
  Randomly decide whether to stop a task or not
  if a task is to be stopped return its id
  else return None
  """

  #if some tasks are running, then randomly decide whether to stop a task
  if task_list:

    #decide if a task is to be stopped
    if random.random() > 0.5:

      #choose the task to stop randomly
      stop_task_pos = random.randint(0, len(task_list)-1)

      #stop the task by removing it from the list of running tasks
      stopped_task = task_list.pop(stop_task_pos)

      return stopped_task

  return None

In [18]:
def randomly_create_task(task_counter):
  """
  Randomly decide whether to create a task or not
  If a task is to be created return its id
  else return None
  """
  #decide if a task is to be created
  if random.random() > 0.5:

    #New task id is the total number of tasks created + 1
    return task_counter + 1

  return None

In [19]:
def send_message(stopped_task_id, producer, type='HELLO'):
  #create message
  message = b'%d'%stopped_task_id

  #send message to kafka
  if type == 'HELLO':
    producer.send('hello', message)
  elif type == 'GOODBYE':
    producer.send('goodbye', message)
  producer.flush()

def consume_message(msg, type='HELLO'):
  if type=='HELLO':
    print(f"task {msg.value.decode('utf-8')} started")
  elif type=='GOODBYE':
    print(f"task {msg.value.decode('utf-8')} stopped")
  print()

Simulation code

In [20]:
from time import sleep

#Set a seed value so that the results are reproducible
random.seed(42)

# list of running tasks
task_list = []

timer = 0 #seconds

#number of tasks started
task_counter = 0

#run simulation for 10 seconds
while timer < 10:

    #Producer

    #if a task is to be stopped, get the id of the stopped task
    stopped_task_id = randomly_stop_task(task_list)

    if stopped_task_id is not None:
      #send message to kafka about the stopped task
      send_message(stopped_task_id, producer, type='GOODBYE')

    #if a task is to be created, get the id of new task
    new_task_id = randomly_create_task(task_counter)

    if new_task_id is not None:
      #increase number of tasks created
      task_counter += 1

      #add new task id to list of running tasks
      task_list.append(new_task_id)

      #send message to kafka about the started task
      send_message(new_task_id, producer, type='HELLO')


    #Consumer
    for msg in consumer_hello:
      #log message about the started task (if any)
      consume_message(msg, 'HELLO')
    for msg in consumer_goodbye:
      #log message about the stopped task (if any)
      consume_message(msg, 'GOODBYE')

    timer += 1
    sleep(1)

#After 10 seconds stop all tasks
for task in task_list:
  send_message(task, producer, type='GOODBYE')

for msg in consumer_goodbye:
  consume_message(msg, 'GOODBYE')

task 1 started

task 1 stopped

task 2 started

task 2 stopped

task 3 started

task 4 started

task 3 stopped

task 4 stopped



For the seed value `42` we get the following output



```
task 1 started

task 1 stopped

task 2 started

task 2 stopped

task 3 started

task 4 started

task 3 stopped

task 4 stopped
```

Commenting out the `random.seed(42)` line gives random results.
