In [None]:
#import libararies
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))
import os
import findspark
findspark.init("F:/programmes/spark-3.5.5-bin-hadoop3")

In [None]:
#pathes for used jars (for postgre , hadoob, clickhouse)
jdbc_driver_path = "F:/programmes/spark-3.5.5-bin-hadoop3/jars/*"

In [None]:
#Bulid spark session
spark = SparkSession.builder \
    .appName("productDim") \
    .config("spark.driver.extraClassPath", jdbc_driver_path) \
    .config("spark.executor.extraClassPath", jdbc_driver_path) \
    .getOrCreate()

print("✅ SparkSession created successfully!")

✅ SparkSession created successfully!


In [None]:
# Set up connection to PostgreSQL database

properties = {
    "user": "postgres",
    "password": "123",
    "driver": "org.postgresql.Driver"
}

In [None]:
#Reding products table from postgre
products_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/olist?ssl=false")\
    .option("dbtable", "public.products") \
    .option("user", "postgres") \
    .option("password", "123") \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [None]:
#Reding products category translation from postgre
translations_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/olist?ssl=false")\
    .option("dbtable", "public.product_category_translation") \
    .option("user", "postgres") \
    .option("password", "123") \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [None]:
# Adding the English translation for `product_category_name`
products_english_df = products_df.join(
    translations_df,
    on="product_category_name",
    how="left"
).select(
    products_df["*"],  # all coulmns
    translations_df["product_category_name_english"] # translation column
)

In [None]:
products_english_df.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+--------------------+-----------------------------+
|          product_id|product_category_name|product_name_length|product_description_length|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|        product_name|product_category_name_english|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+--------------------+-----------------------------+
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|Essential Canvas ...|                          art|
|cef67bcfe19066a93...|                bebes|                 27|        

In [None]:
products_english_df.columns

['product_id',
 'product_category_name',
 'product_name_length',
 'product_description_length',
 'product_photos_qty',
 'product_weight_g',
 'product_length_cm',
 'product_height_cm',
 'product_width_cm',
 'product_name',
 'product_category_name_english']

In [None]:
#Display data types
products_english_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_length: integer (nullable = true)
 |-- product_description_length: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_category_name_english: string (nullable = true)



In [None]:
#lower case and trim for categorical data
products_english_df = products_english_df.withColumn("product_category_name_english", lower(trim(col("product_category_name_english"))))
products_english_df = products_english_df.withColumn("product_category_name", lower(trim(col("product_category_name"))))

In [None]:
#check for null values
products_english_df.select([count(when(col(c).isNull(), c)).alias(c) for c in products_english_df.columns]).show()

+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+------------+-----------------------------+
|product_id|product_category_name|product_name_length|product_description_length|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_name|product_category_name_english|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+------------+-----------------------------+
|         0|                  610|                610|                       610|               610|               2|                2|                2|               2|           0|                          623|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+---------

In [None]:
#calculate average value for each column
avg_values = products_english_df.select(
    avg("product_weight_g").alias("avg_weight"),
    avg("product_length_cm").alias("avg_length"),
    avg("product_height_cm").alias("avg_height"),
    avg("product_width_cm").alias("avg_width")
).first()

In [None]:
# replace null values with the calculated avg's
product_dim_nulls_filled_df = products_english_df.fillna({
    "product_weight_g": avg_values["avg_weight"],
    "product_length_cm": avg_values["avg_length"],
    "product_height_cm": avg_values["avg_height"],
    "product_width_cm": avg_values["avg_width"]
})

In [None]:
# recheck for nulls
product_dim_nulls_filled_df.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in ["product_weight_g", "product_length_cm", "product_height_cm", "product_width_cm"]
]).show()

+----------------+-----------------+-----------------+----------------+
|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------------+-----------------+-----------------+----------------+
|               0|                0|                0|               0|
+----------------+-----------------+-----------------+----------------+



In [None]:
#double check for null values for the full dataset
product_dim_nulls_filled_df.select([count(when(col(c).isNull(), c)).alias(c) for c in product_dim_nulls_filled_df.columns]).show()

+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+------------+-----------------------------+
|product_id|product_category_name|product_name_length|product_description_length|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_name|product_category_name_english|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+------------+-----------------------------+
|         0|                  610|                610|                       610|               610|               0|                0|                0|               0|           0|                          623|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+---------

In [None]:
# Group by product_id , to calculate duplicates
duplicates_df = product_dim_nulls_filled_df.groupBy("product_id").count().filter(col("count") > 1)

# dislay duplicate rows
print("عدد المنتجات المكررة:")
print(duplicates_df.count())
duplicates_df.show()

عدد المنتجات المكررة:
0
+----------+-----+
|product_id|count|
+----------+-----+
+----------+-----+



In [None]:
#Drop duplicates
product_dim_nulls_filled_df = product_dim_nulls_filled_df.dropDuplicates(["product_id"])

In [None]:
# check for outliers: width , hight or length <= 0
outliers_df = product_dim_nulls_filled_df.filter(
    (col("product_weight_g") <= 0) |
    (col("product_length_cm") <= 0) |
    (col("product_height_cm") <= 0) |
    (col("product_width_cm") <= 0)
)

# display outliers rows
print("عدد الصفوف التي تحتوي على outliers:")
print(outliers_df.count())
outliers_df.show()

عدد الصفوف التي تحتوي على outliers:
4
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+--------------------+-----------------------------+
|          product_id|product_category_name|product_name_length|product_description_length|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|        product_name|product_category_name_english|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+--------------------+-----------------------------+
|36ba42dd187055e1f...|      cama_mesa_banho|                 53|                       528|                 1|               0|               30|               25|              30|Deluxe Bed Table ...|               bed_bath_table|
|8038040ee2a71048d...|      cama_m

In [None]:
# create tempveiw df
product_dim_nulls_filled_df.createOrReplaceTempView("products")

In [None]:
# average weight for each category
spark.sql("""
    CREATE OR REPLACE TEMP VIEW category_avg_weight AS
    SELECT product_category_name, AVG(product_weight_g) AS avg_weight
    FROM products
    WHERE product_weight_g > 0
    GROUP BY product_category_name
""")

DataFrame[]

In [None]:
# replace 0 weight values by the average value of its own category
products_trimed_outliers = spark.sql("""
    SELECT
        p.*,
        CASE
            WHEN p.product_weight_g = 0 THEN c.avg_weight
            ELSE p.product_weight_g
        END AS product_weight_g_cleaned
    FROM products p
    LEFT JOIN category_avg_weight c
    ON p.product_category_name = c.product_category_name
""")

In [None]:
# rename the outlier trimmed column
products_trimed_outliers = products_trimed_outliers.drop("product_weight_g") \
    .withColumnRenamed("product_weight_g_cleaned", "product_weight_g")

In [None]:
#remove duplicate column
products_trimed_outliers = products_trimed_outliers.withColumnRenamed("product_weight_g_cleaned", "product_weight_g")

In [None]:
#make sure outlires have been trimed

outliers_df = products_trimed_outliers.filter(
    (col("product_weight_g") <= 0) |
    (col("product_length_cm") <= 0) |
    (col("product_height_cm") <= 0) |
    (col("product_width_cm") <= 0)
)

# display
print("عدد الصفوف التي تحتوي على outliers:")
print(outliers_df.count())
outliers_df.show()

عدد الصفوف التي تحتوي على outliers:
0
+----------+---------------------+-------------------+--------------------------+------------------+-----------------+-----------------+----------------+------------+-----------------------------+----------------+
|product_id|product_category_name|product_name_length|product_description_length|product_photos_qty|product_length_cm|product_height_cm|product_width_cm|product_name|product_category_name_english|product_weight_g|
+----------+---------------------+-------------------+--------------------------+------------------+-----------------+-----------------+----------------+------------+-----------------------------+----------------+
+----------+---------------------+-------------------+--------------------------+------------------+-----------------+-----------------+----------------+------------+-----------------------------+----------------+



In [None]:
# calculate product weight: by cm^3
product_drived_col = products_trimed_outliers.withColumn(
    "product_volume_cm3",
    col("product_length_cm") * col("product_height_cm") * col("product_width_cm")
)

In [None]:
product_drived_col.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+-----------------+-----------------+----------------+--------------------+-----------------------------+----------------+------------------+
|          product_id|product_category_name|product_name_length|product_description_length|product_photos_qty|product_length_cm|product_height_cm|product_width_cm|        product_name|product_category_name_english|product_weight_g|product_volume_cm3|
+--------------------+---------------------+-------------------+--------------------------+------------------+-----------------+-----------------+----------------+--------------------+-----------------------------+----------------+------------------+
|00066f42aeeb9f300...|           perfumaria|                 53|                       596|                 6|               20|               16|              16|   Pure Cologne W708|                    perfumery|           300.0|              51

In [None]:
product_drived_col.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_length: integer (nullable = true)
 |-- product_description_length: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_category_name_english: string (nullable = true)
 |-- product_weight_g: double (nullable = true)
 |-- product_volume_cm3: integer (nullable = true)



In [None]:
#change derived col datatype ###
product_drived_col = product_drived_col.withColumn("product_weight_g", col("product_weight_g").cast("integer"))

In [None]:
product_drived_col.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_length: integer (nullable = true)
 |-- product_description_length: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_category_name_english: string (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_volume_cm3: integer (nullable = true)



In [None]:
#make surrogate_key starting from 1

window_spec = Window.orderBy("product_id")

# addind surrogate_key column
products_final_df = product_drived_col.withColumn(
    "surrogate_key", row_number().over(window_spec)
)

In [None]:
products_final_df.columns

['product_id',
 'product_category_name',
 'product_name_length',
 'product_description_length',
 'product_photos_qty',
 'product_length_cm',
 'product_height_cm',
 'product_width_cm',
 'product_name',
 'product_category_name_english',
 'product_weight_g',
 'product_volume_cm3',
 'surrogate_key']

In [None]:
# Rearranging the columns
products_dim = products_final_df.select(
    "surrogate_key",
    "product_id",
    "product_name",
    "product_category_name_english",
    "product_category_name",
    "product_name_length",
    "product_description_length",
    "product_photos_qty",
    "product_length_cm",
    "product_height_cm",
    "product_width_cm",
    "product_weight_g",
    "product_volume_cm3"

)

In [None]:
products_dim.show()

+-------------+--------------------+--------------------+-----------------------------+---------------------+-------------------+--------------------------+------------------+-----------------+-----------------+----------------+----------------+------------------+
|surrogate_key|          product_id|        product_name|product_category_name_english|product_category_name|product_name_length|product_description_length|product_photos_qty|product_length_cm|product_height_cm|product_width_cm|product_weight_g|product_volume_cm3|
+-------------+--------------------+--------------------+-----------------------------+---------------------+-------------------+--------------------------+------------------+-----------------+-----------------+----------------+----------------+------------------+
|            1|00066f42aeeb9f300...|   Pure Cologne W708|                    perfumery|           perfumaria|                 53|                       596|                 6|               20|            

## Payment Fact

In [None]:
#Reding payment table from postgre
order_payment_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/olist?ssl=false")\
    .option("dbtable", "public.order_payments") \
    .option("user", "postgres") \
    .option("password", "123") \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [None]:
order_payment_df.show()

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|        96.12|
|771ee386b001f0620...|                 1| credit_card|                   1|        81.16|
|3d7239c394a212faa...|                 1| credit_card|                   3|        51.84|
|1f78449c8

In [None]:
order_payment_df.columns

['order_id',
 'payment_sequential',
 'payment_type',
 'payment_installments',
 'payment_value']

In [None]:
#check data types
order_payment_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [None]:
#lower case and trim for categorical data
order_payment_df = order_payment_df.withColumn("payment_type", lower(trim(col("payment_type"))))

In [None]:
#check for null values
order_payment_df.select([count(when(col(c).isNull(), c)).alias(c) for c in order_payment_df.columns]).show()

+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
|       0|                 0|           0|                   0|            0|
+--------+------------------+------------+--------------------+-------------+



In [None]:
# Group by product_id و نعد التكرارات
duplicates_df = order_payment_df.groupBy("order_id","payment_sequential").count().filter(col("count") > 1)

# عرض عدد الصفوف المكررة
print("عدد المدفوعات المكررة:")
print(duplicates_df.count())
duplicates_df.show()

عدد المدفوعات المكررة:
0
+--------+------------------+-----+
|order_id|payment_sequential|count|
+--------+------------------+-----+
+--------+------------------+-----+



In [None]:
outliers_df = order_payment_df.filter(
    (col("payment_value") <= 0))
outliers_df.count()

9

In [None]:
outliers_df.show()

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|8bcbe01d44d147f90...|                 4|     voucher|                   1|          0.0|
|fa65dad1b0e818e3c...|                14|     voucher|                   1|          0.0|
|6ccb433e00daae128...|                 4|     voucher|                   1|          0.0|
|4637ca194b6387e2d...|                 1| not_defined|                   1|          0.0|
|00b1cb0320190ca0d...|                 1| not_defined|                   1|          0.0|
|45ed6e85398a87c25...|                 3|     voucher|                   1|          0.0|
|fa65dad1b0e818e3c...|                13|     voucher|                   1|          0.0|
|c8c528189310eaa44...|                 1| not_defined|                   1|          0.0|
|b23878b3e

In [None]:
# create tempveiw for payment table
order_payment_df.createOrReplaceTempView("order_payments")

In [None]:
# check distinct payment types
v1 = spark.sql('''
SELECT payment_type,count(payment_type)
FROM order_payments
group by payment_type
''')
v1.show()

+------------+-------------------+
|payment_type|count(payment_type)|
+------------+-------------------+
|      boleto|              19784|
| not_defined|                  3|
| credit_card|              76795|
|     voucher|               5775|
|  debit_card|               1529|
+------------+-------------------+



In [None]:
# check number of installments
v1 = spark.sql('''
SELECT payment_installments,count(payment_installments)
FROM order_payments
group by payment_installments
order by payment_installments
''')
v1.show()

+--------------------+---------------------------+
|payment_installments|count(payment_installments)|
+--------------------+---------------------------+
|                   0|                          2|
|                   1|                      52546|
|                   2|                      12413|
|                   3|                      10461|
|                   4|                       7098|
|                   5|                       5239|
|                   6|                       3920|
|                   7|                       1626|
|                   8|                       4268|
|                   9|                        644|
|                  10|                       5328|
|                  11|                         23|
|                  12|                        133|
|                  13|                         16|
|                  14|                         15|
|                  15|                         74|
|                  16|         

In [None]:
#delete undefined payments
order_payment_df = order_payment_df.filter(~(col("payment_type") == "not_defined"))


In [None]:
# check distinct payment types after removing not_defined
v1 = spark.sql('''
SELECT payment_type,count(payment_type)
FROM order_payments
group by payment_type
''')
v1.show()

+------------+-------------------+
|payment_type|count(payment_type)|
+------------+-------------------+
|      boleto|              19784|
| credit_card|              76795|
|     voucher|               5775|
|  debit_card|               1529|
+------------+-------------------+



In [None]:
#correct zero installments to 1
order_payment_df = order_payment_df.withColumn("payment_installments", when(col("payment_installments") == 0, 1).otherwise(col("payment_installments")))


In [None]:
# check number of installments after removing zero installments
v1 = spark.sql('''
SELECT payment_installments,count(payment_installments)
FROM order_payments
group by payment_installments
order by payment_installments
''')
v1.show()

+--------------------+---------------------------+
|payment_installments|count(payment_installments)|
+--------------------+---------------------------+
|                   1|                      52545|
|                   2|                      12413|
|                   3|                      10461|
|                   4|                       7098|
|                   5|                       5239|
|                   6|                       3920|
|                   7|                       1626|
|                   8|                       4268|
|                   9|                        644|
|                  10|                       5328|
|                  11|                         23|
|                  12|                        133|
|                  13|                         16|
|                  14|                         15|
|                  15|                         74|
|                  16|                          5|
|                  17|         

In [None]:
# add Forgin ksys

In [None]:
# نستخدم Window بدون partition لأن الـ order_id + payment_sequential بيضمنوا التفرد
windowSpec = Window.orderBy("order_id", "payment_sequential")

order_payment_df = order_payment_df.withColumn("order_payment_sk", row_number().over(windowSpec))

In [None]:
order_payment_df.columns

['order_id',
 'payment_sequential',
 'payment_type',
 'payment_installments',
 'payment_value',
 'order_payment_sk']

In [None]:
# Rearranging the columns
order_payment_df = order_payment_df.select('order_payment_sk',
'order_id',
'payment_sequential',
'payment_type',
'payment_installments',
'payment_value')

In [None]:
order_payment_df.columns

['order_payment_sk',
 'order_id',
 'payment_sequential',
 'payment_type',
 'payment_installments',
 'payment_value']