In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = (
    '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 '
    'pyspark-shell'
)

#'--jars /home/jovyan/spark_jars/hadoop-aws-3.3.4.jar,'
#'/home/jovyan/spark_jars/aws-java-sdk-bundle-1.12.262.jar,'
#'/home/jovyan/spark_jars/hadoop-common-3.3.4.jar '

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import pyspark

jars = [
    "/home/jovyan/spark_jars/hadoop-aws-3.3.4.jar",
    "/home/jovyan/spark_jars/aws-java-sdk-bundle-1.12.262.jar",
    "/home/jovyan/spark_jars/hadoop-common-3.3.4.jar",
    "/home/jovyan/spark_jars/delta-spark_2.12-3.2.0.jar",
    "/home/jovyan/spark_jars/delta-storage-3.2.0.jar"
]


spark = SparkSession.builder.appName('Stream Demo') \
    .config("spark.jars", ",".join(jars)) \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", "AKIA2CUNLJPWTKKNAQVQ") \
    .config("spark.hadoop.fs.s3a.secret.key", "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") \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Set the legacy time parser policy to handle the date format correctly
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

print(spark.sparkContext._jsc.sc().getConf().get("spark.jars"))

spark 

/home/jovyan/spark_jars/hadoop-aws-3.3.4.jar,/home/jovyan/spark_jars/aws-java-sdk-bundle-1.12.262.jar,/home/jovyan/spark_jars/hadoop-common-3.3.4.jar,/home/jovyan/spark_jars/delta-spark_2.12-3.2.0.jar,/home/jovyan/spark_jars/delta-storage-3.2.0.jar


In [3]:
from pyspark.sql.functions import *

weather_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "weather-data") \
    .option("startingOffsets", "latest") \
    .load()

#.option("startingOffsets", "latest") \

#.option("startingOffsets", "latest") \


traffic_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "traffic-data") \
    .option("startingOffsets", "latest") \
    .load()


#.option("startingOffsets", "latest") \


weather_json_df = weather_stream.selectExpr("CAST(value AS STRING) as value")

traffic_json_df = traffic_stream.selectExpr("CAST(value AS STRING) as value")

#traffic_json_df = traffic_stream.withColumn('value', expr('cast(value as string)')).withColumn('key', expr('cast(key as string)'))

# {"latitude": 49.2838889, "longitude": -122.7933334, "current_speed": 28, "free_flow_speed": 28, 
#"current_travel_time": 131, "free_flow_travel_time": 131, "confidence": 1, "road_closure": false}

weather_schema = StructType([
    StructField('name', StringType()),
    StructField('latitude', DoubleType()),
    StructField('longitude', DoubleType()),
    StructField('date', IntegerType()),
    StructField('weather', StringType()),
    StructField('weather_description', StringType()),
    StructField('temp', DoubleType()),
    StructField('visibility', IntegerType()),
    StructField('clouds', IntegerType()),
    StructField('rain', DoubleType()),
    StructField('snow', DoubleType()),
])

traffic_schema = StructType([
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("current_speed", IntegerType(), True),
    StructField("free_flow_speed", IntegerType(), True),
    StructField("current_travel_time", IntegerType(), True),
    StructField("free_flow_travel_time", IntegerType(), True),
    StructField("confidence", IntegerType(), True),
    StructField("road_closure", BooleanType(), True),
    StructField("date", StringType(), True)
])

# Parse the 'value' column as JSON
weather_parsed_df = weather_json_df.select(from_json("value", weather_schema).alias("data"))
traffic_parsed_df = traffic_json_df.select(from_json("value", traffic_schema).alias("data"))
#traffic_parsed_df = traffic_json_df.withColumn("values_json", from_json(col("value"), traffic_schema))


# Flatten the JSON into separate columns
weather_flatten_df = weather_parsed_df.select(
                col('data.name').alias('name'),
                col('data.latitude').alias('latitude'),
                col('data.longitude').alias('longitude'), 
                from_unixtime(col('data.date')).alias('date_unix'),
                from_utc_timestamp(col("date_unix"), "America/Los_Angeles").alias("date"),
                col('data.weather').alias('weather'), 
                col('data.weather_description').alias('weather_description'), 
                col('data.temp').alias('temp'), 
                col('data.visibility').alias('visibility'),
                col('data.clouds').alias('clouds'),
                col('data.rain').alias('rain'),
                col('data.snow').alias('snow'))
    
weather_flatten_df = weather_flatten_df.drop("date_unix")

traffic_flatten_df = traffic_parsed_df.select(
    col("data.latitude").alias("latitude"), 
    col("data.longitude").alias("longitude"),
    col("data.current_speed").alias("current_speed"),
    col("data.free_flow_speed").alias("free_flow_speed"),
    col("data.current_travel_time").alias("current_travel_time"),
    col("data.free_flow_travel_time").alias("free_flow_travel_time"),
    col("data.confidence").alias("confidence"),
    col("data.road_closure").alias("road_closure"),
    to_timestamp(col("data.date"), "EEE, dd MMM yyyy HH:mm:ss z").alias("date_utc"),
    from_utc_timestamp(col("date_utc"), "America/Los_Angeles").alias("date")
)

traffic_flatten_df = traffic_flatten_df.drop("date_utc")

#traffic_flatten_df = traffic_flatten_df.withColumn("speed_diff",
#                                                   col("current_speed") - col("free_flow_speed"))

# Timestamp and watermark for windowing
weather_flatten_df = weather_flatten_df.withColumn("processing_time", current_timestamp())
traffic_flatten_df = traffic_flatten_df.withColumn("processing_time", current_timestamp())

# Watermark for late data
weather_flatten_df = weather_flatten_df.withWatermark("date", "10 minutes")
traffic_flatten_df = traffic_flatten_df.withWatermark("date", "10 minutes")

# Aggregate data for delta format
weather_agg_df = weather_flatten_df.groupBy(
    window(col("date"), "1 hour"),
    col("name"),
    col("latitude"),
    col("longitude")
).agg(
    F.avg("temp").alias("avg_temp"),
    F.avg("visibility").alias("avg_visibility"),
    F.avg("clouds").alias("avg_clouds"),
    F.max("rain").alias("max_rain"),
    F.max("snow").alias("max_snow"),
    F.last("weather").alias("last_weather"),
    F.last("weather_description").alias("last_weather_description")
)

traffic_agg_df = traffic_flatten_df.groupBy(
    window(col("date"), "1 hour"),
    col("latitude"),
    col("longitude")
).agg(
    F.avg("current_speed").alias("avg_speed"),
    F.avg("free_flow_speed").alias("avg_flow_speed"),
    F.avg("current_travel_time").alias("avg_travel_time"),
    F.avg("free_flow_travel_time").alias("avg_flow_travel_time"),
    F.max(col("road_closure").cast("int")).alias("had_closure")
)


In [None]:
from pyspark.sql.functions import col

# Set the interval (e.g., hourly) for the trigger
trigger_interval = "1 hour" 
    

weather_query = weather_agg_df \
    .writeStream \
    .option("mergeSchema", "true") \
    .outputMode('append') \
    .format('delta') \
    .option("path", "s3a://van-crash-data/weather-data-delta") \
    .option("checkpointLocation", "s3a://van-crash-data/checkpoints/weather-delta") \
    .trigger(processingTime=trigger_interval) \
    .start() 

#.option("path", "s3a://van-crash-data/weather-data") \
#.option("checkpointLocation", "s3a://van-crash-data/checkpoints/weather") \
        
traffic_query = traffic_agg_df \
    .writeStream \
    .option("mergeSchema", "true") \
    .outputMode('append') \
    .format('delta') \
    .option("path", "s3a://van-crash-data/traffic-data-delta") \
    .option("checkpointLocation", "s3a://van-crash-data/checkpoints/traffic-delta") \
    .trigger(processingTime=trigger_interval) \
    .start()
    
#.option("path", "s3a://van-crash-data/traffic-data") \
#.option("checkpointLocation", "s3a://van-crash-data/checkpoints/traffic") \

print("Streaming started... waiting for data...")

weather_query.awaitTermination()
traffic_query.awaitTermination()

Streaming started... waiting for data...


In [None]:
weather_query.stop()
traffic_query.stop()