# Notebook demonstrating how to start a Kafka consumer using the kafka-python-ng package

### Necessary imports

In [1]:
import pandas as pd
from kafka import KafkaConsumer
import json
from s3fs import S3FileSystem
from dotenv import load_dotenv
import os

### Write basic Kafka consumer

In [2]:
KAFKA_PRODUCER_IP = "18.206.253.178"
KAFKA_PRODUCER_PORT = "9092"
KAFKA_PRODUCER_TOPIC = "topic"

In [3]:
consumer = KafkaConsumer(
    KAFKA_PRODUCER_TOPIC, # Topic to subscribe to
    bootstrap_servers=[
        f"{KAFKA_PRODUCER_IP}:{KAFKA_PRODUCER_PORT}", # EC2 instance public IP address and port 9092
    ],
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
)

In [4]:
# Works
# for data in consumer:
#     print(f"Receiving data for {data.value[0]["Index"]} for {data.value[0]["Date"]}")

### Set up AWS S3 access

In [5]:
load_dotenv()

S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY_ID")
S3_ACCESS_KEY_SECRET = os.getenv("S3_ACCESS_KEY_SECRET")
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")

In [6]:
print(f"Bucket: {S3_BUCKET_NAME}")

Bucket: real-time-stock-data


In [7]:
s3 = S3FileSystem(
    anon=False,
    key=S3_ACCESS_KEY,
    secret=S3_ACCESS_KEY_SECRET
)

In [8]:
# Works
# with s3.open("s3://real-time-stock-data-analysis/temp.txt", "w") as file:
#     file.write("Test")

### Write file to S3 for each record received via Kafka stream

In [9]:
for data in consumer:
    value = data.value
    index_name = value["Index"]
    date = value["Date"]
    filename = f"s3://{S3_BUCKET_NAME}/{index_name}-{date}.json"
    with s3.open(filename, "w") as file:
        json.dump(value, file)

KeyboardInterrupt: 