In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySpark_Hadoop")\
.master("local[*]")\
.config("spark.sql.autoBroadcastJoinThreshold", "-1")\
.config("spark.sql.adaptive.enabled", False)\
.config("spark.sql.adaptive.coalescePartitions.enabled", False)\
.config("spark.sql.adaptive.skewJoin.enabled", False)\
.getOrCreate()

In [2]:
spark.stop()
print("Spark Session stopped successfully.")

Spark Session stopped successfully.


In [4]:
print("Customers DataFrame")
customer_df = spark.read.csv("file:////home/dominic/Desktop/pythonLearning/csvFiles/practice_Data/customers.csv", header=True, inferSchema=False)
customer_df = customer_df.repartition(4)

print("Sales DataFrame")
sales_df = spark.read.csv("file:////home/dominic/Desktop/pythonLearning/csvFiles/practice_Data/sales.csv", header=True, inferSchema=False)
sales_df = sales_df.repartition(4)

print("Product DataFrame")
products_df = spark.read.csv("file:////home/dominic/Desktop/pythonLearning/csvFiles/practice_Data/products.csv", header=True, inferSchema=False)
products_df.repartition(4)

Customers DataFrame


                                                                                

Sales DataFrame
Product DataFrame


DataFrame[product_id: string, product_name: string, category: string]

In [5]:
sales_df_clean = sales_df.drop("region").withColumnRenamed("customer_id", "customer_sales_id") \
.withColumnRenamed("product_id", "product_sales_id")
sales_df_clean = sales_df_clean.repartition(4)

In [6]:
from pyspark.sql.functions import col

# Explicitly repartition each DF on its own join key
sales_df_clean = sales_df_clean.repartition(4, col("product_sales_id"))
products_df = products_df.repartition(4, col("product_id"))

# Now join — this causes shuffle because Spark needs to align keys
#joined_df = sales_df_clean.join(products_df, sales_df_clean.product_sales_id == products_df.product_id, "inner")

print("Joined DataFrame")

joined_df = sales_df_clean.join(products_df, sales_df_clean.product_sales_id == products_df.product_id, "inner")\
    .join(customer_df, sales_df_clean.customer_sales_id == customer_df.customer_id, "inner")

joined_df = joined_df.repartition(4)
joined_df = joined_df.withColumn("TotalCost", col("unit_price") * col("quantity"))
#joined_df.groupBy("region").count().show()
joined_df.write.mode("overwrite").parquet("file:////home/dominic/Desktop/pythonLearning/csvFiles/practice_Data/joined_data.parquet")

Joined DataFrame


                                                                                

In [None]:
joined_df.explain(True)

#This is a sample

== Parsed Logical Plan ==
'Project [order_id#40, order_date#41, customer_sales_id#84, product_sales_id#91, quantity#44, unit_price#45, product_id#71, product_name#72, category#73, customer_id#17, customer_name#18, region#19, ('unit_price * 'quantity) AS TotalCost#142]
+- Repartition 4, true
   +- Join Inner, (customer_sales_id#84 = customer_id#17)
      :- Join Inner, (product_sales_id#91 = product_id#71)
      :  :- RepartitionByExpression [product_sales_id#91], 4
      :  :  +- Repartition 4, true
      :  :     +- Project [order_id#40, order_date#41, customer_sales_id#84, product_id#43 AS product_sales_id#91, quantity#44, unit_price#45]
      :  :        +- Project [order_id#40, order_date#41, customer_id#42 AS customer_sales_id#84, product_id#43, quantity#44, unit_price#45]
      :  :           +- Project [order_id#40, order_date#41, customer_id#42, product_id#43, quantity#44, unit_price#45]
      :  :              +- Repartition 4, true
      :  :                 +- Relation [orde

In [164]:
df = spark.read.parquet("file:////home/dominic/Desktop/pythonLearning/csvFiles/practice_Data/joined_data.parquet/part-00003-b5faee99-bb6b-42c2-bd18-9b36c48703a1-c000.snappy.parquet")
print("DataFrame Count: ", df.count())
df.show()

DataFrame Count:  24994
+--------+----------+-----------------+----------------+--------+----------+----------+------------+-----------+-----------+-------------+------+---------+
|order_id|order_date|customer_sales_id|product_sales_id|quantity|unit_price|product_id|product_name|   category|customer_id|customer_name|region|TotalCost|
+--------+----------+-----------------+----------------+--------+----------+----------+------------+-----------+-----------+-------------+------+---------+
| O003329|2023-02-28|            C0668|           P0060|       2|       977|     P0060|  Product_60|      Books|      C0668| Customer_668|  West|   1954.0|
| O046886|2023-01-15|            C0798|           P0093|       3|       284|     P0093|  Product_93|      Books|      C0798| Customer_798|  West|    852.0|
| O099476|2023-03-15|            C0798|           P0042|       3|      1846|     P0042|  Product_42|   Clothing|      C0798| Customer_798|  West|   5538.0|
| O087157|2023-04-01|            C0798| 