In [None]:
from databricks.connect import DatabricksSession  #ruung in db
spark = DatabricksSession.builder.getOrCreate()

In [None]:
from pyspark.sql import functions as F
import pyspark.sql.types as T

from local_time.local_utils import add_local_time
from day_time.day_utils import add_daytime_flag

#from src.local_time.local_utils import add_local_time -> build a wheel and import from there
#from src.day_time.day_utils import add_daytime_flag

In [48]:
catalog = dbutils.widgets.get("catalog")
schema_landing = dbutils.widgets.get("schema_landing")
schema_bronze = dbutils.widgets.get("schema_bronze")
schema_silver = dbutils.widgets.get("schema_silver")
volume = dbutils.widgets.get("volume")

In [49]:
# ------------------------------------------------------------
# 1. Load Bronze Delta table (append-only raw ingestion layer)
# 1. INCREMENTAL READ
# ------------------------------------------------------------
# WHY:

# Silver ALWAYS reads incrementally from Bronze.
# Bronze should contain raw nested JSON + metadata columns.
# Here we assume your Bronze table is:
#   iot_catalog.bronze.weather_raw
# ------------------------------------------------------------
bronze_df = spark.readStream.table(f"{catalog}.{schema_bronze}.iot_bronze_weather")

In [None]:
df_new = bronze_df.select(

    # ----------------------------------------------------------------
    # 1. Identity & Location
    # ----------------------------------------------------------------
    F.sha2(F.concat_ws("_", F.col("id"), F.col("dt")), 256).alias("event_uuid"),
    F.col("id").cast(T.StringType()).alias("city_id"),
    F.col("name").cast(T.StringType()).alias("name"),
    F.col("sys.country").cast(T.StringType()).alias("country"),
    F.col("coord.lat").cast(T.DecimalType(10, 2)).alias("latitude"),
    F.col("coord.lon").cast(T.DecimalType(10, 2)).alias("longitude"),
    F.col("timezone").cast(T.IntegerType()).alias("timezone"),

  # ----------------------------------------------------------------
    # 2. Time Dimensions (The "When")
    # ----------------------------------------------------------------
    # Note: These are timestamps, converted from Unix
    F.col("dt").cast(T.TimestampType()).alias("datetime"),
    F.col("sys.sunrise").cast(T.TimestampType()).alias("sunrise"),
    F.col("sys.sunset").cast(T.TimestampType()).alias("sunset"),

    # ----------------------------------------------------------------
    # 3. Weather Metrics - Main
    # ----------------------------------------------------------------
    F.col("main.temp").cast(T.DecimalType(10, 2)).alias("temperature"),
    F.col("main.feels_like").cast(T.DecimalType(10, 2)).alias("feels_like"),
    F.col("main.temp_min").cast(T.DecimalType(10, 2)).alias("temperature_min"),
    F.col("main.temp_max").cast(T.DecimalType(10, 2)).alias("temperature_max"),
    F.col("main.humidity").cast(T.IntegerType()).alias("humidity"),
    F.col("main.pressure").cast(T.IntegerType()).alias("pressure"),
    F.col("main.sea_level").cast(T.IntegerType()).alias("main_sea_level"),
    F.col("main.grnd_level").cast(T.IntegerType()).alias("main_grnd_level"),

    # ----------------------------------------------------------------
    # 4. Weather Metrics - Atmosphere 
    # note ->  weather is an array of structs, we take the first struct (index 0) which contains the main weather info
    # ----------------------------------------------------------------
    F.col("weather")[0]["main"].cast(T.StringType()).alias("weather_main"),
    F.col("weather")[0]["description"].cast(T.StringType()).alias("weather_description"),
    F.col("weather")[0]["icon"].cast(T.StringType()).alias("weather_icon"),
    F.col("weather")[0]["id"].cast(T.IntegerType()).alias("weather_id"),
    F.col("visibility").cast(T.IntegerType()).alias("visibility"),
    F.col("clouds.all").cast(T.IntegerType()).alias("clouds_all"),
    F.col("rain.1h").cast(T.DecimalType(10, 2)).alias("rain_1h"),

    # ----------------------------------------------------------------
    # 5. Weather Metrics - Wind
    # ----------------------------------------------------------------
    F.col("wind.speed").cast(T.DecimalType(10, 2)).alias("windspeed"),
    F.col("wind.gust").cast(T.DecimalType(10, 2)).alias("wind_gust"),
    F.col("wind.deg").cast(T.IntegerType()).alias("wind_deg"),

    # ----------------------------------------------------------------
    # 6. Technical Metadata
    # ----------------------------------------------------------------
    F.col("base").cast(T.StringType()).alias("base"),
    F.col("cod").cast(T.IntegerType()).alias("cod"),
    F.col("sys.id").cast(T.IntegerType()).alias("sys_id"),
    F.col("sys.type").cast(T.IntegerType()).alias("sys_type"),
    F.col("ingestion_date").cast(T.DateType()).alias("ingestion_date"),
    

    # ----------------------------------------------------------------
    # 7. Ingestion Metadata
    # ----------------------------------------------------------------
    
    F.struct(
        F.col("ingest_timestamp").cast(T.TimestampType()).alias("timestamp"),
        F.col("_ingest_file_name").cast(T.StringType()).alias("file_name"),
        F.col("_ingest_file_path").cast(T.StringType()).alias("file_path"),
        F.col("_rescued_data").alias("rescued_data")
        ).alias("metadata")   
)


In [None]:
# we use a function to add local time based on the timezone offset provided in the API response. This enriches our data with a local timestamp for each weather observation, 
# which can be useful for time-based analyses and visualizations in the local context of the weather event.  
df_new = add_local_time(
    df=df_new, 
    datetime_col="datetime", 
    timezone_col="timezone",
    output_col="local_time"
)

In [None]:
# we use a function to add a daytime flag based on the sunrise and sunset times provided in the API response. 
# This enriches our data with a boolean indicator of whether each weather observation occurred during daytime or nighttime, 
# which can be useful for analyses that differentiate between day and night conditions.
df_normalized = add_daytime_flag(
    df=df_new, 
    datetime_col="datetime", 
    sunrise_col="sunrise",
    sunset_col="sunset",
    output_col="is_daytime" 
)

In [53]:
# =====================================================================================
# 3. DATA QUALITY (DQ) ASSESSMENT
# =====================================================================================
# Instead of hard filtering immediately, we flag rows. 
# This helps with observability (knowing HOW MANY rows are bad).
#ensure the latitude and longitude are within valid ranges, ensure timestamps are not null, and ensure temperature is not null (indicating a working sensor).
rule_geo = (F.col("latitude").between(-90, 90)) & (F.col("longitude").between(-180, 180))
rule_time = F.col("datetime").isNotNull()
rule_temp = F.col("temperature").isNotNull()

dq_df = (
    df_normalized
    # --- A. Apply Individual Flags ---
    .withColumn("is_valid_geo", rule_geo)
    .withColumn("is_valid_timestamp", rule_time)
    .withColumn("is_valid_sensor", rule_temp)
    
    # --- B. The Master Flag ---
    .withColumn("dq_status",
        F.when(rule_geo & rule_time & rule_temp, F.lit("PASS"))
         .otherwise(F.lit("FAIL"))
    )
    
    # --- C. The "Why did it fail?" Column (Expert Addition) ---
    # This concatenates error codes so you can debug later.
    # Example Output: "INVALID_GEO; NULL_TEMP"
    .withColumn("failure_reason",
        F.concat_ws("; ",
            F.when(~rule_geo, F.lit("INVALID_GEO")),
            F.when(~rule_time, F.lit("NULL_TIME")),
            F.when(~rule_temp, F.lit("NULL_TEMP"))
        )
    )
)

In [54]:

# =====================================================================================
# 4. FILTERING
# =====================================================================================

# 4a. Filter for Silver (We only want Clean data here)
# Note: In a larger system, the "FAIL" rows would be written to a Quarantine table.
clean_df = dq_df.filter(F.col("dq_status") == "PASS").drop("dq_status", "is_valid_geo", "is_valid_timestamp", "is_valid_sensor","failure_reason")

In [55]:
# "Select everything EXCEPT metadata, then add metadata at the end"
df_final = clean_df.select(
    *[c for c in clean_df.columns if c != 'metadata'], 
    'metadata'
)

In [56]:
# UC Volume used for checkpoints (stream state) #volume/catalog/schemas
checkpoint_path = f"/Volumes/{catalog}/{schema_landing}/{volume}/checkpoints/iot_silver/"

In [None]:

# =====================================================================================
# 5. WRITE TO SILVER
# =====================================================================================
(
df_final.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", checkpoint_path)
        # Trigger AvailableNow processes all pending data then stops (Cost efficient)
        .trigger(availableNow=True)
        .toTable(f"{catalog}.{schema_silver}.weather_clean")
)

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x1e1f52bed80>