# Silver Layer Ingestion
Goal: Transform Bronze tables into normalized data model.
Source Tables
- bronze.countries_raw
- bronze.products_raw
- bronze.customers_raw
- bronze.sales_raw
- bronze.orders_raw

Silver Tables:
- silver.countries 
- silver.products   
- silver.customers         
- silver.sales
- silver.orders  

Design Notes:
- Column names are standardized using snake_case.
- Semi-structured fields from Bronze layer are parsed and conveted into Databricks data types.
- Generated surrogate primary key (country_id) for silver.countries.
- Add country_id foriegn key to silver.customers and silver.products
- Writes (except silver.countries) are rebuilt using overwrite to facilitate reruns and avoid duplicates.
- country_id is generated as an identity column. The silver.countries table is maintained using merge to avoid downstream key conflicts.

## Import and Context Setting

###Imports

In [0]:
from pyspark.sql import functions as F


###Context Setting

In [0]:


spark.sql("USE CATALOG sales")
spark.sql("USE SCHEMA silver")

## Load Silver Delta Tables

###Countries


#### Build stage table for bronze.countries_raw

In [0]:

bronze_countries = spark.table("bronze.countries_raw")

stg_countries = (
    bronze_countries
    .select(
        F.trim(F.col("Country")).cast("string").alias("country"),
        F.trim(F.col("Name")).cast("string").alias("country_name"),
        F.trim(F.col("Currency")).cast("string").alias("currency"),
        F.trim(F.col("Region")).cast("string").alias("region"),
        F.col("Population").cast("bigint").alias("population"),
        F.col("Area_sq_mi").cast("bigint").alias("area_sq_mi"),
        F.col("Pop_Density_per_sq_mi").cast("double").alias("pop_density_per_sq_mi"),
        F.col("Coastline_coast_per_area_ratio").cast("double").alias("coastline_coast_per_area_ratio"),
        F.col("Net_migration").cast("double").alias("net_migration"),
        F.col("Infant_mortality_per_1000_births").cast("double").alias("infant_mortality_per_1000_births"),
        F.col("GDP_per_capita").cast("double").alias("gdp_per_capita"),
        F.col("Literacy").cast("double").alias("literacy_pct"),
        F.col("Phones_per_1000").cast("double").alias("phones_per_1000"),
        F.col("Arable").cast("double").alias("arable_pct"),
        F.col("Crops").cast("double").alias("crops_pct"),
        F.col("Other").cast("double").alias("other_pct"),
        F.col("Climate").cast("double").alias("climate"),
        F.col("Birthrate").cast("double").alias("birthrate"),
        F.col("Deathrate").cast("double").alias("deathrate"),
        F.col("Agriculture").cast("double").alias("agriculture"),
        F.col("Industry").cast("double").alias("industry"),
        F.col("Service").cast("double").alias("service"),
        F.col("_ingest_timestamp"),
        F.col("_ingest_file"),
        F.col("_source_system")
    
    )
    .where(F.col("country_name").isNotNull() & (F.col("country_name") != ""))
    .dropDuplicates(["country_name"])
)


#### Create silver.countries with IDENTITY for surrogate key

In [0]:
spark.sql("""
          CREATE TABLE IF NOT EXISTS silver.countries (
                country_id BIGINT GENERATED ALWAYS AS IDENTITY,
                country STRING,
                country_name STRING,
                currency STRING,
                region STRING,
                population BIGINT,
                area_sq_mi BIGINT, 
                pop_density_per_sq_mi DOUBLE,      
                coastline_coast_per_area_ratio DOUBLE,
                net_migration DOUBLE,
                infant_mortality_per_1000_births DOUBLE,
                gdp_per_capita DOUBLE,
                literacy_pct DOUBLE,
                phones_per_1000 DOUBLE,
                arable_pct DOUBLE,
                crops_pct DOUBLE,
                other_pct DOUBLE,
                climate DOUBLE,
                birthrate DOUBLE,
                deathrate DOUBLE,
                agriculture DOUBLE,
                industry DOUBLE,
                service DOUBLE, 
                _ingest_timestamp TIMESTAMP, 
                _ingest_file STRING, 
                _source_system STRING
          )
          USING DELTA
""")

#### Merge silver.countries

In [0]:
stg_countries.createOrReplaceTempView("stg_countries")


spark.sql("""
        MERGE INTO silver.countries AS tgt
        USING stg_countries AS src
        ON tgt.country_name = src.country_name

        WHEN MATCHED THEN UPDATE SET
        tgt.country = src.country,
        tgt.currency = src.currency,
        tgt.region = src.region,
        tgt.population = src.population,
        tgt.area_sq_mi = src.area_sq_mi,
        tgt.pop_density_per_sq_mi = src.pop_density_per_sq_mi,
        tgt.coastline_coast_per_area_ratio = src.coastline_coast_per_area_ratio,
        tgt.net_migration = src.net_migration,
        tgt.infant_mortality_per_1000_births = src.infant_mortality_per_1000_births,
        tgt.gdp_per_capita = src.gdp_per_capita,
        tgt.literacy_pct = src.literacy_pct,
        tgt.phones_per_1000 = src.phones_per_1000,
        tgt.arable_pct = src.arable_pct,
        tgt.crops_pct = src.crops_pct,
        tgt.other_pct = src.other_pct,
        tgt.climate = src.climate,
        tgt.birthrate = src.birthrate,
        tgt.deathrate = src.deathrate,
        tgt.agriculture = src.agriculture,
        tgt.industry = src.industry,
        tgt.service = src.service,
        tgt._ingest_timestamp = src._ingest_timestamp,
        tgt._ingest_file = src._ingest_file,
        tgt._source_system = src._source_system

        WHEN NOT MATCHED THEN INSERT (
        country, 
        country_name, 
        currency, region,
        population, 
        area_sq_mi, 
        pop_density_per_sq_mi, 
        coastline_coast_per_area_ratio, 
        net_migration,
        infant_mortality_per_1000_births, 
        gdp_per_capita, 
        literacy_pct, 
        phones_per_1000,
        arable_pct, 
        crops_pct, 
        other_pct, 
        climate, 
        birthrate, 
        deathrate,
        agriculture, 
        industry, 
        service,
        _ingest_timestamp,
         _ingest_file, 
         _source_system
        ) VALUES (
        src.country, 
        src.country_name, 
        src.currency, 
        src.region,
        src.population, 
        src.area_sq_mi, 
        src.pop_density_per_sq_mi, 
        src.coastline_coast_per_area_ratio, 
        src.net_migration,
        src.infant_mortality_per_1000_births, 
        src.gdp_per_capita, 
        src.literacy_pct, 
        src.phones_per_1000,
        src.arable_pct, 
        src.crops_pct, 
        src.other_pct, 
        src.climate, 
        src.birthrate, 
        src.deathrate,
        src.agriculture, 
        src.industry, 
        src.service,
        src._ingest_timestamp, 
        src._ingest_file, 
        src._source_system
)
""")


####Check silver.countries load

In [0]:
display(spark.table("countries").limit(5))

###Customers

#### Build stage table for bronze.customers_raw

In [0]:
bronze_customers = spark.table("bronze.customers_raw")
stg_customers = (
    bronze_customers
    .select(
        F.col("CustomerId").cast("bigint").alias("customer_id"),
        F.col("Active").cast("boolean").alias("is_active"),
        F.trim(F.col("Name")).cast("string").alias("full_name"),
        F.trim(F.col("Address")).cast("string").alias("address"),
        F.trim(F.col("City")).cast("string").alias("city"),
        F.trim(F.col("Country")).cast("string").alias("country"),
        F.trim(F.col("Email")).cast("string").alias("email"),
        F.col("_ingest_timestamp"),
        F.col("_ingest_file"),
        F.col("_source_system")
    )
)


#### Add country_id foreign key column to silver.countries

In [0]:
silver_countries = spark.table("silver.countries")

customers_with_country_id = (
    stg_customers
    .join(
        silver_countries.select("country_id", "country"),
        on="country",
        how = "left"
    )
)


#### Sanity check new country_id in silver.countries
*Output should be 0* 

In [0]:
customers_with_country_id.filter(F.col("country_id").isNull()).count()

#### Load silver.customers

In [0]:
silver_customers = (
    customers_with_country_id
    .select(
        "customer_id",
        "is_active",
        "full_name",
        "address",
        "city",
        "country_id",
        "email",
        "_ingest_timestamp",
        "_ingest_file",
        "_source_system"
    )
)

(silver_customers.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver.customers")
)


####Check silver.customers load

In [0]:
display(spark.table("customers").limit(5))

###Products

#### Build stage table for bronze.products_raw

In [0]:
bronze_products = spark.table("bronze.products_raw")
stg_products = (
    bronze_products
    .select(
        F.col("ProductId").cast("bigint").alias("product_id"),
        F.trim(F.col("Name")).cast("string").alias("product_name"),
        F.trim(F.col("ManufacturedCountry")).cast("string").alias("manufactured_country"),
        F.col("WeightGrams").cast("bigint").alias("weight_in_grams"),
        F.col("_ingest_timestamp"),
        F.col("_ingest_file"),
        F.col("_source_system")
    )
)

#### Add manufactured_country_id foreign key column to silver.products

In [0]:
silver_countries = spark.table("silver.countries")

products_with_country_id = (
    stg_products
    .join(
        silver_countries
            .select(
                F.col("country_id").alias("manufactured_country_id"),
                F.col("country").alias("manufactured_country")
            ),
        on="manufactured_country",
        how="left"
    )
)



#### Sanity check new country_id in silver.products
*Output should be 0* 

In [0]:
products_with_country_id.filter(F.col("manufactured_country_id").isNull()).count()

#### Load silver.products

In [0]:
silver_products = (
    products_with_country_id
    .select(
        "product_id",
        "product_name",
        "manufactured_country_id",
        "weight_in_grams",
        "_ingest_timestamp",
        "_ingest_file",
        "_source_system"
    )
)

(silver_products.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver.products")
)


####Check silver.products load

In [0]:
display(spark.table("products").limit(5))

###Sales

#### Load silver.sales

In [0]:


bronze_sales = spark.table("bronze.sales_raw")

silver_sales = (
    bronze_sales
    .select(
        F.col("SaleId").cast("bigint").alias("sale_id"),
        F.col("OrderId").cast("bigint").alias("order_id"),
        F.col("ProductId").cast("bigint").alias("product_id"),
        F.col("Quantity").cast("bigint").alias("quantity"),
        F.col("_ingest_timestamp"),
        F.col("_ingest_file"),
        F.col("_source_system")
    )
)

(silver_sales.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver.sales")
)

In [0]:
display(spark.table("sales").limit(5))

####Check silver.sales load

###Orders


#### Load silver.orders

In [0]:
bronze_orders = spark.table("bronze.orders_raw")

silver_orders = (
    bronze_orders
    .select(
        F.col("OrderId").cast("bigint").alias("order_id"),
        F.col("CustomerId").cast("bigint").alias("customer_id"),
        F.col("Date").cast("date").alias("date"),
    )
)

(silver_orders.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver.orders")
)

####Check silver.orders load

In [0]:
display(spark.table("orders").limit(5))

## Check rowcounts

In [0]:
from pyspark.sql import functions as F


checks = [
    ("Customers", "bronze.customers_raw", "silver.customers"),
    ("Countries", "bronze.countries_raw", "silver.countries"),
    ("Products",  "bronze.products_raw",  "silver.products"),
    ("Orders",    "bronze.orders_raw",    "silver.orders"),
    ("Sales",     "bronze.sales_raw",     "silver.sales"),
]

dfs = []
for label, bronze_tbl, silver_tbl in checks:
    bronze_count = spark.table(bronze_tbl).count()
    silver_count = spark.table(silver_tbl).count()

    dfs.append(
        spark.createDataFrame(
            [(label, bronze_count, silver_count)],
            ["table_name", "bronze", "silver"]
        )
    )

row_counts_df = dfs[0]
for d in dfs[1:]:
    row_counts_df = row_counts_df.unionByName(d)

display(row_counts_df.orderBy("table_name"))

