In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
catalog_name ='ecommerce'

In [0]:
df = spark.table(f'{catalog_name}.bronze.brz_order_items')
display(df.limit(10))

In [0]:
# Chekcs for negative or null quantities
df_filtered = df.filter(F.expr("try_cast(quantity AS BIGINT) <= 0"))
display(df_filtered)

In [0]:
# We need to convert two to 2
display(df.select('quantity').distinct())


In [0]:
# Checks for null values 
nulls_df = df.select(
    [
        F.sum(
            F.when(F.col(c).isNull(),1)
            .otherwise(0)
        )
        .alias(f'{c}_null') for c in df.columns
    ]
)
display(nulls_df)

# The only column having null values is coupon_code


In [0]:
# Drop any duplicates
df = df.dropDuplicates(["order_id", "item_seq"])

# Convert 'Two' → 2 and cast to Integer
df = df.withColumn(
    "quantity",
    F.when(F.col("quantity") == "Two", 2).otherwise(F.col("quantity")).cast("int")
)

# Remove any '$' or other symbols from unit_price, keep only numeric
df = df.withColumn(
    "unit_price",
    F.regexp_replace("unit_price", "[$, ]", "").cast("double")
)

# Remove '%' from discount_pct and cast to double
df = df.withColumn(
    "discount_pct",
    F.regexp_replace("discount_pct", "%", "").cast("double")
)

# Coupon code processing (convert to lower)
df = df.withColumn(
    "coupon_code", F.lower(F.trim(F.col("coupon_code")))
)

# Channel processing 
df = df.withColumn(
    "channel",
    F.when(F.col("channel") == "web", "Website")
    .when(F.col("channel") == "app", "Mobile")
    .otherwise(F.col("channel")),
)

In [0]:
display(df.limit(10))

In [0]:

# Transformation: datatype conversions
# 1) Convert dt (string → date)
df = df.withColumn(
    "dt",
    F.to_date("dt", "yyyy-MM-dd")     
).withColumnRenamed('dt','date')

# 2) Convert order_ts (string → timestamp)
df = df.withColumn(
    "order_ts",
    F.coalesce(
        F.to_timestamp("order_ts", "yyyy-MM-dd HH:mm:ss"),  # matches 2025-08-01 22:53:52
        F.to_timestamp("order_ts", "dd-MM-yyyy HH:mm")      # fallback for 01-08-2025 22:53
    )
)


# 3) Convert item_seq (string → integer)
df = df.withColumn(
    "item_seq",
    F.col("item_seq").cast("int")
)

# 4) Convert tax_amount (string → double, strip non-numeric characters)
df = df.withColumn(
    "tax_amount",
    F.regexp_replace("tax_amount", r"[^0-9.\-]", "").cast("double")
)


# Transformation : Add processed time 
df = df.withColumn(
    "processed_time", F.current_timestamp()
)

In [0]:
# The distinct unit price currency
display(df.select('unit_price_currency').distinct())

In [0]:
display(df.limit(10))

In [0]:
df.printSchema()

In [0]:
# --- Schema Review ---------------------------------------------------------
# At this stage, the dataset contains several nullable columns.
# Not all of them should accept NULL values in the Silver layer.
#
# Business-critical columns (order_id, product_id, customer_id, quantity, etc.)
# must be NOT NULL to guarantee data quality and reliable analytics.
#
# Optional fields (coupon_code, channel, currency, etc.) can remain nullable.
#
# Before enforcing a stricter schema, we first:
#   1) Identify columns with NULL values we have only the coupon_codes col
#   2) Fix or quarantine invalid rows
#   3) Rebuild the DataFrame with an updated schema 


new_schema = StructType([
    StructField("date", DateType(), False),
    StructField("order_ts", TimestampType(), False),
    StructField("customer_id", StringType(), False),
    StructField("order_id", StringType(), False),
    StructField("item_seq", IntegerType(), True),
    StructField("product_id", StringType(), False),
    StructField("quantity", IntegerType(), False),
    StructField("unit_price_currency", StringType(), True),
    StructField("unit_price", DoubleType(), False),
    StructField("discount_pct", DoubleType(), True),
    StructField("tax_amount", DoubleType(), True),
    StructField("channel", StringType(), True),
    StructField("coupon_code", StringType(), True),
    StructField("file_name", StringType(), True),
    StructField("ingest_timestamp", TimestampType(), True),
    StructField("processed_time", TimestampType(), False),
])

def enforce_schema(batch_iter):
    for pdf in batch_iter:
        yield pdf # Return the lines as they are

df_final = df.mapInPandas(enforce_schema, new_schema)
df_final.printSchema()

In [0]:
df = spark.table(f"{catalog_name}.silver.slv_order_items")

display(df.limit(10))

In [0]:
# Write raw data to the silver layer (catalog: ecommerce, schema: silver, table: slv_brands)
df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.silver.slv_order_items")