In [0]:
import pyspark.sql.functions as F 
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import trim, col 

### Data Quality Check Funcs

In [0]:
def check_nulls(df, cols):
    for c in cols:
        null_count = df.filter(F.col(c).isNull()).count()
        if null_count > 0:
            print(f"{c} has {null_count} NULLs")

def check_duplicates(df, pk):
    dup_count = df.groupBy(pk).count().filter("count > 1").count()
    if dup_count > 0:
        print(f"{dup_count} duplicate {pk} found")

def check_row_count(df, table_name):
    count = df.count()
    print(f"{table_name} row count: {count}")

# Read Bronze Table

In [0]:
df = spark.table("workspace.bronze.erp_loc_a101_raw")

In [0]:
df.display()

# Silver Transformations

## Trimming

In [0]:
for field in df.schema.fields: 
    if isinstance(field.dataType, StringType): 
        df = df.withColumn(field.name, trim(col(field.name)))

## Customer ID Cleanup

In [0]:
df = df.withColumn("cid", F.regexp_replace(col("cid"), "-", ""))

## Country Normalization

In [0]:
df = df.withColumn(
    "cntry", 
    F.when(col("cntry") == "DE", "Germany") 
    .when(col("cntry").isin("US", "USA"), "United States")
    .when((col("cntry") == "") | col("cntry").isNull(), "n/a")
    .otherwise(col("cntry"))
)

## Renaming Columns

In [0]:
RENAME_MAP = {
    "cid": "customer_number",
    "cntry": "country"
}

for old_name, new_name in RENAME_MAP.items(): 
    df = df.withColumnRenamed(old_name, new_name)

### Sanity Check

In [0]:
df.limit(10).display()

### DQ Check

In [0]:
check_row_count(df, "workspace.bronze.erp_loc_a101_raw")

check_nulls(df, ["customer_number"])

check_duplicates(df, "customer_number")

invalid_country = df.filter(F.col("country") == "n/a").count()
if invalid_country > 0:
    print(f"{invalid_country} missing country values")

# Writing Silver Table

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.erp_customer_location")

## Sanity Checks of Silver Table

In [0]:
%sql 
SELECT * FROM workspace.silver.erp_customer_location LIMIT 10