# **Distributed Processing Challenges: Handling Data Skew in DF PySpark**

**`Udemy Course: Hands-on Big Data Practices with PySpark & Spark Tuning`**

**`Author: Amin Karami (PhD, FHEA)`**



In [1]:
# Load Spark engine
!pip3 install -q findspark
import findspark
findspark.init()

# import SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("SkewinDF").getOrCreate()

spark

24/04/14 17:08:02 WARN Utils: Your hostname, arphaxad-HP-Notebook resolves to a loopback address: 127.0.1.1; using 192.168.1.4 instead (on interface wlp3s0)
24/04/14 17:08:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/14 17:08:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/14 17:08:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# (1) Loading Data Skew

To understand skew, we create a random data where keys are uniformly distributed.

In [2]:
import numpy as np
import pandas as pd
import random

# sale dataset:
#Table 1: OrderID, Qty, Sales, Discount (yes=1,no=0)
#Table 2: ProductID, OrderID, Product, Price

########## Table 1 ##########
key_1 = [101] * 100 #100 #1
key_2 = [201] * 7000000 #7000000 #10
key_3 = [301] * 500 #500 #4
key_4 = [401] * 10000 #10000 #2
OrderID = key_1 + key_2 + key_3 + key_4
random.shuffle(OrderID)

Qty_1 = list(np.random.randint(low = 1, high = 100, size = len(key_1)))
Qty_2 = list(np.random.randint(low = 1, high = 200, size = len(key_2)))
Qty_3 = list(np.random.randint(low = 1, high = 1000, size = len(key_3)))
Qty_4 = list(np.random.randint(low = 1, high = 50, size = len(key_4)))
Qty = Qty_1 + Qty_2 + Qty_3 + Qty_4

Sales_1 = list(np.random.randint(low = 10, high = 100, size = len(key_1)))
Sales_2 = list(np.random.randint(low = 50, high = 3400, size = len(key_2)))
Sales_3 = list(np.random.randint(low = 12, high = 2000, size = len(key_3)))
Sales_4 = list(np.random.randint(low = 40, high = 1000, size = len(key_4)))
Sales = Sales_1 + Sales_2 + Sales_3 + Sales_4

Discount = list(np.random.randint(low = 0, high = 2, size = len(OrderID)))

data1 = list(zip(OrderID,Qty,Sales,Discount))

# Create the pandas DataFrame 
data_skew = pd.DataFrame(data1, columns = ['OrderID', 'Qty', 'Sales', 'Discount']) 


########## Table 2 ##########
data2 = [[1, 101, 'pencil', 4.99], [2, 101, 'book', 9.5], [3, 101,'scissors', 14],[4, 301, 'glue', 7], [5, 201, 'marker', 8.49],
         [6, 301, 'label', 2], [7, 201, 'calculator', 3.99], [8, 501, 'eraser', 1.55]]
data_small = pd.DataFrame(data2, columns = ['ProductID', 'OrderID', 'Product', 'Price']) 


In [3]:
!pip3 install pyarrow

Defaulting to user installation because normal site-packages is not writeable
Collecting pyarrow
  Downloading pyarrow-15.0.2-cp310-cp310-manylinux_2_28_x86_64.whl (38.3 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m38.3/38.3 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m[36m0:00:01[0m
Installing collected packages: pyarrow
Successfully installed pyarrow-15.0.2


In [4]:
# Create PySpark DF from Pandas

# Optimize conversion between PySpark and pandas DF: Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")


df_skew = spark.createDataFrame(data_skew)
df_skew.printSchema()
df_skew.show()
df_skew.rdd.getNumPartitions()

24/04/14 17:10:46 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.


root
 |-- OrderID: long (nullable = true)
 |-- Qty: long (nullable = true)
 |-- Sales: long (nullable = true)
 |-- Discount: long (nullable = true)



                                                                                

+-------+---+-----+--------+
|OrderID|Qty|Sales|Discount|
+-------+---+-----+--------+
|    201| 75|   11|       0|
|    201| 95|   43|       1|
|    201| 58|   76|       1|
|    201| 19|   94|       1|
|    201| 31|   59|       1|
|    201| 45|   49|       1|
|    201| 41|   73|       0|
|    201|  2|   33|       1|
|    201| 31|   31|       0|
|    201| 35|   57|       0|
|    201| 74|   39|       1|
|    201| 79|   29|       1|
|    201| 24|   57|       0|
|    201| 34|   42|       1|
|    201| 40|   51|       0|
|    201| 42|   44|       0|
|    201| 64|   80|       1|
|    201| 79|   56|       1|
|    201| 22|   30|       1|
|    201| 52|   82|       1|
+-------+---+-----+--------+
only showing top 20 rows



702

In [5]:
# Create PySpark DataFrame from Pandas
df_small = spark.createDataFrame(data_small)
df_small.printSchema()
df_small.show()
df_small.rdd.getNumPartitions()

root
 |-- ProductID: long (nullable = true)
 |-- OrderID: long (nullable = true)
 |-- Product: string (nullable = true)
 |-- Price: double (nullable = true)

+---------+-------+----------+-----+
|ProductID|OrderID|   Product|Price|
+---------+-------+----------+-----+
|        1|    101|    pencil| 4.99|
|        2|    101|      book|  9.5|
|        3|    101|  scissors| 14.0|
|        4|    301|      glue|  7.0|
|        5|    201|    marker| 8.49|
|        6|    301|     label|  2.0|
|        7|    201|calculator| 3.99|
|        8|    501|    eraser| 1.55|
+---------+-------+----------+-----+



4

# (2) Run a shuffle `Join()` with small sized data

In [6]:
joined_rdd = df_skew.join(df_small,df_skew.OrderID == df_small.OrderID, how = "inner")
joined_rdd.show()
print(joined_rdd.count())

+-------+---+-----+--------+---------+-------+----------+-----+
|OrderID|Qty|Sales|Discount|ProductID|OrderID|   Product|Price|
+-------+---+-----+--------+---------+-------+----------+-----+
|    201| 75|   11|       0|        7|    201|calculator| 3.99|
|    201| 75|   11|       0|        5|    201|    marker| 8.49|
|    201| 95|   43|       1|        7|    201|calculator| 3.99|
|    201| 95|   43|       1|        5|    201|    marker| 8.49|
|    201| 58|   76|       1|        7|    201|calculator| 3.99|
|    201| 58|   76|       1|        5|    201|    marker| 8.49|
|    201| 19|   94|       1|        7|    201|calculator| 3.99|
|    201| 19|   94|       1|        5|    201|    marker| 8.49|
|    201| 31|   59|       1|        7|    201|calculator| 3.99|
|    201| 31|   59|       1|        5|    201|    marker| 8.49|
|    201| 45|   49|       1|        7|    201|calculator| 3.99|
|    201| 45|   49|       1|        5|    201|    marker| 8.49|
|    201| 41|   73|       0|        7|  



14001300


                                                                                

In [None]:
# DF increases the partition number to 200 automatically when Spark operation performs data shuffling (join(), aggregation)
joined_rdd.rdd.getNumPartitions()
joined_rdd.rdd.glom().collect()

24/04/14 17:11:29 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:11:29 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:11:29 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:11:29 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:11:37 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:11:52 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:11:52 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:11:53 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:11:53 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:11:54 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:12:05 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:05 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:12:17 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:17 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:18 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:18 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:18 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:12:29 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:29 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:30 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:30 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:30 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:12:40 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:40 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:41 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:41 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:41 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:12:51 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:52 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:52 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:52 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:12:52 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:13:01 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:02 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:02 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:02 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:02 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:13:11 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:12 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:12 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:12 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:12 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:13:22 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:22 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:23 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:23 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:23 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:13:33 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:33 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:33 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:34 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:34 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:13:43 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:43 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:43 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:44 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:44 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:13:54 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:54 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:54 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:55 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:13:55 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:14:04 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:04 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:05 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:05 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:14:14 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:14 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:15 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:15 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:15 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:14:24 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:24 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:24 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:25 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:25 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:14:34 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:34 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:35 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:35 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:36 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:14:45 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:46 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:46 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:46 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:14:47 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

24/04/14 17:14:59 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:15:00 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:15:00 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:15:00 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
24/04/14 17:15:00 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", 8)

# again, run the above Query
joined_rdd = df_skew.join(df_small,df_skew.OrderID == df_small.OrderID, how = "inner")
joined_rdd.show()
joined_rdd.rdd.getNumPartitions()
joined_rdd.rdd.glom().collect()

In [None]:
# perform a join() and descritive statistics on a skewed data
from pyspark.sql.functions import *

groups = df_skew.join(df_small,df_skew.OrderID == df_small.OrderID, how = "inner")
summary = groups.select(mean(groups.Sales).alias('AVG(Sales)'),
                    stddev(groups.Sales).alias('STD(Sales)'),
                    min(groups.Sales).alias('MIN(Sales)'),
                    max(groups.Sales).alias('MAX(Sales)'),
                    )
summary.show()

## Mitigate data skewness: SALTING

In [None]:
from pyspark.sql.functions import *

# add a random value [0 2]
df_skew_salting = df_skew.withColumn("_salt_", round(rand() * 2))
df_small_salting = df_small.withColumn("_salt_", round(rand() * 2))

df_skew_salting.show()
df_small_salting.show()

In [None]:
# repartition using _salt_:
df_skew_salting = df_skew_salting.repartition(3,"_salt_")
df_small_salting = df_small_salting.repartition(3,"_salt_")

In [None]:
df_skew_salting.rdd.glom().collect()

In [None]:
df_small_salting.rdd.glom().collect()

In [None]:
# apply join() after salting()

df_skew_salting.drop("_salt_")
df_small_salting.drop("_salt_")

joined_rdd_salting = df_skew_salting.join(df_small_salting, df_skew_salting.OrderID == df_small_salting.OrderID, how = "inner")
joined_rdd_salting.show()
joined_rdd_salting.rdd.getNumPartitions()
joined_rdd_salting.rdd.glom().collect()

In [None]:
# apply
from pyspark.sql.functions import *

groups_salting = df_skew_salting.join(df_small_salting,df_skew_salting.OrderID == df_small_salting.OrderID, how = "inner")
groups_salting = groups_salting.select(mean(groups_salting.Sales).alias('AVG(Sales)'),
                    stddev(groups_salting.Sales).alias('STD(Sales)'),
                    min(groups_salting.Sales).alias('MIN(Sales)'),
                    max(groups_salting.Sales).alias('MAX(Sales)'),
                    )
groups_salting.show()

# (3) Run a shuffle `Join()` to see how the skew effects computation resources.

In [None]:
from pyspark.sql.functions import *

# set the number of partitions after shuffling (avoid 200 partitions)
#spark.conf.set("spark.sql.shuffle.partitions", 8)
spark.conf.set('spark.sql.adaptive.enabled', True)
spark.conf.set('spark.sql.adaptive.coalescePartitions.enabled', True)


groups = df_skew.join(df_small, df_skew.OrderID == df_small.OrderID, how = "inner")

summary = groups.select(mean(groups.Sales).alias("AVG(Sales)"),
                        stddev(groups.Sales).alias("STD(Sales)"),
                        min(groups.Sales).alias("MIN(Sales)"),
                        max(groups.Sales).alias("MAX(Sales)"),
                       )
summary.show()

In [None]:
groups.rdd.getNumPartitions()

## Mitigate data skewness: SALTING

In [None]:
from pyspark.sql.functions import *

# set the number of partitions after shuffling (avoid 200 partitions)
spark.conf.set("spark.sql.shuffle.partitions", 8)


# add  random value [0 7]
df_skew_salting = df_skew.withColumn("_salt_", round(rand() * 7))
df_small_salting = df_small.withColumn("_salt_", round(rand() * 7))

# repartition using _salt_:
df_skew_salting = df_skew_salting.repartition(8, "_salt_")
df_small_salting = df_small_salting.repartition(8, "_salt_")


# drop salting
df_skew_salting.drop("_salt_")
df_small_salting.drop("_salt_")


# apply join()
groups = df_skew_salting.join(df_small_salting, df_skew_salting.OrderID == df_small_salting.OrderID, how = "inner")

summary = groups.select(mean(groups.Sales).alias("AVG(Sales)"),
                        stddev(groups.Sales).alias("STD(Sales)"),
                        min(groups.Sales).alias("MIN(Sales)"),
                        max(groups.Sales).alias("MAX(Sales)"),
                       )
summary.show()

## Mitigate data skewness: Broadcast Hash Join

In [None]:
from pyspark.sql.functions import *

groups_brd = df_skew.join(broadcast(df_small), df_skew.OrderID == df_small.OrderID, how = "inner")

summary = groups_brd.select(mean(groups_brd.Sales).alias("AVG(Sales)"),
                        stddev(groups_brd.Sales).alias("STD(Sales)"),
                        min(groups_brd.Sales).alias("MIN(Sales)"),
                        max(groups_brd.Sales).alias("MAX(Sales)"),
                       )
summary.show()

In [None]:
spark.conf.set("spark.sql.adaptive.enabled",True)
spark.conf.set("spark.sql.join.preferSortMergeJoin", False)
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", '100m')
# default: 10485760 bytes (10m)

In [None]:
from pyspark.sql.functions import *

groups_brd = df_skew.join(df_small, df_skew.OrderID == df_small.OrderID, how = "inner")

summary = groups_brd.select(mean(groups_brd.Sales).alias("AVG(Sales)"),
                        stddev(groups_brd.Sales).alias("STD(Sales)"),
                        min(groups_brd.Sales).alias("MIN(Sales)"),
                        max(groups_brd.Sales).alias("MAX(Sales)"),
                       )
summary.show()

## Mitigate data skewness: AQE (Spark V3.x)

In [None]:
spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256mb")

In [None]:
groups = df_skew.join(df_small, df_skew.OrderID == df_small.OrderID, how = "inner")

summary = groups.select(mean(groups.Sales).alias("AVG(Sales)"),
                        stddev(groups.Sales).alias("STD(Sales)"),
                        min(groups.Sales).alias("MIN(Sales)"),
                        max(groups.Sales).alias("MAX(Sales)"),
                       )
summary.show()