# Init

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.functions import trim, col

# Reading From Bronze Table

In [0]:
df = spark.table("workspace.bronze.crm_cust_info")

# Data Transformations

## Deleting leading spaces - trimming the values

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

## Normalization - convert abbriviations into longer names

In [0]:
df = (
    df
    .withColumn(
        "cst_marital_status",
        F.when(F.upper(F.col("cst_marital_status")) == "S", "Single")
        .when(F.upper(F.col("cst_marital_status")) == "M", "Married")
        .otherwise("n/a")
    )
    .withColumn(
        "cst_gndr",
        F.when(F.upper(F.col("cst_gndr")) == "M", "Male")
        .when(F.upper(F.col("cst_gndr")) == "F", "Female")
        .otherwise("n/a")
        )
    )

In [0]:
df.display()

## Renaming Columns to a more readable ones

In [0]:
RENAME_MAP = {
    "cst_id": "customer_id",
    "cst_key": "customer_key",
    "cst_firstname": "first_name",
    "cst_lastname": "last_name",
    "cst_marital_status": "marital_status",
    "cst_gndr": "gender",
    "cst_create_date": "created_date"
}

In [0]:
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

## Finding and Handling Duplicates rows

In [0]:
total_count = df.count()
distinct_count = df.distinct().count()

print (f"duplicate rows: {total_count - distinct_count}")

## Checking datatyps, formats and missing values

In [0]:
df.printSchema()
print(df.dtypes)

In [0]:
missing_id = df.filter(F.col("customer_id").isNull())
missing_id.show()

In [0]:
df = df.filter(col("customer_id").isNotNull())

In [0]:
print(df.filter(col("customer_id").isNull()).count())

## Validate numeric values

In [0]:
invalid_id = df.filter(F.col("customer_id") <= 0)
outliers = df.filter((F.col("customer_id") > 999999999))

invalid_id.show()
outliers.show()

## Standarize business key ID's (customer_id) to ensure tables can be joined correctly

### already handled trimming, checking all keys are upper case and in same length

In [0]:
# Check if ANY row in the entire dataframe has lowercase letters
has_lowercase = df.filter(F.col("customer_key") != F.upper(F.col("customer_key"))).count()

# Check if ANY row has a length other than 10
has_wrong_len = df.filter(F.length(F.col("customer_key")) != 10).count()

print(f"Total rows with lowercase: {has_lowercase}")
print(f"Total rows with wrong length: {has_wrong_len}")

# sanity check before writing to silver

In [0]:
df.display()

# Writing to silver delta table

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_customers")

# sanity chack for silver table after writing

In [0]:
%sql
SELECT * FROM workspace.silver.crm_customers LIMIT 10