In [0]:
from pyspark.sql import functions as F
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import col, from_json, schema_of_json

# --------------------------------------------------------------------
# Step 1: Load ALL JSON files recursively + schema inference enabled
# --------------------------------------------------------------------
bronze_base = "/Volumes/aqi_etl_pipeline/bronze_aqi/bronze_vol_aqi/aqi/"

df_bronze_all = (
    spark.read
        .format("json")
        .option("recursiveFileLookup", "true")
        .option("mergeSchema", "true")
        .load(bronze_base)
)

print("Bronze total rows:", df_bronze_all.count())
df_bronze_all.printSchema()

# --------------------------------------------------------------------
# Step 2: Force correct schema for "data" if it's a string
# --------------------------------------------------------------------
# Detect a valid JSON sample
sample_data = (
    df_bronze_all
    .filter(col("data").isNotNull() & (col("data") != ""))
    .limit(1)
)

if sample_data.count() > 0:
    raw_json = sample_data.first()["data"]
    expected_schema = schema_of_json(raw_json)

    # Convert data column from string ‚Üí struct
    df_bronze_fixed = df_bronze_all.withColumn(
        "data", from_json(col("data"), expected_schema)
    )
else:
    df_bronze_fixed = df_bronze_all

df_bronze_fixed.printSchema()

# --------------------------------------------------------------------
# Step 3: Filter only valid AQI rows
# --------------------------------------------------------------------
df_bronze_valid = df_bronze_fixed.filter(col("data.aqi").isNotNull())
print("Valid AQI rows:", df_bronze_valid.count())

# --------------------------------------------------------------------
# Step 4: Flatten the structure
# --------------------------------------------------------------------
df_flat_all = df_bronze_valid.select(
    col("data.aqi").alias("AQI"),
    col("data.city.name").alias("City"),
    col("data.city.geo").getItem(1).alias("Latitude"),
    col("data.city.geo").getItem(0).alias("Longitude"),
    col("data.time.s").alias("timestamp"),
    col("ingested_at")
)

# Clean timestamp format
df_flat_all = df_flat_all.filter(col("timestamp").isNotNull())
print("Flat rows:", df_flat_all.count())


# --------------------------------------------------------------------
# Step 5: Convert timestamps (IST ‚Üê‚Üí UTC)
# --------------------------------------------------------------------
df_silver_all = (
    df_flat_all
    .withColumn("aqi_timestamp_ist", 
        F.to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss")
    )
    .withColumn("aqi_timestamp_utc", 
        F.expr("aqi_timestamp_ist - INTERVAL 5 HOURS 30 MINUTES")
    )
    .withColumn("ingested_at_utc", 
        F.to_timestamp("ingested_at")
    )
    .withColumn("ingested_at_ist", 
        F.expr("ingested_at_utc + INTERVAL 5 HOURS 30 MINUTES")
    )
    .withColumn("aqi_date", F.to_date("aqi_timestamp_ist"))
    .withColumn("aqi_hour", F.hour("aqi_timestamp_ist"))
)

display(df_silver_all)


# --------------------------------------------------------------------
# Step 6: Write to Silver Delta Table (Append Only)
# --------------------------------------------------------------------
silver_table = "aqi_etl_pipeline.silver.silver_aqi"

# Check table existence safely
try:
    spark.table(silver_table)
    table_exists = True
except AnalysisException:
    table_exists = False

if not table_exists:
    (df_silver_all.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .saveAsTable(silver_table)
    )
    print(f"‚úî Silver table created: {silver_table}")

else:
    (df_silver_all.write
        .format("delta")
        .mode("append")
        .saveAsTable(silver_table)
    )
    print(f"‚úî Historical AQI appended to: {silver_table}")

print("üéØ Backfill completed successfully!")
