In [None]:
import os
import shutil
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.functions import from_unixtime, col, avg, window

# --- CONFIG ---
os.environ['SPARK_LOCAL_IP'] = '127.0.0.1'
input_path = "/tmp/car_speeds"
checkpoint_path = "/tmp/checkpoints/cars"


In [None]:
pyspark_version = pyspark.__version__
pyspark_version

In [None]:
# --- INIT SPARK ---
spark = SparkSession.builder \
    .appName("FileFolderStream") \
    .master("local[*]") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.executor.processTreeMetrics.enabled", "false") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.13:4.0.1") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

spark dashboard is available at: http://localhost:4040/

In [None]:
# --- DEFINE SCHEMA ---
# When streaming files, you MUST specify a schema (Spark can't infer it automatically reliably)
json_schema = StructType([
    StructField("car_id", StringType(), True),
    StructField("car_speed", IntegerType(), True),
    StructField("timestamp", DoubleType(), True)  # Unix timestamp as double
])

# --- READ STREAM ---
# This monitors the folder for NEW files
df = spark.readStream \
    .format("json") \
    .schema(json_schema) \
    .load(input_path)  # <--- The folder to watch

# --- TRANSFORM DATA ---
# Convert Unix timestamp to TimestampType
df_with_timestamp = df.withColumn("timestamp", from_unixtime(col("timestamp")).cast(TimestampType()))

# Apply watermark of 2 minutes
# Note: For append mode with aggregations, we need both watermark AND windowing
df_with_watermark = df_with_timestamp.withWatermark("timestamp", "5 minutes")

# Group by car_id and window, then compute average speed
# Using a tumbling window of 1 minute to compute averages per car per time window
df_aggregated = df_with_watermark \
    .groupBy(
        window(col("timestamp"), "1 minute"),
        col("car_id")
    ) \
    .agg(avg("car_speed").alias("avg_speed")) \
    .select("car_id", "avg_speed", "window.start", "window.end")

# --- WRITE STREAM ---
query = df_aggregated.writeStream \
    .format("console") \
    .outputMode("update") \
    .option("checkpointLocation", checkpoint_path) \
    .queryName("CarSpeedAverage") \
    .start()

print(f"Monitoring {input_path} for new JSON files...")
print("Streaming query started. Average speed by car (per 1-minute window) will be displayed in console.")

In [None]:
# Keep the stream running until manually stopped
query.awaitTermination()