## EXERCISE 4 - rewrite exercise 1

In [2]:

import time
import builtins  # <-- IMPORTANT
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    avg,
    round as spark_round,   # Spark round ONLY for Columns
    count,
    col,
    sum as _sum
)

spark = (
    SparkSession.builder
    .appName("PostgresVsSparkBenchmark")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.2")
    .config("spark.eventLog.enabled", "true")
    .config("spark.eventLog.dir", "/tmp/spark-events")
    .config("spark.history.fs.logDirectory", "/tmp/spark-events")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "4")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

In [3]:
jdbc_url = "jdbc:postgresql://postgres:5432/postgres"
jdbc_props = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

In [4]:

print("\n=== Loading orders from PostgreSQL ===")
start_time = time.time()

orders_df = spark.read.jdbc(
    url=jdbc_url,
    table="orders_big",
    properties=jdbc_props
)

row_count = orders_df.count()
print(f"Rows loaded: {row_count}")
print("Load time:", round(time.time() - start_time, 2), "seconds")

# Register temp view for Spark SQL
orders_df.createOrReplaceTempView("orders")


=== Loading orders from PostgreSQL ===
Rows loaded: 1000000
Load time: 2.53 seconds


In [9]:
print("\n=== Query (A): Highest price_per_unit item ===")
start = time.time()

q_a = (
    orders_df
    .select("product_category", "price_per_unit")
    .orderBy(col("price_per_unit").desc())
    .limit(1)
)

q_a.collect()
q_a.show(truncate=False)
print("Query (A) time:", builtins.round(time.time() - start, 2), "seconds")



=== Query (A): Highest price_per_unit item ===
+----------------+-----------------------+
|product_category|price_per_unit         |
+----------------+-----------------------+
|Automotive      |2000.000000000000000000|
+----------------+-----------------------+

Query (A) time: 1.54 seconds


In [10]:
print("\n=== Query (B): Top 3 product categories by total quantity sold ===")
start = time.time()

q_b = (
    orders_df
    .groupBy("product_category")
    .agg(_sum("quantity").alias("total_quantity_sold"))
    .orderBy(col("total_quantity_sold").desc())
    .limit(3)
)

q_b.collect()
q_b.show(truncate=False)
print("Query (B) time:", builtins.round(time.time() - start, 2), "seconds")


=== Query (B): Top 3 product categories by total quantity sold ===
+----------------+-------------------+
|product_category|total_quantity_sold|
+----------------+-------------------+
|Health & Beauty |300842             |
|Electronics     |300804             |
|Toys            |300598             |
+----------------+-------------------+

Query (B) time: 1.19 seconds


In [11]:
print("\n=== Query (C): Total revenue per product category ===")
start = time.time()

q_c = (
    orders_df
    .groupBy("product_category")
    .agg(_sum(col("price_per_unit") * col("quantity")).alias("total_revenue"))
    .orderBy(col("total_revenue").desc())
)

q_c.collect()
q_c.show(truncate=False)
print("Query (C) time:", builtins.round(time.time() - start, 2), "seconds")


=== Query (C): Total revenue per product category ===
+----------------+-----------------+
|product_category|total_revenue    |
+----------------+-----------------+
|Automotive      |306589798.8600000|
|Electronics     |241525009.4500000|
|Home & Garden   |78023780.0900000 |
|Sports          |61848990.8300000 |
|Health & Beauty |46599817.8900000 |
|Office Supplies |38276061.6400000 |
|Fashion         |31566368.2200000 |
|Toys            |23271039.0200000 |
|Grocery         |15268355.6600000 |
|Books           |12731976.0400000 |
+----------------+-----------------+

Query (C) time: 2.76 seconds


In [12]:
print("\n=== Query (D): Top 10 customers by total spending ===")
start = time.time()

q_d = (
    orders_df
    .groupBy("customer_name")
    .agg(
        spark_round(_sum(col("price_per_unit") * col("quantity")), 2).alias("total_spent")
    )
    .orderBy(col("total_spent").desc())
    .limit(10)
)

q_d.collect()
q_d.show(truncate=False)
print("Query (D) time:", builtins.round(time.time() - start, 2), "seconds")


=== Query (D): Top 10 customers by total spending ===
+--------------+-----------+
|customer_name |total_spent|
+--------------+-----------+
|Carol Taylor  |991179.18  |
|Nina Lopez    |975444.95  |
|Daniel Jackson|959344.48  |
|Carol Lewis   |947708.57  |
|Daniel Young  |946030.14  |
|Alice Martinez|935100.02  |
|Ethan Perez   |934841.24  |
|Leo Lee       |934796.48  |
|Eve Young     |933176.86  |
|Ivy Rodriguez |925742.64  |
+--------------+-----------+

Query (D) time: 2.63 seconds


In [13]:
spark.stop()