# Gold Layer
This notebook creates refined Dimension tables in the Gold layer by joining Silver-level datasets and adding derived columns like business regions and standardized date IDs for analytical reporting.

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType, DateType, TimestampType, FloatType
from pyspark.sql import Row

In [0]:
catalog_name = "ecommerce"

In [0]:
df_products = spark.table(f"{catalog_name}.silver.slv_products")
df_brands = spark.table(f"{catalog_name}.silver.slv_brands")
df_category = spark.table(f"{catalog_name}.silver.slv_category")

In [0]:
df_products.createOrReplaceTempView("view_products")
df_brands.createOrReplaceTempView("view_brands")
df_category.createOrReplaceTempView("view_category")

In [0]:
display(spark.sql("select * from view_products limit 5"))

product_id,sku,category_code,brand_code,color,size,material,weight_grams,length_cm,width_cm,height_cm,rating_count,file_name,ingest_timestamp
2000000000015,STCR-HNK-00001,HNK,STCR,White,One-Size,cotton,305,22.2,17.1,6.3,0,dbfs:/Volumes/ecommerce/source_data/raw/products/products.csv,2026-01-19T12:34:38.090Z
2000000000022,HMNS-HNK-00002,HNK,HMNS,Silver,One-Size,Steel,682,18.2,12.3,3.7,1,dbfs:/Volumes/ecommerce/source_data/raw/products/products.csv,2026-01-19T12:34:38.090Z
2000000000039,NOVW-CE-00003,CE,NOVW,Purple,One-Size,Wood,243,18.2,13.9,4.2,0,dbfs:/Volumes/ecommerce/source_data/raw/products/products.csv,2026-01-19T12:34:38.090Z
2000000000046,URTL-APP-00004,APP,URTL,Silver,S,Rubber,225,17.6,4.6,5.8,50,dbfs:/Volumes/ecommerce/source_data/raw/products/products.csv,2026-01-19T12:34:38.090Z
2000000000053,GGRN-GRC-00005,GRCY,GGRN,Silver,One-Size,Rubber,455,27.2,15.8,7.4,4,dbfs:/Volumes/ecommerce/source_data/raw/products/products.csv,2026-01-19T12:34:38.090Z


In [0]:
display(spark.sql("select * from view_category limit 5"))

category_code,category_name,_ingested_at,_source_file
BKS,Books,2026-01-19T12:29:11.906Z,dbfs:/Volumes/ecommerce/source_data/raw/category/category.csv
GRCY,Grocery,2026-01-19T12:29:11.906Z,dbfs:/Volumes/ecommerce/source_data/raw/category/category.csv
HNK,Home & Kitchen,2026-01-19T12:29:11.906Z,dbfs:/Volumes/ecommerce/source_data/raw/category/category.csv
TOY,Toys & Games,2026-01-19T12:29:11.906Z,dbfs:/Volumes/ecommerce/source_data/raw/category/category.csv
APP,Apparel,2026-01-19T12:29:11.906Z,dbfs:/Volumes/ecommerce/source_data/raw/category/category.csv


In [0]:
display(spark.sql("select * from view_brands limit 5"))

brand_code,brand_name,category_code,_source_file,ingested_at
ACME,AcmeTech,CE,dbfs:/Volumes/ecommerce/source_data/raw/brands/brands.csv,2026-01-19T12:26:14.387Z
NOVW,NovaWave,CE,dbfs:/Volumes/ecommerce/source_data/raw/brands/brands.csv,2026-01-19T12:26:14.387Z
ZNTH,Zenith,CE,dbfs:/Volumes/ecommerce/source_data/raw/brands/brands.csv,2026-01-19T12:26:14.387Z
BYTM,ByteMax,CE,dbfs:/Volumes/ecommerce/source_data/raw/brands/brands.csv,2026-01-19T12:26:14.387Z
ECOT,EcoTone,CE,dbfs:/Volumes/ecommerce/source_data/raw/brands/brands.csv,2026-01-19T12:26:14.387Z


In [0]:
spark.sql(f"USE CATALOG {catalog_name}")

DataFrame[]

## Products

In [0]:
%sql
    -- Build brands and category mapping and write Gold table
CREATE OR REPLACE TABLE gold.gld_dim_products AS

WITH brand_categories AS (
  SELECT
  b.brand_name,
  b.brand_code,
  c.category_name,
  c.category_code
  FROM view_brands b
  INNER JOIN view_category c
  ON b.category_code = c.category_code
)
SELECT
  p.product_id,
  p.sku,
  p.category_code,
  COALESCE(bc.category_name, 'Not Available') AS category_name,
  p.brand_code,
  COALESCE(bc.brand_name, 'Not Available') AS brand_name,
  p.color,
  p.size,
  p.material,
  p.weight_grams,
  p.length_cm,
  p.width_cm,
  p.height_cm,
  p.rating_count,
  p.file_name,
  p.ingest_timestamp
FROM view_products p
LEFT JOIN brand_categories bc
ON p.brand_code = bc.brand_code


num_affected_rows,num_inserted_rows


## Customers

In [0]:
# From business i got these regions
india_reigon = {
    "MH": "West", "GJ": "West", "RJ": "West",
    "KA": "South", "TN": "South", "KL": "South", "AP": "South", "KL": "South",
    "UP": "North", "WB": "North", "DL": "North"
}

uk_region = {
    "ENG": "England", "WLS": "Wales", "NIR": "Northern Ireland", "SCT": "Scotland"
}

australia_region = {
    "VIC": "SouthEast", "WA": "West", "NSW": "East", "QLD": "NorthEast"
}

us_region = {
    "MA": "NorthEast", "FL": "South", "NJ": "NorthEast", "CA": "West",
    "NY": "NorthEast", "TX": "South"
}

uae_region = {
    "AUH": "Abu Dhabi", "DU": "DUbai", "SHJ": "Shajah"
}

singapore_region = {
    "SG": "Singapore"
}

canada_region = {
    "BC": "West", "AB": "West", "ON": "East", "QC": "East", "NS": "East", "IL": "Other"
}

country_state_map = {
    "India": india_reigon,
    "Australia": australia_region,
    "United Kingdom": uk_region,
    "United States": us_region,
    "United Arab Emirates": uae_region,
    "Singapore": singapore_region,
    "Canada": canada_region
}

In [0]:
country_state_map

{'India': {'MH': 'West',
  'GJ': 'West',
  'RJ': 'West',
  'KA': 'South',
  'TN': 'South',
  'KL': 'South',
  'AP': 'South',
  'UP': 'North',
  'WB': 'North',
  'DL': 'North'},
 'Australia': {'VIC': 'SouthEast',
  'WA': 'West',
  'NSW': 'East',
  'QLD': 'NorthEast'},
 'United Kingdom': {'ENG': 'England',
  'WLS': 'Wales',
  'NIR': 'Northern Ireland',
  'SCT': 'Scotland'},
 'United States': {'MA': 'NorthEast',
  'FL': 'South',
  'NJ': 'NorthEast',
  'CA': 'West',
  'NY': 'NorthEast',
  'TX': 'South'},
 'United Arab Emirates': {'AUH': 'Abu Dhabi', 'DU': 'DUbai', 'SHJ': 'Shajah'},
 'Singapore': {'SG': 'Singapore'},
 'Canada': {'BC': 'West',
  'AB': 'West',
  'ON': 'East',
  'QC': 'East',
  'NS': 'East',
  'IL': 'Other'}}

In [0]:
rows = []
for country, states in country_state_map.items():
    for state_code, region in states.items():
        rows.append(Row(country=country, state=state_code, region=region))
rows[:10]

[Row(country='India', state='MH', region='West'),
 Row(country='India', state='GJ', region='West'),
 Row(country='India', state='RJ', region='West'),
 Row(country='India', state='KA', region='South'),
 Row(country='India', state='TN', region='South'),
 Row(country='India', state='KL', region='South'),
 Row(country='India', state='AP', region='South'),
 Row(country='India', state='UP', region='North'),
 Row(country='India', state='WB', region='North'),
 Row(country='India', state='DL', region='North')]

In [0]:
df_region_mapping = spark.createDataFrame(rows)

df_region_mapping.show(truncate=False)

+--------------+-----+----------------+
|country       |state|region          |
+--------------+-----+----------------+
|India         |MH   |West            |
|India         |GJ   |West            |
|India         |RJ   |West            |
|India         |KA   |South           |
|India         |TN   |South           |
|India         |KL   |South           |
|India         |AP   |South           |
|India         |UP   |North           |
|India         |WB   |North           |
|India         |DL   |North           |
|Australia     |VIC  |SouthEast       |
|Australia     |WA   |West            |
|Australia     |NSW  |East            |
|Australia     |QLD  |NorthEast       |
|United Kingdom|ENG  |England         |
|United Kingdom|WLS  |Wales           |
|United Kingdom|NIR  |Northern Ireland|
|United Kingdom|SCT  |Scotland        |
|United States |MA   |NorthEast       |
|United States |FL   |South           |
+--------------+-----+----------------+
only showing top 20 rows


In [0]:
df_silver = spark.table(f"{catalog_name}.silver.slv_customers")
display(df_silver.limit(5))

customer_id,phone,country_code,country,state,file_name,ingest_timestamp
CUST000000000001,917280033536.0,IN,India,MH,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv,2026-01-19T12:36:56.777Z
CUST000000000002,619489725433.0,AU,Australia,VIC,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv,2026-01-19T12:36:56.777Z
CUST000000000003,919390066524.0,IN,India,TN,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv,2026-01-19T12:36:56.777Z
CUST000000000004,917073741793.0,IN,India,TN,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv,2026-01-19T12:36:56.777Z
CUST000000000005,618478772532.0,AU,Australia,WA,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv,2026-01-19T12:36:56.777Z


In [0]:
df_gold = df_silver.join(df_region_mapping, on=['country', 'state'], how='left')

df_gold = df_gold.fillna({'region': 'Other'})

display(df_gold.limit(5))

country,state,customer_id,phone,country_code,file_name,ingest_timestamp,region
India,MH,CUST000000000001,917280033536.0,IN,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv,2026-01-19T12:36:56.777Z,West
Australia,VIC,CUST000000000002,619489725433.0,AU,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv,2026-01-19T12:36:56.777Z,SouthEast
India,TN,CUST000000000003,919390066524.0,IN,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv,2026-01-19T12:36:56.777Z,South
India,TN,CUST000000000004,917073741793.0,IN,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv,2026-01-19T12:36:56.777Z,South
Australia,WA,CUST000000000005,618478772532.0,AU,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv,2026-01-19T12:36:56.777Z,West


In [0]:
columns_order = ["customer_id", "country", "country_code", "state", "region", "phone", "ingest_timestamp", "file_name"]
df_gold = df_gold.select(columns_order)

display(df_gold.limit(5))

customer_id,country,country_code,state,region,phone,ingest_timestamp,file_name
CUST000000000001,India,IN,MH,West,917280033536.0,2026-01-19T12:36:56.777Z,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv
CUST000000000002,Australia,AU,VIC,SouthEast,619489725433.0,2026-01-19T12:36:56.777Z,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv
CUST000000000003,India,IN,TN,South,919390066524.0,2026-01-19T12:36:56.777Z,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv
CUST000000000004,India,IN,TN,South,917073741793.0,2026-01-19T12:36:56.777Z,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv
CUST000000000005,Australia,AU,WA,West,618478772532.0,2026-01-19T12:36:56.777Z,dbfs:/Volumes/ecommerce/source_data/raw/customers/customers.csv


In [0]:
df_gold.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.gold.gld_dim_customers")

## Date/Calendar

In [0]:
df_silver = spark.table(f"{catalog_name}.silver.slv_calendar")
display(df_silver.limit(5))

date,year,day_name,quarter,week_of_year,_ingested_at,_source_file
2025-09-07,2025,Sunday,Q3-2025,Week-36-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv
2025-08-06,2025,Wednesday,Q3-2025,Week-32-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv
2025-09-23,2025,Tuesday,Q3-2025,Week-39-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv
2025-08-25,2025,Monday,Q3-2025,Week-35-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv
2025-10-16,2025,Thursday,Q4-2025,Week-42-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv


In [0]:
df_gold = df_silver.withColumn("date_id", F.date_format(F.col("date"), "yyyMMdd").cast("int"))

df_gold = df_gold.withColumn("month_name", F.date_format(F.col("date"), "MMMM"))

df_gold = df_gold.withColumn("is_weekend", F.when(F.col("day_name").isin("Saturday", "Sunday"), 1).otherwise(0))

display(df_gold.limit(5))

date,year,day_name,quarter,week_of_year,_ingested_at,_source_file,date_id,month_name,is_weekend
2025-09-07,2025,Sunday,Q3-2025,Week-36-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv,20250907,September,1
2025-08-06,2025,Wednesday,Q3-2025,Week-32-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv,20250806,August,0
2025-09-23,2025,Tuesday,Q3-2025,Week-39-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv,20250923,September,0
2025-08-25,2025,Monday,Q3-2025,Week-35-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv,20250825,August,0
2025-10-16,2025,Thursday,Q4-2025,Week-42-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv,20251016,October,0


In [0]:
columns_order = ["date_id", "date", "year", "month_name", "day_name", "week_of_year", "is_weekend", "quarter", "_ingested_at", "_source_file"]

df_gold = df_gold.select(columns_order)

display(df_gold.limit(5))

date_id,date,year,month_name,day_name,week_of_year,is_weekend,quarter,_ingested_at,_source_file
20250907,2025-09-07,2025,September,Sunday,Week-36-2025,1,Q3-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv
20250806,2025-08-06,2025,August,Wednesday,Week-32-2025,0,Q3-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv
20250923,2025-09-23,2025,September,Tuesday,Week-39-2025,0,Q3-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv
20250825,2025-08-25,2025,August,Monday,Week-35-2025,0,Q3-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv
20251016,2025-10-16,2025,October,Thursday,Week-42-2025,0,Q4-2025,2026-01-19T12:40:29.359Z,dbfs:/Volumes/ecommerce/source_data/raw/date/date.csv


In [0]:
df_gold.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.gold.gld_dim_date")