In [0]:
# --------------------------------
# Silver Layer - Medallion Architecture (Service Principal Auth)
# --------------------------------

# Retrieve Service Principal credentials securely from Azure Key Vault via Databricks secrets
client_id = dbutils.secrets.get(scope="secretscope_datacapus6", key="client-id")
client_secret = dbutils.secrets.get(scope="secretscope_datacapus6", key="client-secret")
tenant_id = dbutils.secrets.get(scope="secretscope_datacapus6", key="tenant-id")

storage_account = "storageaccus6"
bronze_container = "bronze"
silver_container = "silver"

# Configure Spark to use Service Principal OAuth for storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", client_id)
spark.conf.set("fs.azure.account.oauth2.client.secret", client_secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

# Base paths for Bronze and Silver containers
bronze_base = f"abfss://{bronze_container}@{storage_account}.dfs.core.windows.net/"
silver_base = f"abfss://{silver_container}@{storage_account}.dfs.core.windows.net/"

# Paths to Bronze Delta tables
bronze_house_path = f"{bronze_base}house_price"
bronze_orders_path = f"{bronze_base}sales_products"
bronze_world_path = f"{bronze_base}population"

# Paths to Silver Delta tables
silver_house_path = f"{silver_base}house_price"
silver_orders_path = f"{silver_base}sales_products"
silver_world_path = f"{silver_base}population"

**Read Bronze Delta tables**

In [0]:
df_house_bronze = spark.read.format("delta").load(bronze_house_path)
df_orders_bronze = spark.read.format("delta").load(bronze_orders_path)
df_world_bronze = spark.read.format("delta").load(bronze_world_path)

**Silver Layer Transformations**

In [0]:
# --------------------------------
# Silver transformations (light cleaning, structuring, metadata)
# --------------------------------
from pyspark.sql import functions as F
from pyspark.sql.types import MapType, StringType

def add_metadata(df, source_name):
    return (
        df
        .withColumn("ingestion_timestamp", F.current_timestamp())
        .withColumn("ingestion_date", F.to_date(F.current_timestamp()))
        .withColumn("source_file", F.input_file_name())
        .withColumn("record_source", F.lit(source_name))
    )

# House Price transformation
df_house_silver = (
    df_house_bronze
    .withColumn("value_clean", F.trim(F.col("value")))
    .filter(F.col("value_clean").isNotNull() & (F.length(F.col("value_clean")) > 0))
    .withColumn("json_map", F.from_json(F.col("value_clean"), MapType(StringType(), StringType())))
    .dropDuplicates(["value_clean"])
)
df_house_silver = add_metadata(df_house_silver, "bronze.house_price")
df_house_silver = df_house_silver.withColumn("is_json", F.col("json_map").isNotNull())

# Sales Orders transformation
df_orders_silver = (
    df_orders_bronze
    .withColumn("value_clean", F.trim(F.col("value")))
    .filter(F.col("value_clean").isNotNull() & (F.length(F.col("value_clean")) > 0))
    .dropDuplicates(["value_clean"])
)
df_orders_silver = add_metadata(df_orders_silver, "bronze.sales_orders")

# World Population transformation
df_world_silver = (
    df_world_bronze
    .withColumn("value_clean", F.trim(F.col("value")))
    .filter(F.col("value_clean").isNotNull() & (F.length(F.col("value_clean")) > 0))
    .dropDuplicates(["value_clean"])
)
df_world_silver = add_metadata(df_world_silver, "bronze.world_population")


In [0]:
# --------------------------------
# Write Silver Delta tables
# --------------------------------
(
    df_house_silver.write.format("delta")
    .mode("overwrite")
    .option("mergeSchema", "true")
    .save(silver_house_path)
)

(
    df_orders_silver.write.format("delta")
    .mode("overwrite")
    .option("mergeSchema", "true")
    .save(silver_orders_path)
)

(
    df_world_silver.write.format("delta")
    .mode("overwrite")
    .option("mergeSchema", "true")
    .save(silver_world_path)
)


In [0]:

# --------------------------------
# Step 4: Delta Lake Time Travel Examples (Silver Layer)
# --------------------------------
# Read version 0 of each Silver table
df_house_silver_v0 = spark.read.format("delta").option("versionAsOf", 0).load(silver_house_path)
df_orders_silver_v0 = spark.read.format("delta").option("versionAsOf", 0).load(silver_orders_path)
df_world_silver_v0 = spark.read.format("delta").option("versionAsOf", 0).load(silver_world_path)

print("Silver House Price (version 0):")
display(df_house_silver_v0.limit(5))

print("Silver Sales Orders (version 0):")
display(df_orders_silver_v0.limit(5))

print("Silver World Population (version 0):")
display(df_world_silver_v0.limit(5))



In [0]:
# --------------------------------
# Delta Lake time travel examples (Silver)
# --------------------------------
df_house_silver_v0 = spark.read.format("delta").option("versionAsOf", 0).load(silver_house_path)
df_orders_silver_v0 = spark.read.format("delta").option("versionAsOf", 0).load(silver_orders_path)
df_world_silver_v0 = spark.read.format("delta").option("versionAsOf", 0).load(silver_world_path)

print("Silver House Price (version 0):")
display(df_house_silver_v0.limit(5))

print("Silver Sales Orders (version 0):")
display(df_orders_silver_v0.limit(5))

print("Silver World Population (version 0):")
display(df_world_silver_v0.limit(5))

from delta.tables import DeltaTable

house_delta = DeltaTable.forPath(spark, silver_house_path)
orders_delta = DeltaTable.forPath(spark, silver_orders_path)
world_delta = DeltaTable.forPath(spark, silver_world_path)

house_first_ts = house_delta.history().orderBy(F.col("timestamp").asc()).first()["timestamp"]
orders_first_ts = orders_delta.history().orderBy(F.col("timestamp").asc()).first()["timestamp"]
world_first_ts = world_delta.history().orderBy(F.col("timestamp").asc()).first()["timestamp"]

df_house_silver_t0 = spark.read.format("delta").option("timestampAsOf", str(house_first_ts)).load(silver_house_path)
df_orders_silver_t0 = spark.read.format("delta").option("timestampAsOf", str(orders_first_ts)).load(silver_orders_path)
df_world_silver_t0 = spark.read.format("delta").option("timestampAsOf", str(world_first_ts)).load(silver_world_path)

print(f"Silver House Price (timestampAsOf={house_first_ts}):")
display(df_house_silver_t0.limit(5))

print(f"Silver Sales Orders (timestampAsOf={orders_first_ts}):")
display(df_orders_silver_t0.limit(5))

print(f"Silver World Population (timestampAsOf={world_first_ts}):")
display(df_world_silver_t0.limit(5))

print("Silver House Price history:")
house_delta.history().show(truncate=False)

print("Silver Sales Orders history:")
orders_delta.history().show(truncate=False)

print("Silver World Population history:")
world_delta.history().show(truncate=False)


In [0]:
# --------------------------------
# Register Silver tables in SQL database
# --------------------------------
spark.sql("CREATE DATABASE IF NOT EXISTS silver_db")
spark.sql(f"CREATE TABLE IF NOT EXISTS silver_db.house_price USING DELTA LOCATION '{silver_house_path}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS silver_db.sales_orders USING DELTA LOCATION '{silver_orders_path}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS silver_db.world_population USING DELTA LOCATION '{silver_world_path}'")

# Sample query
display(spark.sql("SELECT * FROM silver_db.house_price LIMIT 5"))