In [1]:
!pip install pyspark pandas sqlalchemy psycopg2-binary

Collecting pyspark
  Using cached pyspark-4.0.1-py2.py3-none-any.whl
Collecting pandas
  Using cached pandas-2.3.3-cp313-cp313-win_amd64.whl.metadata (19 kB)
Collecting sqlalchemy
  Using cached sqlalchemy-2.0.44-cp313-cp313-win_amd64.whl.metadata (9.8 kB)
Collecting psycopg2-binary
  Using cached psycopg2_binary-2.9.11-cp313-cp313-win_amd64.whl.metadata (5.1 kB)
Collecting py4j==0.10.9.9 (from pyspark)
  Using cached py4j-0.10.9.9-py2.py3-none-any.whl.metadata (1.3 kB)
Collecting numpy>=1.26.0 (from pandas)
  Using cached numpy-2.3.4-cp313-cp313-win_amd64.whl.metadata (60 kB)
Collecting pytz>=2020.1 (from pandas)
  Using cached pytz-2025.2-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Using cached tzdata-2025.2-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting greenlet>=1 (from sqlalchemy)
  Using cached greenlet-3.2.4-cp313-cp313-win_amd64.whl.metadata (4.2 kB)
Collecting typing-extensions>=4.6.0 (from sqlalchemy)
  Using cached typing_extensions-4.1

In [2]:
!pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 4.0.1
      /_/
                        
Using Scala version 2.13.16, Java HotSpot(TM) 64-Bit Server VM, 21.0.9
Branch HEAD
Compiled by user runner on 2025-09-02T03:10:51Z
Revision 29434ea766b0fc3c3bf6eaadb43a8f931133649e
Url https://github.com/apache/spark
Type --help for more information.


In [3]:
# Import necessary dependencies
from pyspark.sql import SparkSession
import pandas as pd
from sqlalchemy import create_engine

In [4]:
# Initialise our Spark Session
spark = SparkSession.builder \
    .appName('DealWithUsApp') \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
        .getOrCreate()

In [5]:
spark

In [6]:
# Loading CSV to Spark DataFrame
dwu_df = spark.read.csv(r'raw_data\dealwithus_raw_data copy.csv', header=True, inferSchema=True)

In [7]:
dwu_df.show(10)
dwu_df.printSchema()

+-------+---------+--------+----------+----------+-------------+---------+-----------+-----------+--------------------+--------------------+-----------------+--------------------+------------------+---------+---------+---------+
|OrderID|ProductID|Quantity|CustomerID| OrderDate|PaymentMethod|UnitPrice|OrderStatus|TotalAmount|       CustomerEmail|       CustomerPhone|             City|             Country|       ProductName| Category|FirstName|Last Name|
+-------+---------+--------+----------+----------+-------------+---------+-----------+-----------+--------------------+--------------------+-----------------+--------------------+------------------+---------+---------+---------+
|  O0094|    P0001|       1|     C1954|2024-12-02|       PayPal|   1491.0|    Pending|    8476.45|griffinmichelle@e...|       (808)782-4405|       Emilymouth|Northern Mariana ...|Interesting Tablet|Computers|   Kelsey|   Burton|
|  O0312|    P0001|       2|     C0827|2025-01-20|  Credit Card|   1491.0|    Shippe

#### Data Cleaning and Transformation

In [None]:
# import pyspark functions
from pyspark.sql.functions import col, length, trim, lower, upper, when, regexp_replace, concat_ws, year, month, to_date, try_to_timestamp,count, monotonically_increasing_id

In [10]:
# Checking and resolving for null values
for column in dwu_df.columns:
    print(f"{column}: {dwu_df.filter(dwu_df[column].isNull()).count()} null value exist")

OrderID: 0 null value exist
ProductID: 0 null value exist
Quantity: 0 null value exist
CustomerID: 0 null value exist
OrderDate: 0 null value exist
PaymentMethod: 172 null value exist
UnitPrice: 0 null value exist
OrderStatus: 0 null value exist
TotalAmount: 0 null value exist
CustomerEmail: 0 null value exist
CustomerPhone: 0 null value exist
City: 130 null value exist
Country: 0 null value exist
ProductName: 0 null value exist
Category: 0 null value exist
FirstName: 0 null value exist
Last Name: 0 null value exist


In [11]:
# How to fill up missing values
dwu_df_clean = dwu_df.fillna({
    'PaymentMethod' : 'Unknown',
    'City' : 'Unknown'
})

for column in dwu_df_clean.columns:
    print(f"{column}: {dwu_df_clean.filter(dwu_df_clean[column].isNull()).count()} null value exist")

OrderID: 0 null value exist
ProductID: 0 null value exist
Quantity: 0 null value exist
CustomerID: 0 null value exist
OrderDate: 0 null value exist
PaymentMethod: 0 null value exist
UnitPrice: 0 null value exist
OrderStatus: 0 null value exist
TotalAmount: 0 null value exist
CustomerEmail: 0 null value exist
CustomerPhone: 0 null value exist
City: 0 null value exist
Country: 0 null value exist
ProductName: 0 null value exist
Category: 0 null value exist
FirstName: 0 null value exist
Last Name: 0 null value exist


In [None]:
# ^anything that is not betiween 0 -9 , remove and replace
dwu_df_clean = dwu_df_clean.withColumn("CustomerPhone", regexp_replace(col("CustomerPhone"),"[^0-9]","")) 

dwu_df_clean.show(50)

+-------+---------+--------+----------+----------+-------------+---------+-----------+-----------+--------------------+-----------------+--------------------+--------------------+------------------+---------+---------+----------+
|OrderID|ProductID|Quantity|CustomerID| OrderDate|PaymentMethod|UnitPrice|OrderStatus|TotalAmount|       CustomerEmail|    CustomerPhone|                City|             Country|       ProductName| Category|FirstName| Last Name|
+-------+---------+--------+----------+----------+-------------+---------+-----------+-----------+--------------------+-----------------+--------------------+--------------------+------------------+---------+---------+----------+
|  O0094|    P0001|       1|     C1954|2024-12-02|       PayPal|   1491.0|    Pending|    8476.45|griffinmichelle@e...|       8087824405|          Emilymouth|Northern Mariana ...|Interesting Tablet|Computers|   Kelsey|    Burton|
|  O0312|    P0001|       2|     C0827|2025-01-20|  Credit Card|   1491.0|    Sh

In [15]:
# trim whitespaces from columns
dwu_df_clean = dwu_df_clean.withColumn("CustomerEmail", trim(col("CustomerEmail"))) \
                            .withColumn("Category", trim(col("Category")))

In [16]:
dwu_df_clean = dwu_df_clean.withColumn("CustomerEmail", lower(col("CustomerEmail"))) \
                            .withColumn("CustomerEmail", regexp_replace(col("CustomerEmail"), " ",""))

dwu_df_clean.show(50)

+-------+---------+--------+----------+----------+-------------+---------+-----------+-----------+--------------------+-----------------+--------------------+--------------------+------------------+---------+---------+----------+
|OrderID|ProductID|Quantity|CustomerID| OrderDate|PaymentMethod|UnitPrice|OrderStatus|TotalAmount|       CustomerEmail|    CustomerPhone|                City|             Country|       ProductName| Category|FirstName| Last Name|
+-------+---------+--------+----------+----------+-------------+---------+-----------+-----------+--------------------+-----------------+--------------------+--------------------+------------------+---------+---------+----------+
|  O0094|    P0001|       1|     C1954|2024-12-02|       PayPal|   1491.0|    Pending|    8476.45|griffinmichelle@e...|       8087824405|          Emilymouth|Northern Mariana ...|Interesting Tablet|Computers|   Kelsey|    Burton|
|  O0312|    P0001|       2|     C0827|2025-01-20|  Credit Card|   1491.0|    Sh

In [18]:
# Combine first and last names into a new column 'CustomerName'
dwu_df_clean = dwu_df_clean.withColumn(
    "CustomerName",
    concat_ws(" ", trim(col("FirstName")), trim(col("Last Name")))
)

dwu_df_clean.show(5)


+-------+---------+--------+----------+----------+-------------+---------+-----------+-----------+--------------------+---------------+-----------------+--------------------+------------------+---------+---------+---------+----------------+
|OrderID|ProductID|Quantity|CustomerID| OrderDate|PaymentMethod|UnitPrice|OrderStatus|TotalAmount|       CustomerEmail|  CustomerPhone|             City|             Country|       ProductName| Category|FirstName|Last Name|    CustomerName|
+-------+---------+--------+----------+----------+-------------+---------+-----------+-----------+--------------------+---------------+-----------------+--------------------+------------------+---------+---------+---------+----------------+
|  O0094|    P0001|       1|     C1954|2024-12-02|       PayPal|   1491.0|    Pending|    8476.45|griffinmichelle@e...|     8087824405|       Emilymouth|Northern Mariana ...|Interesting Tablet|Computers|   Kelsey|   Burton|   Kelsey Burton|
|  O0312|    P0001|       2|     C08

In [19]:
#drop duplicates 
dwu_df_clean = dwu_df_clean.dropDuplicates()

#### Data Transformation

In [22]:
dim_customer =  dwu_df_clean.select(
    "CustomerEmail",
    "CustomerName",   # was 'CutomerName'
    "CustomerPhone",  # was 'CutomerPhone'
    "City",
    "Country"
).dropDuplicates()

dim_customer = dim_customer.withColumn("CustomerID", monotonically_increasing_id())

dim_customer.show(5)

+--------------------+--------------+----------------+----------------+--------------------+----------+
|       CustomerEmail|  CustomerName|   CustomerPhone|            City|             Country|CustomerID|
+--------------------+--------------+----------------+----------------+--------------------+----------+
|petersonamber@exa...|    Jacob Carr|  69696689319648|       Lake John|               Korea|         0|
|ayalagregory@exam...|Christy Willis|1838826089618331|    Bautistabury|    Saint Barthelemy|         1|
|danielcline@examp...| Kenneth Gomez|   0015863776786|       Edgarland|Holy See (Vatican...|         2|
|amberhall@example...|  Bryan Hanson| 795823090646520|       Laurabury|              Brazil|         3|
|yesenia10@example...|    Samuel Lee|  30294534400663|New Williammouth|            Anguilla|         4|
+--------------------+--------------+----------------+----------------+--------------------+----------+
only showing top 5 rows


In [None]:
# create dim_product table
dim_product = dwu_df_clean.select(
    "ProductID",
    "ProductName",
    "Category",
    "UnitPrice"
).dropDuplicates()

dim_product.show(5)

+---------+----------------+-----------+---------+
|ProductID|     ProductName|   Category|UnitPrice|
+---------+----------------+-----------+---------+
|    P0055|     None Laptop|      Audio|   456.14|
|    P0192|Force Headphones|  Computers|  1917.86|
|    P0245|   Public Tablet|  Computers|   163.38|
|    P0260|      Only Phone|Accessories|  1061.21|
|    P0145|      Keep Phone|  Computers|   318.17|
+---------+----------------+-----------+---------+
only showing top 5 rows


In [24]:
# create fact_order table
fact_order = dwu_df_clean.select(
    "OrderID",
    "OrderDate",
    "PaymentMethod",
    "OrderStatus",
    "TotalAmount",
    "CustomerEmail"
)

fact_order = fact_order.join(
    dim_customer, on= "CustomerEmail", how="left"
).select(
    "orderID",
    "OrderDate",
    "CustomerID",
    "CustomerEmail",
    "PaymentMethod",
    "OrderStatus",
    "TotalAmount"

)

fact_order.show(10)

+-------+----------+----------+--------------------+-------------+-----------+-----------+
|orderID| OrderDate|CustomerID|       CustomerEmail|PaymentMethod|OrderStatus|TotalAmount|
+-------+----------+----------+--------------------+-------------+-----------+-----------+
|  O4922|2025-02-17|      1223|christophermcinty...|  Credit Card|  Cancelled|    6004.79|
| O94561|2025-03-07|       979|garciaricardo@exa...|      Unknown|    Shipped|   18154.02|
| O49132|2025-09-20|       940|ballardjohn@examp...|Bank Transfer|  Delivered|    11949.6|
| O49132|2025-09-20|       640|ballardjohn@examp...|Bank Transfer|  Delivered|    11949.6|
| O47996|2025-03-27|       472|velasquezmary@exa...|Bank Transfer|  Delivered|   13813.31|
| O88351|2025-06-03|      1533|owenstroy@example...|   Debit Card|    Pending|   13775.52|
|  O3868|2025-02-09|       488|bradleyhill@examp...|   Debit Card|  Cancelled|    4598.96|
|  O9238|2025-05-16|      1874|mccormickdanielle...|Bank Transfer|  Cancelled|   13809.67|

In [None]:
# Create the fact order_items table
fact_order_item = dwu_df_clean.select(
    "OrderID", "ProductID", "Quantity", "TotalAmount"
)

fact_order_item = fact_order_item.join(
    dim_product, on= "ProductID", how="left"
).select(
    "OrderID","ProductID", "ProductName", "Category", "Quantity", "unitprice", "TotalAmount"
)

fact_order_item.show(10)

+-------+---------+------------------+-----------+--------+---------+-----------+
|OrderID|ProductID|       ProductName|   Category|Quantity|unitprice|TotalAmount|
+-------+---------+------------------+-----------+--------+---------+-----------+
| O47996|    P0003|      Effort Phone|      Audio|       3|   1522.1|   13813.31|
| O88351|    P0003|      Effort Phone|      Audio|       2|   1522.1|   13775.52|
|  O9238|    P0006|  Space Headphones|      Audio|       3|  1797.27|   13809.67|
| O49132|    P0002|Culture Headphones|Accessories|       3|  1538.48|    11949.6|
| O74881|    P0007|    Forward Laptop|  Computers|       3|   280.87|   11983.25|
| O79770|    P0007|    Forward Laptop|  Computers|       2|   280.87|   10870.79|
| O85392|    P0007|    Forward Laptop|  Computers|       2|   280.87|    5873.45|
|  O3868|    P0005|Morning Headphones|Electronics|       2|  1483.93|    4598.96|
| O47381|    P0008|    Foreign Tablet|Electronics|       1|   431.41|    8564.94|
|  O4922|    P00

In [28]:
# Loading the dataset into a Postgresql DB using jdbc on pyspark

# define the database connection url with db parameters
db_url = "jdbc:postgresql://localhost:5432/dealwithus"
db_user ="postgres"
db_password = "London123"

def write_to_postgres(df, tablename):
    df.write.format('jdbc') \
        .option("url", db_url) \
        .option("dbtable", tablename) \
        .option("user", db_user) \
        .option("password", db_password) \
        .option("driver", "org.postgresql.Driver") \
        .mode('overwrite') \
        .save()
    
write_to_postgres(dim_customer, "staging.dim_customer")
write_to_postgres(dim_product,"staging.dim_product")
write_to_postgres(fact_order, "staging.fact_order")
write_to_postgres(fact_order_item, "staging.fact_order_item")


['OrderID',
 'ProductID',
 'Quantity',
 'CustomerID',
 'OrderDate',
 'PaymentMethod',
 'UnitPrice',
 'OrderStatus',
 'TotalAmount',
 'CustomerEmail',
 'CustomerPhone',
 'City',
 'Country',
 'ProductName',
 'Category',
 'FirstName',
 'Last Name']