# BRONZE LAYER

In [0]:
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime, date

In [0]:
df_cust_info = spark.read.format('csv')\
               .option('inferSchema', True)\
               .option('multiline', False)\
               .option('header', True)\
               .load('/Volumes/sales_crm_erp/source_data/crm/cust_info.csv')
display(df_cust_info)

In [0]:
df_prd_info = spark.read.format('csv')\
               .option('inferSchema', True)\
               .option('multiline', False)\
               .option('header', True)\
               .load('/Volumes/sales_crm_erp/source_data/crm/prd_info.csv')
display(df_prd_info)

In [0]:
df_sales_details = spark.read.format('csv')\
               .option('inferSchema', True)\
               .option('multiline', False)\
               .option('header', True)\
               .load('/Volumes/sales_crm_erp/source_data/crm/sales_details.csv')
display(df_sales_details)

# SILVER LAYER

## crm_cust_info

#### 1. Check for Null or Duplicate in Primary Key

In [0]:
# Run Before and after cleaning null and duplicate PK
df = df_cust_info.groupBy(F.col('cst_id'))\
    .agg(F.count('*').alias('PK_Count'))\
    .filter((F.col('PK_Count') > 1) | (F.col('PK_Count').isNull()))
display(df)

In [0]:
# Drop the Null Primary Key("cst_id")
df_cust_info = df_cust_info.filter(F.col('cst_id').isNotNull())

window_spec = Window.partitionBy("cst_id").orderBy(F.col("cst_create_date").desc())

# Row Number
df_cust_info = df_cust_info.withColumn("flag_last", F.row_number().over(window_spec))\
                .filter(F.col("flag_last") == 1)\
                .drop("flag_last")

display(df_cust_info)

#### 2. Check for Unwanted Space

In [0]:
display(df_cust_info.select(F.col('cst_firstname')).filter(F.col('cst_firstname') != F.trim(F.col('cst_firstname'))).count())

In [0]:
def has_unwanted_space(col):
    return (
        col != F.trim(col)
    )

cols_to_check = ['cst_id', 'cst_key', 'cst_firstname', 'cst_lastname', 'cst_gndr', 'cst_marital_status']

col_exprs = [
    F.sum(F.when(has_unwanted_space(F.col(col)), 1).otherwise(0)).alias(col)
    for col in cols_to_check
]

display(df_cust_info.select(col_exprs))

In [0]:
df_cust_info = df_cust_info.withColumn('cst_firstname', F.trim(F.col('cst_firstname')))\
                .withColumn('cst_lastname', F.trim(F.col('cst_lastname')))
display(df_cust_info)

In [0]:
cols_to_trim = ['cst_firstname', 'cst_lastname']

df_cust_info = df_cust_info.select(
    *[
        F.trim(F.col(c)).alias(c) if c in cols_to_trim else F.col(c)
        for c in df_cust_info.columns
    ]
)

In [0]:
display(df_cust_info)

#### 3. Data Standardization & Consistency
we aim to store clear and meaningfull values, rather than using abbreviated terms

In [0]:
display(df_cust_info.select(F.col("cst_gndr")).distinct())
display(df_cust_info.select(F.col("cst_marital_status")).distinct())

In [0]:
display(df_cust_info.select(
    F.collect_set(F.col("cst_gndr")).alias("cst_gndr"),
    F.sum(F.when(F.col("cst_gndr").isNull(), 1).otherwise(0)).alias('cst_gndr_null'),
    F.collect_set(F.col("cst_marital_status")).alias("cst_marital_status"),
    F.sum(F.when(F.col("cst_marital_status").isNull(), 1).otherwise(0)).alias("cst_marital_status_nulls")
))

In [0]:
df_cust_info = df_cust_info.select(
    *[c for c in df_cust_info.columns if c not in ['cst_gndr', 'cst_marital_status']],
    F.when(F.upper(F.trim(F.col("cst_gndr"))) == 'M', 'Male')\
     .when(F.upper(F.trim(F.col("cst_gndr"))) == 'F', 'Female')\
     .otherwise('n/a').alias('cst_gndr'),
    F.when(F.upper(F.trim(F.col("cst_marital_status"))) == 'M', 'Married')\
     .when(F.upper(F.trim(F.col("cst_marital_status"))) == 'S', 'Single')\
     .otherwise('n/a').alias('cst_marital_status'),
)

In [0]:
display(df_cust_info)

### Final Code

In [0]:
# 1. Remove Null Primary Key("cst_id")
# df = df_cust_info.filter(F.col('cst_id').isNotNull())

# # 2. Deduplicate PK
# window_spec = Window.partitionBy("cst_id").orderBy(F.col("cst_create_date").desc())

# df = df.withColumn("rn", F.row_number().over(window_spec))\
#                 .filter(F.col("rn") == 1)\
#                 .drop("rn")

# ========================================

# cols_to_trim = ['cst_firstname', 'cst_lastname']

# df_cust_info = df_cust_info.select(
#     *[
#         F.trim(F.col(c)).alias(c) if c in cols_to_trim else F.col(c)
#         for c in df_cust_info.columns
#     ]
# )

# ==========================================
# df_cust_info = df_cust_info.select(
#     *[c for c in df_cust_info.columns if c not in ['cst_gndr', 'cst_marital_status']],
#     F.when(F.upper(F.trim(F.col("cst_gndr"))) == 'M', 'Male')\
#      .when(F.upper(F.trim(F.col("cst_gndr"))) == 'F', 'Female')\
#      .otherwise('n/a').alias('cst_gndr'),
#     F.when(F.upper(F.trim(F.col("cst_marital_status"))) == 'M', 'Married')\
#      .when(F.upper(F.trim(F.col("cst_marital_status"))) == 'S', 'Single')\
#      .otherwise('n/a').alias('cst_marital_status'),
# )

# 3. Data Standardization & Consistency
window_spec = Window.partitionBy("cst_id").orderBy(F.col("cst_create_date").desc())
df_cust_info = df_cust_info.\
    filter(F.col('cst_id').isNotNull()).\
    withColumn("rn", F.row_number().over(window_spec))\
                .filter(F.col("rn") == 1)\
                .drop("rn").\
    select(
        *[F.col(c) for c in df_cust_info.columns if c not in ["cst_firstname", "cst_lastname", "cst_gndr", "cst_marital_status"]],
        *[F.trim(F.col(c)).alias(c) for c in ["cst_firstname", "cst_lastname"]],
        F.when(F.upper(F.trim(F.col("cst_gndr"))) == 'M', 'Male')\
            .when(F.upper(F.trim(F.col("cst_gndr"))) == 'F', 'Female')\
            .otherwise('n/a').alias('cst_gndr'),
        F.when(F.upper(F.trim(F.col("cst_marital_status"))) == 'M', 'Married')\
            .when(F.upper(F.trim(F.col("cst_marital_status"))) == 'S', 'Single')\
            .otherwise('n/a').alias('cst_marital_status'),
    )

In [0]:
# df_cust_info = df
display(df_cust_info)

## crm_prd_info

#### 1. Check for Null or Duplicate in Primary Key

In [0]:
# Run Before and after cleaning null and duplicate PK
# No Result, No Need to filter it
df_check = df_prd_info.groupBy(F.col('prd_id'))\
    .agg(F.count('*').alias('PK_Count'))\
    .filter((F.col('PK_Count') > 1) | (F.col('PK_Count').isNull()))
display(df_check)

####2. Derive two columns: cat_id and prd_key by splitting the prd_key column

In [0]:
df_staging = df_prd_info.select(
    *[F.col(c) for c in df_prd_info.columns if c not in ['prd_key']],
    F.regexp_replace(F.substring(F.col('prd_key'), 1, 5), '-', '_').alias('cat_id'),
    F.substring(F.col('prd_key'), 7, F.length(F.col('prd_key'))).alias('prd_key')
)

display(df_staging)

####3. Check for Unwanted Space in prd_nm column

In [0]:
# function to check for unwanted space
def has_unwanted_space(col):
    return (
        col != F.trim(col)
    )

In [0]:
# Check if some columns have unwanted space
# No Result, No Need to trim it
cols_to_check = ['prd_nm', 'prd_key'] 

col_exprs = [
    F.sum(F.when(has_unwanted_space(F.col(col)), 1).otherwise(0)).alias(col)
    for col in cols_to_check
]

df_check = df_staging.select(col_exprs)

display(df_check) 

####4. Check for Nulls or Negative Values in Cost

In [0]:
# We dont want to save null values in cost
# There are 2 rows with null value
df_check = df_staging.select("*")\
     .where((F.col("prd_cost") < 0) | (F.col("prd_cost").isNull()))
display(df_check)

In [0]:
# Another Way to check for null
df_check = df_staging.select(
    F.sum(F.when(F.col('prd_cost').isNull(), 1).otherwise(0)).alias('prd_cost_null')
)
display(df_check)

In [0]:
# Change the null cost to 0
df_staging = df_staging.withColumn("prd_cost", F.coalesce(F.col("prd_cost"), F.lit(0)))
display(df_staging)

In [0]:
# Check for null
df_check = df_staging.select(
    F.sum(F.when(F.col('prd_cost').isNull(), 1).otherwise(0)).alias('prd_cost_null')
)

display(df_check)

####5. Data Standardization and Consistency

In [0]:
# Search the distinc value
df_check = df_staging.select(F.col('prd_line')).distinct()
display(df_check)

In [0]:
# we aim to store clear and meaningfull values, rather than using abbreviated terms
df_staging = df_staging.select(
               #  *[c for c in df_cust_info.columns if c not in ['cst_gndr', 'cst_marital_status']],
                *[c for c in df_prd_info.columns if c not in ['prd_line']], 
                F.when(F.upper(F.trim(F.col('prd_line'))) == 'M', 'Mountain')\
                 .when(F.upper(F.trim(F.col("prd_line"))) == 'R', 'Road')\
                 .when(F.upper(F.trim(F.col("prd_line"))) == 'S', 'Other Sales')\
                 .when(F.upper(F.trim(F.col("prd_line"))) == 'T', 'Touring')\
                 .otherwise(F.lit('n/a')).alias('prd_line')
            )
display(df_staging)

In [0]:
df = df_staging.select(F.col("prd_line")).distinct()
display(df)

####6. Check for invalid order date

In [0]:
# Check if the value (Start Date > End Date)
df_check = df_staging.select("*").where(F.col("prd_start_dt") > F.col("prd_end_dt"))
display(df_check)

In [0]:
window_spec = Window.partitionBy('prd_key').orderBy(F.col('prd_start_dt'))

df_staging = df_staging\
             .withColumn("prd_end_dt", F.lead("prd_start_dt", 1).over(window_spec))\
             .withColumn(
                 "prd_end_dt", 
                 F.when(
                     F.col('prd_end_dt').isNotNull(), 
                     F.date_sub(F.col('prd_end_dt'), 1)
                 ).otherwise(F.lit("9999-12-31")))
display(df_staging)

In [0]:

window_spec = Window.partitionBy("prd_key").orderBy("prd_start_dt")

df = df.withColumn(
    "fix_prd_end_dt",
    F.lead("prd_start_dt", 1).over(window_spec)
)

display(df)

In [0]:
# df_prd_info = df_staging
display(df_prd_info)

####Final Code

In [0]:
window_spec = Window.partitionBy('prd_key').orderBy(F.col('prd_start_dt'))

# df = df_prd_info.withColumn("prd_end_dt", F.lead("prd_start_dt", 1).over(window_spec))

df_prd_info = df_prd_info.withColumn("prd_end_dt", F.lead("prd_start_dt", 1).over(window_spec))\
  .select(
  *[c for c in df_prd_info.columns if c not in ['prd_end_dt', 'prd_key', 'prd_cost', 'prd_line', 'prd_end_dt']],
  F.regexp_replace(F.substring(F.col('prd_key'), 1, 5), '-', '_').alias('cat_id'),
  F.substring(F.col('prd_key'), 7, F.length(F.col('prd_key'))).alias('prd_key'),
  F.coalesce(F.col('prd_cost'), F.lit(0)).alias('prd_cost'),
  F.when(F.upper(F.trim(F.col('prd_line'))) == 'M', 'Mountain')\
                 .when(F.upper(F.trim(F.col("prd_line"))) == 'R', 'Road')\
                 .when(F.upper(F.trim(F.col("prd_line"))) == 'S', 'Other Sales')\
                 .when(F.upper(F.trim(F.col("prd_line"))) == 'T', 'Touring')\
                 .otherwise(F.lit('n/a')).alias('prd_line'),
  F.when(F.col('prd_end_dt').isNotNull(), F.date_sub(F.col('prd_end_dt'), 1)).otherwise(F.lit("9999-12-31")).alias('prd_end_dt')
)

display(df_prd_info)

## crm_sales_details

###1. Check for Unwanted Space in sls_ord_num column

In [0]:
# function to check for unwanted space
def has_unwanted_space(col):
    return (
        col != F.trim(col)
    )

In [0]:
# Check if string value columns: sls_ord_num and sls_prd_key column have unwanted space
# No Result, No Need to trim them
cols_to_check = ['sls_ord_num', 'sls_prd_key'] 
col_exprs = [
    F.sum(F.when(has_unwanted_space(F.col(col)), 1).otherwise(0)).alias(col)
    for col in cols_to_check
]

df_check = df_sales_details.select(col_exprs)
display(df_check) 

###2. Check Relation with crm_cust_info

In [0]:
# Check is there any unmatched cust_id in sales_details
df_cust_id = df_cust_info.select(F.col('cst_id'))
df_check = df_sales_details.select("*").where(~F.col("sls_cust_id").isin(df_cust_id))
display(df_check)

###3. Check Relation with crm_prd_info

In [0]:
# Check is there any unmatched prd_key in sales_details
df_prd_key = df_prd_info.select(F.col('prd_key'))
df_check = df_sales_details.select("*").where(~F.col("sls_prd_key").isin(df_prd_key))
display(df_check)

###4. Check for invalid dates

In [0]:
df_check = df_sales_details.select("*").where(F.col('sls_order_dt') <= 0)
display(df_check)

In [0]:
# 
df_trial = df_sales_details.select(
    *[c for c in df_sales_details.columns if c not in ["sls_order_dt", "sls_ship_dt", "sls_due_dt"]],
    F.when(F.col('sls_order_dt') == 0, None).otherwise(F.to_date(F.col('sls_order_dt').cast("string"), "yyyyMMdd")).alias('sls_order_dt'),
    F.when(F.col('sls_ship_dt') == 0, None).otherwise(F.to_date(F.col('sls_ship_dt').cast("string"), "yyyyMMdd")).alias('sls_ship_dt'),
    F.when(F.col('sls_due_dt') == 0, None).otherwise(F.to_date(F.col('sls_due_dt').cast("string"), "yyyyMMdd")).alias('sls_due_dt')
)
display(df_trial)

In [0]:
# Change the order date()
df_staging = df_sales_details.select(
    *[c for c in df_sales_details.columns if c not in ["sls_order_dt", "sls_ship_dt", "sls_due_dt"]],
    F.try_to_date(F.col('sls_order_dt').cast("string"), "yyyyMMdd").alias('sls_order_dt'),
    F.try_to_date(F.col('sls_ship_dt').cast("string"), "yyyyMMdd").alias('sls_ship_dt'),
    F.try_to_date(F.col('sls_due_dt').cast("string"), "yyyyMMdd").alias('sls_due_dt')
)
display(df_staging)

In [0]:
df_check = df_staging.select("*").where(F.col('sls_order_dt').isNull() | F.col('sls_ship_dt').isNull() | F.col('sls_due_dt').isNull())

display(df_check)

###5. Check Data Consistency between Sales, Quantity, and Price

In [0]:
df_check = df_staging.where(F.col('sls_sales') != F.col('sls_quantity') * F.col('sls_price') | F.col('sls_sales').isNull() | F.col('sls_quantity').isNull() | F.col('sls_price').isNull() | F.col('sls_sales') <= 0 | F.col('sls_quantity') <= 0 | F.col('sls_price') <= 0)
display(df_check)

In [0]:
# Sales = Quantity * Price
# Value must not be null, zero, or negatif
df_check = df_staging.where(
    (F.col('sls_sales') != F.col('sls_quantity') * F.col('sls_price'))
    | F.col('sls_sales').isNull()
    | F.col('sls_quantity').isNull()
    | F.col('sls_price').isNull()
    | (F.col('sls_sales') <= 0)
    | (F.col('sls_quantity') <= 0)
    | (F.col('sls_price') <= 0)
)

display(df_check)

In [0]:
# 1. If Sales is negative, zero, or null, derive it using Quantity and Price
# 2. If Price is zero or null, calculate it using Sales and QUantity
# 3. if Price is negative, convert it to a positive value
# 4. The Quantity is fine
df_check = df_sales_details.select(
    *[c for c in df_sales_details.columns if c not in ["sls_sales", "sls_price"]],
    F.when(F.col('sls_sales').isNull() | (F.col('sls_sales') <= 0) | (F.col('sls_sales') != F.col('sls_quantity') * F.col('sls_price')), F.col('sls_quantity') * F.abs(F.col('sls_price')))
    .otherwise(F.col('sls_sales')).alias('sls_sales'),
    F.when(F.col('sls_price').isNull() | (F.col('sls_price') <= 0), F.col('sls_sales') / F.col('sls_quantity'))
    .otherwise(F.abs(F.col('sls_price'))).alias('sls_price')
)
display(df_check)

In [0]:
df_check = df_check.where((F.col('sls_sales') != F.col('sls_quantity') * F.col('sls_price')))
display(df_check)

In [0]:
df_check = df_staging.where(F.col('sls_sales') != F.col('sls_quantity') * F.col('sls_price') | F.col('sls_sales').isNull() | F.col('sls_quantity').isNull() | F.col('sls_price').isNull() | F.col('sls_sales') <= 0 | F.col('sls_quantity') <= 0 | F.col('sls_price') <= 0)
display(df_check)

###Final Code

In [0]:
df_sales_details = df_sales_details.select(
    *[c for c in df_sales_details.columns if c not in ["sls_order_dt", "sls_ship_dt", "sls_due_dt", "sls_sales", "sls_price"]],
    F.try_to_date(F.col('sls_order_dt').cast("string"), "yyyyMMdd").alias('sls_order_dt'),
    F.try_to_date(F.col('sls_ship_dt').cast("string"), "yyyyMMdd").alias('sls_ship_dt'),
    F.try_to_date(F.col('sls_due_dt').cast("string"), "yyyyMMdd").alias('sls_due_dt'),
    F.when(F.col('sls_sales').isNull() | (F.col('sls_sales') <= 0) | (F.col('sls_sales') != F.col('sls_quantity') * F.col('sls_price')), F.col('sls_quantity') * F.abs(F.col('sls_price')))
    .otherwise(F.col('sls_sales')).alias('sls_sales'),
    F.when(F.col('sls_price').isNull() | (F.col('sls_price') <= 0), F.col('sls_sales') / F.col('sls_quantity'))
    .otherwise(F.abs(F.col('sls_price'))).alias('sls_price')
)
display(df_sales_details)

##erp_cust_az12

In [0]:
df_cust_az12 = spark.read.format('csv')\
               .option('inferSchema', True)\
               .option('multiline', False)\
               .option('header', True)\
               .load('/Volumes/sales_crm_erp/source_data/erp/CUST_AZ12.csv')
display(df_cust_az12)

### 1. Check For Null or Duplicate Primary Key

In [0]:
df_check = df_cust_az12.groupBy(F.col('CID'))\
    .agg(F.count('*').alias('PK_Count'))\
    .filter((F.col('PK_Count') > 1) | (F.col('PK_Count').isNull()))
display(df_check)

###2. Remove "NAS" in then beginning of CID
to match with cust_info data source in the crm

In [0]:
df_staging = df_cust_az12.select(
    F.regexp_replace(F.col('CID'), '^NAS', '').alias('cid'),
    *[c for c in df_cust_az12.columns if c not in ['CID']]
)
display(df_staging)
# df_check = df_staging.groupBy(F.col('CID'))\
#     .agg(F.count('*').alias('PK_Count'))\
#     .filter((F.col('PK_Count') > 1) | (F.col('PK_Count').isNull()))
# display(df_check)

In [0]:
df_check = df_staging.where(F.col('cid').startswith('NAS').alias('startwith_without_NAS'))
# df_check = df_staging.filter(F.col('cid').like('NAS%').alias('startwith_without_NAS'))
display(df_check)

###3. Null the BDATE column if its in the future or more than 100 years old

In [0]:
from datetime import datetime, date

In [0]:
df_check = df_staging.where(F.col('BDATE') < date(1926, 1, 25))
display(df_check)

In [0]:
df_check = df_staging.where((F.col('BDATE') > datetime.now()) | (F.col('BDATE') < date(1926, 1, 25)))
display(df_check)

In [0]:
df_staging = df_staging.select(
    *[c for c in df_staging.columns if c not in ['BDATE']],
    F.when((F.col('BDATE') > datetime.now()) | (F.col('BDATE') < date(1926, 1, 25)), F.lit(None).cast("date")).otherwise(F.col('BDATE')).alias('bdate')
)
display(df_staging)

###4. Data Standardization & Consistency on the "GEN" column

In [0]:
df_check = df_staging.select(F.col("GEN")).distinct()
display(df_check)

In [0]:
df_staging = df_staging.select(
    *[c for c in df_staging.columns if c not in ['GEN']],
    F.when(F.upper(F.trim(F.col('GEN'))).isin(['F', 'FEMALE']), "Female")\
        .when(F.upper(F.trim(F.col('GEN'))).isin(['M, MALE']), "Male")\
        .otherwise('n/a').alias('gen')
)

display(df_staging)

###Final Code

In [0]:
df_cust_az12 = df_cust_az12.select(
    F.regexp_replace(F.col('CID'), '^NAS', '').alias('cid'),
    F.when((F.col('BDATE') > datetime.now()) | (F.col('BDATE') < date(1926, 1, 25)), F.lit(None).cast("date")).otherwise(F.col('BDATE')).alias('bdate'),
    F.when(F.upper(F.trim(F.col('GEN'))).isin(['F', 'FEMALE']), "Female")\
        .when(F.upper(F.trim(F.col('GEN'))).isin(['M, MALE']), "Male")\
        .otherwise('n/a').alias('gen')
)
display(df_cust_az12)

##erp_loc_a101

In [0]:
df_loc_a101 = spark.read.format('csv')\
               .option('inferSchema', True)\
               .option('multiline', False)\
               .option('header', True)\
               .load('/Volumes/sales_crm_erp/source_data/erp/LOC_A101.csv')
display(df_loc_a101)

###1. Check for Null or Duplicate Primary Key

In [0]:
df_check = df_loc_a101.groupBy(F.col('CID'))\
    .agg(F.count('*').alias('PK_Count'))\
    .filter((F.col('PK_Count') > 1) | (F.col('PK_Count').isNull()))
display(df_check)

###2. Check for Unwanted Space in CID and CNTRY column

In [0]:
# function to check for unwanted space
def has_unwanted_space(col):
    return (
        col != F.trim(col)
    )

In [0]:
# Check if string value columns: CID and CNTRY column have unwanted space
# No Result, No Need to trim them
cols_to_check = ['CID', 'CNTRY'] 
col_exprs = [
    F.sum(F.when(has_unwanted_space(F.col(col)), 1).otherwise(0)).alias(col)
    for col in cols_to_check
]

df_check = df_loc_a101.select(col_exprs)
display(df_check) 

In [0]:
df_check = df_loc_a101.where(F.col('CNTRY') != F.trim(F.col('CNTRY')))
display(df_check)

###3. Remove '-' in the CID Column to match with crm_cust_info

In [0]:
df_staging = df_loc_a101.select(
    F.regexp_replace(F.col('CID'), '-', '').alias('cid'),
    F.col('CNTRY').alias('cntry')
)
display(df_staging)

###4. Check All of CID in crm_cust_info

In [0]:
df_cst_key_cust_info = df_cust_info.select(
    F.col("cst_key").alias('cid')
)
display(df_cst_key_cust_info)

In [0]:
# No Result, it means all of CID in the crm_cust_info
df_check = df_staging.join(df_cst_key_cust_info, on='cid', how="left_anti")
display(df_check)

###5. Data Standardization & Consistency on the "GEN" column

In [0]:
df_check = df_loc_a101.select(F.col("cntry")).distinct()
display(df_check)

In [0]:
df_staging = df_staging.select(
    F.col('cid'),
    F.when(F.trim(F.col('cntry')).isin('USA', 'US'), 'United States')\
        .when(F.trim(F.col('cntry')) == 'DE', 'Germany')\
        .when((F.trim(F.col('cntry')) == '') | F.col('cntry').isNull(), 'n/a')\
        .otherwise(F.col('cntry')).alias('cntry'))
display(df_staging)

In [0]:
df_check = df_staging.select(F.col("cntry")).distinct()
display(df_check)

###Final Code

In [0]:
df_loc_a101 = df_loc_a101.select(
  F.regexp_replace(F.col('CID'), '-', '').alias('cid'),
  F.when(F.trim(F.col('cntry')).isin('USA', 'US'), 'United States')\
        .when(F.trim(F.col('cntry')) == 'DE', 'Germany')\
        .when((F.trim(F.col('cntry')) == '') | F.col('cntry').isNull(), 'n/a')\
        .otherwise(F.col('cntry')).alias('cntry')
)
display(df_loc_a101)

##erp_px_cat_g1v2

In [0]:
df_px_cat_g1v2 = spark.read.format('csv')\
               .option('inferSchema', True)\
               .option('multiline', False)\
               .option('header', True)\
               .load('/Volumes/sales_crm_erp/source_data/erp/PX_CAT_G1V2.csv')
display(df_px_cat_g1v2)

###1. Check Duplicate ID

In [0]:
# there are just 37 rows of data, so we can check the null data manually
df_check = df_px_cat_g1v2.groupBy(F.col('ID'))\
    .agg(F.count('*').alias('PK_Count'))\
    .filter((F.col('PK_Count') > 1))
display(df_check)

###2. Check Unwanted Space in all column

In [0]:
# function to check for unwanted space
def has_unwanted_space(col):
    return (
        col != F.trim(col)
    )

In [0]:
# Check if string value columns: CID and CNTRY column have unwanted space
# No Result, No Need to trim them
cols_to_check = ['ID', 'CAT', 'SUBCAT', 'MAINTENANCE'] 
col_exprs = [
    F.sum(F.when(has_unwanted_space(F.col(col)), 1).otherwise(0)).alias(col)
    for col in cols_to_check
]

df_check = df_px_cat_g1v2.select(col_exprs)
display(df_check) 

###3. Data Standardization & Consistency on the "CAT" & "SUBCAT column

In [0]:
# CAT column
df_check = df_px_cat_g1v2.select(F.col("CAT")).distinct()
display(df_check)

In [0]:
# SUBCAT column
df_check = df_px_cat_g1v2.select(F.col("SUBCAT")).distinct()
display(df_check)

###4. Check All of ID in crm_prd_info

In [0]:
df_cat_id_prd_info = df_prd_info.select(
    F.col("cat_id")
)
display(df_cat_id_prd_info)

In [0]:
df_check = df_px_cat_g1v2.join(df_cat_id_prd_info, df_px_cat_g1v2['ID'] == df_cat_id_prd_info['cat_id'], how="left_anti")
display(df_check)

###Final Code

In [0]:
df_px_cat_g1v2 = df_px_cat_g1v2.select(
  F.col("ID").alias('id'),
  F.col("CAT").alias('cat'),
  F.col("SUBCAT").alias('subcat'),
  F.col("MAINTENANCE").alias('maintenance')
)
display(df_px_cat_g1v2)

# GOLD LAYER

##Dimension Table: Customer

In [0]:
df_staging_cust = df_cust_info\
            .join(
                df_cust_az12, df_cust_info['cst_key'] == df_cust_az12['cid'], how="left"
            ).join(
                df_loc_a101, df_cust_info['cst_key'] == df_loc_a101['cid'], how="left"
            ).select(
                df_cust_info['cst_id'],
                df_cust_info['cst_key'],
                df_cust_info['cst_firstname'],
                df_cust_info['cst_lastname'],
                df_cust_info['cst_marital_status'],
                df_cust_info['cst_gndr'],
                df_cust_info['cst_create_date'],
                df_cust_az12['bdate'],
                df_cust_az12['gen'],
                df_loc_a101['cntry']
            )
display(df_staging_cust)

###1. Check Duplicate Data

In [0]:
# Result: No Rows means there is no duplicate data after join
df_check = df_staging_cust.groupBy(F.col('cst_id'))\
    .agg(F.count('*').alias('PK_Count'))\
    .filter((F.col('PK_Count') > 1))
display(df_check)

###2. there are 2 columns that share the same descrption/information
consider data from CRM as the master

In [0]:
df_trial = df_staging_cust.select(
  F.col('cst_gndr'),
  F.col('gen'),
  F.coalesce(
    F.when(F.col('cst_gndr') != "n/a", F.col('cst_gndr')),
    F.col('gen'),
    F.lit("n/a")
  ).alias('new_gen')
).distinct()
display(df_trial)

###3. Rename Column to friendly, meaningful names

###4. Sort the column into logical groups to improve readability

###5. add primary key(surrogate key)|

###Final Code

In [0]:
df_cust = df_cust_info\
            .join(
                df_cust_az12, df_cust_info['cst_key'] == df_cust_az12['cid'], how="left"
            ).join(
                df_loc_a101, df_cust_info['cst_key'] == df_loc_a101['cid'], how="left"
            ).select(
                F.row_number().over(Window.orderBy(df_cust_info['cst_id'])).alias('customer_key'),
                df_cust_info['cst_id'].alias('customer_id'),
                df_cust_info['cst_key'].alias('customer_number'),
                df_cust_info['cst_firstname'].alias('first_name'),
                df_cust_info['cst_lastname'].alias('last_name'),
                df_loc_a101['cntry'].alias('country'),
                F.coalesce(
                  F.when(df_cust_info['cst_gndr'] != "n/a", df_cust_info['cst_gndr']),
                  df_cust_az12['gen'],
                  F.lit("n/a")
                ).alias('gender'),
                df_cust_info['cst_marital_status'].alias('marital_status'),
                df_cust_az12['bdate'].alias('birthdate'),
                df_cust_info['cst_create_date'].alias('create_date')
            )
display(df_cust)

##Dimension Table: Product

###Final Code
Put it all together
1. Remove the historical data, use the latest data
2. Rename Column to friendly, meaningful names(in the final form as we add alias (AS))
3. Sort the column into logical groups to improve readability
4. add primary key(surrogate key)

In [0]:
df_product = df_prd_info\
                .join(df_px_cat_g1v2, df_prd_info['cat_id'] == df_px_cat_g1v2['id'], how="left")\
                .select(
                    F.row_number().over(Window.orderBy(df_prd_info['prd_start_dt'], df_prd_info['prd_key'])).alias('product_key'),
                    df_prd_info['prd_id'].alias('product_id'),
                    df_prd_info['prd_key'].alias('product_number'),
                    df_prd_info['prd_nm'].alias('product_name'),
                    df_prd_info['cat_id'].alias('category_id'),
                    df_px_cat_g1v2['cat'].alias('category'),
                    df_px_cat_g1v2['subcat'].alias('subcategory'),
                    df_px_cat_g1v2['maintenance'].alias('maintenance'),
                    df_prd_info['prd_cost'].alias('cost'),
                    df_prd_info['prd_line'].alias('product_line'),
                    df_prd_info['prd_start_dt'].alias('start_date')
                    # df_prd_info['prd_end_dt'] //removed because its gonna be the same
                )\
                .where(df_prd_info['prd_end_dt'] == '9999-12-31')

display(df_product)

###Check is there any duplicate data due to joining the table

In [0]:
df_check = df_product.groupBy(F.col('product_number'))\
    .agg(F.count('*').alias('PK_Count'))\
    .filter((F.col('PK_Count') > 1))
display(df_check)

##Fact Table: Sales

###Final Code
Put it all together
1. Use The dimension's surrogate key instead of IDs to easily connect facts with dimension(Surrogate key is in gold)
2. Rename Column Name
3. Sort the columns into the logical group to improve readability

In [0]:
df_sales = df_sales_details\
    .join(df_product, df_sales_details['sls_prd_key'] == df_product['product_number'], how="left")\
    .join(df_cust, df_sales_details['sls_cust_id'] == df_cust['customer_id'], how="left")\
    .select(
        df_sales_details['sls_ord_num'].alias('order_number'),
        df_product['product_key'].alias('product_key'),
        df_cust['customer_key'].alias('customer_key'),
        df_sales_details['sls_order_dt'].alias('order_date'),
        df_sales_details['sls_ship_dt'].alias('shipping_date'),
        df_sales_details['sls_due_dt'].alias('due_date'),
        df_sales_details['sls_sales'].alias('sales_amount'),
        df_sales_details['sls_quantity'].alias('quantity'),
        df_sales_details['sls_price'].alias('price'),
    )
display(df_sales)