In [1]:
import os
os.environ["SPARK_HOME"] = "/opt/homebrew/Cellar/apache-spark/3.5.2/libexec"

In [2]:
import pyspark

spark_config = (
    pyspark.SparkConf()
    .setAppName("6 exercises")
    .setMaster("local")
    .set("spark.sql.autoBroadcastJoinThreshold", -1)
    .set("spark.executor.memory", "3000mb")
)

spark = pyspark.sql.SparkSession.builder.config(conf=spark_config).getOrCreate()

24/09/25 18:11:47 WARN Utils: Your hostname, Barts-Mac.local resolves to a loopback address: 127.0.0.1; using 192.168.0.199 instead (on interface en0)
24/09/25 18:11:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/25 18:11:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.sql import functions as F

In [4]:
df_products = spark.read.parquet("Exercise\ Files/six_exercises/products_parquet")
df_sales = spark.read.parquet("Exercise\ Files/six_exercises/sales_parquet")
df_sellers = spark.read.parquet("Exercise\ Files/six_exercises/sellers_parquet")

                                                                                

1. How many orders, products and sellers are there?
How many products have been sold at least once? Which is the product contained in more orders?

In [5]:
df_products.count()

75000000

In [9]:
df_sellers.count()

10

In [7]:
df_sales.count()

20000040

In [8]:
df_sales.select("seller_id").distinct().count()

[Stage 12:=====>                                                   (8 + 1) / 83]

CodeCache: size=131072Kb used=25857Kb max_used=25994Kb free=105214Kb
 bounds [0x00000001071d8000, 0x0000000108b68000, 0x000000010f1d8000]
 total_blobs=10428 nmethods=9445 adapters=897
 compilation: disabled (not enough contiguous free space left)


                                                                                

10

In [14]:
df_products.printSchema()
df_sales.printSchema()
df_sellers.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)

root
 |-- seller_id: string (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: string (nullable = true)



In [29]:
df_sales.groupBy("product_id").agg(F.count("*").alias("cnt")).orderBy(F.col("cnt").desc()).show(10)

[Stage 38:>                                                         (0 + 1) / 1]

+----------+--------+
|product_id|     cnt|
+----------+--------+
|         0|19000000|
|  35669461|       3|
|  52606213|       3|
|  28592106|       3|
|  31136332|       3|
|   2316238|       3|
|  67723231|       3|
|  36269838|       3|
|  72017876|       3|
|  69790381|       3|
+----------+--------+
only showing top 10 rows



                                                                                

2. How many distinct products have been sold in each day?

In [32]:
df_sales.groupBy("date").agg(F.countDistinct("product_id").alias("distinct_prods")).orderBy(F.col("distinct_prods").desc()).show(10)

[Stage 53:>                                                         (0 + 1) / 1]

+----------+--------------+
|      date|distinct_prods|
+----------+--------------+
|2020-07-06|        100765|
|2020-07-09|        100501|
|2020-07-01|        100337|
|2020-07-03|        100017|
|2020-07-02|         99807|
|2020-07-05|         99796|
|2020-07-04|         99791|
|2020-07-07|         99756|
|2020-07-08|         99662|
|2020-07-10|         98973|
+----------+--------------+



                                                                                

3. What is the average revenue of the orders?

In [37]:
run_slow = False

In [36]:
if run_slow:
    df_sales.join(df_products, df_sales["product_id"] == df_products["product_id"], "inner").agg(F.avg(df_products["price"] * df_sales["num_pieces_sold"])).show()



+------------------------------+
|avg((price * num_pieces_sold))|
+------------------------------+
|            1246.1338560822878|
+------------------------------+



                                                                                

In [45]:
df_sales.explain(True)

== Parsed Logical Plan ==
Relation [order_id#6,product_id#7,seller_id#8,date#9,num_pieces_sold#10,bill_raw_text#11] parquet

== Analyzed Logical Plan ==
order_id: string, product_id: string, seller_id: string, date: string, num_pieces_sold: string, bill_raw_text: string
Relation [order_id#6,product_id#7,seller_id#8,date#9,num_pieces_sold#10,bill_raw_text#11] parquet

== Optimized Logical Plan ==
Relation [order_id#6,product_id#7,seller_id#8,date#9,num_pieces_sold#10,bill_raw_text#11] parquet

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [order_id#6,product_id#7,seller_id#8,date#9,num_pieces_sold#10,bill_raw_text#11] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/Bartek/Desktop/Bartek/Programowanie/Portfolio/data_enginee..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<order_id:string,product_id:string,seller_id:string,date:string,num_pieces_sold:string,bill...



Using the `key salting` to unskew the data:

1. Duplicate the entries that are in the dimension table for the most common products. (e.g. product_0 -> product_0-1, product_0-2, product_0-3, etc.)
2. On the sales table replace "product_0" with a random replica

Only salt the products that drive the skewness!

In [46]:
spark

In [47]:
from pyspark.sql import Row
from pyspark.sql.types import IntegerType

In [48]:
# STEP 1: check and select the skewed keys
results = df_sales.groupby(df_sales["product_id"]).count().sort(F.col("count").desc()).limit(100).collect()

# STEP 2: duplicate the entries that we have in the sales table for the most common product

REPLICATION_FACTOR = 101
list_ = []
replicated_products = []
for r_ in results:
    replicated_products.append(r_["product_id"])
    for rep_ in range(0, REPLICATION_FACTOR):
        list_.append((r_["product_id"], rep_))

# STEP 3: replace old product_ with a random duplciate

rdd = spark.sparkContext.parallelize(list_)
replicated_df = rdd.map(lambda x: Row(product_id=x[0], replication=int(x[1])))
replicated_df = spark.createDataFrame(replicated_df)

                                                                                

In [50]:
# STEP 4: generate the salted key
products_table = df_products.join(F.broadcast(replicated_df),
                                  df_products["product_id"] == replicated_df["product_id"], "left"). \
                                  withColumn("salted_join_key", F.when(replicated_df["replication"].isNull(), df_products["product_id"]).otherwise(
                                      F.concat(replicated_df["product_id"], F.lit("-"), replicated_df["replication"])
                                  ))
sales_table = df_sales.withColumn("salted_join_key", F.when(
    df_sales["product_id"].isin(replicated_products), F.concat(df_sales["product_id"], F.lit("-"), F.round(F.rand() * (REPLICATION_FACTOR - 1), 0).cast(IntegerType()))).otherwise(
                                      df_sales["product_id"]
                                  ))

In [52]:
# STEP 5: finally let's do the join

print(sales_table.join(products_table, sales_table["salted_join_key"] == products_table["salted_join_key"], "inner").agg(
    F.avg(products_table["price"] * sales_table["num_pieces_sold"])
).show())



+------------------------------+
|avg((price * num_pieces_sold))|
+------------------------------+
|            1246.1338560822878|
+------------------------------+

None


                                                                                

24/09/25 23:40:38 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1042429 ms exceeds timeout 120000 ms
24/09/25 23:40:38 WARN SparkContext: Killing executors is not supported by current scheduler.
24/09/25 23:57:15 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$