In [0]:
# Read from Azure storage
df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "avro") \
    .option("cloudFiles.schemaLocation", "/bronze/schema") \
    .load("/mnt/bronze/")

In [0]:
from pyspark.sql.types import (
    StructType,
    StructField,
    FloatType,
    IntegerType,
    StringType,
    ArrayType,
    LongType
)

# Define schema of incoming data
schema = StructType([
    StructField("coord", StructType([
        StructField("lon", FloatType()),
        StructField("lat", FloatType())
    ])),
    StructField("weather", ArrayType(StructType([
        StructField("id", IntegerType()),
        StructField("main", StringType()),
        StructField("description", StringType()),
        StructField("icon", StringType())
    ]))),
    StructField("base", StringType()),
    StructField("main", StructType([
        StructField("temp", FloatType()),
        StructField("feels_like", FloatType()),
        StructField("temp_min", FloatType()),
        StructField("temp_max", FloatType()),
        StructField("pressure", FloatType()),
        StructField("humidity", FloatType()),
        StructField("sea_level", FloatType()),
        StructField("grnd_level", FloatType())
    ])),
    StructField("visibility", IntegerType()),
    StructField("wind", StructType([
        StructField("speed", FloatType()),
        StructField("deg", FloatType()),
        StructField("gust", FloatType())
    ])),
    StructField("clouds", StructType([
        StructField("all", IntegerType())
    ])),
    StructField("dt", LongType()),
    StructField("sys", StructType([
        StructField("type", IntegerType()),
        StructField("id", IntegerType()),
        StructField("country", StringType()),
        StructField("sunrise", LongType()),
        StructField("sunset", LongType())
    ])),
    StructField("timezone", IntegerType()),
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("cod", IntegerType())
])

In [0]:
from pyspark.sql.functions import from_json, col

# Parse the json file
parsed_df = df.withColumn(
    "data",
    from_json(
        col("Body").cast("string"),
        schema
    )
)

In [0]:
# Flatten parsed df
flat_df = parsed_df.select(
    col("data.coord.lon").alias("longitude"),
    col("data.coord.lat").alias("latitude"),
    
    col("data.weather")[0]["main"].alias("weather_condition"),
    col("data.weather")[0]["description"].alias("weather_description"),
    col("data.weather")[0]["icon"].alias("weather_icon"),
    
    col("data.base").alias("base"),
    
    col("data.main.temp").alias("temperature"),
    col("data.main.feels_like").alias("feels_like"),
    col("data.main.temp_min").alias("temp_min"),
    col("data.main.temp_max").alias("temp_max"),
    col("data.main.pressure").alias("pressure"),
    col("data.main.humidity").alias("humidity"),
    col("data.main.sea_level").alias("sea_level"),
    col("data.main.grnd_level").alias("grnd_level"),
    
    col("data.visibility").alias("visibility"),
    
    col("data.wind.speed").alias("wind_speed"),
    col("data.wind.deg").alias("wind_degree"),
    col("data.wind.gust").alias("wind_gust"),
    
    col("data.clouds.all").alias("cloudiness"),
    
    col("data.dt").alias("timestamp_utc"),
    
    col("data.sys.type").alias("sys_type"),
    col("data.sys.id").alias("sys_id"),
    col("data.sys.country").alias("country"),
    col("data.sys.sunrise").alias("sunrise"),
    col("data.sys.sunset").alias("sunset"),
    
    col("data.timezone").alias("timezone"),
    col("data.id").alias("city_id"),
    col("data.name").alias("city_name"),
    col("data.cod").alias("cod")
)

In [0]:
from pyspark.sql.functions import from_unixtime, current_timestamp, col

# Convert Unix timestamps and add processing metadata
transformed_df = flat_df \
    .withColumn("timestamp", col("timestamp_utc").cast("timestamp")) \
    .withColumn("sunrise_time", from_unixtime(col("sunrise"))) \
    .withColumn("sunset_time", from_unixtime(col("sunset"))) \
    .withColumn("processing_time", current_timestamp()) \
    .drop("timestamp_utc", "sunrise", "sunset")

In [0]:
# Apply deduplication and watermarking for late-arriving data
clean_df = (
    transformed_df
    .withWatermark("timestamp", "5 minutes")
    .dropDuplicates(["city_id", "timestamp"])
)


In [0]:
w_stream_raw = (
    clean_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("path", "/mnt/silver/data/")
    .option("checkpointLocation", "/mnt/silver/checkpoints/bronze_to_silver/")
    .trigger(availableNow=True)
    .start()
)

w_stream_raw.awaitTermination()