In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType, TimestampType
import time, os, shutil

spark = SparkSession.builder.appName("RealTimeTrafficRawSaver").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

KAFKA_BROKER = "kafka:9092"
TOPIC_NAME = "traffic_data"

raw_df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BROKER)
    .option("subscribe", TOPIC_NAME)
    .option("startingOffsets", "earliest")
    .load()
)

string_df = raw_df.selectExpr("CAST(value AS STRING)")

schema = (
    StructType()
    .add("timestamp", StringType())
    .add("location", StringType())
    .add("vehicle_count", IntegerType())
    .add("avg_speed", DoubleType())
)

parsed_df = string_df.select(from_json(col("value"), schema).alias("data")).select("data.*")
parsed_df = parsed_df.withColumn("timestamp", col("timestamp").cast(TimestampType()))

OUTPUT_PATH = "./traffic_raw_data/"
CHECKPOINT_PATH = "./traffic_checkpoints/"
FINAL_FILE = "./traffic_data_all.csv"

query = (
    parsed_df.writeStream.outputMode("append")
    .format("csv")
    .option("path", OUTPUT_PATH)
    .option("checkpointLocation", CHECKPOINT_PATH)
    .trigger(processingTime="5 seconds")
    .start()
)

query.awaitTermination(100)
query.stop()

