In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import col, trim
from pyspark.sql.window import Window

In [0]:
df=spark.table("workspace.bronze.crm_prd_info")

In [0]:
df.display()

## Triming

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

## Product Key Parsing

In [0]:
df = df.withColumn("cat_id", F.regexp_replace(F.substring(col("prd_key"), 1, 5), "-", "_"))
df = df.withColumn("prd_key", F.substring(col("prd_key"), 7, F.length(col("prd_key"))))

In [0]:
print(df.schema.fields)

In [0]:
display(df.select(F.length(col("prd_key")).alias("prd_key_length")))

In [0]:
df.display()

## Cost Cleanup

In [0]:
df=df.withColumn("prd_cost",F.coalesce(col("prd_cost"),F.lit(0)))
df.display()

## Product Line Normalization

In [0]:
df=(
    df.withColumn(
        "Prd_line",
        F.when(F.upper(col("prd_line"))=="M","Mountins")
        .when(F.upper(col("prd_line"))=="R","Road")
        .when(F.upper(col("prd_line"))=="T","Touring")
        .when(F.upper(col("prd_line"))=="S","Other sales")
        .otherwise("n/a")
    )
    )

## Renaming Colunms

In [0]:
RENAME_MAP={
    
    "prd_id": "product_id",
    "cat_id": "category_id",
    "prd_key": "product_number",
    "prd_nm": "product_name",
    "prd_cost": "product_cost",
    "prd_line": "product_line",
    "prd_start_dt": "start_date",
    "prd_end_dt": "end_date"

}


for old_name,new_name in RENAME_MAP.items():
    df=df.withColumnRenamed(old_name,new_name)
df.display()

In [0]:
display(df.select(F.sum(F.isnull(col("end_date")).cast("int")).alias("null_count_prd_end_dt")))

## treat null values for end date

In [0]:
import pyspark.pandas as ps

psdf = df.toPandas()
psdf["end_date"] = psdf["end_date"].ffill()
df = spark.createDataFrame(psdf)
display(df)

In [0]:
import pyspark.pandas as ps

psdf = df.toPandas()
psdf["end_date"] = psdf["end_date"].ffill().bfill()
df = spark.createDataFrame(psdf)
display(df)

In [0]:
display(
    df.select(
        F.min(col("end_date")).alias("min_end_date"),
        F.max(col("end_date")).alias("max_end_date")
    )
)

## Writing To Silver Table

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_products")

In [0]:
%sql
SELECT * FROM workspace.silver.crm_products LIMIT 10