# Moving Data into Silver layer from  Bronze layer

In [1]:
from pyspark.sql import SparkSession, functions as F, Window
import requests
import json
from datetime import datetime

StatementMeta(, 0404c6a0-b808-44f8-b87e-3b363c34f39e, 3, Finished, Available, Finished)

In [2]:
# CONFIGURATION & TOGGLE

RUN_BRONZE_TO_SILVER = True
BRONZE_PATH = "abfss://fabric_dev@onelake.dfs.fabric.microsoft.com/fabric_LH_sales.Lakehouse/Tables/dbo/Bronze"
SILVER_PATH = "abfss://fabric_dev@onelake.dfs.fabric.microsoft.com/fabric_LH_sales.Lakehouse/Tables/dbo/Silver"
REF_PATH = "abfss://fabric_dev@onelake.dfs.fabric.microsoft.com/fabric_LH_sales.Lakehouse/Tables/Reference/state_latlon"

PROCESSING_START_TIME = datetime.now()

# Override for Fabric Pipeline
try:
    import mssparkutils
    RUN_BRONZE_TO_SILVER = bool(mssparkutils.env.getJobInput("RUN_BRONZE_TO_SILVER") or RUN_BRONZE_TO_SILVER)
    BRONZE_PATH = mssparkutils.env.getJobInput("BRONZE_PATH") or BRONZE_PATH
    SILVER_PATH = mssparkutils.env.getJobInput("SILVER_PATH") or SILVER_PATH
    REF_PATH = mssparkutils.env.getJobInput("REF_PATH") or REF_PATH
    print(" Parameters loaded from Fabric Pipeline runtime.")
except ImportError:
    print(" Running outside Fabric — using default parameter values.")


print(f"Parameters — RUN: {RUN_BRONZE_TO_SILVER}")
print(f"Bronze Path: {BRONZE_PATH}")
print(f"Silver Path: {SILVER_PATH}")
print(f"Ref Table: {REF_PATH}")


StatementMeta(, 0404c6a0-b808-44f8-b87e-3b363c34f39e, 4, Finished, Available, Finished)

 Running outside Fabric — using default parameter values.
Parameters — RUN: True
Bronze Path: abfss://fabric_dev@onelake.dfs.fabric.microsoft.com/fabric_LH_sales.Lakehouse/Tables/dbo/Bronze
Silver Path: abfss://fabric_dev@onelake.dfs.fabric.microsoft.com/fabric_LH_sales.Lakehouse/Tables/dbo/Silver
Ref Table: abfss://fabric_dev@onelake.dfs.fabric.microsoft.com/fabric_LH_sales.Lakehouse/Tables/Reference/state_latlon


In [3]:
# INITIALIZE SPARK
spark = SparkSession.builder.getOrCreate()

if RUN_BRONZE_TO_SILVER:
    print(" Reading data from Bronze...")
    bronze_df = spark.read.option("mergeSchema", "true").parquet(BRONZE_PATH)
    
    row_count = bronze_df.count()
    print(f" Loaded {row_count:,} rows from Bronze.")

StatementMeta(, 0404c6a0-b808-44f8-b87e-3b363c34f39e, 5, Finished, Available, Finished)

 Reading data from Bronze...
 Loaded 69,958 rows from Bronze.


In [4]:
    
    # CLEAN DATA
    key_columns = ["Order_ID", "Customer_ID", "Product_ID", "Order_Date"]
    bronze_df = (
        bronze_df
        .dropna(subset=key_columns)
        .dropDuplicates(key_columns)
    )

    # ADD PARTITION COLUMNS
    bronze_df = bronze_df.withColumn("Order_Date_parsed", F.to_date("Order_Date", "dd-MM-yyyy"))
    bronze_df = bronze_df.withColumn("Year", F.year("Order_Date_parsed"))
    bronze_df = bronze_df.withColumn("Month", F.month("Order_Date_parsed"))
    bronze_df = bronze_df.withColumn("SilverProcessingTime", F.current_timestamp())

StatementMeta(, 0404c6a0-b808-44f8-b87e-3b363c34f39e, 6, Finished, Available, Finished)

#### === FETCH LAT/LONG FOR STATES (Wikidata) ===


In [5]:
    PROCESSING_START_TIME_wiki = datetime.now()

    print(" Fetching latitude/longitude from Wikidata...")

    url = "https://query.wikidata.org/sparql"
    query = """
    SELECT ?stateLabel ?lat ?lon WHERE {
    ?state wdt:P31 wd:Q35657;
            wdt:P625 ?coord.
    SERVICE wikibase:label { bd:serviceParam wikibase:language "en". }
    BIND(geof:latitude(?coord) AS ?lat)
    BIND(geof:longitude(?coord) AS ?lon)
    }
    ORDER BY ?stateLabel
    """
    headers = {"Accept": "application/sparql-results+json"}
    response = requests.get(url, params={"query": query}, headers=headers)
    response.raise_for_status()
    data = response.json()

    records = [
        {
            "state": b["stateLabel"]["value"],
            "latitude": float(b["lat"]["value"]),
            "longitude": float(b["lon"]["value"])
        }
        for b in data["results"]["bindings"]
    ]

    df_states = spark.createDataFrame(records)

    # Surrogate key
    window_spec = Window.orderBy("state")
    df_states = df_states.withColumn("state_key", F.row_number().over(window_spec))
    df_states = df_states.select("state_key", "state", "latitude", "longitude")


    (
        df_states.write
        .mode("overwrite")
        .option("mergeSchema", "true")
        .parquet(REF_PATH)
    )
    print(f" Reference table 'state_latlon' saved to: {REF_PATH}")

    # LOGGING
    PROCESSING_END_TIME_wiki = datetime.now()
    duration_seconds_wiki = (PROCESSING_END_TIME_wiki - PROCESSING_START_TIME_wiki).total_seconds()
    duration_minutes_wiki = round(duration_seconds_wiki / 60, 2)
    
    
    log_data = [("WIKIDATA_LONG_LAT", PROCESSING_START_TIME_wiki, PROCESSING_END_TIME_wiki, duration_minutes_wiki,"wikidata -> Ssales", REF_PATH, df_states.count())]
    log_df = spark.createDataFrame(log_data, ["Dataset", "Start_Timestamp", "End_Timestamp", "run_duration", "Stage", "Destination", "Row_Count"])
    
    log_df.write \
        .format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .saveAsTable("dbo.pipeline_log")

    print(" Pipeline load logged successfully.")


StatementMeta(, 0404c6a0-b808-44f8-b87e-3b363c34f39e, 7, Finished, Available, Finished)

 Fetching latitude/longitude from Wikidata...
 Reference table 'state_latlon' saved to: abfss://fabric_dev@onelake.dfs.fabric.microsoft.com/fabric_LH_sales.Lakehouse/Tables/Reference/state_latlon
 Pipeline load logged successfully.


In [6]:
    # Read Wikidata reference
    df_states = spark.read.parquet(REF_PATH)

    # Join with sales data
    silver_df = bronze_df.join(
        df_states,
        bronze_df.State == df_states.state,
        "left"
    ).drop(df_states.state)

    silver_df = silver_df.withColumnRenamed("State", "state_name")

    # Create Silver temp view
    silver_df.createOrReplaceTempView("silver_temp_view")

    # Write to Silver
    (
        silver_df.write
        .mode("overwrite")
        .partitionBy("Year", "Month")
        .option("mergeSchema", "true")
        .parquet(SILVER_PATH)
    )
    print(f" Silver data written to: {SILVER_PATH}")

StatementMeta(, 0404c6a0-b808-44f8-b87e-3b363c34f39e, 8, Finished, Available, Finished)

 Silver data written to: abfss://fabric_dev@onelake.dfs.fabric.microsoft.com/fabric_LH_sales.Lakehouse/Tables/dbo/Silver


#### Logging Pipeline

In [7]:
    from datetime import datetime
    # LOGGING
    PROCESSING_END_TIME = datetime.now()
    duration_seconds = (PROCESSING_END_TIME - PROCESSING_START_TIME).total_seconds()
    duration_minutes = round(duration_seconds / 60, 2)
    
    
    log_data = [("SILVER_LAYER_DATASET", PROCESSING_START_TIME, PROCESSING_END_TIME, duration_minutes,"Bronze -> Silver", SILVER_PATH, silver_df.count())]
    log_df = spark.createDataFrame(log_data, ["Dataset", "Start_Timestamp", "End_Timestamp", "run_duration", "Stage", "Destination", "Row_Count"])
    
    log_df.write \
        .format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .saveAsTable("dbo.pipeline_log")

    print(" Pipeline load logged successfully.")

StatementMeta(, 0404c6a0-b808-44f8-b87e-3b363c34f39e, 9, Finished, Available, Finished)

 Pipeline load logged successfully.
