In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime

In [2]:
spark = SparkSession.builder \
    .appName("E-commerce analysis") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .getOrCreate()

25/04/10 22:14:15 WARN Utils: Your hostname, ettore1012-IdeaPad-Gaming-3-15ARH05 resolves to a loopback address: 127.0.1.1; using 192.168.15.113 instead (on interface wlp4s0)
25/04/10 22:14:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/ettore1012/Project/ecommerce_data_pipeline-main/venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ettore1012/.ivy2/cache
The jars for the packages stored in: /home/ettore1012/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1bb96e10-ff94-4528-909a-64e282b189fa;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 191ms :: artifacts dl 11ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwn

In [3]:
# Ignoring some Spark's log
spark.sparkContext.setLogLevel("ERROR")

In [4]:
# Retrive current date, timestamps, week 
today = datetime.now()
timestamp = today.strftime("%Y-%m-%d")
year, week_num, _ = today.isocalendar() # ISO standard week number
partition_folder = f"week_{year}_{week_num}"

In [5]:
# load data on spark
try:
    users_df = spark.read.json(f"s3a://raw-data/users/{partition_folder}/users.json", multiLine = True)
except Exception as e:
    print(f"⚠️ Could not load users: {e}")

try:
    products_df = spark.read.json(f"s3a://raw-data/products/{partition_folder}/products.json", multiLine = True)
except Exception as e:
    print(f"⚠️ Could not load products: {e}")

try:
    carts_df = spark.read.json(f"s3a://raw-data/carts_{timestamp}.json", multiLine = True)
except Exception as e:
    print(f"⚠️ Could not load carts: {e}")

                                                                                

In [6]:
# check schema of data
print("\n\nUsers schema\n")
users_df.printSchema()
users_df.show(5)



Users schema

root
 |-- __v: long (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- geolocation: struct (nullable = true)
 |    |    |-- lat: string (nullable = true)
 |    |    |-- long: string (nullable = true)
 |    |-- number: long (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- zipcode: string (nullable = true)
 |-- email: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- password: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- username: string (nullable = true)

+---+--------------------+------------------+---+-----------------+---------+--------------+---------+
|__v|             address|             email| id|             name| password|         phone| username|
+---+--------------------+------------------+---+-----------------+---------+--

In [7]:
# Explode users_df
users_df_flat = users_df.select(
    col("__V"),
    col("address.city"),
    col("address.geolocation.lat"),
    col("address.geolocation.long"),
    col("address.number"),
    col("address.street"),
    col("address.zipcode"),
    col("email"),
    col("id"),
    col("name.firstname"),
    col("name.lastname"),
    col("password"),
    col("phone"),
    col("username")
)

users_df_flat.show(5)

+---+-----------+--------+--------+------+----------------+----------+------------------+---+---------+--------+---------+--------------+---------+
|__V|       city|     lat|    long|number|          street|   zipcode|             email| id|firstname|lastname| password|         phone| username|
+---+-----------+--------+--------+------+----------------+----------+------------------+---+---------+--------+---------+--------------+---------+
|  0|   kilcoole|-37.3159| 81.1496|  7682|        new road|12926-3874|    john@gmail.com|  1|     john|     doe|  m38rmF$|1-570-236-7033|    johnd|
|  0|   kilcoole|-37.3159| 81.1496|  7267|       Lovers Ln|12926-3874|morrison@gmail.com|  2|    david|morrison|   83r5^_|1-570-236-7033| mor_2314|
|  0|    Cullman| 40.3467|-30.1310|    86|      Frances Ct|29567-1452|   kevin@gmail.com|  3|    kevin|    ryan|kev02937@|1-567-094-1345|kevinryan|
|  0|San Antonio| 50.3467|-20.1310|  6454|Hunters Creek Dr|98234-1734|     don@gmail.com|  4|      don|   romer|

In [8]:
# check schema of data
print("\n\nProducts schema\n")
products_df.printSchema()
products_df.show(5)



Products schema

root
 |-- category: string (nullable = true)
 |-- description: string (nullable = true)
 |-- id: long (nullable = true)
 |-- image: string (nullable = true)
 |-- price: double (nullable = true)
 |-- rating: struct (nullable = true)
 |    |-- count: long (nullable = true)
 |    |-- rate: double (nullable = true)
 |-- title: string (nullable = true)

+--------------+--------------------+---+--------------------+------+----------+--------------------+
|      category|         description| id|               image| price|    rating|               title|
+--------------+--------------------+---+--------------------+------+----------+--------------------+
|men's clothing|Your perfect pack...|  1|https://fakestore...|109.95|{120, 3.9}|Fjallraven - Fold...|
|men's clothing|Slim-fitting styl...|  2|https://fakestore...|  22.3|{259, 4.1}|Mens Casual Premi...|
|men's clothing|great outerwear j...|  3|https://fakestore...| 55.99|{500, 4.7}|  Mens Cotton Jacket|
|men's clothing|Th

In [9]:
products_df_flat = products_df.select(
    col("category"),
    col("description"),
    col("id"),
    col("image"),
    col("price"),
    col("rating.count"),
    col("rating.rate"),
    col("title")
)

products_df_flat.show(5)

+--------------+--------------------+---+--------------------+------+-----+----+--------------------+
|      category|         description| id|               image| price|count|rate|               title|
+--------------+--------------------+---+--------------------+------+-----+----+--------------------+
|men's clothing|Your perfect pack...|  1|https://fakestore...|109.95|  120| 3.9|Fjallraven - Fold...|
|men's clothing|Slim-fitting styl...|  2|https://fakestore...|  22.3|  259| 4.1|Mens Casual Premi...|
|men's clothing|great outerwear j...|  3|https://fakestore...| 55.99|  500| 4.7|  Mens Cotton Jacket|
|men's clothing|The color could b...|  4|https://fakestore...| 15.99|  430| 2.1|Mens Casual Slim Fit|
|      jewelery|From our Legends ...|  5|https://fakestore...| 695.0|  400| 4.6|John Hardy Women'...|
+--------------+--------------------+---+--------------------+------+-----+----+--------------------+
only showing top 5 rows



In [10]:
# check schema of data
print("\n\nCarts schema\n")
carts_df.printSchema()
carts_df.show(5)



Carts schema

root
 |-- __v: long (nullable = true)
 |-- date: string (nullable = true)
 |-- id: long (nullable = true)
 |-- products: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- productId: long (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |-- userId: long (nullable = true)

+---+----------+---+--------------------+------+
|__v|      date| id|            products|userId|
+---+----------+---+--------------------+------+
|  0|2025-04-10|  1|[{1, 4}, {2, 1}, ...|     1|
|  0|2025-04-10|  2|[{2, 4}, {1, 10},...|     1|
|  0|2025-04-10|  3|    [{1, 2}, {9, 1}]|     2|
|  0|2025-04-10|  4|            [{1, 4}]|     3|
|  0|2025-04-10|  5|    [{7, 1}, {8, 1}]|     3|
+---+----------+---+--------------------+------+
only showing top 5 rows



In [11]:
carts_df_exploded = carts_df.withColumn("product", explode(col("products")))
carts_df_flat = carts_df_exploded.select(
    "__V",
    "date",
    "id",
    col("product.productID").alias("product_ID"),
    col("product.quantity").alias("quantity")
)
carts_df_flat.show()

+---+----------+---+----------+--------+
|__V|      date| id|product_ID|quantity|
+---+----------+---+----------+--------+
|  0|2025-04-10|  1|         1|       4|
|  0|2025-04-10|  1|         2|       1|
|  0|2025-04-10|  1|         3|       6|
|  0|2025-04-10|  2|         2|       4|
|  0|2025-04-10|  2|         1|      10|
|  0|2025-04-10|  2|         5|       2|
|  0|2025-04-10|  3|         1|       2|
|  0|2025-04-10|  3|         9|       1|
|  0|2025-04-10|  4|         1|       4|
|  0|2025-04-10|  5|         7|       1|
|  0|2025-04-10|  5|         8|       1|
|  0|2025-04-10|  6|        10|       2|
|  0|2025-04-10|  6|        12|       3|
|  0|2025-04-10|  7|        18|       1|
+---+----------+---+----------+--------+



In [12]:
# Clarify some column names
products_df_flat = products_df_flat.withColumnRenamed("id", "Product_ID")
users_df_flat = users_df_flat.withColumnRenamed("id", "User_ID")
carts_df_flat = carts_df_flat.withColumnRenamed("id", "User_ID")

In [13]:
# Build enriched carts facts data sets with static data from users and products
enriched_df = carts_df_flat.join(
    products_df_flat,
    products_df_flat.Product_ID == carts_df_flat.product_ID,
    "left_outer"
).select(
    products_df_flat["category"], 
    products_df_flat["price"],
    products_df_flat["count"], 
    products_df_flat["rate"],
    carts_df_flat["User_ID"],
    carts_df_flat["product_ID"],
    carts_df_flat["quantity"]
)

enriched_df = enriched_df.join(
    users_df_flat,
    users_df_flat.User_ID == enriched_df.User_ID,
    "left_outer" 
).select(
    enriched_df["*"],
    users_df_flat["city"],
    users_df_flat["email"],
    users_df_flat["firstname"],
    users_df_flat["lastname"]
)

enriched_df.show()

+----------------+------+-----+----+-------+----------+--------+-----------+------------------+---------+--------+
|        category| price|count|rate|User_ID|product_ID|quantity|       city|             email|firstname|lastname|
+----------------+------+-----+----+-------+----------+--------+-----------+------------------+---------+--------+
|  men's clothing|109.95|  120| 3.9|      1|         1|       4|   kilcoole|    john@gmail.com|     john|     doe|
|  men's clothing|  22.3|  259| 4.1|      1|         2|       1|   kilcoole|    john@gmail.com|     john|     doe|
|  men's clothing| 55.99|  500| 4.7|      1|         3|       6|   kilcoole|    john@gmail.com|     john|     doe|
|  men's clothing|  22.3|  259| 4.1|      2|         2|       4|   kilcoole|morrison@gmail.com|    david|morrison|
|  men's clothing|109.95|  120| 3.9|      2|         1|      10|   kilcoole|morrison@gmail.com|    david|morrison|
|        jewelery| 695.0|  400| 4.6|      2|         5|       2|   kilcoole|morr

In [14]:
# We cache the enriched DataFrame because we will reuse it multiple times
# for different aggregations (e.g., by category, by user, by product).
# Caching avoids recomputing the same transformations and keeps the data in memory
# if possible, greatly improving performance in a batch analytics context.
enriched_df.cache()
enriched_df.count()  # triggers the actual caching (required)

14

In [15]:
revenue_by_category = enriched_df.groupBy("category").agg(
    sum(col("price") * col("quantity")).alias("total_revenue_byCat")
)

revenue_by_category.show()

+----------------+-------------------+
|        category|total_revenue_byCat|
+----------------+-------------------+
|women's clothing|               9.85|
|        jewelery|            1410.98|
|  men's clothing|            2646.44|
|     electronics|              624.0|
+----------------+-------------------+



In [16]:
revenue_by_users_ID = enriched_df.groupBy("User_ID", "firstname","lastname", "email").agg(
    sum(col("price") * col("quantity")).alias("total_revenue_byUser")
).orderBy(col("total_revenue_byUser").desc())
revenue_by_users_ID.show()

+-------+---------+--------+------------------+--------------------+
|User_ID|firstname|lastname|             email|total_revenue_byUser|
+-------+---------+--------+------------------+--------------------+
|      2|    david|morrison|morrison@gmail.com|              2578.7|
|      1|     john|     doe|    john@gmail.com|              798.04|
|      6|    david| russell| david_r@gmail.com|               560.0|
|      4|      don|   romer|     don@gmail.com|               439.8|
|      3|    kevin|    ryan|   kevin@gmail.com|               283.9|
|      5|    derek|  powell|   derek@gmail.com|               20.98|
|      7|   miriam|  snyder|  miriam@gmail.com|                9.85|
+-------+---------+--------+------------------+--------------------+



In [17]:
total_day_revenue = enriched_df.agg(
    round(sum(col("price") * col("quantity")),2).alias("total_day_revenue")
)

print(total_day_revenue.collect())

[Row(total_day_revenue=4691.27)]


In [20]:
total_quantity_per_prod = enriched_df.groupBy("Product_ID", "category").agg(
    sum(col("quantity")).alias("quantity_per_productID")
).orderBy(col("quantity_per_productID").desc())
total_quantity_per_prod.show()

+----------+----------------+------------+
|Product_ID|        category|tot_quantity|
+----------+----------------+------------+
|         1|  men's clothing|          20|
|         3|  men's clothing|           6|
|         2|  men's clothing|           5|
|        12|     electronics|           3|
|        10|     electronics|           2|
|         5|        jewelery|           2|
|         8|        jewelery|           1|
|         7|        jewelery|           1|
|         9|     electronics|           1|
|        18|women's clothing|           1|
+----------+----------------+------------+



In [25]:
avg_ordval_per_user = enriched_df.groupBy("User_ID", "firstname", "lastname", "email").agg(
    round(avg(col("price")), 2).alias("avg_ordval_per_UserID")
).orderBy(col("avg_ordval_per_UserID").desc())
avg_ordval_per_user.show()

+-------+---------+--------+------------------+---------------------+
|User_ID|firstname|lastname|             email|avg_ordval_per_UserID|
+-------+---------+--------+------------------+---------------------+
|      2|    david|morrison|morrison@gmail.com|               275.75|
|      6|    david| russell| david_r@gmail.com|                111.5|
|      4|      don|   romer|     don@gmail.com|               109.95|
|      3|    kevin|    ryan|   kevin@gmail.com|                86.98|
|      1|     john|     doe|    john@gmail.com|                62.75|
|      5|    derek|  powell|   derek@gmail.com|                10.49|
|      7|   miriam|  snyder|  miriam@gmail.com|                 9.85|
+-------+---------+--------+------------------+---------------------+



In [38]:
avg_rat_per_cat = enriched_df.groupBy("category").agg(
    round((sum(col("rate") * col("count")) / sum(col("count"))),1).alias("Weighted_avg_rating_per_category")
).orderBy(col("Weighted_avg_rating_per_category").desc())

avg_rat_per_cat.show()

+----------------+--------------------------------+
|        category|Weighted_avg_rating_per_category|
+----------------+--------------------------------+
|women's clothing|                             4.7|
|  men's clothing|                             4.2|
|     electronics|                             3.7|
|        jewelery|                             3.6|
+----------------+--------------------------------+



In [40]:
total_revenue_per_city = enriched_df.groupBy("city").agg(
    round(sum(col("quantity") * col("price")),2).alias("Total_revenue_city")
).orderBy(col("Total_revenue_city").desc())
total_revenue_per_city.show()

+-----------+------------------+
|       city|Total_revenue_city|
+-----------+------------------+
|   kilcoole|           3376.74|
|    el paso|             560.0|
|San Antonio|             439.8|
|    Cullman|             283.9|
|san Antonio|             20.98|
|     fresno|              9.85|
+-----------+------------------+



In [42]:
num_of_prod_sold = enriched_df.groupBy("product_ID").agg(
    count("product_ID").alias("Num_prod_sold")
).orderBy(col("Num_prod_sold").desc())

num_of_prod_sold.show()

+----------+-------------+
|product_ID|Num_prod_sold|
+----------+-------------+
|         1|            4|
|         2|            2|
|         7|            1|
|         9|            1|
|         5|            1|
|        10|            1|
|         3|            1|
|        12|            1|
|         8|            1|
|        18|            1|
+----------+-------------+



In [50]:
rev_prod_cat = enriched_df.groupBy("category", "product_ID").agg(
    sum(col("price") * col("quantity")).alias("revenue_prod_byCat")
).orderBy(col("revenue_prod_byCat").desc())
rev_prod_cat.show()

+----------------+----------+------------------+
|        category|product_ID|revenue_prod_byCat|
+----------------+----------+------------------+
|  men's clothing|         1|            2199.0|
|        jewelery|         5|            1390.0|
|     electronics|        12|             342.0|
|  men's clothing|         3|            335.94|
|     electronics|        10|             218.0|
|  men's clothing|         2|             111.5|
|     electronics|         9|              64.0|
|        jewelery|         8|             10.99|
|        jewelery|         7|              9.99|
|women's clothing|        18|              9.85|
+----------------+----------+------------------+



In [None]:
top_user_rev = enriched_df.groupBy("user_ID", "firstname", "lastname", "email").agg(
    sum(col("price") * col("quantity")).alias("Top_user_revenue")
).orderBy(col("Top_user_revenue").desc()).limit(1)

+-------+---------+--------+------------------+----------------+
|user_ID|firstname|lastname|             email|Top_user_revenue|
+-------+---------+--------+------------------+----------------+
|      2|    david|morrison|morrison@gmail.com|          2578.7|
+-------+---------+--------+------------------+----------------+



In [57]:
top_prod_rev = enriched_df.groupBy("product_ID", "category").agg(
    sum(col("price") * col("quantity")).alias("Top_product_revenue")
).orderBy(col("Top_product_revenue").desc()).limit(1)
top_prod_rev.show()

+----------+--------------+-------------------+
|product_ID|      category|Top_product_revenue|
+----------+--------------+-------------------+
|         1|men's clothing|             2199.0|
+----------+--------------+-------------------+



In [58]:
ord_bycity = enriched_df.groupBy("city").agg(
    sum(col("quantity")).alias("Orders_by_city")
).orderBy(col("Orders_by_city").desc())
ord_bycity.show()

+-----------+--------------+
|       city|Orders_by_city|
+-----------+--------------+
|   kilcoole|            27|
|    el paso|             5|
|San Antonio|             4|
|    Cullman|             3|
|san Antonio|             2|
|     fresno|             1|
+-----------+--------------+

