### Connection Setup:
 I created a local Kafka using Docker for this project; it was much better for me than a remote connection.
- Connect to Kafka via Docker
- Installed Docker Desktop on my system
- Start Docker ***"docker compose up -d"***

### Install kcat on my system (MacOS)  
- brew install kcat  (in terminal)
- kcat -h (to verify installation)

### Test Reachability/connectivity to Kafka
- "kcat -b localhost:9092 -t cities -C -o beginning -q

### Implementing Producer and Consumer mode
- I followed the  provided starter scripts, and it was straightforward 

### Lab Deliverable:
- produce messages: ***kcat -b localhost:9092 -t cities -P***
- Consume messages: ***kcat -b localhost:9092 -t cities -C -o beginning -f Offset: %o\Message: %s\n'***


# Lab 2: Kafka for Data Streaming

### Relevant library

In [4]:

import json
from json import loads
from time import sleep
from random import randint
from datetime import datetime
from typing import Dict, Any  # drop if unused

# Third-party
from kafka import KafkaConsumer, KafkaProducer


### Producer Mode -> Writes Data to Broker

In [5]:
# Function to create city data
def make_city_data(city: str, temperature_f: int) -> Dict[str, Any]:
    return {
        "city": city,
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "temperature_f": temperature_f,
    }

# Kafka topic name
topic = "test-topic"

# Create Kafka producer
producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],        # your local Docker Kafka
    value_serializer=lambda v: json.dumps(v).encode("utf-8")  # dict → JSON → bytes
)

# Example cities
cities = [
    make_city_data("Miami", 85),
    make_city_data("Boca Raton", 82),
    make_city_data("Tallahassee", 78)
]

print("Writing to Kafka Broker...")

# Send 10 random messages
for i in range(10):
    data = cities[randint(0, len(cities)-1)]
    producer.send(topic=topic, value=data)
    sleep(1)

producer.flush()
print(f"Data written to topic: {topic}")

Writing to Kafka Broker...
Data written to topic: test-topic


### Consumer Mode -> Reads Data from Broker

In [3]:
import os
from json import loads
# Kafka topic name
topic = "test-topic"

# Create Kafka consumer
consumer = KafkaConsumer(
    topic,                                   # topic to subscribe
    bootstrap_servers=["localhost:9092"],    # local Docker Kafka
    auto_offset_reset="earliest",            # read messages from beginning
    enable_auto_commit=True,                 # commit offsets automatically
    auto_commit_interval_ms=1000             # commit interval
)

print("Reading Kafka Broker...")

# Make sure kafka_log.csv exists (or create it)
if not os.path.exists("Kafka_log.csv"):
    with open("Kafka_log.csv", "w") as f:
        f.write("city,timestamp,temperature_f\n")

# Consume messages
for message in consumer:
    # Kafka message is bytes → decode to string → parse JSON
    message_dict = loads(message.value.decode())
    print(message_dict)

    # Append message to CSV
    with open("Kafka_log.csv", "a") as f:
        f.write(f"{message_dict['city']},{message_dict['timestamp']},{message_dict['temperature_f']}\n")

Reading Kafka Broker...
{'city': 'Tallahassee', 'timestamp': '2025-09-22 19:48:12', 'temperature_f': 78}
{'city': 'Boca Raton', 'timestamp': '2025-09-22 19:48:12', 'temperature_f': 82}
{'city': 'Boca Raton', 'timestamp': '2025-09-22 19:48:12', 'temperature_f': 82}
{'city': 'Tallahassee', 'timestamp': '2025-09-22 19:48:12', 'temperature_f': 78}
{'city': 'Miami', 'timestamp': '2025-09-22 19:48:12', 'temperature_f': 85}
{'city': 'Miami', 'timestamp': '2025-09-22 19:48:12', 'temperature_f': 85}
{'city': 'Tallahassee', 'timestamp': '2025-09-22 19:48:12', 'temperature_f': 78}
{'city': 'Tallahassee', 'timestamp': '2025-09-22 19:48:12', 'temperature_f': 78}
{'city': 'Miami', 'timestamp': '2025-09-22 19:48:12', 'temperature_f': 85}
{'city': 'Boca Raton', 'timestamp': '2025-09-22 19:48:12', 'temperature_f': 82}
{'city': 'Tallahassee', 'timestamp': '2025-09-22 22:54:18', 'temperature_f': 78}
{'city': 'Boca Raton', 'timestamp': '2025-09-22 22:54:18', 'temperature_f': 82}
{'city': 'Tallahassee', 't

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

### Explore kcat (or CLI)!

### List metadata and topics

***List topics:***

kafka-topics --describe --topic cities --bootstrap-server localhost:9092

***Describe a topic:***

kafka-topics --describe --topic cities --bootstrap-server localhost:9092