In [1]:
!pip install kafka-python



In [2]:
from kafka import KafkaConsumer, KafkaProducer
import json
import uuid
import random
import time

In [3]:
def connect_kafka_producer(servers):
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=servers, api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer

In [4]:
def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message published successfully.')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))

In [5]:
def produce_xy(producer, topic_name):
    while True:
        name = f'Mark {random.randint(0,22)}'
        message = json.dumps({"name": name})
        print(name)
        publish_message(producer, topic_name, str(uuid.uuid4()), message)
        time.sleep(1)

In [6]:
def consume_xy(consumer, topic_name):
    for msg in consumer:
        print(msg.key.decode("utf-8"), msg.value)

In [7]:
server1 = 'broker1:9093'
topic1 = "names"

consumer = KafkaConsumer(
    topic1, 
    auto_offset_reset='earliest',
    bootstrap_servers=[server1], 
    api_version=(0, 10), 
    value_deserializer = json.loads,
    consumer_timeout_ms=1000
)

In [8]:
def get_changed_data(consumer, topic):
    output = []
    for msg in consumer:
        msg.value[topic] = random.randint(400, 500)
        output.append(msg.value)
    return output

In [9]:
changed_data = get_changed_data(consumer, topic1)

In [10]:
def write_json_to_file(filepath, content):
    with open(filepath, "w") as file:
        json.dump(content, file)

In [11]:
write_json_to_file("changed_data.json", changed_data)