In [1]:
%pip install pyspark pandas sqlalchemy psycopg2-binary

Collecting pyspark
  Downloading pyspark-4.0.1.tar.gz (434.2 MB)
     ---------------------------------------- 0.0/434.2 MB ? eta -:--:--
     --------------------------------------- 1.0/434.2 MB 13.1 MB/s eta 0:00:34
     --------------------------------------- 3.7/434.2 MB 12.7 MB/s eta 0:00:34
      -------------------------------------- 6.3/434.2 MB 13.0 MB/s eta 0:00:33
      ------------------------------------- 10.0/434.2 MB 13.9 MB/s eta 0:00:31
     - ------------------------------------ 12.3/434.2 MB 13.6 MB/s eta 0:00:32
     - ------------------------------------ 15.2/434.2 MB 13.9 MB/s eta 0:00:31
     - ------------------------------------ 16.5/434.2 MB 12.5 MB/s eta 0:00:34
     - ------------------------------------ 19.9/434.2 MB 12.9 MB/s eta 0:00:33
     -- ----------------------------------- 24.1/434.2 MB 13.6 MB/s eta 0:00:31
     -- ----------------------------------- 28.0/434.2 MB 14.1 MB/s eta 0:00:29
     -- ----------------------------------- 31.7/434.2 MB 14.5

In [1]:
# import Necessary Libraries

from pyspark.sql import SparkSession
from pyspark.sql import DataFrameWriter
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd
from sqlalchemy import create_engine
import os
import psycopg2

In [4]:
from pyspark.sql import SparkSession


spark = (
    SparkSession.builder
    .appName("Flour4Four_ETL_PySpark")
    .getOrCreate()
)



In [5]:
spark

In [47]:
# Data Extraction to Spark DataFrame
f4f_df = spark.read.csv(r'raw_data\flour4four_orders.csv', header=True, inferSchema=True, nullValue="None") 

f4f_df.show(10)


+----------+----------+-------------+-----------+--------------------+-------------+--------------------+---------------+-------------+------------+-------------+-------------+------------+--------------+------------+-------------+-----------+
|  order_id|order_date|delivery_date|business_id|       business_name|business_type|    business_address|   contact_name|contact_phone|  flour_type|quantity_bags|price_per_bag|total_amount|payment_method|order_status|   rider_name|rider_phone|
+----------+----------+-------------+-----------+--------------------+-------------+--------------------+---------------+-------------+------------+-------------+-------------+------------+--------------+------------+-------------+-----------+
|ORD-214576|2025-10-25|   2025-10-25|   BIZ-1018|Adams, Zuniga and...|   Restaurant|Herbert Macaulay ...|   Elimu Agbaje|   8017507864|        NULL|           26|         9500|      247000|           POS|   Delivered|  Aisha Bello| 8089864260|
|ORD-299448|2025-10-08| 

In [38]:
# no of rows
num_rows = f4f_df.count()

num_rows

10000

In [39]:
# no of columns 
num_columns = len(f4f_df.columns)

num_columns

17

In [48]:
# Checking for null values
for column in f4f_df.columns:
    print(column, 'Nulls', f4f_df.filter(f4f_df[column].isNull()).count())


order_id Nulls 0
order_date Nulls 628
delivery_date Nulls 0
business_id Nulls 0
business_name Nulls 602
business_type Nulls 987
business_address Nulls 978
contact_name Nulls 0
contact_phone Nulls 0
flour_type Nulls 642
quantity_bags Nulls 0
price_per_bag Nulls 636
total_amount Nulls 0
payment_method Nulls 0
order_status Nulls 0
rider_name Nulls 0
rider_phone Nulls 0


In [49]:
f4f_df.show(5)

+----------+----------+-------------+-----------+--------------------+-------------+--------------------+---------------+-------------+------------+-------------+-------------+------------+--------------+------------+-------------+-----------+
|  order_id|order_date|delivery_date|business_id|       business_name|business_type|    business_address|   contact_name|contact_phone|  flour_type|quantity_bags|price_per_bag|total_amount|payment_method|order_status|   rider_name|rider_phone|
+----------+----------+-------------+-----------+--------------------+-------------+--------------------+---------------+-------------+------------+-------------+-------------+------------+--------------+------------+-------------+-----------+
|ORD-214576|2025-10-25|   2025-10-25|   BIZ-1018|Adams, Zuniga and...|   Restaurant|Herbert Macaulay ...|   Elimu Agbaje|   8017507864|        NULL|           26|         9500|      247000|           POS|   Delivered|  Aisha Bello| 8089864260|
|ORD-299448|2025-10-08| 

In [51]:
# fill up the missing values
from pyspark.sql import functions as F

# Replace null order_date with delivery_date
f4f_df = f4f_df.withColumn(
    "order_date",
    F.coalesce("order_date", "delivery_date")
)


In [52]:
f4f_df.show(5)

+----------+----------+-------------+-----------+--------------------+-------------+--------------------+---------------+-------------+------------+-------------+-------------+------------+--------------+------------+-------------+-----------+
|  order_id|order_date|delivery_date|business_id|       business_name|business_type|    business_address|   contact_name|contact_phone|  flour_type|quantity_bags|price_per_bag|total_amount|payment_method|order_status|   rider_name|rider_phone|
+----------+----------+-------------+-----------+--------------------+-------------+--------------------+---------------+-------------+------------+-------------+-------------+------------+--------------+------------+-------------+-----------+
|ORD-214576|2025-10-25|   2025-10-25|   BIZ-1018|Adams, Zuniga and...|   Restaurant|Herbert Macaulay ...|   Elimu Agbaje|   8017507864|        NULL|           26|         9500|      247000|           POS|   Delivered|  Aisha Bello| 8089864260|
|ORD-299448|2025-10-08| 

In [56]:
# fill up the missing values
f4f_df_clean = f4f_df.fillna({
    'business_name': 'Unknown',
    'business_type': 'Unknown',
    'business_address': 'Unknown',
    'flour_type': 'Unknown',
          
})

for column in f4f_df_clean.columns:
    print(column, 'Nulls', f4f_df_clean.filter(f4f_df_clean[column].isNull()).count())

order_id Nulls 0
order_date Nulls 0
delivery_date Nulls 0
business_id Nulls 0
business_name Nulls 0
business_type Nulls 0
business_address Nulls 0
contact_name Nulls 0
contact_phone Nulls 0
flour_type Nulls 0
quantity_bags Nulls 0
price_per_bag Nulls 636
total_amount Nulls 0
payment_method Nulls 0
order_status Nulls 0
rider_name Nulls 0
rider_phone Nulls 0


In [58]:
# Fill price_per_bag with median
median_price = f4f_df_clean.approxQuantile("price_per_bag", [0.5], 0.01)[0]
f4f_df_clean = f4f_df_clean.na.fill({"price_per_bag": median_price})


f4f_df_clean.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- delivery_date: date (nullable = true)
 |-- business_id: string (nullable = true)
 |-- business_name: string (nullable = false)
 |-- business_type: string (nullable = false)
 |-- business_address: string (nullable = false)
 |-- contact_name: string (nullable = true)
 |-- contact_phone: long (nullable = true)
 |-- flour_type: string (nullable = false)
 |-- quantity_bags: integer (nullable = true)
 |-- price_per_bag: integer (nullable = true)
 |-- total_amount: integer (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- rider_name: string (nullable = true)
 |-- rider_phone: long (nullable = true)



In [61]:
f4f_df_clean.columns

['order_id',
 'order_date',
 'delivery_date',
 'business_id',
 'business_name',
 'business_type',
 'business_address',
 'contact_name',
 'contact_phone',
 'flour_type',
 'quantity_bags',
 'price_per_bag',
 'total_amount',
 'payment_method',
 'order_status',
 'rider_name',
 'rider_phone']

### Data Transformation


In [None]:
# create business dimension model 
dim_business = f4f_df_clean.select('business_id', 'business_name', 'business_type', 'business_address', \
                                   'contact_name', 'contact_phone') \
                                   .dropDuplicates(["business_id"])

dim_business.show()


+-----------+--------------------+-------------+--------------------+----------------+-------------+
|business_id|       business_name|business_type|    business_address|    contact_name|contact_phone|
+-----------+--------------------+-------------+--------------------+----------------+-------------+
|   BIZ-1000|             Unknown|       Bakery|Ahmadu Bello Way,...|   Ijeoma Jalloh|   8042868828|
|   BIZ-1001|           Doyle Ltd|   Restaurant|Ahmadu Bello Way,...|   Adaeze Fofana|   8083197857|
|   BIZ-1002|Mcclain, Miller a...|       Bakery|Herbert Macaulay ...|      Asha Okeke|   8013999315|
|   BIZ-1003|      Davis and Sons|       Bakery|Ahmadu Bello Way,...|       Zane Adom|   8090801586|
|   BIZ-1004|Guzman, Hoffman a...|       Bakery|Herbert Macaulay ...|     Osman Touré|   8097226012|
|   BIZ-1005|Gardner, Robinson...|         Cafe|Ahmadu Bello Way,...|  Abena Obasanjo|   8047338124|
|   BIZ-1006|      Blake and Sons|       Bakery|Ahmadu Bello Way,...| Bolanle Kalumba|   80

In [83]:
# create dimension table for rider
dim_rider = f4f_df_clean.select('rider_name', 'rider_phone').dropDuplicates()  \
                    .withColumn('rider_id', monotonically_increasing_id()) \
                    .select('rider_id','rider_name', 'rider_phone')

dim_rider.show()

+--------+-------------+-----------+
|rider_id|   rider_name|rider_phone|
+--------+-------------+-----------+
|       0|   Emeka John| 8019196777|
|       1|  Aisha Bello| 8089864260|
|       2|Tunde Oladipo| 8019121552|
|       3| Grace Onyema| 8041568532|
+--------+-------------+-----------+



In [82]:
# create flour dimension table

dim_flour = f4f_df_clean.select('flour_type').dropDuplicates() \
                    .withColumn('flour_type_id', monotonically_increasing_id()) \
                    .select('flour_type_id', 'flour_type')

dim_flour.show()

+-------------+------------+
|flour_type_id|  flour_type|
+-------------+------------+
|            0| All-purpose|
|            1|     Unknown|
|            2| Whole Wheat|
|            3|Pastry Flour|
|            4| Bread Flour|
+-------------+------------+



In [84]:
# create fact table

fact_orders = f4f_df_clean.join(dim_flour, ['flour_type'], 'left') \
                          .join(dim_rider, ['rider_name', 'rider_phone'], 'left') \
                          .select('order_id', 'order_date', 'delivery_date', 'business_id', 'rider_id', 'flour_type_id', \
                                  'quantity_bags', 'price_per_bag', 'total_amount', 'payment_method', 'order_status')

fact_orders.show(5)




+----------+----------+-------------+-----------+--------+-------------+-------------+-------------+------------+--------------+------------+
|  order_id|order_date|delivery_date|business_id|rider_id|flour_type_id|quantity_bags|price_per_bag|total_amount|payment_method|order_status|
+----------+----------+-------------+-----------+--------+-------------+-------------+-------------+------------+--------------+------------+
|ORD-214576|2025-10-25|   2025-10-25|   BIZ-1018|       1|            1|           26|         9500|      247000|           POS|   Delivered|
|ORD-299448|2025-10-08|   2025-10-08|   BIZ-1006|       2|            4|           27|        10000|      270000|           POS|   Cancelled|
|ORD-246991|2025-10-17|   2025-10-17|   BIZ-1052|       2|            3|           21|         9800|      205800| Bank Transfer|   Cancelled|
|ORD-392075|2025-10-13|   2025-10-13|   BIZ-1035|       2|            0|           20|        10500|      210000| Bank Transfer|     Pending|
|ORD-1

#### Data Loading

In [85]:
# data loading 

def get_db_connection():
    connection = psycopg2.connect(
        host='localhost',
        database='F4F_DB',
        user='postgres',
        password='London123'
    )
    return connection

# connect to sql database
conn = get_db_connection()

In [90]:
fact_orders.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- delivery_date: date (nullable = true)
 |-- business_id: string (nullable = true)
 |-- rider_id: long (nullable = true)
 |-- flour_type_id: long (nullable = true)
 |-- quantity_bags: integer (nullable = true)
 |-- price_per_bag: integer (nullable = true)
 |-- total_amount: integer (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- order_status: string (nullable = true)



In [93]:
# Create a function to create tables

def create_table():
    conn = get_db_connection()
    cursor = conn.cursor()

    create_table_query = '''
   
                        CREATE SCHEMA IF NOT EXISTS f4f;

                        DROP TABLE IF EXISTS f4f.dim_business CASCADE;
                        DROP TABLE IF EXISTS f4f.dim_rider CASCADE;
                        DROP TABLE IF EXISTS f4f.dim_flour CASCADE;
                        DROP TABLE IF EXISTS f4f.fact_orders CASCADE;

                        CREATE TABLE f4f.dim_business (
                            business_id VARCHAR PRIMARY KEY,
                            business_name VARCHAR NOT NULL,
                            business_type VARCHAR,
                            business_address VARCHAR,
                            contact_name VARCHAR NOT NULL,
                            contact_phone VARCHAR NOT NULL
                            
                        );

                        CREATE TABLE f4f.dim_rider (
                            rider_id SERIAL PRIMARY KEY,
                            rider_name VARCHAR NOT NULL,
                            rider_phone VARCHAR NOT NULL
                        );

                        CREATE TABLE f4f.dim_flour (
                            flour_type_id SERIAL PRIMARY KEY,
                            flour_type VARCHAR(10000)
                            
                        );

                        CREATE TABLE f4f.fact_orders (
                            order_id       VARCHAR PRIMARY KEY,
                            order_date     DATE NOT NULL,
                            delivery_date  DATE NOT NULL,
                            business_id    VARCHAR NOT NULL REFERENCES f4f.dim_business(business_id),
                            rider_id       INT NOT NULL REFERENCES f4f.dim_rider(rider_id),
                            flour_type_id  INT NOT NULL REFERENCES f4f.dim_flour(flour_type_id),
                            quantity_bags  INT NOT NULL,
                            price_per_bag   NUMERIC NOT NULL,
                            total_amount   NUMERIC NOT NULL,
                            payment_method VARCHAR NOT NULL,
                            order_status   VARCHAR NOT NULL
                        );
    '''

    cursor.execute(create_table_query)
    conn.commit()
    cursor.close()
    conn.close()
    print("Tables created successfully.")

In [94]:
create_table()

Tables created successfully.


In [95]:
# loading data into table

url = "jdbc:postgresql://localhost:5432/F4F_DB"
properties = {
    "user" : "postgres",
    "password" : "London123",
    "driver" : "org.postgresql.Driver"
}

In [97]:
dim_business.write.jdbc(url=url, table="f4f.dim_business", mode="append", properties=properties)
dim_rider.write.jdbc(url=url, table="f4f.dim_rider", mode="append", properties=properties)
dim_flour.write.jdbc(url=url, table="f4f.dim_flour", mode="append", properties=properties)
fact_orders.write.jdbc(url=url, table="f4f.fact_orders", mode="append", properties=properties)