#Loading RAW data from bronze Layer (CRM)

## Init

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType

##Importing Customer Data

In [0]:
df=spark.table("workspace.bronze.crm_cust_info")
df.display()

###Cleaning up Customer data

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType,StringType):
        df=df.withColumn(field.name,trim(col(field.name)))
# df.display()

In [0]:
df=(
    df.withColumn("cst_firstname",expr("concat(coalesce(cst_firstname,''),' ',coalesce(cst_lastname,cst_firstname))"))
    .withColumn("cst_marital_status",expr("""
                                          case
                                          when UPPER(cst_marital_status)='S' then 'Single'
                                          when UPPER(cst_marital_status)='M' then 'Married'
                                          else 'n/a'
                                          end 
                                          """))
    .withColumn("cst_gndr",expr("""
                                  case 
                                  when UPPER(cst_gndr)='M' then 'Male'
                                  when UPPER(cst_gndr)='F' then 'Female'
                                  else 'n/a'
                                  end
                                  """))
    .withColumnRenamed("cst_firstname","cst_name")
    .withColumnRenamed("cst_gndr","cst_gender")
    .drop("cst_lastname")
)
# df.display()

### Removing Customer ids  

In [0]:
df=df.filter(col("cst_id").isNotNull())
# df.display()

In [0]:
RENAME_COLUMNS={
    "cst_id":"customer_id",
    "cst_key":"customer_key",
    "cst_name":"customer_name",
    "cst_marital_status":"customer_marital_status",
    "cst_gender":"customer_gender",
    "cst_create_date":"customer_create_date"
}
for old_name,new_name in RENAME_COLUMNS.items():
    df=df.withColumnRenamed(old_name,new_name)
df.display()

### Loading Customer Data

In [0]:
(
  df.write
  .format("delta")
  .mode("overwrite")
  .saveAsTable("workspace.silver.crm_cust_info")
)

##Importing Product Data

In [0]:
df=spark.table("workspace.bronze.crm_prd_info")
df.display()

### Data Cleaning

In [0]:
df=spark.sql("""
             SELECT
                t.prd_id,
                t.prd_key,
                t.prd_nm,
                t.prd_cost,
                t.prd_line,
                t.prd_start_dt,
                CASE 
                WHEN t.prd_end_dt < t.prd_start_dt then t.new_dt
                ELSE t.prd_end_dt
                END AS product_end_date
                FROM
                (SELECT 
                prd_id,
                prd_key,
                prd_nm,
                prd_cost,
                prd_line,
                CAST(prd_start_dt AS DATE),
                CAST(prd_end_dt AS DATE),
                LEAD(CAST(prd_start_dt AS DATE)) OVER ( PARTITION BY prd_key ORDER BY CAST(prd_start_dt AS DATE))-1 as new_dt
                FROM
                workspace.bronze.crm_prd_info)t
             """)
# df.show()


### Trimming empty spaces

In [0]:

for field in df.schema.fields:
    if isinstance(field.dataType,StringType):
        df=df.withColumn(field.name,trim(col(field.name)))
# df.display()

In [0]:
df=(
    df.withColumn("prd_line",expr("""
                                CASE 
                                WHEN UPPER(prd_line)='M' THEN 'Mountain'
                                WHEN UPPER(prd_line)='R' THEN 'Road'
                                WHEN UPPER(prd_line)='S' THEN 'Other Sales'
                                WHEN UPPER(prd_line)='T' THEN 'Trekking'
                                ELSE 'n/a'
                                END 
                                  """))
        .withColumn("prd_cat_id",expr("REPLACE(SUBSTRING(prd_key,1,5),'-','_')"))
        .withColumn("prd_key",expr("SUBSTRING(prd_key,7,LEN(prd_key))"))
        .withColumn("prd_cost",expr("CAST(COALESCE(prd_cost,0) AS INT)"))
)
# df.display()

###Renaming Columns

In [0]:
RENAME_COLUMNS={
    'prd_id':'product_id',
    'prd_key':'product_key',
    'prd_nm':'product_name',
    'prd_cost':'product_cost',
    'prd_line':'product_line',
    'prd_start_dt':'product_start_date',
    'product_end_date':'product_end_date',
    'prd_cat_id':'product_category_id'
}
for old_name,new_name in RENAME_COLUMNS.items():
    df=df.withColumnRenamed(old_name,new_name)
df.display()

###Loading Product Table

In [0]:
(
  df.write
  .mode("overwrite")
  .format("delta")
  .saveAsTable("workspace.silver.crm_prd_info")  
)

##Importing Sales Data

In [0]:
df=spark.table("workspace.bronze.crm_sales_details")
df.display()

### Cleaning Sales data

In [0]:
df=spark.sql("""
                SELECT
                sls_ord_num,
                sls_prd_key,
                sls_cust_id,
                CASE
                WHEN sls_order_dt=0 OR LEN(sls_order_dt)!=8 THEN NULL
                ELSE to_date(sls_order_dt,'yyyyMMdd')
                END AS sales_order_date,
                CASE
                WHEN sls_ship_dt=0 OR LEN(sls_ship_dt)!=8 THEN NULL
                ELSE to_date(sls_ship_dt,'yyyyMMdd')
                END AS sales_ship_date,
                CASE
                WHEN sls_due_dt=0 OR LEN(sls_due_dt)!=8 THEN NULL
                ELSE to_date(sls_due_dt,'yyyyMMdd')
                END AS sls_due_date,
                sls_sales,
                sls_quantity,
                sls_price
                FROM
                workspace.bronze.crm_sales_details
             """)

In [0]:
df=(
    df.withColumn("sls_cust_id",expr("CAST(sls_cust_id AS INT)"))
    .withColumn("sls_sales",expr("CAST(sls_sales AS INT)"))
    .withColumn("sls_quantity",expr("CAST(sls_quantity AS INT)"))
    .withColumn("sls_price",expr("""
                                 CASE 
                                 WHEN CAST(sls_sales AS INT) * CAST(sls_quantity AS INT) != CAST(sls_price AS INT) THEN CAST(sls_sales AS INT) * CAST(sls_quantity AS INT)
                                 ELSE CAST(ABS(sls_price) AS INT)
                                 END
                                 """))   
)
# df.display()

In [0]:
RENAME_COLUMNS={
    'sls_ord_num':'sales_order_number',
    'sls_prd_key':'sales_product_key',
    'sls_cust_id':'sales_customer_id',
    'sales_order_date':'sales_order_date',
    'sales_ship_date':'sales_ship_date',
    'sls_due_date':'sales_due_date',
    'sls_sales':'total_sales',
    'sls_quantity':'sales_quantity',
    'sls_price':'sales_price'
}

for old_name,new_name in RENAME_COLUMNS.items():
    df=df.withColumnRenamed(old_name,new_name)  
df.display()

### Loading Sales Data

In [0]:
(
  df.write
  .mode("overwrite")
  .format("delta")
  .saveAsTable("workspace.silver.crm_sales_details")
)

#Loading RAW data from bronze Layer (ERP)

##Importing erp_cust_az12 Data 

In [0]:
df=spark.table("workspace.bronze.erp_cust_az12")
df.show()

###Data Cleaning

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType,StringType):
        df=df.withColumn(field.name,trim(col(field.name )))
# df.display()


In [0]:
df=(
    df.withColumn("CID",expr("""
                             CASE WHEN CID LIKE 'NAS%' THEN SUBSTRING(CID,4,LEN(CID))
                             ELSE CID END
                             """))
    .withColumn("BDATE",expr("""
                            CASE WHEN YEAR(CAST(BDATE AS DATE)) > YEAR(GETDATE()) THEN NULL
                            ELSE CAST(BDATE AS DATE)
                            END
                             """))
    .withColumn("GEN",expr("""
                           CASE WHEN UPPER(GEN)='F' THEN 'Female'
                           WHEN UPPER(GEN)='M' THEN 'Male'
                           WHEN GEN='' THEN 'n/a'
                           ELSE GEN
                           END
                           """))
    .withColumnRenamed("CID","customer_id")
    .withColumnRenamed("BDATE","birth_date")
    .withColumnRenamed("GEN","gender")
)

df.display()

###Loading erp_cust_az12 Data

In [0]:
(
  df.write
  .format("delta")
  .mode("overwrite")
  .saveAsTable("silver.erp_cust_az12")
)

## Importing erp_loc_a101 Data

In [0]:
df=spark.table("workspace.bronze.erp_loc_a101")
df.display()

### Data Cleaning

In [0]:
for field in df.schema.fields:
  if isinstance(field.dataType,StringType):
    df=df.withColumn(field.name,trim(col(field.name )))
# df.display()

In [0]:
df=(
    df.withColumn("CNTRY",expr("""
                                CASE 
                                WHEN CNTRY = '' OR CNTRY IS NULL THEN 'n/a'
                                WHEN UPPER(CNTRY) = 'USA' OR UPPER(CNTRY)='US' THEN 'United States'
                                WHEN UPPER(CNTRY) = 'CAN' THEN 'Canada'
                                WHEN UPPER(CNTRY) = 'DE' THEN 'Germany'
                                WHEN UPPER(CNTRY) = 'FR' THEN 'France'
                                ELSE CNTRY
                                END
                               """))
    .withColumn("CID",expr("REPLACE(CID,'-','')"))
    .withColumnRenamed("CNTRY","country")
    .withColumnRenamed("CID","customer_id")
)
df.display()

### Loading erp_loc_a101 Data

In [0]:
(
  df.write
  .mode("overwrite")
  .format("delta")
  .saveAsTable("silver.erp_loc_az12")
)

## Importing erp_px_cat_g1v2 Data

In [0]:
df=spark.table("workspace.bronze.erp_px_cat_g1v2")
df.display()

### Data Cleaning

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType,StringType):
        df=df.withColumn(field.name,trim(col(field.name)))
# df.display()

In [0]:
RENAME_COLUMNS={
    'ID':'id',
    'CAT': 'category',
    'SUBCAT': 'sub_category',
    'MAINTENANCE': 'maintenance'
}
for old_name,new_name in RENAME_COLUMNS.items():
    df=df.withColumnRenamed(old_name,new_name)
df.display()

### Loading erp_px_cat_g1v2 Data

In [0]:
(
  df.write
  .mode("overwrite")
  .format("delta")
  .saveAsTable("silver.erp_px_cat_az12")
)