### Import Datasets

In [1]:
from pathlib import Path

source_crm = Path.home() / 'data-engineering' / 'medallion' / 'datasets' / 'source_crm'
source_erp = Path.home() / 'data-engineering' / 'medallion' / 'datasets' / 'source_erp'

crm_cust_path = Path(source_crm / 'cust_info.csv')
crm_prd_path = Path(source_crm / 'prd_info.csv')
crm_sales_path = Path(source_crm / 'sales_details.csv')

erp_cust_path = Path(source_erp / 'CUST_AZ12.csv')
erp_loc_path = Path(source_erp / 'LOC_A101.csv')
erp_prd_path = Path(source_erp / 'PX_CAT_G1V2.csv')

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Medallion').getOrCreate()

### Explore Spark

In [3]:
spark

In [4]:
df_crm_cust = spark.read.csv(str(crm_cust_path), header=True, inferSchema=True)

In [5]:
df_crm_cust.head(3)

[Row(cst_id=11000, cst_key='AW00011000', cst_firstname=' Jon', cst_lastname='Yang ', cst_marital_status='M', cst_gndr='M', cst_create_date=datetime.date(2025, 10, 6)),
 Row(cst_id=11001, cst_key='AW00011001', cst_firstname='Eugene', cst_lastname='Huang  ', cst_marital_status='S', cst_gndr='M', cst_create_date=datetime.date(2025, 10, 6)),
 Row(cst_id=11002, cst_key='AW00011002', cst_firstname='Ruben', cst_lastname=' Torres', cst_marital_status='M', cst_gndr='M', cst_create_date=datetime.date(2025, 10, 6))]

In [6]:
type(df_crm_cust)

pyspark.sql.dataframe.DataFrame

In [7]:
df_crm_cust.printSchema()

root
 |-- cst_id: integer (nullable = true)
 |-- cst_key: string (nullable = true)
 |-- cst_firstname: string (nullable = true)
 |-- cst_lastname: string (nullable = true)
 |-- cst_marital_status: string (nullable = true)
 |-- cst_gndr: string (nullable = true)
 |-- cst_create_date: date (nullable = true)



In [8]:
df_crm_cust.columns

['cst_id',
 'cst_key',
 'cst_firstname',
 'cst_lastname',
 'cst_marital_status',
 'cst_gndr',
 'cst_create_date']

In [9]:
df_crm_cust.select(['cst_firstname', 'cst_lastname']).show()

+-------------+------------+
|cst_firstname|cst_lastname|
+-------------+------------+
|          Jon|       Yang |
|       Eugene|     Huang  |
|        Ruben|      Torres|
|      Christy|         Zhu|
|    Elizabeth|     Johnson|
|        Julio|        Ruiz|
|        Janet|     Alvarez|
|        Marco|       Mehta|
|          Rob|     Verhoff|
|      Shannon|     Carlson|
|    Jacquelyn|      Suarez|
|       Curtis|          Lu|
|       Lauren|      Walker|
|         Ian |    Jenkins |
|       Sydney|     Bennett|
|        Chloe|       Young|
|        Wyatt|        Hill|
|      Shannon|        Wang|
|     Clarence|         Rai|
|         Luke|         Lal|
+-------------+------------+
only showing top 20 rows



In [10]:
df_crm_cust.dtypes

[('cst_id', 'int'),
 ('cst_key', 'string'),
 ('cst_firstname', 'string'),
 ('cst_lastname', 'string'),
 ('cst_marital_status', 'string'),
 ('cst_gndr', 'string'),
 ('cst_create_date', 'date')]

In [11]:
df_crm_cust.describe().show()

+-------+------------------+-----------+-------------+------------+------------------+--------+
|summary|            cst_id|    cst_key|cst_firstname|cst_lastname|cst_marital_status|cst_gndr|
+-------+------------------+-----------+-------------+------------+------------------+--------+
|  count|             18490|      18494|        18486|       18487|             18487|   13916|
|   mean|20244.491941590048|1.3451235E7|         NULL|        NULL|              NULL|    NULL|
| stddev| 5337.733647606977|       NULL|         NULL|        NULL|              NULL|    NULL|
|    min|             11000|   13451235|        Chloe|       Brown|                 M|       F|
|    max|             29483|      SF566|          Zoe|    Zukowski|                 S|       M|
+-------+------------------+-----------+-------------+------------+------------------+--------+



In [12]:
### Adding Columns
from pyspark.sql.functions import concat_ws

df_crm_cust.withColumn(
    'cst_fullname', 
    concat_ws(' ', df_crm_cust['cst_firstname'], df_crm_cust['cst_lastname'])
).show(5)

+------+----------+-------------+------------+------------------+--------+---------------+------------------+
|cst_id|   cst_key|cst_firstname|cst_lastname|cst_marital_status|cst_gndr|cst_create_date|      cst_fullname|
+------+----------+-------------+------------+------------------+--------+---------------+------------------+
| 11000|AW00011000|          Jon|       Yang |                 M|       M|     2025-10-06|         Jon Yang |
| 11001|AW00011001|       Eugene|     Huang  |                 S|       M|     2025-10-06|    Eugene Huang  |
| 11002|AW00011002|        Ruben|      Torres|                 M|       M|     2025-10-06|     Ruben  Torres|
| 11003|AW00011003|      Christy|         Zhu|                 S|       F|     2025-10-06|     Christy   Zhu|
| 11004|AW00011004|    Elizabeth|     Johnson|                 S|       F|     2025-10-06| Elizabeth Johnson|
+------+----------+-------------+------------+------------------+--------+---------------+------------------+
only showi

In [13]:
### Rename columns
df_crm_cust.withColumnRenamed('cst_gndr', 'cst_gender').show(5)

+------+----------+-------------+------------+------------------+----------+---------------+
|cst_id|   cst_key|cst_firstname|cst_lastname|cst_marital_status|cst_gender|cst_create_date|
+------+----------+-------------+------------+------------------+----------+---------------+
| 11000|AW00011000|          Jon|       Yang |                 M|         M|     2025-10-06|
| 11001|AW00011001|       Eugene|     Huang  |                 S|         M|     2025-10-06|
| 11002|AW00011002|        Ruben|      Torres|                 M|         M|     2025-10-06|
| 11003|AW00011003|      Christy|         Zhu|                 S|         F|     2025-10-06|
| 11004|AW00011004|    Elizabeth|     Johnson|                 S|         F|     2025-10-06|
+------+----------+-------------+------------+------------------+----------+---------------+
only showing top 5 rows



In [14]:
### Drop columns
df_crm_cust.drop('cst_key').show(5)

+------+-------------+------------+------------------+--------+---------------+
|cst_id|cst_firstname|cst_lastname|cst_marital_status|cst_gndr|cst_create_date|
+------+-------------+------------+------------------+--------+---------------+
| 11000|          Jon|       Yang |                 M|       M|     2025-10-06|
| 11001|       Eugene|     Huang  |                 S|       M|     2025-10-06|
| 11002|        Ruben|      Torres|                 M|       M|     2025-10-06|
| 11003|      Christy|         Zhu|                 S|       F|     2025-10-06|
| 11004|    Elizabeth|     Johnson|                 S|       F|     2025-10-06|
+------+-------------+------------+------------------+--------+---------------+
only showing top 5 rows



In [15]:
from pyspark.sql.functions import col, isnan, isnull

In [16]:
(
    df_crm_cust
    .na
    .drop(how='any', subset=['cst_id'])
    .count()
    #.show(10)
)

18490

In [17]:
# Count nulls in a specific column
df_crm_cust \
.filter(col('cst_id').isNull()) \
.show() \
#.count()

+------+--------+-------------+------------+------------------+--------+---------------+
|cst_id| cst_key|cst_firstname|cst_lastname|cst_marital_status|cst_gndr|cst_create_date|
+------+--------+-------------+------------+------------------+--------+---------------+
|  NULL|   SF566|         NULL|        NULL|              NULL|    NULL|           NULL|
|  NULL|    PO25|         NULL|        NULL|              NULL|    NULL|           NULL|
|  NULL|13451235|         NULL|        NULL|              NULL|    NULL|           NULL|
|  NULL|  A01Ass|         NULL|        NULL|              NULL|    NULL|           NULL|
+------+--------+-------------+------------+------------------+--------+---------------+



In [18]:
# Check for NaN or Null values
df_crm_cust \
.filter(isnull('cst_id') | isnan('cst_id')) \
.show() \
#.count()

+------+--------+-------------+------------+------------------+--------+---------------+
|cst_id| cst_key|cst_firstname|cst_lastname|cst_marital_status|cst_gndr|cst_create_date|
+------+--------+-------------+------------+------------------+--------+---------------+
|  NULL|   SF566|         NULL|        NULL|              NULL|    NULL|           NULL|
|  NULL|    PO25|         NULL|        NULL|              NULL|    NULL|           NULL|
|  NULL|13451235|         NULL|        NULL|              NULL|    NULL|           NULL|
|  NULL|  A01Ass|         NULL|        NULL|              NULL|    NULL|           NULL|
+------+--------+-------------+------------+------------------+--------+---------------+



### Stage 1: Data Loading and Cleaning

In [19]:
df_crm_cust = spark.read.csv(str(crm_cust_path), header=True, inferSchema=True)
df_crm_prd = spark.read.csv(str(crm_prd_path), header=True, inferSchema=True)
df_crm_sales = spark.read.csv(str(crm_sales_path), header=True, inferSchema=True)

df_erp_cust = spark.read.csv(str(erp_cust_path), header=True, inferSchema=True)
df_erp_loc = spark.read.csv(str(erp_loc_path), header=True, inferSchema=True)
df_erp_prd = spark.read.csv(str(erp_prd_path), header=True, inferSchema=True)

#### CRM Customer

In [20]:
df_crm_cust.groupBy(df_crm_cust['cst_id']) \
    .count() \
    .filter("count > 1") \
    .show()

+------+-----+
|cst_id|count|
+------+-----+
| 29483|    2|
|  NULL|    4|
| 29473|    2|
| 29449|    2|
| 29466|    3|
| 29433|    2|
+------+-----+



In [21]:
df_crm_cust \
.filter("cst_id = 29473") \
.show(5)

+------+----------+-------------+------------+------------------+--------+---------------+
|cst_id|   cst_key|cst_firstname|cst_lastname|cst_marital_status|cst_gndr|cst_create_date|
+------+----------+-------------+------------+------------------+--------+---------------+
| 29473|AW00029473|       Carmen|        NULL|              NULL|    NULL|     2026-01-25|
| 29473|AW00029473|       Carmen|      Subram|                 S|    NULL|     2026-01-26|
+------+----------+-------------+------------+------------------+--------+---------------+



In [22]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, desc, trim, when, upper, rand

DF_crm_cust = (
    df_crm_cust
    .dropna(subset=['cst_id'])
    .withColumn(
        "row_num", 
        row_number().over(
            Window.partitionBy("cst_id").orderBy(desc("cst_create_date"))
        )
    )
    .filter(col("row_num") == 1)
    .drop("row_num")
    .withColumn("cst_firstname", trim("cst_firstname"))
    .withColumn("cst_lastname", trim("cst_lastname"))
    .withColumn(
        "cst_gndr",
        when(upper(trim(col("cst_gndr"))) == "MALE", "Male")
        .when(upper(trim(col("cst_gndr"))) == "FEMALE", "Female")
        .when(upper(trim(col("cst_gndr"))) == "M", "Male")
        .when(upper(trim(col("cst_gndr"))) == "F", "Female")
        .otherwise("Unknown")
    )
    .withColumn(
        "cst_marital_status",
        when(upper(trim(col("cst_marital_status"))) == "MARRIED", "Married")
        .when(upper(trim(col("cst_marital_status"))) == "SINGLE", "Single")
        .when(upper(trim(col("cst_marital_status"))) == "M", "Married")
        .when(upper(trim(col("cst_marital_status"))) == "S", "Single")
        .otherwise("Unknown")
    )
    .withColumn(
        "customer_key", 
        row_number().over(
            Window.partitionBy().orderBy("cst_id")
        )
    )
    .sort("cst_id")
    .select("customer_key", "cst_id", "cst_key", "cst_firstname", "cst_lastname", "cst_marital_status", "cst_gndr", "cst_create_date")
    .toDF("customer_key", "customer_id", "customer_number", "first_name", "last_name", "marital_status", "gender", "created_at")
    #.count()
)

(
    DF_crm_cust 
    .orderBy(rand()) 
    .limit(5) 
    .show()
)

+------------+-----------+---------------+----------+---------+--------------+-------+----------+
|customer_key|customer_id|customer_number|first_name|last_name|marital_status| gender|created_at|
+------------+-----------+---------------+----------+---------+--------------+-------+----------+
|        9002|      20001|     AW00020001|    Jordan|     Hill|        Single| Female|2026-01-05|
|         559|      11558|     AW00011558|      Ivan| Malhotra|        Single|Unknown|2025-10-08|
|       15439|      26438|     AW00026438|    Alisha|     Shen|       Married| Female|2026-01-14|
|        2661|      13660|     AW00013660|     Ethan|    Davis|       Married|Unknown|2025-10-12|
|        4073|      15072|     AW00015072|   Whitney|   Subram|        Single|Unknown|2025-10-12|
+------------+-----------+---------------+----------+---------+--------------+-------+----------+



#### CRM Product

In [23]:
df_crm_prd.filter(col("prd_cost") < 0).show()

+------+-------+------+--------+--------+------------+----------+
|prd_id|prd_key|prd_nm|prd_cost|prd_line|prd_start_dt|prd_end_dt|
+------+-------+------+--------+--------+------------+----------+
+------+-------+------+--------+--------+------------+----------+



In [24]:
from pyspark.sql.functions import regexp_replace, substring_index, substring, lead

DF_crm_prd = (
    df_crm_prd
    .withColumn("prd_key", regexp_replace("prd_key", "-", "_"))
    .withColumn("cat_id", substring_index("prd_key", "_", 2))
    .withColumn("prd_key", substring("prd_key", 7, 100))
    .withColumn("prd_nm", regexp_replace("prd_nm", r"\s*-\s*", "-"))
    .withColumn(
        "prd_line",
        when(upper(trim(col("prd_line"))) == "M", "Mountain")
        .when(upper(trim(col("prd_line"))) == "R", "Road")
        .when(upper(trim(col("prd_line"))) == "T", "Touring")
        .when(upper(trim(col("prd_line"))) == "S", "Other Sales")
        .otherwise("Unknown")
    )    
    .withColumn(
        "prd_end_dt", 
        lead("prd_start_dt").over(
            Window.partitionBy("prd_nm").orderBy("prd_start_dt")
        ) - 1
    )
    .withColumn(
        "product_key", 
        row_number().over(
            Window.partitionBy().orderBy("prd_id")
        )
    )
    .fillna({"prd_cost": 0})
    .sort("prd_id")
    .select(['product_key', 'prd_id', 'cat_id', 'prd_key', 'prd_nm', 'prd_line', 'prd_cost', 'prd_start_dt', 'prd_end_dt'])
    .toDF("product_key", "product_id", "category_id", "product_number", "product_name", "product_line", "cost", "started_at", "ended_at")
    #.orderBy(rand()).limit(5)
    #.show(truncate=False)
)

(
    DF_crm_prd
    .orderBy(rand())
    .limit(5)
    .show(truncate=False)
)

+-----------+----------+-----------+--------------+--------------------------+------------+----+----------+----------+
|product_key|product_id|category_id|product_number|product_name              |product_line|cost|started_at|ended_at  |
+-----------+----------+-----------+--------------+--------------------------+------------+----+----------+----------+
|143        |352       |BI_MB      |BK_M68S_38    |Mountain-200 Silver-38    |Mountain    |1118|2012-07-01|2013-06-30|
|134        |343       |BI_RB      |BK_R50B_52    |Road-650 Black-52         |Road        |487 |2012-07-01|NULL      |
|322        |531       |CO_MF      |FR_M21B_42    |LL Mountain Frame-Black-42|Mountain    |137 |2013-07-01|NULL      |
|157        |366       |BI_MB      |BK_M47B_44    |Mountain-300 Black-44     |Mountain    |598 |2012-07-01|NULL      |
|89         |298       |CO_MF      |FR_M94B_42    |HL Mountain Frame-Black-42|Mountain    |739 |2013-07-01|NULL      |
+-----------+----------+-----------+------------

#### CRM Sales

In [25]:
df_crm_sales.printSchema()

root
 |-- sls_ord_num: string (nullable = true)
 |-- sls_prd_key: string (nullable = true)
 |-- sls_cust_id: integer (nullable = true)
 |-- sls_order_dt: integer (nullable = true)
 |-- sls_ship_dt: integer (nullable = true)
 |-- sls_due_dt: integer (nullable = true)
 |-- sls_sales: integer (nullable = true)
 |-- sls_quantity: integer (nullable = true)
 |-- sls_price: integer (nullable = true)



In [26]:
df_crm_sales \
.filter("sls_order_dt = 0") \
.count() \
#.show()

17

In [27]:
df_crm_sales \
.filter((df_crm_sales["sls_price"].isNull()) | (df_crm_sales["sls_price"] < 0)) \
.show()

+-----------+-----------+-----------+------------+-----------+----------+---------+------------+---------+
|sls_ord_num|sls_prd_key|sls_cust_id|sls_order_dt|sls_ship_dt|sls_due_dt|sls_sales|sls_quantity|sls_price|
+-----------+-----------+-----------+------------+-----------+----------+---------+------------+---------+
|    SO51259|    WB-H098|      11433|    20130101|   20130108|  20130113|       10|           2|     NULL|
|    SO51298|    WB-H098|      27949|    20130104|   20130111|  20130116|       25|           5|     NULL|
|    SO51387|  HL-U509-B|      11942|    20130109|   20130116|  20130121|       70|           2|     NULL|
|    SO51479|    BC-R205|      16687|    20130116|   20130123|  20130128|        9|           1|     NULL|
|    SO51479|    HL-U509|      16687|    20130116|   20130123|  20130128|       35|           1|     NULL|
|    SO51942|    BC-M005|      11223|    20130129|   20130205|  20130210|      100|          10|     NULL|
|    SO52187|    CL-9009|      18110|

In [28]:
df_crm_sales \
.filter(df_crm_sales["sls_quantity"] == 0) \
.show()

+-----------+-----------+-----------+------------+-----------+----------+---------+------------+---------+
|sls_ord_num|sls_prd_key|sls_cust_id|sls_order_dt|sls_ship_dt|sls_due_dt|sls_sales|sls_quantity|sls_price|
+-----------+-----------+-----------+------------+-----------+----------+---------+------------+---------+
+-----------+-----------+-----------+------------+-----------+----------+---------+------------+---------+



In [29]:
from pyspark.sql.functions import abs

df_crm_sales \
.where(
    df_crm_sales["sls_sales"] != (df_crm_sales["sls_quantity"] * abs(df_crm_sales["sls_price"]))
) \
.show()

+-----------+-----------+-----------+------------+-----------+----------+---------+------------+---------+
|sls_ord_num|sls_prd_key|sls_cust_id|sls_order_dt|sls_ship_dt|sls_due_dt|sls_sales|sls_quantity|sls_price|
+-----------+-----------+-----------+------------+-----------+----------+---------+------------+---------+
|    SO58335|    TI-M823|      13326|    20130520|   20130527|  20130601|       35|           2|       35|
|    SO61548|  LJ-0192-L|      12386|    20130705|   20130712|  20130717|       50|           2|       50|
|    SO61570|    CA-1098|      17809|    20130705|   20130712|  20130717|      -18|           1|        9|
|    SO61636|    BC-M005|      12914|    20130706|   20130713|  20130718|        0|           1|       10|
|    SO61879|  LJ-0192-X|      14455|    20130710|   20130717|  20130722|       50|           2|       50|
|    SO67707|    TT-M928|      26046|    20131004|   20131011|  20131016|        5|           2|        5|
|    SO67707|  VE-C304-S|      26046|

In [30]:
from pyspark.sql.functions import col, to_date, abs

DF_crm_sales = (
    df_crm_sales
    .withColumn("sls_prd_key", regexp_replace("sls_prd_key", "-", "_"))
    .withColumn(
        "sls_order_dt", 
        to_date(col("sls_order_dt").cast("string"), "yyyyMMdd")
    )
    .withColumn(
        "sls_ship_dt", 
        to_date(col("sls_ship_dt").cast("string"), "yyyyMMdd")
    )
    .withColumn(
        "sls_due_dt", 
        to_date(col("sls_due_dt").cast("string"), "yyyyMMdd")
    )
    .withColumn(
        "sls_price",
        when(
            (col("sls_price").isNull()) | (col("sls_price") < 0),
            col("sls_sales") / col("sls_quantity")
        ).otherwise(col("sls_price"))
    )
    .withColumn(
        "sls_sales",
        when(
            (col("sls_price").isNull()) |
            (col("sls_price") < 0) |
            (col("sls_sales") != (col("sls_quantity") * abs(col("sls_price")))),
            col("sls_quantity") * abs(col("sls_price"))
        ).otherwise(col("sls_sales"))
    )
    .sort("sls_ord_num")
    .toDF("order_number", "product_number", "customer_id", "order_date", "shipping_date", "due_date", "sales_amount", "quantity", "price")
    #.orderBy(rand()).limit(5)
    #.show(truncate=False)
)

DF_crm_sales.orderBy(rand()).limit(5).show()

+------------+--------------+-----------+----------+-------------+----------+------------+--------+------+
|order_number|product_number|customer_id|order_date|shipping_date|  due_date|sales_amount|quantity| price|
+------------+--------------+-----------+----------+-------------+----------+------------+--------+------+
|     SO57483|       PK_7098|      26144|2013-05-05|   2013-05-12|2013-05-17|         2.0|       1|   2.0|
|     SO55179|    BK_R89B_52|      18491|2013-03-29|   2013-04-05|2013-04-10|      2443.0|       1|2443.0|
|     SO73432|       PK_7098|      29038|2013-12-19|   2013-12-26|2013-12-31|         2.0|       1|   2.0|
|     SO58557|       TT_R982|      11330|2013-05-24|   2013-05-31|2013-06-05|         4.0|       1|   4.0|
|     SO56517|     GL_H102_S|      24876|2013-04-20|   2013-04-27|2013-05-02|        24.0|       1|  24.0|
+------------+--------------+-----------+----------+-------------+----------+------------+--------+------+



#### ERP Customer

In [31]:
from pyspark.sql.functions import current_date

df_erp_cust.where(col("BDATE") > current_date()).show()

+-------------+----------+------+
|          CID|     BDATE|   GEN|
+-------------+----------+------+
|NASAW00011257|2050-07-06|Female|
|NASAW00011410|2042-02-22|  Male|
|NASAW00011551|2050-05-21|  Male|
|NASAW00011562|2038-10-17|  Male|
|NASAW00011581|2045-03-03|Female|
|NASAW00011775|2050-11-22|Female|
|NASAW00011912|2066-06-16|Female|
|NASAW00011915|9999-09-13|  Male|
|NASAW00012123|2065-12-12|  Male|
|NASAW00012188|9999-11-20|Female|
|NASAW00012248|9999-09-11|  Male|
|NASAW00013052|9999-05-10|  Male|
|NASAW00013417|2050-09-07|  Male|
|NASAW00020062|2080-03-15|  Male|
|   AW00025441|2055-01-23|Female|
|   AW00028543|2980-03-09|  Male|
+-------------+----------+------+



In [32]:
from pyspark.sql.functions import expr, lit, current_date

DF_erp_cust = (
    df_erp_cust
    .withColumn(
        "CID", 
        when(upper(trim(col("CID"))).like("NAS%"), expr("substring(CID, 4, length(CID) - 3)"))
        .otherwise(col("CID"))
    )
    .withColumn(
        "BDATE",
        when(col("BDATE") > current_date(), lit(None))
        .otherwise(col("BDATE"))
    )
    .withColumn(
        "GEN",
        when(upper(trim(col("GEN"))) == "MALE", "Male")
        .when(upper(trim(col("GEN"))) == "FEMALE", "Female")
        .when(upper(trim(col("GEN"))) == "M", "Male")
        .when(upper(trim(col("GEN"))) == "F", "Female")
        .otherwise("Unknown")
    )
    .toDF("customer_number", "birth_date", "gender")
    #.show()
)

DF_erp_cust.orderBy(rand()).limit(5).show()

+---------------+----------+------+
|customer_number|birth_date|gender|
+---------------+----------+------+
|     AW00014955|1968-08-30|  Male|
|     AW00011202|1958-08-22|Female|
|     AW00028068|1975-04-12|Female|
|     AW00026788|1983-11-08|Female|
|     AW00022936|1967-01-29|Female|
+---------------+----------+------+



#### ERP Location

In [33]:
df_erp_loc.select('CNTRY').distinct().show()

+--------------+
|         CNTRY|
+--------------+
|       Germany|
|        France|
|              |
| United States|
|            DE|
|              |
|            US|
|           USA|
|        Canada|
|              |
|     Australia|
|United Kingdom|
|          NULL|
+--------------+



In [34]:
DF_erp_loc = (
    df_erp_loc
    .withColumn("CID", regexp_replace("CID", "-", ""))
    .withColumn(
        "CNTRY",
        when(upper(trim(col("CNTRY"))) == "DE", "Germany")
        .when(upper(trim(col("CNTRY"))) == "US", "United States")
        .when(upper(trim(col("CNTRY"))) == "USA", "United States")
        .when(
            (col("CNTRY").isNull()) | (trim(col("CNTRY")) == ""), 
            "Unknown"
        )
        .otherwise(col("CNTRY"))
    )
    .toDF("customer_number", "country")
    #.select('CNTRY').distinct()
    #.show()
)

DF_erp_loc.orderBy(rand()).limit(5).show()

+---------------+-------------+
|customer_number|      country|
+---------------+-------------+
|     AW00017095|       France|
|     AW00028192|United States|
|     AW00025340|       Canada|
|     AW00022087|United States|
|     AW00022322|       France|
+---------------+-------------+



#### ERP Product

In [35]:
DF_erp_prd = (
    df_erp_prd
    .toDF("category_id", "category", "subcategory", "is_maintenance")
)

DF_erp_prd.orderBy(rand()).limit(5).show()

+-----------+-----------+--------------+--------------+
|category_id|   category|   subcategory|is_maintenance|
+-----------+-----------+--------------+--------------+
|      BI_MB|      Bikes|Mountain Bikes|           Yes|
|      AC_LO|Accessories|         Locks|           Yes|
|      CO_CH| Components|        Chains|           Yes|
|      BI_RB|      Bikes|    Road Bikes|           Yes|
|      CO_PD| Components|        Pedals|            No|
+-----------+-----------+--------------+--------------+



### Stage 2: Defining Dimension and Fact Tables

In [36]:
dim_products = (
    DF_crm_prd
    .join(DF_erp_prd, how="left", on="category_id")
    .select("product_key", "product_id",  "product_number", "product_name", "product_line", "category_id", "category", "is_maintenance", "subcategory", "cost", "started_at", "ended_at")
)

(
    dim_products
    .orderBy(rand())
    .limit(5)
    .show(truncate=False)
)

+-----------+----------+--------------+-----------------------+------------+-----------+----------+--------------+-----------+----+----------+----------+
|product_key|product_id|product_number|product_name           |product_line|category_id|category  |is_maintenance|subcategory|cost|started_at|ended_at  |
+-----------+----------+--------------+-----------------------+------------+-----------+----------+--------------+-----------+----+----------+----------+
|195        |404       |HB_R504       |LL Road Handlebars     |Road        |CO_HB      |Components|No            |Handlebars |20  |2013-07-01|NULL      |
|174        |383       |BK_R64Y_40    |Road-550-W Yellow-40   |Road        |BI_RB      |Bikes     |Yes           |Road Bikes |606 |2012-07-01|2013-06-30|
|231        |440       |FR_R92B_44    |HL Road Frame-Black-44 |Road        |CO_RF      |Components|Yes           |Road Frames|869 |2013-07-01|NULL      |
|234        |443       |FR_R92B_52    |HL Road Frame-Black-52 |Road        |

In [37]:
dim_customers = (
    DF_crm_cust
    .join(DF_erp_cust, how="left", on="customer_number")
    .join(DF_erp_loc, how="left", on="customer_number")
    .select("customer_key", "customer_id", "customer_number", "first_name", "last_name", "country", "marital_status", DF_crm_cust["gender"], "birth_date", "created_at")
)

(
    dim_customers
    .orderBy(rand())
    .limit(5)
    .show()
)

+------------+-----------+---------------+----------+---------+-------------+--------------+------+----------+----------+
|customer_key|customer_id|customer_number|first_name|last_name|      country|marital_status|gender|birth_date|created_at|
+------------+-----------+---------------+----------+---------+-------------+--------------+------+----------+----------+
|        7034|      18033|     AW00018033|    Evelyn|    Patel|      Germany|        Single|Female|1981-11-17|2026-01-04|
|       10006|      21005|     AW00021005|   Barbara|      Zhu|       France|       Married|Female|1965-08-26|2026-01-05|
|       15663|      26662|     AW00026662|      Lori|  Alvarez|    Australia|        Single|Female|1974-03-13|2026-01-14|
|        6469|      17468|     AW00017468|  Jeremiah| Gonzalez|       Canada|       Married|  Male|1942-07-09|2026-01-04|
|        7381|      18380|     AW00018380|   Kaitlyn|    Kelly|United States|       Married|Female|1961-12-25|2026-01-04|
+------------+----------

In [38]:
fact_sales = (
    DF_crm_sales
    .join(dim_customers, how="left", on="customer_id")
    .join(dim_products, how="left", on="product_number")
    .select("order_number", "product_key", "customer_key", "order_date", "shipping_date", "due_date", "sales_amount", "quantity", "price")
)

(
    fact_sales
    .orderBy(rand())
    .limit(5)
    .show()
)

+------------+-----------+------------+----------+-------------+----------+------------+--------+-----+
|order_number|product_key|customer_key|order_date|shipping_date|  due_date|sales_amount|quantity|price|
+------------+-----------+------------+----------+-------------+----------+------------+--------+-----+
|     SO70229|         15|         192|2013-11-06|   2013-11-13|2013-11-18|         9.0|       1|  9.0|
|     SO66642|         12|         139|2013-09-19|   2013-09-26|2013-10-01|        35.0|       1| 35.0|
|     SO64188|        271|         499|2013-08-14|   2013-08-21|2013-08-26|         2.0|       1|  2.0|
|     SO51584|          8|       11275|2013-01-21|   2013-01-28|2013-02-02|        35.0|       1| 35.0|
|     SO72711|        274|        5821|2013-12-09|   2013-12-16|2013-12-21|       120.0|       1|120.0|
+------------+-----------+------------+----------+-------------+----------+------------+--------+-----+



In [39]:
(
    fact_sales
    .filter(col('product_key').isNull())
    .show()
    #.count()
)

+------------+-----------+------------+----------+-------------+--------+------------+--------+-----+
|order_number|product_key|customer_key|order_date|shipping_date|due_date|sales_amount|quantity|price|
+------------+-----------+------------+----------+-------------+--------+------------+--------+-----+
+------------+-----------+------------+----------+-------------+--------+------------+--------+-----+



In [40]:
(
    fact_sales
    .filter(col('customer_key').isNull())
    .show()
    #.count()
)

+------------+-----------+------------+----------+-------------+--------+------------+--------+-----+
|order_number|product_key|customer_key|order_date|shipping_date|due_date|sales_amount|quantity|price|
+------------+-----------+------------+----------+-------------+--------+------------+--------+-----+
+------------+-----------+------------+----------+-------------+--------+------------+--------+-----+



### Future Exploration
- Loading to Database (DataSink API and Connectors)
- Conduct Some EDA