#### Bronze_to_Silver (Cleaning)
This notebook cleans and organizes raw Bronze climate data into a usable dataset.


In [0]:
spark.conf.set(
    "fs.azure.account.key.qatarclimateanalysis.dfs.core.windows.net",
    "<account key>"
)

In [0]:
bronze_path = "abfss://lakehouse@qatarclimateanalysis.dfs.core.windows.net/bronze/gcc_bronze.parquet"

# Read bronze data
df_bronze = spark.read.parquet(bronze_path)
display(df_bronze.limit(5))

DATA_VALUE,DATE,ELEMENT,ID,MFLAG,OBS_TIME,QFLAG,SFLAG
125,20150101,TMIN,AE000041196,,,,S
0,20150101,PRCP,AE000041196,,,,S
206,20150101,TAVG,AE000041196,H,,,S
286,20150101,TMAX,AEM00041194,,,,S
180,20150101,TMIN,AEM00041194,,,,S


Data Scaling and Columns Pivoting

In [0]:
from pyspark.sql.functions import to_date
from pyspark.sql.functions import col, to_date, first

# 1. Convert DATE from YYYYMMDD to a real date
df = df_bronze.withColumn("date", to_date(col("DATE").cast("string"), "yyyyMMdd"))

# 2. Fix scaling by dividing DATA_VALUE by 10
df = df.withColumn("value", col("DATA_VALUE") / 10)

# 3. Remove rows where date is missing
df = df.dropna(subset=["date"])

# 4. Pivot ELEMENT values into columns (TMAX, TMIN, TAVG, PRCP…)
df_pivot = (
    df.groupBy("ID", "date")
      .pivot("ELEMENT")
      .agg(first("value"))
)

# 5. show the cleaned daily dataset
display(df_pivot.limit(5))


ID,date,PRCP,SNWD,TAVG,TMAX,TMIN
SAM00041084,2015-05-09,,,26.8,,20.4
SAM00041114,2019-01-02,,,16.8,24.0,10.0
SAM00040405,2019-08-05,,,34.9,42.0,25.2
AEM00041218,2022-06-13,,,37.1,,
SAM00041136,2023-02-10,,,19.4,,11.5


Removing Irrelevant Columns and Fixing Missing Values

In [0]:
from pyspark.sql.functions import *

# 1. Replace PRCP nulls with 0 (no rain reported)
df_silver_clean = df_pivot.withColumn(
    "PRCP", coalesce(col("PRCP"), lit(0.0))
)

# 2. Drop snow depth (SNWD) — irrelevant in GCC region
df_silver_clean = df_silver_clean.drop("SNWD")

# 3. Drop all flag columns (not useful for ML)
df_silver_clean = df_silver_clean.drop("MFLAG", "QFLAG", "SFLAG", "OBS_TIME")

# 4. Display the final SILVER table
display(df_silver_clean.limit(5))


ID,date,PRCP,TAVG,TMAX,TMIN
SAM00040435,2015-03-25,0.0,26.7,31.0,15.0
SAM00040357,2018-09-13,0.0,34.6,43.0,26.9
SAM00040405,2019-11-15,0.0,19.5,25.0,15.0
SAM00040373,2020-07-27,0.0,41.2,48.0,34.0
SAW00032502,2022-12-13,0.0,18.9,,


Filling Missing Temprature Values (Forward/Backward Fill in Silver Layer)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

# window per station, ordered by date (forward direction)
# Move through each station’s data، looks forward in time
w_ffill = Window.partitionBy("ID").orderBy("date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# window per station, ordered by date in reverse (backward direction)
# Move through each station’s data، looks backward in time
w_bfill = Window.partitionBy("ID").orderBy(col("date").desc()) \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)


# Note
# first time = forward fill
# second time = backward fill
# third time = choose the best value (original → ffill → bfill)
# Original values preserved

# 1. Fill TMAX using forward fill (previous valid day)
df_silver_filled = df_silver_clean.withColumn(
    "TMAX_ffill",
    last("TMAX", ignorenulls=True).over(w_ffill)
)

# 2. Fill TMAX using backward fill (next valid day)
df_silver_filled = df_silver_filled.withColumn(
    "TMAX_bfill",
    last("TMAX", ignorenulls=True).over(w_bfill)
)

# 3. Combine original TMAX → forward fill → backward fill
df_silver_filled = df_silver_filled.withColumn(
    "TMAX",
    coalesce(col("TMAX"), col("TMAX_ffill"), col("TMAX_bfill"))
)


# 4. Fill TMIN using forward fill (previous day)
df_silver_filled = df_silver_filled.withColumn(
    "TMIN_ffill",
    last("TMIN", ignorenulls=True).over(w_ffill)
)

# 5. Fill TMIN using backward fill (next valid day)
df_silver_filled = df_silver_filled.withColumn(
    "TMIN_bfill",
    last("TMIN", ignorenulls=True).over(w_bfill)
)

# 6. Combine original TMIN → forward fill → backward fill
df_silver_filled = df_silver_filled.withColumn(
    "TMIN",
    coalesce(col("TMIN"), col("TMIN_ffill"), col("TMIN_bfill"))
)


# 7. Fill TAVG if missing, using (TMAX + TMIN) / 2
df_silver_filled = df_silver_filled.withColumn(
    "TAVG",
    when(col("TAVG").isNull(), (col("TMAX") + col("TMIN")) / 2)
    .otherwise(col("TAVG"))
)


# 8. Remove temporary fill columns
df_silver_filled = df_silver_filled.drop(
    "TMAX_ffill", "TMAX_bfill",
    "TMIN_ffill", "TMIN_bfill"
)

# 9. Show final cleaned Silver table
df_silver_filled = df_silver_filled.orderBy("ID", "date")
display(df_silver_filled.limit(5))


ID,date,PRCP,TAVG,TMAX,TMIN
AE000041196,2015-01-01,0.0,20.6,27.4,12.5
AE000041196,2015-01-02,0.0,19.9,27.4,12.7
AE000041196,2015-01-03,0.0,20.6,27.4,14.0
AE000041196,2015-01-04,0.0,19.7,27.4,14.0
AE000041196,2015-01-05,0.0,19.6,27.0,14.0


Save Silver

In [0]:
silver_path = "abfss://lakehouse@qatarclimateanalysis.dfs.core.windows.net/silver/gcc_silver.parquet"
df_silver_filled.write.mode("overwrite").parquet(silver_path)
