# Kafka Demo

### Connect to Kafka Broker Server
```
ssh -o ServerAliveInterval=60 -L 9092:localhost:9092 tunnel@128.2.204.215 -NTf
```
pass: seaitunnel


To kill connection at port:
```
lsof -ti:9092 | xargs kill -9
```

### Setup
```
python -m pip install kafka-python
```

In [2]:
from os import path
import sys, os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
import numpy as np
# ssh -o ServerAliveInterval=60 -L 9092:localhost:9092 tunnel@128.2.24.106 -NTf
from kafka import KafkaConsumer, KafkaProducer

# Update this for your own recitation section :)
topic = 'recitation-b'

### Producer Mode -> Writes Data to Broker

In [3]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# [TODO]: Replace '...' with the address of your Kafka bootstrap server
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                        value_serializer=lambda x: dumps(x).encode('utf-8'),
                        )

# [TODO]: Add cities of your choice
cities = ['Pittsburgh', 'New York', 'Cleveland']

# Write data via the producer
print("Writing to Kafka Broker")
for i in range(10):
    data = f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")},{cities[randint(0,len(cities)-1)]},{randint(18, 32)}ºC'
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1)

Writing to Kafka Broker
Writing: 2024-01-26 09:46:58,Pittsburgh,21ºC
Writing: 2024-01-26 09:46:59,Pittsburgh,22ºC
Writing: 2024-01-26 09:47:00,Pittsburgh,21ºC
Writing: 2024-01-26 09:47:01,New York,30ºC
Writing: 2024-01-26 09:47:02,Pittsburgh,20ºC
Writing: 2024-01-26 09:47:03,Cleveland,28ºC
Writing: 2024-01-26 09:47:04,Cleveland,19ºC
Writing: 2024-01-26 09:47:05,Cleveland,22ºC
Writing: 2024-01-26 09:47:06,Cleveland,25ºC
Writing: 2024-01-26 09:47:07,Pittsburgh,25ºC


### Consumer Mode -> Reads Data from Broker

In [5]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# [TODO]: Complete the missing ... parameters/arguments using the Kafka documentation
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest', #Experiment with different values
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=1000
)

print('Reading Kafka Broker')
for message in consumer:

    message = message.value.decode()
    # Default message.value type is bytes!
    print(loads(message))
    os.system(f"echo {message} >> kafka_log.csv")

Reading Kafka Broker
2024-01-24 14:27:49,London,22ºC
2024-01-24 14:27:51,London,23ºC
2024-01-24 14:27:52,New York,23ºC
2024-01-24 14:27:53,London,21ºC
2024-01-24 14:27:54,Pittsburgh,24ºC
2024-01-24 14:27:55,Pittsburgh,30ºC
2024-01-24 14:27:56,London,27ºC
2024-01-24 14:27:57,London,32ºC
2024-01-24 14:27:58,London,31ºC
2024-01-24 14:27:59,London,25ºC
2024-01-24 14:38:17,New York,21ºC
2024-01-24 14:38:18,Bangalore,20ºC
2024-01-24 14:38:19,London,22ºC
2024-01-24 14:38:20,Bangalore,30ºC
2024-01-24 14:38:21,Bangalore,26ºC
2024-01-24 14:38:22,Pittsburgh,20ºC
2024-01-24 14:38:23,New York,19ºC
2024-01-24 14:38:24,London,26ºC
2024-01-24 14:38:25,London,25ºC
2024-01-24 14:38:26,Bangalore,26ºC
2024-01-24 16:26:20,Seattle,31ºC
2024-01-24 16:26:21,Beijing,18ºC
2024-01-24 16:26:22,Montreal,26ºC
2024-01-24 16:26:23,Montreal,20ºC
2024-01-24 16:26:24,Tokyo,24ºC
2024-01-24 16:26:25,Montreal,24ºC
2024-01-24 16:26:26,Beijing,21ºC
2024-01-24 16:26:27,Seattle,32ºC
2024-01-24 16:26:28,Montreal,24ºC
2024-01-24

KeyboardInterrupt: 

# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [6]:
#kcat command: connect to local Kafka broker, specify a topic, and consume messages from the earliest offset
!kcat -b localhost:9092 -t 'recitation-b' -C -o earliest

"2024-01-24 23:10:33,Los Angeles,28\u00baC"
"2024-01-24 23:10:35,New York,28\u00baC"
"2024-01-24 23:10:36,Chicago,25\u00baC"
"2024-01-24 23:10:37,Los Angeles,24\u00baC"
"2024-01-24 23:10:38,New York,25\u00baC"
"2024-01-24 23:10:39,New York,31\u00baC"
"2024-01-24 23:10:40,New York,24\u00baC"
"2024-01-24 23:10:41,Los Angeles,31\u00baC"
"2024-01-24 23:10:42,Los Angeles,23\u00baC"
"2024-01-24 23:10:43,New York,29\u00baC"
"2024-01-24 23:12:03,Los Angeles,30\u00baC"
"2024-01-24 23:12:04,Los Angeles,31\u00baC"
"2024-01-24 23:12:05,Los Angeles,27\u00baC"
"2024-01-24 23:12:06,Los Angeles,20\u00baC"
"2024-01-24 23:12:07,Chicago,29\u00baC"
"2024-01-24 23:12:08,Los Angeles,25\u00baC"
"2024-01-24 23:12:09,New York,27\u00baC"
"2024-01-24 23:12:10,Los Angeles,25\u00baC"
"2024-01-24 23:12:11,Los Angeles,31\u00baC"
"2024-01-24 23:12:12,Los Angeles,20\u00baC"
"2024-01-25 07:56:37,New York,32\u00baC"
"2024-01-25 07:56:38,Santiago,18\u00baC"
"2024-01-25 07:56:39,Santiago,18\u00baC"
"2024-01-25 07:56:40,Pi