Initialize PySpark Session
------------------

In [1]:
import pyspark
import os
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.window import Window
from deep_translator import GoogleTranslator
from pyspark.sql.functions import *
from pyspark.sql.functions import  max as spark_max, row_number
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

## Extract DB from PostgreSQL

In [2]:
spark = SparkSession.builder \
    .appName("ETL_Process") \
    .config("spark.jars", r"P:\Career\Data Engineering\ITI-DE\Graduation Project\Steps\ETL\postgresql-42.7.5.jar") \
    .getOrCreate()

In [3]:
postgres_url = "jdbc:postgresql://localhost:5432/FashionRetailDB"

postgres_properties={
    "user": "postgres",
    "password": "1699",
    "driver": "org.postgresql.Driver"
}

df_categories = spark.read.jdbc(
    url = postgres_url,
    table="normalized_retail.categories",
    properties = postgres_properties
)

df_currency = spark.read.jdbc(
    url = postgres_url,
    table="normalized_retail.currency",
    properties = postgres_properties
)

df_customers = spark.read.jdbc(
    url = postgres_url,
    table="normalized_retail.customers",
    properties = postgres_properties
)

df_discounts = spark.read.jdbc(
    url = postgres_url,
    table="normalized_retail.discounts",
    properties = postgres_properties
)

df_employees = spark.read.jdbc(
    url = postgres_url,
    table="normalized_retail.employees",
    properties = postgres_properties
)

df_location = spark.read.jdbc(
    url = postgres_url,
    table="normalized_retail.location",
    properties = postgres_properties
)

df_productattribute = spark.read.jdbc(
    url = postgres_url,
    table="normalized_retail.productattribute",
    properties = postgres_properties
)

df_products = spark.read.jdbc(
    url = postgres_url,
    table="normalized_retail.products",
    properties = postgres_properties
)

df_stores = spark.read.jdbc(
    url = postgres_url,
    table="normalized_retail.stores",
    properties = postgres_properties
)

df_transactionlines = spark.read.jdbc(
    url = postgres_url,
    table="normalized_retail.transactionlines",
    properties = postgres_properties
)

df_transactions1 = spark.read.jdbc(
    url = postgres_url,
    table="normalized_retail.transactions",
    properties = postgres_properties
)

## Extract Data from CSV

In [244]:
spark = SparkSession.builder.appName("ETL Process").getOrCreate()
sc = spark.sparkContext

In [245]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY") #This allows Spark to parse timestamps as it did in previous versions.

Define Schema for each file

In [246]:
transactions_schema = StructType([
    StructField("Invoice ID", StringType(), False),
    StructField("Line", IntegerType(), False),
    StructField("Customer ID", IntegerType(), True),
    StructField("Product ID", IntegerType(), True),
    StructField("Size", StringType(), True),
    StructField("Color", StringType(), True),
    StructField("Unit Price", FloatType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Date", StringType(), True),
    StructField("Discount", FloatType(), True),
    StructField("Line Total", FloatType(), True),
    StructField("Store ID", IntegerType(), True),
    StructField("Employee ID", IntegerType(), True),
    StructField("Currency", StringType(), True),
    StructField("Currency Symbol", StringType(), True),
    StructField("SKU", StringType(), True),
    StructField("Transaction Type", StringType(), True),
    StructField("Payment Method", StringType(), True),
    StructField("Invoice Total", FloatType(), True)
])

Read csv file


Create DFs with schemas and file paths

In [None]:
file2 = r"P:\Career\Data Engineering\ITI-DE\Graduation Project\Data\CSVs\Transactions2.csv"
df_transactions2 = spark.read.option("header", "true").schema(transactions_schema).csv(file2)

We have successfully:

1-Initialized PySpark for ETL processing.

2-Extracted DB from PostgreSQL.

3-Defined schema for the CSV file to avoid incorrect type inferences.

4-Loaded all dataframes while ensuring structure is maintained.

5-Integrated all dataframes in one dataset.

6-Verified data integrity using .show().

## Data Transformation

Customers Table

1-Casting Data Types.

2-Trim spaces in columns names.

3-Convert emails to lowercase & remove non-numeric characters from phone numbers.

4-Standardize gender values (assign "Unknown" if missing) & Convert date format properly.

5-Filling Nulls.

6-Handling Duplicates

7-Fixing Emails Address.

In [9]:
df_customers = df_customers \
        .withColumnRenamed("customer_id", "Customer ID") \
        .withColumnRenamed("locid", "Location ID") \
        .withColumnRenamed("job_title", "Job Title") \
        .withColumnRenamed("gender", "Gender") \
        .withColumnRenamed("date_of_birth", "Date of Birth") \
        .withColumn("Name", col("name")) \
        .withColumn("Email", lower(col("email"))) \
        .withColumn("Location ID", trim(col("Location ID")).cast("int")) \
        .withColumn("Telephone", regexp_replace(col("telephone"), "[^0-9]", "")) \
        .withColumn("Gender", when(upper(col("Gender")) == "M", lit("M")).when(upper(col("Gender")) == "F", lit("F")).otherwise(lit("U")))        .withColumn("Date of Birth", to_date(col("Date of Birth"), "yyyy-MM-dd"))

In [10]:
df_customers.select([sum(col(c).isNull().cast("int")).alias(c) for c in ["Customer ID", "Name", "Email", "Telephone", "Gender", "Job Title", "Location ID"]]).show()

+-----------+----+-----+---------+------+---------+-----------+
|Customer ID|Name|Email|Telephone|Gender|Job Title|Location ID|
+-----------+----+-----+---------+------+---------+-----------+
|          0|   0|    0|        0|     0|   584185|          0|
+-----------+----+-----+---------+------+---------+-----------+



In [11]:
df_customers = df_customers.fillna({"Job Title": "Not Specified"})

In [12]:
df_customers.select([sum(col(c).isNull().cast("int")).alias(c) for c in ["Customer ID", "Name", "Email", "Telephone", "Gender", "Job Title"]]).show()

+-----------+----+-----+---------+------+---------+
|Customer ID|Name|Email|Telephone|Gender|Job Title|
+-----------+----+-----+---------+------+---------+
|          0|   0|    0|        0|     0|        0|
+-----------+----+-----+---------+------+---------+



In [13]:
duplicate_count_customers = df_customers.count() - df_customers.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count_customers}")

Number of duplicate rows: 0


In [14]:
df_customers = df_customers.withColumn(
    "Email", 
    regexp_replace("Email", "@fake.*", "@gmail.com"))

In [15]:
df_customers.columns

['Customer ID',
 'Location ID',
 'Name',
 'Email',
 'Telephone',
 'Gender',
 'Date of Birth',
 'Job Title']

In [16]:
df_customers.printSchema()

root
 |-- Customer ID: integer (nullable = true)
 |-- Location ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Telephone: string (nullable = true)
 |-- Gender: string (nullable = false)
 |-- Date of Birth: date (nullable = true)
 |-- Job Title: string (nullable = false)



In [17]:
df_customers.show(5)

+-----------+-----------+------+----------------+-----------+------+-------------+-------------------+
|Customer ID|Location ID|  Name|           Email|  Telephone|Gender|Date of Birth|          Job Title|
+-----------+-----------+------+----------------+-----------+------+-------------+-------------------+
|     585309|         96|吴凤英|吴凤英@gmail.com|18117575105|     M|   1990-09-15|Editorial assistant|
|     585310|         96|傅建华|傅建华@gmail.com|15660623475|     M|   2005-02-21|      Not Specified|
|     585311|         96|  严斌|  严斌@gmail.com|15604342387|     M|   2005-10-25|      Not Specified|
|     585312|         96|  蒋强|  蒋强@gmail.com|13143478943|     M|   2006-07-24|      Not Specified|
|     585313|         96|李凤兰|李凤兰@gmail.com|15085275823|     F|   2005-08-11|      Not Specified|
+-----------+-----------+------+----------------+-----------+------+-------------+-------------------+
only showing top 5 rows



Products Tables

1-Casting Data Types.

2-Trim spaces in columns names.

3-Filling Nulls

4-Handling Duplicates.

5-Drop Description Columns




In [294]:
df_products = df_products \
    .withColumnRenamed("productid", "Product ID") \
    .withColumnRenamed("categoryid", "Category ID") \
    .withColumn("Product ID", trim(col("Product ID")).cast("int")) \
    .withColumn("Category ID", trim(col("Category ID")).cast("int")) \
    .withColumn("Description EN", trim(col("Description EN"))) 

In [295]:
columns_to_drop = ["Description ZH", "Description PT", "Description DE", "Description FR", "Description ES"]
df_products = df_products.drop(*columns_to_drop)

In [296]:
df_products.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_products.columns]).show()

+----------+-----------+--------------+
|Product ID|Category ID|Description EN|
+----------+-----------+--------------+
|         0|          0|             0|
+----------+-----------+--------------+



In [297]:
duplicate_count_products = df_products.count() - df_products.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count_products}")

Number of duplicate rows: 0


In [298]:
df_products.columns

['Product ID', 'Category ID', 'Description EN']

In [299]:
df_products.printSchema()

root
 |-- Product ID: integer (nullable = true)
 |-- Category ID: integer (nullable = true)
 |-- Description EN: string (nullable = true)



In [300]:
df_products.show(5)

+----------+-----------+--------------------+
|Product ID|Category ID|      Description EN|
+----------+-----------+--------------------+
|      5006|          7|Modern Smooth Whi...|
|      7181|         18|Women'S T-Shirt W...|
|     13537|         10|Male Factory And ...|
|     13149|          6|Men'S Sleep T -Sh...|
|      3474|         14|Men'S Pants Jogge...|
+----------+-----------+--------------------+
only showing top 5 rows



Products Attribute Table

1-Trim Spaces & Cast Data Types.

2-Filling Nulls

3-Handling Duplicates.

In [301]:
df_productattribute = df_productattribute \
        .withColumnRenamed("prod_attributeid", "Product Attribute ID") \
        .withColumnRenamed("productid", "Product ID") \
        .withColumnRenamed("color", "Color") \
        .withColumnRenamed("sizes", "Sizes") \
        .withColumnRenamed("productioncost", "Production Cost") \
        .withColumnRenamed("sku", "SKU") \
        .withColumn("Product ID", trim(col("Product ID")).cast("int")) \
        .withColumn("Product Attribute ID", trim(col("Product Attribute ID")).cast("int")) \
        .withColumn("Color", trim(col("Color"))) \
        .withColumn("Sizes", trim(col("Sizes"))) \
        .withColumn("Production Cost", trim(col("Production Cost")).cast(DecimalType(10,2))) \
        .withColumn("SKU", trim(col("SKU")))

In [302]:
df_productattribute.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_productattribute.columns]).show()

+--------------------+----------+-----+-----+---------------+---+
|Product Attribute ID|Product ID|Color|Sizes|Production Cost|SKU|
+--------------------+----------+-----+-----+---------------+---+
|                   0|         0|    0|    0|              0|  0|
+--------------------+----------+-----+-----+---------------+---+



In [303]:
duplicate_count_attribute = df_productattribute.count() - df_productattribute.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count_attribute}")

Number of duplicate rows: 0


In [304]:
df_productattribute.columns

['Product Attribute ID',
 'Product ID',
 'Color',
 'Sizes',
 'Production Cost',
 'SKU']

In [305]:
df_productattribute.printSchema()

root
 |-- Product Attribute ID: integer (nullable = true)
 |-- Product ID: integer (nullable = true)
 |-- Color: string (nullable = true)
 |-- Sizes: string (nullable = true)
 |-- Production Cost: decimal(10,2) (nullable = true)
 |-- SKU: string (nullable = true)



In [306]:
df_productattribute.show(5)

+--------------------+----------+-------+----------+---------------+--------------------+
|Product Attribute ID|Product ID|  Color|     Sizes|Production Cost|                 SKU|
+--------------------+----------+-------+----------+---------------+--------------------+
|                   1|      8151|NEUTRAL|M|L|XL|XXL|          22.28|MASW8151-XXL-NEUTRAL|
|                   2|     14054|   BLUE|M|L|XL|XXL|          11.69|  MAT-14054-XXL-BLUE|
|                   3|     15052|   BLUE|     P|M|G|          11.00|    CHSW15052-P-BLUE|
|                   4|     13131|    RED|  S|M|L|XL|          30.16|     FECO13131-S-RED|
|                   5|     11722|  BEIGE|  P|M|G|GG|           7.64|   CHGI11722-G-BEIGE|
+--------------------+----------+-------+----------+---------------+--------------------+
only showing top 5 rows



Employees Table

1-Trim Spaces & Cast Data Types.

2-Filling Nulls

3-Handling Duplicates.

In [201]:
df_employees = df_employees \
            .withColumnRenamed("employeeid", "employeeid_pk_bk") \
            .withColumn("name", trim(col("name"))) \
            .withColumn("position", trim(col("position"))) \
            .withColumn("employeeid_pk_bk", col("employeeid_pk_bk").cast("int")) \
            .withColumn("storeid", col("storeid").cast("int"))

In [202]:
df_employees.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_employees.columns]).show()

+----------------+----+--------+-------+
|employeeid_pk_bk|name|position|storeid|
+----------------+----+--------+-------+
|               0|   0|       0|      0|
+----------------+----+--------+-------+



In [203]:
duplicate_count_employees = df_employees.count() - df_employees.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count_employees}")

Number of duplicate rows: 0


In [204]:
df_employees.columns

['employeeid_pk_bk', 'name', 'position', 'storeid']

In [205]:
df_employees.printSchema()

root
 |-- employeeid_pk_bk: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- position: string (nullable = true)
 |-- storeid: integer (nullable = true)



In [206]:
df_employees.show(5)

+----------------+---------------+-----------------+-------+
|employeeid_pk_bk|           name|         position|storeid|
+----------------+---------------+-----------------+-------+
|               1|   DRAGO HORNIG|ASSISTANT MANAGER|     11|
|               2|   NOAH AZEVEDO|      STOCK CLERK|     31|
|               3|    JOHN BAILEY|          CASHIER|     20|
|               4|           唐旭|  SALES ASSOCIATE|      7|
|               5|LOURDES CABAÑAS|  SALES ASSOCIATE|     26|
+----------------+---------------+-----------------+-------+
only showing top 5 rows



Currency Table

1-Trim Spaces & Cast Data Types.

2-Drop Currency Symbol Column.

3-Filling Nulls

4-Handling Duplicates.

In [37]:
df_currency = df_currency \
            .withColumnRenamed("currencyid", "Currency ID") \
            .withColumnRenamed("currency", "Currency") \
            .withColumn("Currency", trim(col("Currency"))) \
            .withColumn("Currency ID", col("Currency ID").cast("int"))

In [38]:
df_currency = df_currency.drop("currencysymbol")

In [39]:
df_currency.select([count(when(col(c).isNull(), c)).alias(c) for c in df_currency.columns]).show()

+-----------+--------+
|Currency ID|Currency|
+-----------+--------+
|          0|       0|
+-----------+--------+



In [40]:
duplicate_count_currency = df_currency.count() - df_currency.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count_currency}")

Number of duplicate rows: 0


In [41]:
df_currency.columns

['Currency ID', 'Currency']

In [42]:
df_currency.printSchema()

root
 |-- Currency ID: integer (nullable = true)
 |-- Currency: string (nullable = true)



In [43]:
df_currency.show(5)

+-----------+--------+
|Currency ID|Currency|
+-----------+--------+
|          1|     USD|
+-----------+--------+



Location Table

1-Trim Spaces & Cast Data Types.

2-Filling Nulls

3-Handling Duplicates.

In [44]:
df_location = df_location \
        .withColumnRenamed("locid", "Location ID") \
        .withColumnRenamed("city", "City") \
        .withColumnRenamed("country", "Country") \
        .withColumn("City", trim(col("City"))) \
        .withColumn("Country", trim(col("Country"))) \
        .withColumn("Location ID", col("Location ID").cast("int"))

In [45]:
df_location.select([count(when(col(c).isNull(), c)).alias(c) for c in df_location.columns]).show()

+-----------+----+-------+
|Location ID|City|Country|
+-----------+----+-------+
|          0|   0|      0|
+-----------+----+-------+



In [46]:
duplicate_count_location = df_location.count() - df_location.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count_location}")

Number of duplicate rows: 0


In [47]:
df_location.columns

['Location ID', 'City', 'Country']

In [48]:
df_location.printSchema()

root
 |-- Location ID: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)



In [49]:
df_location.show(5)

+-----------+---------+-------------+
|Location ID|     City|      Country|
+-----------+---------+-------------+
|          1| Oak Park|United States|
|          2|     Haan|  Deutschland|
|          3| Juvignac|       France|
|          4|Colomiers|       France|
|          5|   Quincy|United States|
+-----------+---------+-------------+
only showing top 5 rows



Categories Table

1-Trim Spaces & Cast Data Types.

2-Filling Nulls

3-Handling Duplicates.

In [307]:
df_categories = df_categories \
        .withColumn("category", trim(col("category"))) \
        .withColumn("subcategory", trim(col("subcategory"))) \
        .withColumnRenamed("categoryid", "Category ID") \
        .withColumnRenamed("category", "Category") \
        .withColumnRenamed("subcategory", "SubCategory") \
        .withColumn("Category ID", col("Category ID").cast("int"))

In [308]:
df_categories.select([count(when(col(c).isNull(), c)).alias(c) for c in df_categories.columns]).show()

+-----------+--------+-----------+
|Category ID|Category|SubCategory|
+-----------+--------+-----------+
|          0|       0|          0|
+-----------+--------+-----------+



In [309]:
duplicate_count_category = df_categories.count() - df_categories.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count_category}")

Number of duplicate rows: 0


In [310]:
df_categories.columns

['Category ID', 'Category', 'SubCategory']

In [311]:
df_categories.printSchema()

root
 |-- Category ID: integer (nullable = true)
 |-- Category: string (nullable = true)
 |-- SubCategory: string (nullable = true)



In [312]:
df_categories.show(5)

+-----------+--------+---------------+
|Category ID|Category|    SubCategory|
+-----------+--------+---------------+
|          1|FEMININE|     SPORTSWEAR|
|          2|CHILDREN|        PAJAMAS|
|          3|FEMININE|PANTS AND JEANS|
|          4|CHILDREN|          COATS|
|          5|FEMININE| SUITS AND SETS|
+-----------+--------+---------------+
only showing top 5 rows



Stores Table

1-Trim Spaces & Cast Data Types.

2-Filling Nulls

3-Handling Duplicates.

4-Translate Store Names into English.

In [56]:
df_stores = df_stores \
        .withColumnRenamed("storename", "Store Name") \
        .withColumnRenamed("zipcode", "Zip Code") \
        .withColumnRenamed("storeid", "Store ID") \
        .withColumnRenamed("latitude", "Latitude") \
        .withColumnRenamed("longitude", "Longitude") \
        .withColumnRenamed("locid", "Location ID") \
        .withColumnRenamed("numberofemployees", "Number of Employees") \
        .withColumnRenamed("zipcode", "Zip Code") \
        .withColumn("Store Name", trim(col("Store Name"))) \
        .withColumn("Zip Code", trim(col("Zip Code"))) \
        .withColumn("Store ID", col("Store ID").cast("int")) \
        .withColumn("Location ID", col("Location ID").cast("int")) \
        .withColumn("Number of Employees", col("Number of Employees").cast("int")) \
        .withColumn("Latitude", col("Latitude").cast(DecimalType(10,6))) \
        .withColumn("Longitude", col("Longitude").cast(DecimalType(10,6)))

In [57]:
df_stores.select([count(when(col(c).isNull(), c)).alias(c) for c in df_stores.columns]).show()

+--------+-----------+----------+-------------------+--------+--------+---------+
|Store ID|Location ID|Store Name|Number of Employees|Zip Code|Latitude|Longitude|
+--------+-----------+----------+-------------------+--------+--------+---------+
|       0|          0|         0|                  0|       0|       0|        0|
+--------+-----------+----------+-------------------+--------+--------+---------+



In [58]:
duplicate_count_stores = df_stores.count() - df_stores.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count_stores}")

Number of duplicate rows: 0


In [59]:
df_stores.columns

['Store ID',
 'Location ID',
 'Store Name',
 'Number of Employees',
 'Zip Code',
 'Latitude',
 'Longitude']

In [60]:
df_stores.printSchema()

root
 |-- Store ID: integer (nullable = true)
 |-- Location ID: integer (nullable = true)
 |-- Store Name: string (nullable = true)
 |-- Number of Employees: integer (nullable = true)
 |-- Zip Code: string (nullable = true)
 |-- Latitude: decimal(10,6) (nullable = true)
 |-- Longitude: decimal(10,6) (nullable = true)



In [61]:
df_stores.show(5)

+--------+-----------+----------------+-------------------+--------+---------+-----------+
|Store ID|Location ID|      Store Name|Number of Employees|Zip Code| Latitude|  Longitude|
+--------+-----------+----------------+-------------------+--------+---------+-----------+
|       1|        636|STORE BIRMINGHAM|                  9|  B1 1AA|52.486200|  -1.890400|
|       2|        483| STORE GUIMARÃES|                  9|4800-001|41.444400|  -8.296200|
|       3|         79|  STORE ZARAGOZA|                  9|   50001|41.641900|  -0.904600|
|       4|         29|   STORE BRISTOL|                  7| BS1 1AA|51.454500|  -2.587900|
|       5|        668|   STORE PHOENIX|                  9|   85001|33.448400|-112.074000|
+--------+-----------+----------------+-------------------+--------+---------+-----------+
only showing top 5 rows



Discounts Table

1-Trim Spaces & Cast Data Types.

2-Filling Nulls

3-Handling Duplicates.

4-Drop Description Column.

In [62]:
df_discounts = df_discounts \
        .withColumn("discountid", trim(col("discountid")).cast("int")) \
        .withColumn("categoryid", trim(col("categoryid")).cast("int")) \
        .withColumn("Start", to_date(trim(col("Start")), "yyyy-MM-dd")) \
        .withColumn("End", to_date(trim(col("End")), "yyyy-MM-dd")) \
        .withColumn("discount", trim(col("discount")).cast(DecimalType(5,2))) \
        .withColumn("description", trim(col("description"))) \
        .withColumnRenamed("discountid", "Discount ID") \
        .withColumnRenamed("categoryid", "Category ID") \
        .withColumnRenamed("Start", "Start Date") \
        .withColumnRenamed("End", "End Date") \
        .withColumnRenamed("Discount", "Discount") \
        .withColumnRenamed("Description", "Description")                
                

In [63]:
df_discounts.select([count(when(col(c).isNull(), c)).alias(c) for c in df_discounts.columns]).show()

+-----------+----------+--------+--------+-----------+-----------+
|Discount ID|Start Date|End Date|Discount|Description|Category ID|
+-----------+----------+--------+--------+-----------+-----------+
|          0|         0|       0|       0|          0|          0|
+-----------+----------+--------+--------+-----------+-----------+



In [64]:
duplicate_count_discounts = df_discounts.count() - df_discounts.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count_discounts}")

Number of duplicate rows: 0


In [65]:
columns_to_drop = ["Description"]
df_discounts = df_discounts.drop(*columns_to_drop)

In [66]:
df_discounts.columns

['Discount ID', 'Start Date', 'End Date', 'Discount', 'Category ID']

In [67]:
df_discounts.printSchema()

root
 |-- Discount ID: integer (nullable = true)
 |-- Start Date: date (nullable = true)
 |-- End Date: date (nullable = true)
 |-- Discount: decimal(5,2) (nullable = true)
 |-- Category ID: integer (nullable = true)



In [68]:
df_discounts.show(5)

+-----------+----------+----------+--------+-----------+
|Discount ID|Start Date|  End Date|Discount|Category ID|
+-----------+----------+----------+--------+-----------+
|          1|2024-10-01|2024-10-10|    0.20|         22|
|          2|2020-09-01|2020-09-15|    0.45|         21|
|          3|2024-05-01|2024-05-15|    0.25|         14|
|          4|2022-10-01|2022-10-10|    0.20|         15|
|          5|2020-01-01|2020-01-10|    0.40|         17|
+-----------+----------+----------+--------+-----------+
only showing top 5 rows



Transactions Table (CSV)

1-Trim Spaces & Cast Data Types.

2-Drop unnecessary columns that won't be used in dwh.

3-Drop Currency Symbol Column.

4-Filling Nulls

5-Handling Duplicates.

In [248]:
df_transactionscsv = df_transactions2 \
    .withColumn("Invoice ID", F.col("Invoice ID").cast(StringType())) \
    .withColumn("Line", F.col("Line").cast(IntegerType())) \
    .withColumn("Customer ID", F.col("Customer ID").cast(StringType())) \
    .withColumn("Product ID", F.col("Product ID").cast(StringType())) \
    .withColumn("Size", F.col("Size").cast(StringType())) \
    .withColumn("Color", F.col("Color").cast(StringType())) \
    .withColumn("Unit Price", F.col("Unit Price").cast(DoubleType())) \
    .withColumn("Quantity", F.col("Quantity").cast(IntegerType())) \
    .withColumn("Date", F.to_timestamp(F.col("Date"), "dd/MM/yyyy HH:mm").cast(TimestampType())) \
    .withColumn("Discount", F.col("Discount").cast(DoubleType())) \
    .withColumn("Line Total", F.col("Line Total").cast(DecimalType(10,2))) \
    .withColumn("Store ID", F.col("Store ID").cast(StringType())) \
    .withColumn("Employee ID", F.col("Employee ID").cast(StringType())) \
    .withColumn("Currency", F.col("Currency").cast(StringType())) \
    .withColumn("Currency Symbol", F.col("Currency Symbol").cast(StringType())) \
    .withColumn("SKU", F.col("SKU").cast(StringType())) \
    .withColumn("Transaction Type", F.col("Transaction Type").cast(StringType())) \
    .withColumn("Payment Method", F.col("Payment Method").cast(StringType())) \
    .withColumn("Invoice Total", F.col("Invoice Total").cast(DecimalType(10,2)))\
    .drop("Date")

In [249]:
df_transactionscsv = df_transactionscsv.drop("Currency Symbol", 'Customer ID',
        'Product ID','SKU','Size','Color',
        'Store ID','Employee ID','Currency',
        'Unit Price','Discount',)

In [250]:
df_transactionscsv.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_transactionscsv.columns]).show()

+----------+----+--------+----------+----------------+--------------+-------------+
|Invoice ID|Line|Quantity|Line Total|Transaction Type|Payment Method|Invoice Total|
+----------+----+--------+----------+----------------+--------------+-------------+
|         0|   0|       0|         0|               0|             0|            0|
+----------+----+--------+----------+----------------+--------------+-------------+



In [251]:
duplicates_count = df_transactionscsv.count() - df_transactionscsv.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicates_count}")

Number of duplicate rows: 1002


In [252]:
df_transactionscsv = df_transactionscsv.dropDuplicates()

In [253]:
duplicates_count = df_transactionscsv.count() - df_transactionscsv.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicates_count}")

Number of duplicate rows: 0


In [254]:
df_transactionscsv.columns

['Invoice ID',
 'Line',
 'Quantity',
 'Line Total',
 'Transaction Type',
 'Payment Method',
 'Invoice Total']

In [255]:
df_transactionscsv.printSchema()

root
 |-- Invoice ID: string (nullable = true)
 |-- Line: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Line Total: decimal(10,2) (nullable = true)
 |-- Transaction Type: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Invoice Total: decimal(10,2) (nullable = true)



In [256]:
df_transactionscsv.show(5)

+-------------------+----+--------+----------+----------------+--------------+-------------+
|         Invoice ID|Line|Quantity|Line Total|Transaction Type|Payment Method|Invoice Total|
+-------------------+----+--------+----------+----------------+--------------+-------------+
|INV-US-004-01360435|   1|       1|    138.50|            Sale|   Credit Card|       138.50|
|INV-US-004-01361037|   1|       1|     62.50|            Sale|   Credit Card|        62.50|
|INV-US-004-01361071|   1|       1|     76.00|            Sale|   Credit Card|        76.00|
|INV-US-004-01361149|   4|       1|     33.00|            Sale|   Credit Card|       326.00|
|INV-US-004-01361556|   2|       1|     32.50|            Sale|          Cash|       241.50|
+-------------------+----+--------+----------+----------------+--------------+-------------+
only showing top 5 rows



Transactions Table (DB)

1-Trim Spaces & Cast Data Types.

2-Drop unnecessary columns that won't be used in dwh.

3-Drop Currency Symbol Column.

4-Filling Nulls

5-Handling Duplicates.


In [257]:
df_transactionsdb = df_transactions1 \
    .withColumnRenamed("transactionid", "Transaction ID") \
    .withColumnRenamed("invoiceid", "Invoice ID") \
    .withColumnRenamed("transactiondate", "Transaction Date") \
    .withColumnRenamed("transactiontype", "Transaction Type") \
    .withColumnRenamed("discountid", "Discount ID") \
    .withColumnRenamed("paymentmethod", "Payment Method") \
    .withColumnRenamed("invoicetotal", "Invoice Total") \
    .withColumnRenamed("customerid", "Customer ID") \
    .withColumnRenamed("storeid", "Store ID") \
    .withColumnRenamed("employeeid", "Employee ID") \
    .withColumnRenamed("currencyid", "Currency ID") \
    .withColumn("Transaction ID", trim(col("Transaction ID")).cast("int")) \
    .withColumn("Invoice ID", trim(col("Invoice ID")).cast("string")) \
    .withColumn("Transaction Date", trim(col("Transaction Date")).cast("timestamp")) \
    .withColumn("Transaction Type", trim(col("Transaction Type")).cast("string")) \
    .withColumn("Discount ID", trim(col("Discount ID")).cast("double")) \
    .withColumn("Payment Method", trim(col("Payment Method")).cast("string")) \
    .withColumn("Invoice Total", trim(col("Invoice Total")).cast(DecimalType(10,2))) \
    .withColumn("Customer ID", trim(col("Customer ID")).cast("int")) \
    .withColumn("Store ID", trim(col("Store ID")).cast("int")) \
    .withColumn("Employee ID", trim(col("Employee ID")).cast("int")) \
    .withColumn("Currency ID", trim(col("Currency ID")).cast("int"))

In [258]:
df_transactionsdb.columns

['Transaction ID',
 'Invoice ID',
 'Transaction Date',
 'Transaction Type',
 'Discount ID',
 'Payment Method',
 'Invoice Total',
 'Customer ID',
 'Store ID',
 'Employee ID',
 'Currency ID']

In [259]:
df_transactionsdb = df_transactionsdb.drop('Customer ID', 'Store ID','Employee ID','Currency ID','Discount ID')

In [260]:
df_transactionsdb.columns

['Transaction ID',
 'Invoice ID',
 'Transaction Date',
 'Transaction Type',
 'Payment Method',
 'Invoice Total']

In [261]:
df_transactionsdb.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_transactionsdb.columns]).show()

+--------------+----------+----------------+----------------+--------------+-------------+
|Transaction ID|Invoice ID|Transaction Date|Transaction Type|Payment Method|Invoice Total|
+--------------+----------+----------------+----------------+--------------+-------------+
|             0|         0|               0|               0|             0|            0|
+--------------+----------+----------------+----------------+--------------+-------------+



In [262]:
duplicates_count = df_transactionsdb.count() - df_transactionsdb.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicates_count}")

Number of duplicate rows: 0


In [263]:
df_transactionsdb.columns

['Transaction ID',
 'Invoice ID',
 'Transaction Date',
 'Transaction Type',
 'Payment Method',
 'Invoice Total']

In [264]:
df_transactionsdb.printSchema()

root
 |-- Transaction ID: integer (nullable = true)
 |-- Invoice ID: string (nullable = true)
 |-- Transaction Date: timestamp (nullable = true)
 |-- Transaction Type: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Invoice Total: decimal(10,2) (nullable = true)



In [265]:
df_transactionsdb.show(5)

+--------------+-------------------+-------------------+----------------+--------------+-------------+
|Transaction ID|         Invoice ID|   Transaction Date|Transaction Type|Payment Method|Invoice Total|
+--------------+-------------------+-------------------+----------------+--------------+-------------+
|        435821|INV-US-001-03650258|2023-10-04 19:28:00|            Sale|   Credit Card|        50.00|
|        435822|INV-US-001-03650259|2023-10-04 08:59:00|            Sale|   Credit Card|        27.50|
|        435823|INV-US-001-03650260|2023-10-04 11:50:00|            Sale|   Credit Card|        95.40|
|        435824|INV-US-001-03650260|2023-10-04 11:50:00|            Sale|   Credit Card|        95.40|
|        435825|INV-US-001-03650261|2023-10-04 16:26:00|            Sale|   Credit Card|        64.40|
+--------------+-------------------+-------------------+----------------+--------------+-------------+
only showing top 5 rows



Transaction Lines Table

1-Trim Spaces & Cast Data Types.

2-Filling Nulls

3-Handling Duplicates.

In [266]:
df_transactionlines = df_transactionlines \
        .withColumnRenamed("transactionlineid", "Transaction Line ID") \
        .withColumnRenamed("transactionid", "Transaction ID") \
        .withColumnRenamed("productid", "Product ID") \
        .withColumnRenamed("unitprice", "Unit Price") \
        .withColumnRenamed("quantity", "Quantity") \
        .withColumnRenamed("discount", "Discount") \
        .withColumnRenamed("linetotal", "Line Total") \
        .withColumnRenamed("line", "Line") \
        .withColumn("Transaction ID", trim(col("Transaction ID")).cast("int")) \
        .withColumn("Transaction Line ID", trim(col("Transaction Line ID")).cast("int")) \
        .withColumn("Product ID", trim(col("Product ID"))) \
        .withColumn("Unit Price", trim(col("Unit Price")).cast(DecimalType(10,2))) \
        .withColumn("Quantity", trim(col("Quantity")).cast("int")) \
        .withColumn("line", trim(col("line")).cast("int")) \
        .withColumn("Discount", trim(col("Discount")).cast("double")) \
        .withColumn("Line Total", trim(col("Line Total")).cast(DecimalType(10,2)))

In [267]:
df_transactionlines.select([count(when(col(c).isNull(), c)).alias(c) for c in df_transactionlines.columns]).show()

+-------------------+--------------+----------+----------+--------+--------+----------+----+
|Transaction Line ID|Transaction ID|Product ID|Unit Price|Quantity|Discount|Line Total|line|
+-------------------+--------------+----------+----------+--------+--------+----------+----+
|                  0|             0|         0|         0|       0|       0|         0|   0|
+-------------------+--------------+----------+----------+--------+--------+----------+----+



In [268]:
duplicate_count_lines = df_transactionlines.count() - df_transactionlines.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count_lines}")

Number of duplicate rows: 0


In [269]:
df_transactionlines.columns

['Transaction Line ID',
 'Transaction ID',
 'Product ID',
 'Unit Price',
 'Quantity',
 'Discount',
 'Line Total',
 'line']

In [270]:
df_transactionlines.printSchema()

root
 |-- Transaction Line ID: integer (nullable = true)
 |-- Transaction ID: integer (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Unit Price: decimal(10,2) (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Line Total: decimal(10,2) (nullable = true)
 |-- line: integer (nullable = true)



In [271]:
df_transactionlines.show(5)

+-------------------+--------------+----------+----------+--------+--------+----------+----+
|Transaction Line ID|Transaction ID|Product ID|Unit Price|Quantity|Discount|Line Total|line|
+-------------------+--------------+----------+----------+--------+--------+----------+----+
|             487884|       1054491|     11960|      8.00|       1|     0.0|      8.00|   4|
|             487885|       1054491|      9950|     49.00|       1|     0.0|     49.00|   3|
|             487886|       1054491|     11312|     35.50|       1|     0.0|     35.50|   2|
|             487887|       1054491|      9231|     53.50|       1|     0.0|     53.50|   1|
|             487888|       1054493|     10094|     78.00|       1|     0.0|     78.00|   1|
+-------------------+--------------+----------+----------+--------+--------+----------+----+
only showing top 5 rows



## Defining DWH Schema

Customers Table 

Source: Customers & Location

In [151]:
customers_dwh = df_customers.join(df_location, "Location ID", "left")

windowSpec = Window.orderBy("Customer ID")

customers_dwh = customers_dwh.select(
    row_number().over(windowSpec).alias("customerid_pk_sk"),
    df_customers["Customer ID"].alias("customerid_pk_bk"),
    df_customers["Name"].alias("name"),
    df_customers["Email"].alias("email"),
    df_customers["Telephone"].alias("telephone"),
    df_location["City"].alias("city"),
    df_location["Country"].alias("country"),
    df_customers["Gender"].alias("gender"),
    df_customers["Date of Birth"].alias("date_of_birth"),
    df_customers["Job Title"].alias("job_title"))

In [152]:
customers_dwh.show(5)

+----------------+----------------+-------------------+--------------------+---------------+--------+-------------+------+-------------+--------------------+
|customerid_pk_sk|customerid_pk_bk|               name|               email|      telephone|    city|      country|gender|date_of_birth|           job_title|
+----------------+----------------+-------------------+--------------------+---------------+--------+-------------+------+-------------+--------------------+
|               1|               1|       Tyler Garcia|tyler.garcia@gmai...|922970226547563|New York|United States|     M|   2003-07-15|       Not Specified|
|               2|               2|      Joshua Miller|joshua.miller@gma...|    19587296169|New York|United States|     M|   2000-06-16|     Records manager|
|               3|               3|Alison Marshall DDS|alison.marshall.d...|164556708765409|New York|United States|     F|   2003-07-22|       Not Specified|
|               4|               4|     Jeffery Acos

In [153]:
customers_dwh.printSchema()

root
 |-- customerid_pk_sk: integer (nullable = false)
 |-- customerid_pk_bk: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- telephone: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- gender: string (nullable = false)
 |-- date_of_birth: date (nullable = true)
 |-- job_title: string (nullable = false)



Discounts Table

Source: Discounts & Categories

Note: Description column is dropped before. (unnecessary)

In [154]:
discounts_dwh = df_discounts.join(df_categories, "Category ID", "left")

windowSpec = Window.orderBy("Discount ID")

discounts_dwh = discounts_dwh.select(
    row_number().over(windowSpec).alias("discountid_pk_sk"),
    df_discounts["Discount ID"].alias("discountid_pk_bk"),
    df_discounts["Start Date"].alias("startdate"),
    df_discounts["End Date"].alias("enddate"),
    df_discounts["Discount"].alias("discount"),
    df_categories["Category"].alias("category"),
    df_categories["SubCategory"].alias("subcategory")
)

In [155]:
discounts_dwh.show(5)

+----------------+----------------+----------+----------+--------+---------+--------------------+
|discountid_pk_sk|discountid_pk_bk| startdate|   enddate|discount| category|         subcategory|
+----------------+----------------+----------+----------+--------+---------+--------------------+
|               1|               1|2024-10-01|2024-10-10|    0.20| FEMININE|SWEATERS AND KNIT...|
|               2|               2|2020-09-01|2020-09-15|    0.45|MASCULINE|              SHIRTS|
|               3|               3|2024-05-01|2024-05-15|    0.25|MASCULINE|     PANTS AND JEANS|
|               4|               4|2022-10-01|2022-10-10|    0.20|MASCULINE|          SPORTSWEAR|
|               5|               5|2020-01-01|2020-01-10|    0.40| CHILDREN|            SWEATERS|
+----------------+----------------+----------+----------+--------+---------+--------------------+
only showing top 5 rows



In [156]:
discounts_dwh.printSchema()

root
 |-- discountid_pk_sk: integer (nullable = false)
 |-- discountid_pk_bk: integer (nullable = true)
 |-- startdate: date (nullable = true)
 |-- enddate: date (nullable = true)
 |-- discount: decimal(5,2) (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)



Products Table

Source: Categories, Products & Products Attribute


Note: Description columns in other languages except English & Currency Symbol are drooped. (Unnecessary)

In [349]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, lit, col

# Step 1: Join products with categories to get category information
prod_with_cat = df_products.join(
    df_categories,
    "Category ID",
    "left"
)

# Step 2: Join with transaction lines to get unit price (take the first available price)
prod_with_price = prod_with_cat.join(
    df_transactionlines,
    "Product ID",
    "left"
).groupBy(
    "Product ID", "Category", "SubCategory", "Description EN"
).agg(
    {"Unit Price": "first"}  # Takes the first available price
).withColumnRenamed("first(Unit Price)", "Unit Price")

# Step 3: Join with product attributes (ensure proper join)
products_dwh_temp = prod_with_price.join(
    df_productattribute.alias("attr"),
    "Product ID",
    "left"
).select(
    col("Product ID"),
    col("Unit Price"),
    col("Category"),
    col("SubCategory"),
    col("Description EN"),
    col("attr.Color"),
    col("attr.Sizes"),
    col("attr.Production Cost"),
    col("attr.SKU")
)

# Step 4: Add surrogate key and currency
windowSpec = Window.orderBy("Product ID")
products_dwh = products_dwh_temp.select(
    row_number().over(windowSpec).alias("productid_pk_sk"),
    col("Product ID").alias("productid_pk_bk"),
    col("Category").alias("category"),
    col("SubCategory").alias("subcategory"),
    col("Description EN").alias("descriptionen"),
    coalesce(col("Color"), lit("N/A")).alias("color"),
    coalesce(col("Sizes"), lit("UNKNOWN")).alias("sizes"),
    coalesce(col("Production Cost"), lit(0.0)).alias("productioncost"),
    coalesce(col("SKU"), lit("NO_SKU")).alias("sku"),
    lit("USD").alias("Currency"),    
    coalesce(col("Unit Price"), lit(0.0)).alias("unitprice")
)


In [350]:
products_dwh.show(5)

+---------------+---------------+--------+--------------------+--------------------+-----+--------+--------------+-------------+--------+---------+
|productid_pk_sk|productid_pk_bk|category|         subcategory|       descriptionen|color|   sizes|productioncost|          sku|Currency|unitprice|
+---------------+---------------+--------+--------------------+--------------------+-----+--------+--------------+-------------+--------+---------+
|              1|              1|FEMININE|   COATS AND BLAZERS|Sports Velvet Spo...|  N/A| UNKNOWN|           0.0|       NO_SKU|     USD|     72.0|
|              2|              2|FEMININE|SWEATERS AND KNIT...|Luxurious Pink De...| PINK|S|M|L|XL|         19.55| FESW2-S-PINK|     USD|     40.0|
|              3|              2|FEMININE|SWEATERS AND KNIT...|Luxurious Pink De...| PINK|S|M|L|XL|         19.55|FESW2-XL-PINK|     USD|     40.0|
|              4|              2|FEMININE|SWEATERS AND KNIT...|Luxurious Pink De...| PINK|S|M|L|XL|         19.5

In [351]:
products_dwh.printSchema()

root
 |-- productid_pk_sk: integer (nullable = false)
 |-- productid_pk_bk: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- descriptionen: string (nullable = true)
 |-- color: string (nullable = false)
 |-- sizes: string (nullable = false)
 |-- productioncost: double (nullable = false)
 |-- sku: string (nullable = false)
 |-- Currency: string (nullable = false)
 |-- unitprice: double (nullable = false)



Stores Table:

Source: Stores & Location

In [174]:
stores_dwh = df_stores.join(df_location, "Location ID", "left")

windowSpec = Window.orderBy("Store ID")

stores_dwh = stores_dwh.select(
    row_number().over(windowSpec).alias("storeid_pk_sk"),
    df_stores["Store ID"].alias("storeid_pk_bk"),   
    df_location["Country"].alias("country"), 
    df_location["City"].alias("city"),
    df_stores["Store Name"].alias("storename"),
    df_stores["Number of Employees"].alias("numberofemployees"),
    df_stores["Zip Code"].alias("zipcode"),
    df_stores["Latitude"].alias("latitude"),
    df_stores["Longitude"].alias("longitude")
)

In [175]:
stores_dwh.show(5)

+-------------+-------------+--------------+----------+----------------+-----------------+--------+---------+-----------+
|storeid_pk_sk|storeid_pk_bk|       country|      city|       storename|numberofemployees| zipcode| latitude|  longitude|
+-------------+-------------+--------------+----------+----------------+-----------------+--------+---------+-----------+
|            1|            1|United Kingdom|Birmingham|STORE BIRMINGHAM|                9|  B1 1AA|52.486200|  -1.890400|
|            2|            2|      Portugal| Guimarães| STORE GUIMARÃES|                9|4800-001|41.444400|  -8.296200|
|            3|            3|        España|  Zaragoza|  STORE ZARAGOZA|                9|   50001|41.641900|  -0.904600|
|            4|            4|United Kingdom|   Bristol|   STORE BRISTOL|                7| BS1 1AA|51.454500|  -2.587900|
|            5|            5| United States|   Phoenix|   STORE PHOENIX|                9|   85001|33.448400|-112.074000|
+-------------+---------

In [176]:
stores_dwh.printSchema()

root
 |-- storeid_pk_sk: integer (nullable = false)
 |-- storeid_pk_bk: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- storename: string (nullable = true)
 |-- numberofemployees: integer (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- latitude: decimal(10,6) (nullable = true)
 |-- longitude: decimal(10,6) (nullable = true)



Employees Table

Source: Employees table

Note: No changes will occur to the source table.

In [212]:
print("Columns in df_employees:", df_employees.columns)


Columns in df_employees: ['employeeid_pk_bk', 'name', 'position', 'storeid']


In [217]:
employees_dwh = df_employees

windowSpec = Window.orderBy("employeeid_pk_bk") 
employees_dwh = df_employees.withColumn("employeeid_pk_sk", row_number().over(windowSpec))

employees_dwh = employees_dwh.select(
    "employeeid_pk_sk",  
    "employeeid_pk_bk",  
    "storeid",
    "name",
    "position"
)

In [218]:
employees_dwh.show(5)

+----------------+----------------+-------+---------------+-----------------+
|employeeid_pk_sk|employeeid_pk_bk|storeid|           name|         position|
+----------------+----------------+-------+---------------+-----------------+
|               1|               1|     11|   DRAGO HORNIG|ASSISTANT MANAGER|
|               2|               2|     31|   NOAH AZEVEDO|      STOCK CLERK|
|               3|               3|     20|    JOHN BAILEY|          CASHIER|
|               4|               4|      7|           唐旭|  SALES ASSOCIATE|
|               5|               5|     26|LOURDES CABAÑAS|  SALES ASSOCIATE|
+----------------+----------------+-------+---------------+-----------------+
only showing top 5 rows



In [219]:
employees_dwh.printSchema()

root
 |-- employeeid_pk_sk: integer (nullable = false)
 |-- employeeid_pk_bk: integer (nullable = true)
 |-- storeid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- position: string (nullable = true)



Date Table

Source: Transactiondb Table

Note: Date will be extracted from Transaction Date column

In [225]:
date_dwh = df_transactionsdb.select(to_date("Transaction Date").alias("date")).distinct()

date_dwh = date_dwh.select(
    col("date"),
    dayofmonth("date").alias("day"),
    month("date").alias("month"),
    date_format("date", "MMMM").alias("month_name"),
    year("date").alias("year"),
    dayofweek("date").alias("day_of_week"),            # 1=Sunday, 7=Saturday
    date_format("date", "EEEE").alias("day_name"),    
    weekofyear("date").alias("week_of_year"),
    when(dayofweek("date").isin(1, 7), lit("Yes")).otherwise(lit("No")).alias("is_weekend"),
    quarter("date").alias("quarter"),
    date_format("date", "yyyy-MM").alias("year_month")
)

windowSpec = Window.orderBy("date")
date_dwh = date_dwh.withColumn("date_pk_sk", row_number().over(windowSpec))

date_dwh = date_dwh.select("date_pk_sk", *[col for col in date_dwh.columns if col != "date_pk_sk"])

date_dwh = date_dwh.select(
    "date_pk_sk",  
    "date",  
    "day",
    "month",
    "month_name",
    "year",
    "day_of_week",
    "day_name",
    "week_of_year",
    "is_weekend",
    "quarter",
    "year_month"
)

In [226]:
date_dwh.show(5)

+----------+----------+---+-----+----------+----+-----------+---------+------------+----------+-------+----------+
|date_pk_sk|      date|day|month|month_name|year|day_of_week| day_name|week_of_year|is_weekend|quarter|year_month|
+----------+----------+---+-----+----------+----+-----------+---------+------------+----------+-------+----------+
|         1|2023-01-01|  1|    1|   January|2023|          1|   Sunday|          52|       Yes|      1|   2023-01|
|         2|2023-01-02|  2|    1|   January|2023|          2|   Monday|           1|        No|      1|   2023-01|
|         3|2023-01-03|  3|    1|   January|2023|          3|  Tuesday|           1|        No|      1|   2023-01|
|         4|2023-01-04|  4|    1|   January|2023|          4|Wednesday|           1|        No|      1|   2023-01|
|         5|2023-01-05|  5|    1|   January|2023|          5| Thursday|           1|        No|      1|   2023-01|
+----------+----------+---+-----+----------+----+-----------+---------+---------

In [227]:
date_dwh.printSchema()

root
 |-- date_pk_sk: integer (nullable = false)
 |-- date: date (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- month_name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- day_name: string (nullable = true)
 |-- week_of_year: integer (nullable = true)
 |-- is_weekend: string (nullable = false)
 |-- quarter: integer (nullable = true)
 |-- year_month: string (nullable = true)



Transactions Table

Source: Transactions & Transcation Lines

Note: Currency Symbol column is dropped. (Unnecessary)

1-Add columns (Line, Total Line, Quantity) from Transactionlines Table to Transactionsdb Table

In [273]:
df_transactionsdb = df_transactionsdb.join(
    df_transactionlines, 
    "Transaction ID", 
    "left"
).select(
    df_transactionsdb["Transaction ID"],
    df_transactionsdb["Invoice ID"],
    df_transactionsdb["Transaction Date"],
    df_transactionsdb["Transaction Type"],
    df_transactionsdb["Payment Method"],
    df_transactionsdb["Invoice Total"],
    df_transactionlines["Line"],
    df_transactionlines["Line Total"],  
    df_transactionlines["Quantity"]  
)

In [274]:
df_transactionsdb.show(5)

+--------------+-------------------+-------------------+----------------+--------------+-------------+----+----------+--------+
|Transaction ID|         Invoice ID|   Transaction Date|Transaction Type|Payment Method|Invoice Total|Line|Line Total|Quantity|
+--------------+-------------------+-------------------+----------------+--------------+-------------+----+----------+--------+
|        435824|INV-US-001-03650260|2023-10-04 11:50:00|            Sale|   Credit Card|        95.40|NULL|      NULL|    NULL|
|        435825|INV-US-001-03650261|2023-10-04 16:26:00|            Sale|   Credit Card|        64.40|   2|     30.40|       1|
|        435825|INV-US-001-03650261|2023-10-04 16:26:00|            Sale|   Credit Card|        64.40|   1|     34.00|       1|
|        435821|INV-US-001-03650258|2023-10-04 19:28:00|            Sale|   Credit Card|        50.00|   1|     50.00|       1|
|        435822|INV-US-001-03650259|2023-10-04 08:59:00|            Sale|   Credit Card|        27.50|NU

2-Join Transactionsdb & Transactionscsv Tables in one Table

In [284]:
# First ensure consistent formatting of join keys
df_transactionsdb = df_transactionsdb.withColumn("Invoice ID", trim(col("Invoice ID")))
df_transactionscsv = df_transactionscsv.withColumn("Invoice ID", trim(col("Invoice ID")))

# Optionally make case consistent (if needed)
df_transactionsdb = df_transactionsdb.withColumn("Invoice ID", lower(col("Invoice ID")))
df_transactionscsv = df_transactionscsv.withColumn("Invoice ID", lower(col("Invoice ID")))

# Verify data types match (both should be string)
df_transactionsdb = df_transactionsdb.withColumn("Invoice ID", col("Invoice ID").cast("string"))
df_transactionscsv = df_transactionscsv.withColumn("Invoice ID", col("Invoice ID").cast("string"))

# Perform the join - using left join to preserve all CSV records (which contain Line/Quantity/Line Total)
transactions_dwh = df_transactionscsv.join(
    df_transactionsdb,
    ["Invoice ID"],
    how="left"
).select(
    coalesce(df_transactionsdb["Transaction ID"], lit(0)).alias("Transaction ID").alias("transactionid"),
    col("Invoice ID").alias("invoiceid"),
    coalesce(df_transactionsdb["Transaction Type"], df_transactionscsv["Transaction Type"]).alias("transactiontype"),
    df_transactionscsv["Quantity"].alias("quantity"),  # Always take from CSV 
    df_transactionscsv["Line Total"].alias("line"),  # Always take from CSV
    coalesce(df_transactionsdb["Payment Method"], df_transactionscsv["Payment Method"]).alias("paymentmethod"),
    coalesce(df_transactionsdb["Invoice Total"], df_transactionscsv["Invoice Total"]).alias("invoicetotal")
)

3-Clean TransactionsDWH Table

In [286]:
transactions_dwh.columns

['transactionid',
 'invoiceid',
 'transactiontype',
 'quantity',
 'line',
 'paymentmethod',
 'invoicetotal']

In [288]:
transactions_dwh.show(5)

+-------------+-------------------+---------------+--------+------+-------------+------------+
|transactionid|          invoiceid|transactiontype|quantity|  line|paymentmethod|invoicetotal|
+-------------+-------------------+---------------+--------+------+-------------+------------+
|            0|inv-us-004-01360995|           Sale|       1| 54.50|  Credit Card|       54.50|
|            0|inv-us-004-01361417|           Sale|       1| 45.50|  Credit Card|       45.50|
|            0|inv-us-004-01361567|           Sale|       3|130.50|         Cash|      257.50|
|       544406|ret-us-004-01358472|         Return|       1|-42.50|  Credit Card|      -98.50|
|       544405|ret-us-004-01358472|         Return|       1|-42.50|  Credit Card|      -98.50|
+-------------+-------------------+---------------+--------+------+-------------+------------+
only showing top 5 rows



In [289]:
transactions_dwh.printSchema()

root
 |-- transactionid: integer (nullable = false)
 |-- invoiceid: string (nullable = true)
 |-- transactiontype: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- line: decimal(10,2) (nullable = true)
 |-- paymentmethod: string (nullable = true)
 |-- invoicetotal: decimal(10,2) (nullable = true)



## Loading Data into CSVs Files

In [None]:
pandas_df = date_dwh.toPandas()
pandas_df.to_csv(r"P:/Career/Data Engineering/ITI-DE/Graduation Project/Data/Data Destinations/Date.csv", index=False)

In [None]:
pandas_df = customers_dwh.toPandas()
pandas_df.to_csv(r"P:/Career/Data Engineering/ITI-DE/Graduation Project/Data/Data Destinations/Customers.csv", index=False)

In [None]:
pandas_df = discounts_dwh.toPandas()
pandas_df.to_csv(r"P:/Career/Data Engineering/ITI-DE/Graduation Project/Data/Data Destinations/Discounts.csv", index=False)

In [None]:
pandas_df = products_dwh.toPandas()
pandas_df.to_csv(r"P:/Career/Data Engineering/ITI-DE/Graduation Project/Data/Data Destinations/Products.csv", index=False)

In [None]:
pandas_df = stores_dwh.toPandas()
pandas_df.to_csv(r"P:/Career/Data Engineering/ITI-DE/Graduation Project/Data/Data Destinations/Stores.csv", index=False)

In [None]:
pandas_df = employees_dwh.toPandas()
pandas_df.to_csv(r"P:/Career/Data Engineering/ITI-DE/Graduation Project/Data/Data Destinations/Employees.csv", index=False)

In [None]:
pandas_df = transactions_dwh.toPandas()
pandas_df.to_csv(r"P:/Career/Data Engineering/ITI-DE/Graduation Project/Data/Data Destinations/Transactions.csv", index=False)