In [0]:
%sql
USE CATALOG ecommerce_catalog;
USE SCHEMA ecommerce_bronze;


Customers

In [0]:
CATALOG = "ecommerce_catalog"
SILVER_SCHEMA = "ecommerce_silver"
TABLE_NAME = "customers"

# Fully qualified table name
full_table_name = f"{CATALOG}.{SILVER_SCHEMA}.{TABLE_NAME}"

# Read the table
customers_df = spark.read.table(full_table_name)

# Get row count
customers_count = customers_df.count()

print(f"Total records in {full_table_name}: {customers_count}")


In [0]:
from pyspark.sql.functions import (
    col, row_number, current_timestamp, lit, trim,
    when, lower, regexp_replace, length, concat
)
from pyspark.sql.window import Window

# -----------------------------
# Configuration
# -----------------------------
CATALOG = "ecommerce_catalog"
BRONZE_SCHEMA = "ecommerce_bronze"
SILVER_SCHEMA = "ecommerce_silver"
TABLE_NAME = "customers"
BATCH_ID = "silver_customers_batch_001"

BRONZE_TABLE = f"{CATALOG}.{BRONZE_SCHEMA}.{TABLE_NAME}"
SILVER_TABLE = f"{CATALOG}.{SILVER_SCHEMA}.{TABLE_NAME}"

DEFAULT_COUNTRY_CODE = "91"
MIN_PHONE_LENGTH = 10
MAX_PHONE_LENGTH = 15

# -----------------------------
# Ensure catalog & Silver database exist
# -----------------------------
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {SILVER_SCHEMA}")

# -----------------------------
# Read Bronze table
# -----------------------------
df_bronze = spark.read.table(BRONZE_TABLE)

# -----------------------------
# Normalize empty strings to NULL
# -----------------------------
df = df_bronze.select(
    *[
        when(trim(col(c)) == "", None).otherwise(col(c)).alias(c)
        for c in df_bronze.columns
    ]
)

# -----------------------------
# Remove invalid primary keys
# -----------------------------
df = df.filter(col("customer_id").isNotNull())

# -----------------------------
# ✅ Correct Deduplication Logic
# -----------------------------
window_spec = Window.partitionBy("customer_id").orderBy(col("ingestion_ts").desc())

df = (
    df.withColumn("rn", row_number().over(window_spec))
      .filter(col("rn") == 1)
      .drop("rn")
)

# -----------------------------
# Standardize gender
# -----------------------------
df = df.withColumn(
    "gender",
    when(lower(col("gender")).isin("m", "male"), "Male")
    .when(lower(col("gender")).isin("f", "female"), "Female")
    .otherwise("Unknown")
)

# -----------------------------
# Handle city and country
# -----------------------------
df = (
    df.withColumn("city", when(col("city").isNull(), "Unknown").otherwise(col("city")))
      .withColumn("country", when(col("country").isNull(), "Unknown").otherwise(col("country")))
)

# -----------------------------
# Normalize telephone
# -----------------------------
df = (
    df.withColumn("telephone_clean", regexp_replace(col("telephone"), "[^0-9]", ""))
      .withColumn(
          "telephone",
          when(
              (length(col("telephone_clean")) >= MIN_PHONE_LENGTH) &
              (length(col("telephone_clean")) <= MAX_PHONE_LENGTH),
              when(
                  col("telephone_clean").startswith(DEFAULT_COUNTRY_CODE),
                  concat(lit("+"), col("telephone_clean"))
              ).otherwise(
                  concat(lit("+"), lit(DEFAULT_COUNTRY_CODE), col("telephone_clean"))
              )
          )
      )
      .drop("telephone_clean")
)

# -----------------------------
# Default job title
# -----------------------------
df = df.withColumn(
    "job_title",
    when(col("job_title").isNull(), "Unknown").otherwise(col("job_title"))
)

# -----------------------------
# Add audit columns
# -----------------------------
df = (
    df.withColumn("batch_id", lit(BATCH_ID))
      .withColumn("record_created_at", current_timestamp())
      .withColumn("record_updated_at", current_timestamp())
)

# -----------------------------
# Write to Silver table
# -----------------------------
df.write \
  .format("delta") \
  .mode("overwrite") \
  .saveAsTable(SILVER_TABLE)

# -----------------------------
# Verify no duplicates
# -----------------------------
display(
    spark.table(SILVER_TABLE)
         .groupBy("customer_id")
         .count()
         .filter(col("count") > 1)
)

display(df)

In [0]:
df.count()

DISCOUNTS

In [0]:
CATALOG = "ecommerce_catalog"
SILVER_SCHEMA = "ecommerce_silver"
TABLE_NAME = "discounts"

# Fully qualified table name
full_table_name = f"{CATALOG}.{SILVER_SCHEMA}.{TABLE_NAME}"

# Read table
discounts_df = spark.read.table(full_table_name)

# Get count
discounts_count = discounts_df.count()

print(f"Total records in {full_table_name}: {discounts_count}")


In [0]:
from pyspark.sql.functions import (
    col, trim, when, current_timestamp, lit, regexp_extract
)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# -----------------------------
# Configuration
# -----------------------------
CATALOG = "ecommerce_catalog"
BRONZE_SCHEMA = "ecommerce_bronze"
SILVER_SCHEMA = "ecommerce_silver"
TABLE_NAME = "discounts"
BATCH_ID = "silver_discounts_batch_001"

BRONZE_TABLE = f"{CATALOG}.{BRONZE_SCHEMA}.{TABLE_NAME}"
SILVER_TABLE = f"{CATALOG}.{SILVER_SCHEMA}.{TABLE_NAME}"

# -----------------------------
# Ensure catalog & Silver database exist
# -----------------------------
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {SILVER_SCHEMA}")

# -----------------------------
# Read Bronze table
# -----------------------------
df_bronze = spark.read.table(BRONZE_TABLE)

# -----------------------------
# Data Cleaning & Transformation
# -----------------------------
# Normalize empty strings to NULL
df = df_bronze.select(
    *[when(trim(col(c)) == "", None).otherwise(col(c)).alias(c) for c in df_bronze.columns]
)

# Rename discont → discount_value if exists
if "discont" in df.columns:
    df = df.withColumnRenamed("discont", "discount_value")

# Ensure start and end are timestamp
df = df.withColumn("start", col("start").cast("timestamp")) \
       .withColumn("end", col("end").cast("timestamp"))

# Parse discount_value from description if missing
df = df.withColumn(
    "discount_value",
    when(
        col("discount_value").isNull(),
        regexp_extract(col("description"), r"(\d+)%", 1).cast("double") / 100
    ).otherwise(col("discount_value"))
)

# Handle null description
df = df.withColumn(
    "description",
    when(col("description").isNull(), "No description provided").otherwise(col("description"))
)

# Deduplicate if discount_id exists
if "discount_id" in df.columns:
    df = df.withColumn("tech_ingest_ts", current_timestamp())
    window_spec = Window.partitionBy("discount_id").orderBy(col("tech_ingest_ts").desc())
    df = df.withColumn("rn", row_number().over(window_spec)) \
           .filter(col("rn") == 1) \
           .drop("rn", "tech_ingest_ts")

# Add audit columns
df = df.withColumn("batch_id", lit(BATCH_ID)) \
       .withColumn("record_created_at", current_timestamp()) \
       .withColumn("record_updated_at", current_timestamp())

# -----------------------------
# Write to Silver table
# -----------------------------
df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(SILVER_TABLE)

# -----------------------------
# Verify Silver table
# -----------------------------
display(spark.table(SILVER_TABLE))


In [0]:
df.count()

EMPLOYEES

In [0]:
CATALOG = "ecommerce_catalog"
SILVER_SCHEMA = "ecommerce_silver"
TABLE_NAME = "employees"

# Fully qualified table name
full_table_name = f"{CATALOG}.{SILVER_SCHEMA}.{TABLE_NAME}"

# Read table
employees_df = spark.read.table(full_table_name)

# Get row count
employees_count = employees_df.count()

print(f"Total records in {full_table_name}: {employees_count}")


In [0]:
from pyspark.sql.functions import (
    col, trim, when, current_timestamp, lit
)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# -----------------------------
# Configuration
# -----------------------------
CATALOG = "ecommerce_catalog"
BRONZE_SCHEMA = "ecommerce_bronze"
SILVER_SCHEMA = "ecommerce_silver"
TABLE_NAME = "employees"
BATCH_ID = "silver_employees_batch_001"

BRONZE_TABLE = f"{CATALOG}.{BRONZE_SCHEMA}.{TABLE_NAME}"
SILVER_TABLE = f"{CATALOG}.{SILVER_SCHEMA}.{TABLE_NAME}"

# -----------------------------
# Ensure catalog & Silver database exist
# -----------------------------
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {SILVER_SCHEMA}")

# -----------------------------
# Read Bronze table
# -----------------------------
df_bronze = spark.read.table(BRONZE_TABLE)

# -----------------------------
# Data Cleaning & Transformation
# -----------------------------
# Normalize empty strings to NULL
df = df_bronze.select(
    *[when(trim(col(c)) == "", None).otherwise(col(c)).alias(c) for c in df_bronze.columns]
)

# Remove rows with null primary key
df = df.filter(col("employee_id").isNotNull())

# Deduplicate by employee_id (keep latest ingestion)
df = df.withColumn("tech_ingest_ts", current_timestamp())
window_spec = Window.partitionBy("employee_id").orderBy(col("tech_ingest_ts").desc())
df = df.withColumn("rn", row_number().over(window_spec)) \
       .filter(col("rn") == 1) \
       .drop("rn", "tech_ingest_ts")

# Handle nulls for string columns
df = df.withColumn("name", when(col("name").isNull(), "Unknown").otherwise(col("name"))) \
       .withColumn("position", when(col("position").isNull(), "Unknown").otherwise(col("position"))) \
       .withColumn("source_file", when(col("source_file").isNull(), "Unknown").otherwise(col("source_file")))

# -----------------------------
# Add audit columns
# -----------------------------
df = df.withColumn("batch_id", lit(BATCH_ID)) \
       .withColumn("record_created_at", current_timestamp()) \
       .withColumn("record_updated_at", current_timestamp())

# -----------------------------
# Write to Silver table
# -----------------------------
df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(SILVER_TABLE)

# -----------------------------
# Verify Silver table
# -----------------------------
display(spark.table(SILVER_TABLE))


In [0]:
df.count()

PRODUCTS

In [0]:
CATALOG = "ecommerce_catalog"
SILVER_SCHEMA = "ecommerce_silver"
TABLE_NAME = "products"

# Fully qualified table name
full_table_name = f"{CATALOG}.{SILVER_SCHEMA}.{TABLE_NAME}"

# Read table
products_df = spark.read.table(full_table_name)

# Get row count
products_count = products_df.count()

print(f"Total records in {full_table_name}: {products_count}")


In [0]:
from pyspark.sql.functions import (
    col, trim, when, current_timestamp, lit
)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# -----------------------------
# Configuration
# -----------------------------
CATALOG = "ecommerce_catalog"
BRONZE_SCHEMA = "ecommerce_bronze"
SILVER_SCHEMA = "ecommerce_silver"
TABLE_NAME = "products"
BATCH_ID = "silver_products_batch_001"

BRONZE_TABLE = f"{CATALOG}.{BRONZE_SCHEMA}.{TABLE_NAME}"
SILVER_TABLE = f"{CATALOG}.{SILVER_SCHEMA}.{TABLE_NAME}"

# -----------------------------
# Ensure catalog & Silver database exist
# -----------------------------
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {SILVER_SCHEMA}")

# -----------------------------
# Read Bronze table
# -----------------------------
df_bronze = spark.read.table(BRONZE_TABLE)

# -----------------------------
# Data Cleaning & Transformation
# -----------------------------
# Normalize empty strings to NULL
df = df_bronze.select(
    *[when(trim(col(c)) == "", None).otherwise(col(c)).alias(c) for c in df_bronze.columns]
)

# Remove rows with null primary key
df = df.filter(col("product_id").isNotNull())

# Deduplicate by product_id (keep latest ingestion)
df = df.withColumn("tech_ingest_ts", current_timestamp())
window_spec = Window.partitionBy("product_id").orderBy(col("tech_ingest_ts").desc())
df = df.withColumn("rn", row_number().over(window_spec)) \
       .filter(col("rn") == 1) \
       .drop("rn", "tech_ingest_ts")

# Handle nulls / default values for string columns
string_columns = [
    "category", "sub_category", "description_pt", "description_de",
    "description_fr", "description_es", "description_en", "description_zh",
    "color", "sizes", "source_file"
]

for c in string_columns:
    df = df.withColumn(c, when(col(c).isNull(), "Unknown").otherwise(col(c)))

# Handle null production_cost
df = df.withColumn("production_cost", when(col("production_cost").isNull(), 0.0).otherwise(col("production_cost")))

# -----------------------------
# Add audit columns
# -----------------------------
df = df.withColumn("batch_id", lit(BATCH_ID)) \
       .withColumn("record_created_at", current_timestamp()) \
       .withColumn("record_updated_at", current_timestamp())

# -----------------------------
# Write to Silver table
# -----------------------------
df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(SILVER_TABLE)

# -----------------------------
# Verify Silver table
# -----------------------------
display(spark.table(SILVER_TABLE))


In [0]:
df.count()

STORES

In [0]:
CATALOG = "ecommerce_catalog"
SILVER_SCHEMA = "ecommerce_silver"
TABLE_NAME = "stores"

# Fully qualified table name
full_table_name = f"{CATALOG}.{SILVER_SCHEMA}.{TABLE_NAME}"

# Read table
stores_df = spark.read.table(full_table_name)

# Get row count
stores_count = stores_df.count()

print(f"Total records in {full_table_name}: {stores_count}")


In [0]:
from pyspark.sql.functions import (
    col, trim, when, current_timestamp, lit
)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# -----------------------------
# Configuration
# -----------------------------
CATALOG = "ecommerce_catalog"
BRONZE_SCHEMA = "ecommerce_bronze"
SILVER_SCHEMA = "ecommerce_silver"
TABLE_NAME = "stores"
BATCH_ID = "silver_stores_batch_001"

BRONZE_TABLE = f"{CATALOG}.{BRONZE_SCHEMA}.{TABLE_NAME}"
SILVER_TABLE = f"{CATALOG}.{SILVER_SCHEMA}.{TABLE_NAME}"

# -----------------------------
# Ensure catalog & Silver database exist
# -----------------------------
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {SILVER_SCHEMA}")

# -----------------------------
# Read Bronze table
# -----------------------------
df_bronze = spark.read.table(BRONZE_TABLE)

# -----------------------------
# Data Cleaning & Transformation
# -----------------------------
# Normalize empty strings to NULL
df = df_bronze.select(
    *[when(trim(col(c)) == "", None).otherwise(col(c)).alias(c) for c in df_bronze.columns]
)

# Remove rows with null primary key
df = df.filter(col("store_id").isNotNull())

# Deduplicate by store_id (keep latest ingestion)
df = df.withColumn("tech_ingest_ts", current_timestamp())
window_spec = Window.partitionBy("store_id").orderBy(col("tech_ingest_ts").desc())
df = df.withColumn("rn", row_number().over(window_spec)) \
       .filter(col("rn") == 1) \
       .drop("rn", "tech_ingest_ts")

# Handle nulls / default values for string columns
string_columns = ["country", "city", "store_name", "zip_code", "source_file"]
for c in string_columns:
    df = df.withColumn(c, when(col(c).isNull(), "Unknown").otherwise(col(c)))

# Handle nulls / default values for numeric columns
df = df.withColumn("number_of_employees", when(col("number_of_employees").isNull(), 0).otherwise(col("number_of_employees")))
df = df.withColumn("latitude", when(col("latitude").isNull(), 0.0).otherwise(col("latitude")))
df = df.withColumn("longitude", when(col("longitude").isNull(), 0.0).otherwise(col("longitude")))

# -----------------------------
# Add audit columns
# -----------------------------
df = df.withColumn("batch_id", lit(BATCH_ID)) \
       .withColumn("record_created_at", current_timestamp()) \
       .withColumn("record_updated_at", current_timestamp())

# -----------------------------
# Write to Silver table
# -----------------------------
df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(SILVER_TABLE)

# -----------------------------
# Verify Silver table
# -----------------------------
display(spark.table(SILVER_TABLE))


In [0]:
df.count()

TRANSACTION

In [0]:
CATALOG = "ecommerce_catalog"
SILVER_SCHEMA = "ecommerce_silver"
TABLE_NAME = "transactions"

# Fully qualified table name
full_table_name = f"{CATALOG}.{SILVER_SCHEMA}.{TABLE_NAME}"

# Read table
transactions_df = spark.read.table(full_table_name)

# Get row count
transactions_count = transactions_df.count()

print(f"Total records in {full_table_name}: {transactions_count}")


In [0]:
from pyspark.sql.functions import (
    col, trim, when, current_timestamp, lit
)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# -----------------------------
# Configuration
# -----------------------------
CATALOG = "ecommerce_catalog"
BRONZE_SCHEMA = "ecommerce_bronze"
SILVER_SCHEMA = "ecommerce_silver"
TABLE_NAME = "transactions"
BATCH_ID = "silver_transactions_batch_001"

BRONZE_TABLE = f"{CATALOG}.{BRONZE_SCHEMA}.{TABLE_NAME}"
SILVER_TABLE = f"{CATALOG}.{SILVER_SCHEMA}.{TABLE_NAME}"

# -----------------------------
# Ensure catalog & Silver database exist
# -----------------------------
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {SILVER_SCHEMA}")

# -----------------------------
# Read Bronze table
# -----------------------------
df_bronze = spark.read.table(BRONZE_TABLE)

# -----------------------------
# Data Cleaning & Transformation
# -----------------------------
# Normalize empty strings to NULL
df = df_bronze.select(
    *[when(trim(col(c)) == "", None).otherwise(col(c)).alias(c) for c in df_bronze.columns]
)

# Remove rows with null primary keys (invoice_id & line)
df = df.filter(col("invoice_id").isNotNull() & col("line").isNotNull())

# Deduplicate using composite key: invoice_id + line
df = df.withColumn("tech_ingest_ts", current_timestamp())
window_spec = Window.partitionBy("invoice_id", "line").orderBy(col("tech_ingest_ts").desc())
df = df.withColumn("rn", row_number().over(window_spec)) \
       .filter(col("rn") == 1) \
       .drop("rn", "tech_ingest_ts")

# Handle nulls / default values for string columns
string_columns = [
    "size", "color", "currency", "currency_symbol", "sku",
    "transaction_type", "payment_method"
]
for c in string_columns:
    df = df.withColumn(c, when(col(c).isNull(), "Unknown").otherwise(col(c)))

# Handle nulls / default values for numeric columns
numeric_columns = [
    "unit_price", "quantity", "discount", "line_total", 
    "customer_id", "product_id", "store_id", "employee_id", "invoice_total"
]
for c in numeric_columns:
    df = df.withColumn(c, when(col(c).isNull(), 0).otherwise(col(c)))

# Handle null date
df = df.withColumn("date", when(col("date").isNull(), current_timestamp()).otherwise(col("date")))

# -----------------------------
# Add audit columns
# -----------------------------
df = df.withColumn("batch_id", lit(BATCH_ID)) \
       .withColumn("record_created_at", current_timestamp()) \
       .withColumn("record_updated_at", current_timestamp())

# -----------------------------
# Write to Silver table
# -----------------------------
df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(SILVER_TABLE)

# -----------------------------
# Verify Silver table
# -----------------------------
display(spark.table(SILVER_TABLE))


In [0]:
df.count()

In [0]:
%sql
DESCRIBE TABLE ecommerce_catalog.ecommerce_silver.transactions;
