# Kafka Demo

### Connect to Kafka Broker Server
```
ssh -o ServerAliveInterval=60 -L 9092:localhost:9092 tunnel@128.2.204.215 -NTf
```
password for tunnel@128.2.204.215: mlip-tunnel


To kill connection at port:
```
lsof -ti:9092 | xargs kill -9
```

### Setup
```
python -m pip install kafka-python
```

### Producer Mode -> Writes Data to Broker

In [2]:
import os
from datetime import datetime
from json import dumps, loads
from time import sleep
from random import randint
from kafka import KafkaConsumer, KafkaProducer

# Update this for your own recitation section :)
topic = 'recitation-c'

In [6]:
# Create a producer to write data to kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# [TODO]: Replace '...' with the address of your Kafka bootstrap server
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                        value_serializer=lambda x: dumps(x).encode('utf-8'),
                        )

# [TODO]: Add cities of your choice
cities = ['Atlanta', 'Seattle', 'Boston']

# Write data via the producer
print("Writing to Kafka Broker")
for i in range(10):
    data = f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")},{cities[randint(0,len(cities)-1)]},{randint(18, 32)}ºC'
    print(f"Writing: {data}")
    producer.send(topic=topic, value=data)
    sleep(1)

Writing to Kafka Broker
Writing: 2024-09-03 09:25:17,Boston,26ºC
Writing: 2024-09-03 09:25:18,Boston,30ºC
Writing: 2024-09-03 09:25:19,Boston,28ºC
Writing: 2024-09-03 09:25:20,Atlanta,29ºC
Writing: 2024-09-03 09:25:21,Seattle,25ºC
Writing: 2024-09-03 09:25:22,Seattle,31ºC
Writing: 2024-09-03 09:25:23,Atlanta,27ºC
Writing: 2024-09-03 09:25:24,Atlanta,18ºC
Writing: 2024-09-03 09:25:25,Atlanta,19ºC
Writing: 2024-09-03 09:25:26,Boston,18ºC


### Consumer Mode -> Reads Data from Broker

In [5]:
# Create a consumer to read data from kafka
# Ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# [TODO]: Complete the missing ... parameters/arguments using the Kafka documentation
consumer = KafkaConsumer(
    'recitation-c',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest', #Experiment with different values: earliest, latest, a specific number
    # Commit that an offset has been read
    enable_auto_commit=True,
    # How often to tell Kafka, an offset has been read
    auto_commit_interval_ms=1000
)

print('Reading Kafka Broker')
for message in consumer:
    message = message.value.decode()
    # Default message.value type is bytes!
    print(loads(message))
    os.system(f"echo {message} >> kafka_log.csv")

Reading Kafka Broker


KeyboardInterrupt: 

# Use kcat!
It's a CLI (Command Line Interface). Previously known as kafkacat


Ref: https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html

In [None]:
#kcat command: connect to local Kafka broker, specify a topic, and consume messages from the earliest offset

#answer: kcat -b localhost:9092 -t recitation-c -C -o earliest

# Kafka Topic: A topic in Kafka is a way to categorize data. Producers publish messages to specific topics, and consumers subscribe to topics to receive messages. A topic is a logical concept.

# Kafka Offset: The offset in Kafka is used to uniquely identify the sequence number of each message within a topic. Consumers can track the last offset they have read, allowing them to continue from where they left off in the event of a disconnection or failure, thus ensuring message continuity.