In [1]:
!pip install kafka-python



In [2]:
import json
import uuid
import random
import time

from kafka import KafkaConsumer, KafkaProducer

KAFKA_SERVERS = ['kafka1:9092', 'kafka2:9092', 'kafka3:9092']

class HPCKafkaProducer(KafkaProducer):
    """See API docs for further information:
    
    https://kafka-python.readthedocs.io/en/master
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.sent_msg_counter = 0
    
    def send_dummy_messages(self, number_of_msgs, topic="hpcDummyTopic"):
        """Send number_of_msgs messages containing an increasing ID"""
        for _i in range(number_of_msgs):
            self.send(topic, bytes(f"Message {self.sent_msg_counter} at {time.time()}", encoding='utf-8'))
            self.sent_msg_counter += 1
        self.flush()
        print(f"{number_of_msgs} messages published successfully.")

producer = HPCKafkaProducer(bootstrap_servers=KAFKA_SERVERS)

In [3]:
class HPCKafkaConsumer(KafkaConsumer):
    """See API docs for further information:
    
    https://kafka-python.readthedocs.io/en/master
    """
    def receive_n_messages(self, number_of_msgs, topic="hpcDummyTopic"):
        """Receive number_of_msgs messages"""
        for i, msg in enumerate(self):
            if i == 0:
                print(f"First message printed full:\n{msg}\n")
            else:
                print(msg.value)
            if i >= number_of_msgs - 1:
                break

consumer = HPCKafkaConsumer("hpcDummyTopic", bootstrap_servers=KAFKA_SERVERS, auto_offset_reset='earliest')

In [4]:
# This should be running in different notebooks/docker containers
producer.send_dummy_messages(10)
time.sleep(1)
consumer.receive_n_messages(10)

10 messages published successfully.
First message printed full:
ConsumerRecord(topic='hpcDummyTopic', partition=0, offset=0, timestamp=1695192984884, timestamp_type=0, key=None, value=b'Message 10 at 1695192984.8845954', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=32, serialized_header_size=-1)

b'Message 11 at 1695192984.8849473'
b'Message 12 at 1695192984.8849945'
b'Message 13 at 1695192984.8850214'
b'Message 14 at 1695192984.8850446'
b'Message 15 at 1695192984.885067'
b'Message 16 at 1695192984.885089'
b'Message 17 at 1695192984.8851097'
b'Message 18 at 1695192984.8851311'
b'Message 19 at 1695192984.8851533'


### Creation of topics
The first message will create a new topic automatically with its default settings. You can also create topics with different settings your own, e.g. using the following command:

```docker exec -ti kafka1 /usr/bin/kafka-topics --create  --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --replication-factor 2 --partitions 4 --topic topic1```