#Read dataset

In [0]:
df = spark.table("workspace.bronze.crm_sales")
df.display()

#Transformation

##remove unwanted spaces

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.functions import trim
from pyspark.sql.functions import col,StringType,length

In [0]:
for field in df.schema.fields:
  if isinstance(field.dataType, StringType):
    df.withColumn(field.name, trim(col(field.name)))


## cleaning dates

In [0]:
df = (
    df
    .withColumn(
        "sls_order_dt",
        F.when(
            (col("sls_order_dt") == 0) | (length(col("sls_order_dt")) != 8),
            None
        ).otherwise(F.to_date(col("sls_order_dt").cast("string"), "yyyyMMdd"))
    )
    .withColumn(
        "sls_ship_dt",
        F.when(
            (col("sls_ship_dt") == 0) | (length(col("sls_ship_dt")) != 8),
            None
        ).otherwise(F.to_date(col("sls_ship_dt").cast("string"), "yyyyMMdd"))
    )
    .withColumn(
        "sls_due_dt",
        F.when(
            (col("sls_due_dt") == 0) | (length(col("sls_due_dt")) != 8),
            None
        ).otherwise(F.to_date(col("sls_due_dt").cast("string"), "yyyyMMdd"))
    )
)

## sales and price corrections


In [0]:

df = (
    df
    .withColumn(
        "sls_price",
        F.when(
            (col("sls_price").isNull()) | (col("sls_price") <= 0),
            F.when(
                col("sls_quantity") != 0,
                col("sls_sales") / col("sls_quantity")
            ).otherwise(None)
        ).otherwise(col("sls_price"))
    )
)
df.display()

##renaming columns

In [0]:
df = df.withColumnRenamed("sls_ord_num", "order_number")\
        .withColumnRenamed("sls_prd_key", "product_number")\
        .withColumnRenamed("sls_cust_id", "customer_id")\
        .withColumnRenamed("sls_order_dt", "order_date")\
        .withColumnRenamed("sls_ship_dt", "ship_date")\
        .withColumnRenamed("sls_due_dt", "due_date")\
        .withColumnRenamed("sls_sales", "sales_amount")\
        .withColumnRenamed("sls_quantity", "quantity")\
        .withColumnRenamed("sls_price", "price")
df.display()


#Writing silver Table

In [0]:
df.write.mode("overwrite").saveAsTable("workspace.silver.crm_sales")

##checks of silver table

In [0]:
%sql
select* from workspace.silver.crm_sales