## Silver Source CRM 

In [0]:
# Imports

from pyspark.sql.functions import col, count, when, row_number, desc, trim
from pyspark.sql.functions import substring, regexp_replace, when, length, expr, lead, lit
from pyspark.sql.window import Window

In [0]:
def write_to_one_csv(df, destination_folder_path, destination_file_name):
    # Make dateframe into a single partition and write to CSV
    df.coalesce(1).write.mode("append").csv(destination_folder_path, header=True) 
    # Remove unneeded files, and rename csv file to template
    list_dir = dbutils.fs.ls(destination_folder_path)
    for file in list_dir:
        file_name = file.name
        file_path = file.path
        if file_name.startswith("part"):
            source_path = f"{destination_folder_path}/{file_name}"
            destination_path = f"{destination_folder_path}/{destination_file_name}"
            dbutils.fs.mv(source_path, destination_path)
            print(f"{file_name} renamed to {destination_file_name}")
        elif file_name.endswith(".csv"):
            pass
        else:
            file_path = f"{destination_folder_path}/{file_name}"
            dbutils.fs.rm(file_path)
            print(f"{file_name} removed")

In [0]:
# cust_info
silver_cust_info = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/workspace/default/bronze/source_crm/cust_info.csv")

# Data Cleaning
# Keeping latest value only
window_spec = Window.partitionBy("cst_id").orderBy(desc("cst_create_date"))
silver_cust_info = silver_cust_info.withColumn("rn", row_number().over(window_spec)).filter(col("rn") == 1).drop("rn")

# Trim string values and alias them back to original names
silver_cust_info = silver_cust_info.select(
    *[trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in silver_cust_info.dtypes]
)

# Renaming columns
silver_cust_info = silver_cust_info.withColumn(
    "cst_marital_status",
    when(col("cst_marital_status") == "M", "Married")
   .when(col("cst_marital_status") == "S", "Single")
   .otherwise('na')
)
silver_cust_info = silver_cust_info.withColumn(
    "cst_gndr",
    when(col("cst_gndr") == "M", "Male")
   .when(col("cst_gndr") == "F", "Female")
   .otherwise('na')
)

# Drop nulls and duplicates
silver_cust_info = silver_cust_info.dropDuplicates()
silver_cust_info = silver_cust_info.dropna(how='any', subset=['cst_id'])

write_to_one_csv(silver_cust_info, "/Volumes/workspace/default/silver/source_crm", "cust_info.csv")

# display(
#     silver_cust_info.groupBy("cst_id").agg(count("*").alias("count")).filter((col("count") > 1) | col("cst_id").isNull())
# )
# display(silver_cust_info.select("cst_key").distinct())
# display(silver_cust_info.filter(col("cst_id").isNull()))

display(silver_cust_info)

In [0]:
# prd_info
silver_prd_info = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/workspace/default/bronze/source_crm/prd_info.csv")

# Cleaning Data
silver_prd_info = silver_prd_info.withColumn(
    "cat_id",
    regexp_replace(substring("prd_key", 1, 5), "-", "_")
)

silver_prd_info = silver_prd_info.withColumn(
    "prd_key",
    substring("prd_key", 7, length("prd_key"))
)

# Trim string values and alias them back to original names
silver_prd_info = silver_prd_info.select(
    *[trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in silver_prd_info.dtypes]
)

# Handling null values
silver_prd_info = silver_prd_info.withColumn(
    "prd_cost",
    when(col("prd_cost").isNull(), 0).otherwise(col("prd_cost"))
)

# Renaming columns
silver_prd_info = silver_prd_info.withColumn(
    "prd_line",
    when(col("prd_line") == "M", "Mountain")
   .when(col("prd_line") == "R", "Road")
   .when(col("prd_line") == "S", "Other Sales")
   .when(col("prd_line") == "T", "Touring")
   .otherwise('na')
)

# End date enhancement
# Pyspark solution
w = Window.partitionBy("prd_key").orderBy("prd_start_dt")
silver_prd_info = silver_prd_info.withColumn(
    "prd_end_dt",
        lead(("prd_start_dt")).over(w)-1  
    )

# SQL solution
# silver_prd_info = silver_prd_info.withColumn(
#     "prd_end_dt",
#     expr("case 
#               when prd_end_dt is null then null 
#               else lead(prd_start_dt) over (partition by prd_key order by prd_start_dt)-1 
#           end")
# )



# Swap prd_start_dt and prd_end_dt if prd_start_dt > prd_end_dt
# silver_prd_info = silver_prd_info.withColumn(
#     "temp_start_dt",
#     when(col("prd_start_dt") > col("prd_end_dt"), col("prd_end_dt")).otherwise(col("prd_start_dt"))
# ).withColumn(
#     "temp_end_dt",
#     when(col("prd_start_dt") > col("prd_end_dt"), col("prd_start_dt")).otherwise(col("prd_end_dt"))
# ).drop("prd_start_dt", "prd_end_dt").withColumnRenamed("temp_start_dt", "prd_start_dt").withColumnRenamed("temp_end_dt", "prd_end_dt")

# display(silver_prd_info.groupby("prd_key").)

# display(silver_prd_info.where(col("prd_start_dt") > col("prd_end_dt")))
# display(silver_prd_info.select("prd_line").distinct())
# display(silver_prd_info.select(col("prd_cost").isNull()))
# display(silver_prd_info.filter(silver_prd_info["prd_cost"].isNull()))
# display(silver_prd_info.groupby("prd_id").count().where((col('count') > 1) | col('prd_id').isNull()))

write_to_one_csv(silver_prd_info, "/Volumes/workspace/default/silver/source_crm", "prd_info.csv")
display(silver_prd_info)

In [0]:
from pyspark.sql.functions import to_date, abs

# sales_details
silver_sales_details = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/workspace/default/bronze/source_crm/sales_details.csv")

# Data Cleaning
# Handling date data
silver_sales_details = silver_sales_details.withColumn(
    "sls_order_dt",
    to_date(when((col("sls_order_dt") == 0) | (length(col("sls_order_dt")) != 8), None).otherwise(col("sls_order_dt")), "yyyyMMdd")
).withColumn(
    "sls_ship_dt",
    to_date(when((col("sls_ship_dt") == 0) | (length(col("sls_ship_dt")) != 8), None).otherwise(col("sls_ship_dt")), "yyyyMMdd")
).withColumn(
    "sls_due_dt",
    to_date(when((col("sls_due_dt") == 0) | (length(col("sls_due_dt")) != 8), None).otherwise(col("sls_due_dt")), "yyyyMMdd")
)

# Trim string values and alias them back to original names
silver_sales_details = silver_sales_details.select(
    *[trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in silver_sales_details.dtypes]
)

# display(silver_sales_details.select("sls_cust_id","sls_sales","sls_quantity","sls_price")
#         .filter((col("sls_sales") <= 0) | (col("sls_price") <= 0) 
#                 | (col("sls_sales").isNull()) | (col("sls_price").isNull())))

# Reconstructing Sales Price
silver_sales_details = silver_sales_details.withColumn(
    "sls_sales",
    when((col("sls_sales") <= 0) | (col("sls_sales").isNull()), 
            col("sls_price").cast("double") * col("sls_quantity").cast("double"))
    .otherwise(col("sls_sales")) 
)

silver_sales_details = silver_sales_details.withColumn(
    "sls_price",
    when((col("sls_price") == 0) | (col("sls_price").isNull()), 
         when((silver_sales_details["sls_quantity"] != 0) | (silver_sales_details["sls_quantity"].isNull()),
            silver_sales_details["sls_sales"] / silver_sales_details["sls_quantity"])
        .otherwise(0))
   .when((col("sls_price") < 0), 
         abs(col("sls_price")))
   .otherwise(col("sls_price"))
)

# display(silver_sales_details.select("sls_cust_id","sls_sales","sls_quantity","sls_price")
#         .filter((col("sls_sales") <= 0) | (col("sls_price") <= 0) 
#                 | (col("sls_sales").isNull()) | (col("sls_price").isNull())))

# Handling null sls_price values
# silver_sales_details = silver_sales_details.fillna({'sls_price': 0})
# display(
#     silver_sales_details.filter(
#         (
#             (col("sls_order_dt") > col("sls_ship_dt")) & (col("sls_order_dt") > col("sls_due_dt"))
#         )
#     )
# )

write_to_one_csv(silver_sales_details, "/Volumes/workspace/default/silver/source_crm", "sales_details.csv")
display(silver_sales_details)

## Source ERP

In [0]:
# CUST_AZ12
silver_CUST_AZ12 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/workspace/default/bronze/source_erp/CUST_AZ12.csv")

# Data Cleaning
# Split CID to match cst_id
silver_CUST_AZ12 = silver_CUST_AZ12.withColumn(
    "CID",
    when(col("CID").contains("NAS"), regexp_replace(col("CID"), "NAS", "")).otherwise(col("CID"))
)

from pyspark.sql.functions import col, sum as Fsum, current_date
# # Count nulls per column
# display(silver_CUST_AZ12.select([
#     Fsum(col(c).isNull().cast("int")).alias(f"{c}_nulls")
#     for c in silver_CUST_AZ12.columns
# ]))

# Normalizing Gen
silver_CUST_AZ12 = silver_CUST_AZ12.withColumn(
    "GEN",
    when(col("GEN") == "M", "Male")
   .when(col("GEN") == "F", "Female")
   .when(col("GEN").isNull(), "NA")
   .otherwise(col("GEN"))    
)
         
# display(silver_CUST_AZ12.select("GEN").distinct())

# Removing old customers < 1924-01-01
silver_CUST_AZ12 = silver_CUST_AZ12.filter((col("BDATE") > "1924-01-01") | (col("BDATE") < current_date()))


# Verify every id matches into both tables
# display(silver_CUST_AZ12.join(silver_cust_info, silver_CUST_AZ12["CID"] == silver_cust_info["cst_key"], "left_anti"))

write_to_one_csv(silver_CUST_AZ12, "/Volumes/workspace/default/silver/source_erp/", "CUST_AZ12.csv")
display(silver_CUST_AZ12)

In [0]:
# LOC_A101
silver_LOC_A101 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/workspace/default/bronze/source_erp/LOC_A101.csv")

# Data cleaning
# CID Normailiztion
silver_LOC_A101 = silver_LOC_A101.withColumn(
    "CID",
    regexp_replace(col("CID"), "-","")
)

# CNTRY Normalization
silver_LOC_A101 = silver_LOC_A101.withColumn(
    "CNTRY",
    when((col("CNTRY") == "USA") | (col("CNTRY") == "US"), "United States")
   .when(col("CNTRY") == "DE", "Germany")
   .when(col("CNTRY").isNull(), "NA")
   .otherwise(col("CNTRY"))
)

write_to_one_csv(silver_LOC_A101, "/Volumes/workspace/default/silver/source_erp/", "LOC_A101.csv")
display(silver_LOC_A101.head(5))

In [0]:
# PX_CAT_G1V2
silver_PX_CAT_G1V2 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/workspace/default/bronze/source_erp/PX_CAT_G1V2.csv")

write_to_one_csv(silver_PX_CAT_G1V2, "/Volumes/workspace/default/silver/source_erp/", "PX_CAT_G1V.csv")
display(silver_PX_CAT_G1V2)