In [0]:
%sql
USE CATALOG ecommerce_catalog;
USE SCHEMA ecommerce_bronze;


Customers

In [0]:
Silver_customers = spark.read.table(
    "ecommerce_catalog.ecommerce_bronze.customers"
)
display(Silver_customers)


In [0]:
Silver_customers.count()


In [0]:
silver_customers_dedup = Silver_customers.dropDuplicates(["customer_id"])

display(silver_customers_dedup)

In [0]:
silver_customers_dedup.count()

In [0]:
from pyspark.sql.functions import (
    col,
    row_number,
    current_timestamp,
    lit,
    trim,
    when,
    lower,
    regexp_replace,
    length,
    concat
)
from pyspark.sql.window import Window

CATALOG = "ecommerce_catalog"
BRONZE_SCHEMA = "ecommerce_bronze"
TABLE_NAME = "customers"
BATCH_ID = "silver_customers_batch_001"

BRONZE_TABLE = f"{CATALOG}.{BRONZE_SCHEMA}.{TABLE_NAME}"

DEFAULT_COUNTRY_CODE = "91"
MIN_PHONE_LENGTH = 10
MAX_PHONE_LENGTH = 15

# Read Bronze
Silver_customers = spark.read.table(BRONZE_TABLE)

# Normalize empty strings to NULL
df = Silver_customers.select(
    *[
        when(trim(col(c)) == "", None).otherwise(col(c)).alias(c)
        for c in Silver_customers.columns
    ]
)

# Remove invalid primary key
df = df.filter(col("customer_id").isNotNull())

# Deduplicate by customer_id
df = df.withColumn("tech_ingest_ts", current_timestamp())

window_spec = Window.partitionBy("customer_id").orderBy(col("tech_ingest_ts").desc())

df = (
    df.withColumn("rn", row_number().over(window_spec))
      .filter(col("rn") == 1)
      .drop("rn", "tech_ingest_ts")
)

# Standardize gender
df = df.withColumn(
    "gender",
    when(lower(col("gender")).isin("m", "male"), "Male")
    .when(lower(col("gender")).isin("f", "female"), "Female")
    .otherwise("Unknown")
)

# Handle city and country
df = df.withColumn("city", when(col("city").isNull(), "Unknown").otherwise(col("city"))) \
       .withColumn("country", when(col("country").isNull(), "Unknown").otherwise(col("country")))

# Normalize telephone
df = df.withColumn(
    "telephone_clean",
    regexp_replace(col("telephone"), "[^0-9]", "")
).withColumn(
    "telephone",
    when(
        (length(col("telephone_clean")) >= MIN_PHONE_LENGTH) &
        (length(col("telephone_clean")) <= MAX_PHONE_LENGTH),
        when(
            col("telephone_clean").startswith(DEFAULT_COUNTRY_CODE),
            concat(lit("+"), col("telephone_clean"))
        ).otherwise(
            concat(lit("+"), lit(DEFAULT_COUNTRY_CODE), col("telephone_clean"))
        )
    )
).drop("telephone_clean")

# Default job title
df = df.withColumn(
    "job_title",
    when(col("job_title").isNull(), "Unknown").otherwise(col("job_title"))
)

# Add audit columns
df = df.withColumn("batch_id", lit(BATCH_ID)) \
       .withColumn("record_created_at", current_timestamp()) \
       .withColumn("record_updated_at", current_timestamp())

df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("ecommerce_silver.customers")
display(df)


DISCOUNTS

In [0]:
Silver_discounts = spark.read.table(
    "ecommerce_catalog.ecommerce_bronze.discounts"
)
display(Silver_discounts)


In [0]:
from pyspark.sql.functions import (
    col,
    trim,
    when,
    current_timestamp,
    lit,
    regexp_extract
)

# Configuration
CATALOG = "ecommerce_catalog"
BRONZE_SCHEMA = "ecommerce_bronze"
TABLE_NAME = "discounts"
BATCH_ID = "silver_discounts_batch_001"

BRONZE_TABLE = f"{CATALOG}.{BRONZE_SCHEMA}.{TABLE_NAME}"

# Read Bronze
Silver_discounts = spark.read.table(BRONZE_TABLE)

# Normalize empty strings to NULL
df = Silver_discounts.select(
    *[
        when(trim(col(c)) == "", None).otherwise(col(c)).alias(c)
        for c in Silver_discounts.columns
    ]
)

# Rename discont â†’ discount_value
df = df.withColumnRenamed("discont", "discount_value")

# Ensure start and end are timestamp
df = df.withColumn("start", col("start").cast("timestamp")) \
       .withColumn("end", col("end").cast("timestamp"))

# Parse discount_value from description if missing
df = df.withColumn(
    "discount_value",
    when(
        col("discount_value").isNull(),
        regexp_extract(col("description"), r"(\d+)%", 1).cast("double")/100
    ).otherwise(col("discount_value"))
)

# Handle null description
df = df.withColumn(
    "description",
    when(col("description").isNull(), "No description provided").otherwise(col("description"))
)

# Add audit columns
df = df.withColumn("batch_id", lit(BATCH_ID)) \
       .withColumn("record_created_at", current_timestamp()) \
       .withColumn("record_updated_at", current_timestamp())

display(df)
