## Import PySpark

In [1]:
import findspark
findspark.init()

import pyspark

## Initiate Spark Session

In [2]:
from pyspark.conf import SparkConf
import pyspark.sql.functions as F
from pyspark.sql.session import SparkSession

conf=SparkConf()
conf.set("spark.driver.memory",      "2g" ) 
conf.set("spark.executor.memory",    "2g" )
conf.set("spark.executor.instances", "5"  )

spark = SparkSession.builder.master("yarn").appName("Spark Salting Example").enableHiveSupport().config(conf=conf).getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark

### Table 1: Balanced Partitions

In [4]:
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F

dataSize=1000000
df_balanced_part = spark.createDataFrame([i for i in range(dataSize)], IntegerType())
df_balanced_part = df_balanced_part.withColumn("partitionId", F.spark_partition_id())

In [5]:
df_balanced_part.count()

23/03/26 11:00:36 WARN scheduler.TaskSetManager: Stage 0 contains a task of very large size (1246 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

1000000

In [6]:
df_balanced_part.groupby([df_balanced_part.partitionId]).count().sort(df_balanced_part.partitionId).show()

23/03/26 11:00:42 WARN scheduler.TaskSetManager: Stage 2 contains a task of very large size (1246 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+-----------+------+
|partitionId| count|
+-----------+------+
|          0|199680|
|          1|199680|
|          2|200704|
|          3|199680|
|          4|200256|
+-----------+------+



In [7]:
df_balanced_part.limit(10).show()

23/03/26 11:00:48 WARN scheduler.TaskSetManager: Stage 4 contains a task of very large size (1246 KiB). The maximum recommended task size is 1000 KiB.


+------+-----------+
| value|partitionId|
+------+-----------+
|199680|          1|
|199681|          1|
|199682|          1|
|199683|          1|
|199684|          1|
|199685|          1|
|199686|          1|
|199687|          1|
|199688|          1|
|199689|          1|
+------+-----------+



23/03/26 11:00:49 WARN scheduler.TaskSetManager: Lost task 4.1 in stage 4.0 (TID 216) (datanode2 executor 2): TaskKilled (Finish but did not commit due to another attempt succeeded)


In [8]:
df_balanced_part.rdd.getNumPartitions()

5

### Table 2: Unbalanced data (Skew)

In [9]:
df0 = spark.createDataFrame([0] * (dataSize-2), IntegerType()).repartition(1)
df1 = spark.createDataFrame([1], IntegerType()).repartition(1)
df2 = spark.createDataFrame([2], IntegerType()).repartition(1)

In [10]:
df_skew = df0.union(df1).union(df2)
df_skew = df_skew.withColumn("partitionId", F.spark_partition_id())

In [11]:
df_skew.groupby([df_skew.partitionId]).count().sort(df_skew.partitionId).show()



+-----------+------+
|partitionId| count|
+-----------+------+
|          0|999998|
|          1|     1|
|          2|     1|
+-----------+------+



                                                                                

### Join Tables without Optimization

In [12]:
df_skew.printSchema()

root
 |-- value: integer (nullable = true)
 |-- partitionId: integer (nullable = false)



In [13]:
df_balanced_part.printSchema()

root
 |-- value: integer (nullable = true)
 |-- partitionId: integer (nullable = false)



In [14]:
df_skew.join(df_balanced_part.select("value"),"value", "inner").count()

23/03/26 11:00:54 WARN scheduler.TaskSetManager: Stage 14 contains a task of very large size (1246 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

1000000

### How to resolve the Spark data skew problem

#### 1. Leveraging the Number of Partitions

In [15]:
spark.conf.get("spark.sql.shuffle.partitions")

'200'

In [16]:
spark.conf.set("spark.sql.shuffle.partitions", 10)

In [17]:
df_skew.join(df_balanced_part.select("value"),"value", "inner").count()

23/03/26 11:00:58 WARN scheduler.TaskSetManager: Stage 21 contains a task of very large size (1246 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

1000000

#### 2. Broadcast join

In [18]:
df_skew.join(F.broadcast(df_balanced_part.select("value")),"value", "inner").count()

23/03/26 11:01:00 WARN scheduler.TaskSetManager: Stage 25 contains a task of very large size (1246 KiB). The maximum recommended task size is 1000 KiB.
23/03/26 11:01:02 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 29.0 (TID 717) (datanode1 executor 5): TaskKilled (another attempt succeeded)


1000000

## Salting

In [19]:
spark.conf.set("spark.sql.shuffle.partitions", 3)

In [20]:
numPart = spark.conf.get("spark.sql.shuffle.partitions")
numPart

'3'

In [21]:
df_skew_salt = df_skew.withColumn("salt", F.concat(df_skew.value, F.lit("_"), F.lit( F.floor(F.rand() * numPart))))

df_skew_salt

DataFrame[value: int, partitionId: int, salt: string]

In [22]:
df_skew_salt.groupby([df_skew_salt.partitionId]).count().sort(df_skew_salt.partitionId).show()

+-----------+------+
|partitionId| count|
+-----------+------+
|          0|999998|
|          1|     1|
|          2|     1|
+-----------+------+



In [23]:
df_skew_salt.count()

1000000

In [24]:
df_skew_salt.show(5)

+-----+-----------+----+
|value|partitionId|salt|
+-----+-----------+----+
|    0|          0| 0_2|
|    0|          0| 0_1|
|    0|          0| 0_2|
|    0|          0| 0_2|
|    0|          0| 0_0|
+-----+-----------+----+
only showing top 5 rows



In [25]:
df_balanced_salt = df_balanced_part.withColumn("salt_array", F.array([F.lit(i) for i in range(int(numPart))]))

In [26]:
df_balanced_salt.show(5,0)

+-----+-----------+----------+
|value|partitionId|salt_array|
+-----+-----------+----------+
|0    |0          |[0, 1, 2] |
|1    |0          |[0, 1, 2] |
|2    |0          |[0, 1, 2] |
|3    |0          |[0, 1, 2] |
|4    |0          |[0, 1, 2] |
+-----+-----------+----------+
only showing top 5 rows



23/03/26 11:01:04 WARN scheduler.TaskSetManager: Stage 45 contains a task of very large size (1246 KiB). The maximum recommended task size is 1000 KiB.


In [27]:
df_balanced_salt.count()

23/03/26 11:01:04 WARN scheduler.TaskSetManager: Stage 46 contains a task of very large size (1246 KiB). The maximum recommended task size is 1000 KiB.


1000000

In [28]:
df_balanced_salt_1 = df_balanced_salt.withColumn("explodCol",  F.explode(df_balanced_salt.salt_array) ).drop("salt_array")

In [29]:
df_balanced_salt_1.count()

23/03/26 11:01:04 WARN scheduler.TaskSetManager: Stage 48 contains a task of very large size (1246 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

3000000

In [30]:
df_balanced_salt_1.limit(5).show()

23/03/26 11:01:05 WARN scheduler.TaskSetManager: Stage 50 contains a task of very large size (1246 KiB). The maximum recommended task size is 1000 KiB.


+------+-----------+---------+
| value|partitionId|explodCol|
+------+-----------+---------+
|600064|          3|        0|
|600064|          3|        1|
|600064|          3|        2|
|600065|          3|        0|
|600065|          3|        1|
+------+-----------+---------+



In [31]:
df_balanced_salt_2 = df_balanced_salt_1.withColumn("salt", F.concat(df_balanced_salt_1.value,  F.lit("_"), df_balanced_salt_1.explodCol))

In [32]:
df_balanced_salt_2.limit(5).show()

23/03/26 11:01:06 WARN scheduler.TaskSetManager: Stage 52 contains a task of very large size (1246 KiB). The maximum recommended task size is 1000 KiB.


+------+-----------+---------+--------+
| value|partitionId|explodCol|    salt|
+------+-----------+---------+--------+
|799744|          4|        0|799744_0|
|799744|          4|        1|799744_1|
|799744|          4|        2|799744_2|
|799745|          4|        0|799745_0|
|799745|          4|        1|799745_1|
+------+-----------+---------+--------+



In [33]:
df_salt = df_skew_salt.join(df_balanced_salt_2, ["salt"], "inner")

In [34]:
df_salt.count()

23/03/26 11:01:06 WARN scheduler.TaskSetManager: Stage 57 contains a task of very large size (1246 KiB). The maximum recommended task size is 1000 KiB.
23/03/26 11:01:12 WARN scheduler.TaskSetManager: Lost task 1.1 in stage 59.0 (TID 826) (datanode1 executor 5): TaskKilled (another attempt succeeded)
                                                                                

1000000

In [35]:
df_salt.limit(5).show()

+----+-----+-----------+-----+-----------+---------+
|salt|value|partitionId|value|partitionId|explodCol|
+----+-----+-----------+-----+-----------+---------+
| 2_2|    2|          2|    2|          0|        2|
| 0_0|    0|          0|    0|          0|        0|
| 0_0|    0|          0|    0|          0|        0|
| 0_0|    0|          0|    0|          0|        0|
| 0_0|    0|          0|    0|          0|        0|
+----+-----+-----------+-----+-----------+---------+



23/03/26 11:16:11 WARN storage.BlockManagerMasterEndpoint: No more replicas available for rdd_213_0 !
23/03/26 11:16:11 WARN storage.BlockManagerMasterEndpoint: No more replicas available for rdd_213_1 !
23/03/26 11:16:12 ERROR cluster.YarnScheduler: Lost executor 1 on datanode4: Container from a bad node: container_1679827331130_0002_01_000002 on host: datanode4. Exit status: 137. Diagnostics: [2023-03-26 11:16:11.815]Container killed on request. Exit code is 137[2023-03-26 11:16:11.819]
[2023-03-26 11:16:11.822]Container exited with a non-zero exit code 137. [2023-03-26 11:16:11.822]
[2023-03-26 11:16:11.823]Killed by external signal
.
23/03/26 11:16:12 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 1 for reason Container from a bad node: container_1679827331130_0002_01_000002 on host: datanode4. Exit status: 137. Diagnostics: [2023-03-26 11:16:11.815]Container killed on request. Exit code is 137[2023-03-26 11:16:11.819]
[2023-03-26 11:1

#### 4. AQE (Spark 3x)