In [1]:
import os
import boto3
import json
import pandas as pd
from io import BytesIO
from kafka import KafkaProducer
from pyspark.sql import SparkSession
import datetime
import json

In [None]:
# 1️⃣ Set AWS credentials
aws_key = os.getenv("AWS_ACCESS_KEY")
aws_secret = os.getenv("AWS_SECRET_ACCESS_KEY")

# 2️⃣ Boto3 S3 Client
s3 = boto3.client(
    's3',
    aws_access_key_id=aws_key,
    aws_secret_access_key=aws_secret
)

bucket = "nyc-taxi-streaming"
prefix = "stream-data/"

# 3️⃣ Kafka Producer Setup
# Custom serializer
def json_serializer(obj):
    if isinstance(obj, (datetime.datetime, datetime.date)):
        return obj.isoformat()
    raise TypeError(f"Type {type(obj)} not serializable")

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v, default=json_serializer).encode('utf-8')
)


topic = "taxi-trips-topic"

In [3]:
# 4️⃣ SparkSession (for downstream transformations)
spark = SparkSession.builder.appName("TaxiProducer").getOrCreate()

# 5️⃣ List all parquet objects in S3 under the prefix
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)

25/04/25 19:34:31 WARN Utils: Your hostname, Ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/04/25 19:34:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/25 19:34:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/25 19:34:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [8]:
import time
# Iterate over each object and stream to Kafka
for obj in response.get("Contents", []):
    key = obj["Key"]
    if key.endswith(".parquet"):
        print(f"Processing: {key}")
        
        obj_body = s3.get_object(Bucket=bucket, Key=key)['Body'].read()
        pdf = pd.read_parquet(BytesIO(obj_body))
        
        # Sample first 1000 rows (avoid huge loads)
        pdf_sample = pdf.head(500).dropna(axis=1, how='all')
        sdf = spark.createDataFrame(pdf_sample)  # Safe after dropping null-only columns

        
        # Stream each row to Kafka
        for row in sdf.toLocalIterator():
            producer.send(topic, value=row.asDict())
            #time.sleep(0.01)
            time.sleep(0.5)

print("✅ Data streaming complete.")

Processing: stream-data/green_tripdata_2022-01.parquet
Processing: stream-data/green_tripdata_2022-02.parquet
Processing: stream-data/green_tripdata_2022-03.parquet
Processing: stream-data/green_tripdata_2022-04.parquet
Processing: stream-data/green_tripdata_2022-05.parquet
Processing: stream-data/green_tripdata_2022-06.parquet
Processing: stream-data/green_tripdata_2022-07.parquet
Processing: stream-data/green_tripdata_2022-08.parquet
Processing: stream-data/green_tripdata_2022-09.parquet
Processing: stream-data/green_tripdata_2022-10.parquet


[Stage 926:>                                                        (0 + 1) / 1]

Processing: stream-data/green_tripdata_2022-11.parquet
Processing: stream-data/green_tripdata_2022-12.parquet
Processing: stream-data/green_tripdata_2023-01.parquet


[Stage 937:>                                                        (0 + 1) / 1]

Processing: stream-data/green_tripdata_2023-02.parquet
Processing: stream-data/green_tripdata_2023-03.parquet


[Stage 947:>                                                        (0 + 1) / 1]

Processing: stream-data/green_tripdata_2023-04.parquet
Processing: stream-data/green_tripdata_2023-05.parquet


[Stage 954:>                                                        (0 + 1) / 1]

Processing: stream-data/green_tripdata_2023-06.parquet
Processing: stream-data/green_tripdata_2023-07.parquet
Processing: stream-data/green_tripdata_2023-08.parquet
Processing: stream-data/green_tripdata_2023-09.parquet
Processing: stream-data/green_tripdata_2023-10.parquet


[Stage 972:>                                                        (0 + 1) / 1]

Processing: stream-data/green_tripdata_2023-11.parquet
Processing: stream-data/green_tripdata_2023-12.parquet
Processing: stream-data/green_tripdata_2024-01.parquet
Processing: stream-data/green_tripdata_2024-02.parquet
Processing: stream-data/green_tripdata_2024-03.parquet
Processing: stream-data/green_tripdata_2024-04.parquet
Processing: stream-data/green_tripdata_2024-05.parquet
Processing: stream-data/green_tripdata_2024-06.parquet
Processing: stream-data/green_tripdata_2024-07.parquet
Processing: stream-data/green_tripdata_2024-08.parquet
Processing: stream-data/green_tripdata_2024-09.parquet
Processing: stream-data/green_tripdata_2024-10.parquet
Processing: stream-data/green_tripdata_2024-11.parquet
Processing: stream-data/green_tripdata_2024-12.parquet
Processing: stream-data/green_tripdata_2025-01.parquet
Processing: stream-data/yellow_tripdata_2022-01.parquet
Processing: stream-data/yellow_tripdata_2022-02.parquet
Processing: stream-data/yellow_tripdata_2022-03.parquet
Process

[Stage 1174:>                                                       (0 + 1) / 1]

Processing: stream-data/yellow_tripdata_2024-12.parquet


[Stage 1176:>                                                       (0 + 1) / 1]

Processing: stream-data/yellow_tripdata_2025-01.parquet
✅ Data streaming complete.
