# Kafka Demo

### Connect to Kafka Broker Server 
```
ssh -L <local_port>:localhost:<remote_port> <user>@<remote_server> -NTf
```
Find how to connect to Kafka server on Canvas lab 2 assignment page.

### To kill connection
```
lsof -ti:<local_port> | xargs kill -9
```

### Setup
```
python -m pip install kafka-python
```

In [1]:
# !ssh -v -L 9092:localhost:9092 kafkastudent@48.217.83.110 -NTf
# pw: soqzuV-bunmu0

In [1]:
import os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
from kafka import KafkaConsumer, KafkaProducer

# Update this for your own recitation section :)
topic = 'recitation-f' # x could be b, c, d, e, f

### Producer Mode -> Writes Data to Broker

In [8]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# [TODO]: Replace '...' with the address of your Kafka bootstrap server
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                        value_serializer=lambda x: dumps(x).encode('utf-8'))

# [TODO]: Add cities of your choice
cities = ["Singapore", "Tokyo", "New York"]

# Write data via the producer
print("Writing to Kafka Broker")
for i in range(10):
    data = f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")},{cities[randint(0,len(cities)-1)]},{randint(18, 32)}ºC'
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1)

Writing to Kafka Broker
Writing: 2025-01-23 12:30:07,New York,20ºC
Writing: 2025-01-23 12:30:08,Tokyo,18ºC
Writing: 2025-01-23 12:30:09,New York,21ºC
Writing: 2025-01-23 12:30:10,Tokyo,24ºC
Writing: 2025-01-23 12:30:11,Tokyo,29ºC
Writing: 2025-01-23 12:30:12,Tokyo,32ºC
Writing: 2025-01-23 12:30:13,Tokyo,22ºC
Writing: 2025-01-23 12:30:14,Tokyo,25ºC
Writing: 2025-01-23 12:30:15,Singapore,20ºC
Writing: 2025-01-23 12:30:16,Tokyo,28ºC


### Consumer Mode -> Reads Data from Broker

In [11]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# [TODO]: Complete the missing ... parameters/arguments using the Kafka documentation
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest', #Experiment with different values
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=1000,
    consumer_timeout_ms=5000  # NOTE: newly added to save time
)

print('Reading Kafka Broker')
for message in consumer:
    message = message.value.decode()
    # Default message.value type is bytes!
    print(loads(message))
    os.system(f"echo {message} >> kafka_log.csv")

Reading Kafka Broker
2025-01-22 22:44:37,Ankara,20ºC
2025-01-22 22:44:38,Ankara,22ºC
2025-01-22 22:44:39,Barcelona,19ºC
2025-01-22 22:44:40,Ankara,30ºC
2025-01-22 22:44:41,Ankara,27ºC
2025-01-22 22:44:42,Kiev,25ºC
2025-01-22 22:44:43,Barcelona,32ºC
2025-01-22 22:44:44,Kiev,30ºC
2025-01-22 22:44:45,Ankara,24ºC
2025-01-22 22:44:47,Barcelona,21ºC
2025-01-22 22:57:38,Cairo,27ºC
2025-01-22 22:57:39,Vienna,28ºC
2025-01-22 22:57:40,Cairo,32ºC
2025-01-22 22:57:41,Adelaide,19ºC
2025-01-22 22:57:42,Adelaide,20ºC
2025-01-22 22:57:43,Cairo,26ºC
2025-01-22 22:57:44,Adelaide,27ºC
2025-01-22 22:57:45,Vienna,20ºC
2025-01-22 22:57:46,Adelaide,21ºC
2025-01-22 22:57:47,Adelaide,29ºC
2025-01-23 10:07:13,New York,32ºC
2025-01-23 10:07:14,Ann Arbor,18ºC
2025-01-23 10:07:15,Ann Arbor,23ºC
2025-01-23 10:07:16,New York,31ºC
2025-01-23 10:07:17,Ann Arbor,30ºC
2025-01-23 10:07:18,New York,32ºC
2025-01-23 10:07:19,Ann Arbor,22ºC
2025-01-23 10:07:20,New York,28ºC
2025-01-23 10:07:21,New York,19ºC
2025-01-23 10:07:

# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [3]:
#kcat command: connect to local Kafka broker, specify a topic, and consume messages from the earliest offset
# use in terminal bah
!kcat -b localhost:9092 -t recitation-f -C -c1


In [None]:
# ps aux | grep ssh
# kill [PID]