In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext

conf = SparkConf()\
    .setMaster("local[*]")\
    .setAppName("Dealing with skewd data")
    # .setExecutorEnv("spark.driver.memory","2g")\
    # .setExecutorEnv("spark.executor.memory","4g")

spark = SparkSession\
    .builder\
    .config(conf=conf)\
    .getOrCreate()

sc = spark.sparkContext
sc

# (1) Loading Data Skew

To understand skew, we create a random data where keys are uniformly distributed.

In [9]:
import numpy as np
import pandas as pd
import random


# sale dataset:
# table 1: OrderID, Qty, Sales, Discount (yes=1, no=0)
# table 2: ProductID, OrderID, Product, Price

########### Table 1 ###########

key_1 = [101] * 100 #1 #100
key_2 = [201] * 7000000 #8 #7000000
key_3 = [301] * 500 #4 #500
key_4 = [401] * 10000 #2 #10000
OrderID = key_1 + key_2 + key_3 + key_4
random.shuffle(OrderID)

Qty_1 = list(np.random.randint(low = 1, high = 100, size = len(key_1)))
Qty_2 = list(np.random.randint(low = 1, high = 200, size = len(key_2)))
Qty_3 = list(np.random.randint(low = 1, high = 1000, size = len(key_3)))
Qty_4 = list(np.random.randint(low = 1, high = 50, size = len(key_4)))
Qty = Qty_1 + Qty_2 + Qty_3 + Qty_4

Sales_1 = list(np.random.randint(low = 10, high = 100, size = len(key_1)))
Sales_2 = list(np.random.randint(low = 50, high = 3400, size = len(key_2)))
Sales_3 = list(np.random.randint(low = 12, high = 2000, size = len(key_3)))
Sales_4 = list(np.random.randint(low = 40, high = 1000, size = len(key_4)))
Sales = Sales_1 + Sales_2 + Sales_3 + Sales_4

Discount = list(np.random.randint(low = 0, high = 2, size = len(OrderID)))
data1 = list(zip(OrderID,Qty,Sales,Discount))

# Create the Pandas DF
data_skew = pd.DataFrame(data1, columns=['OrderID','Qty','Sales','Discount'])


########### Table 2 ###########
data2 = [[1, 101, 'pencil', 4.99],
         [2, 101, 'book', 9.5],
         [3, 101, 'scissors', 14],
         [4, 301, 'glue', 7],
         [5, 201, 'marker', 8.49],
         [6, 301, 'label', 2],
         [7, 201, 'calculator', 3.99],
         [8, 501, 'eraser', 1.55],
        ]

data_small = pd.DataFrame(data2, columns=['ProductID', 'OrderID', 'Product', 'Price'])

In [10]:
# Create PySpark DF from Pandas

# Optimize conversion between PySpark and Pandas DF: Enable arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

df_skew = spark.createDataFrame(data_skew)
df_skew.printSchema()
df_skew.show(30)
df_skew.rdd.getNumPartitions()

root
 |-- OrderID: long (nullable = true)
 |-- Qty: long (nullable = true)
 |-- Sales: long (nullable = true)
 |-- Discount: long (nullable = true)

+-------+---+-----+--------+
|OrderID|Qty|Sales|Discount|
+-------+---+-----+--------+
|    201|  4|   17|       1|
|    201| 93|   95|       0|
|    201| 36|   23|       0|
|    201| 28|   68|       0|
|    201| 75|   37|       1|
|    201| 36|   13|       0|
|    201| 89|   28|       1|
|    201| 86|   28|       1|
|    201| 99|   64|       1|
|    201| 92|   67|       1|
|    201|  1|   82|       0|
|    201|  7|   94|       0|
|    201| 52|   62|       0|
|    201| 95|   85|       1|
|    201| 95|   62|       0|
|    201| 61|   63|       1|
|    201| 43|   21|       1|
|    201| 14|   86|       1|
|    201| 56|   73|       1|
|    201| 20|   48|       0|
|    201| 24|   10|       0|
|    201| 77|   34|       0|
|    201|  8|   36|       0|
|    201| 18|   62|       0|
|    201| 66|   93|       0|
|    201| 59|   65|       0|
|    201| 

16

In [11]:
df_small = spark.createDataFrame(data_small)
df_small.printSchema()
df_small.show(30)
df_small.rdd.getNumPartitions()

root
 |-- ProductID: long (nullable = true)
 |-- OrderID: long (nullable = true)
 |-- Product: string (nullable = true)
 |-- Price: double (nullable = true)

+---------+-------+----------+-----+
|ProductID|OrderID|   Product|Price|
+---------+-------+----------+-----+
|        1|    101|    pencil| 4.99|
|        2|    101|      book|  9.5|
|        3|    101|  scissors| 14.0|
|        4|    301|      glue|  7.0|
|        5|    201|    marker| 8.49|
|        6|    301|     label|  2.0|
|        7|    201|calculator| 3.99|
|        8|    501|    eraser| 1.55|
+---------+-------+----------+-----+



16

# (2) Run a shuffle `Join()` with small sized data

In [5]:
joined_df = df_skew.join(df_small, df_skew.OrderID == df_small.OrderID, how = 'inner')

joined_df.show(30)

+-------+---+-----+--------+---------+-------+----------+-----+
|OrderID|Qty|Sales|Discount|ProductID|OrderID|   Product|Price|
+-------+---+-----+--------+---------+-------+----------+-----+
|    201| 95|  698|       1|        5|    201|    marker| 8.49|
|    201| 95|  698|       1|        7|    201|calculator| 3.99|
|    201| 56|  330|       0|        5|    201|    marker| 8.49|
|    201| 56|  330|       0|        7|    201|calculator| 3.99|
|    201|170|  351|       1|        5|    201|    marker| 8.49|
|    201|170|  351|       1|        7|    201|calculator| 3.99|
|    201| 45| 3223|       1|        5|    201|    marker| 8.49|
|    201| 45| 3223|       1|        7|    201|calculator| 3.99|
|    201|443|  229|       1|        5|    201|    marker| 8.49|
|    201|443|  229|       1|        7|    201|calculator| 3.99|
|    201|415|  176|       1|        5|    201|    marker| 8.49|
|    201|415|  176|       1|        7|    201|calculator| 3.99|
|    201| 13|  808|       0|        5|  

In [6]:
print(joined_df.rdd.getNumPartitions())
joined_df.rdd.glom().collect()

200


[[],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [Row(OrderID=201, Qty=95, Sales=698, Discount=1, ProductID=5, OrderID=201, Product='marker', Price=8.49),
  Row(OrderID=201, Qty=95, Sales=698, Discount=1, ProductID=7, OrderID=201, Product='calculator', Price=3.99),
  Row(OrderID=201, Qty=56, Sales=330, Discount=0, ProductID=5, OrderID=201, Product='marker', Price=8.49),
  Row(OrderID=201, Qty=56, Sales=330, Discount=0, ProductID=7, OrderID=201, Product='calculator', Price=3.99),
  Row(OrderID=201, Qty=170, Sales=351, Discount=1, ProductID=5, OrderID=201, Product='marker', Price=8.49),
  Row(OrderID=201, Qty=170, Sales=351, Discount=1, ProductID=7, OrderID=201, Product='calculator', Price=3.99),
  Row(OrderID=201, Qty=45, Sales=3223, Discount=1, ProductID=5, OrderID=201, Product='marker', Price=8.49),
  Row(OrderID=201, Qty=45, Sales=3223, Discount=1, ProductID=7, OrderID=201,

This configuration is used to specify the default number partitions when shuffles data for aggregations or joins. The default value is 200.

In [7]:
spark.conf.set('spark.sql.shuffle.partitions', 8)

In [8]:
joined_df = df_skew.join(df_small, df_skew.OrderID == df_small.OrderID, how = 'inner')

joined_df.show(30)

+-------+---+-----+--------+---------+-------+----------+-----+
|OrderID|Qty|Sales|Discount|ProductID|OrderID|   Product|Price|
+-------+---+-----+--------+---------+-------+----------+-----+
|    201| 95|  698|       1|        5|    201|    marker| 8.49|
|    201| 95|  698|       1|        7|    201|calculator| 3.99|
|    201| 56|  330|       0|        5|    201|    marker| 8.49|
|    201| 56|  330|       0|        7|    201|calculator| 3.99|
|    201|170|  351|       1|        5|    201|    marker| 8.49|
|    201|170|  351|       1|        7|    201|calculator| 3.99|
|    201| 45| 3223|       1|        5|    201|    marker| 8.49|
|    201| 45| 3223|       1|        7|    201|calculator| 3.99|
|    201|443|  229|       1|        5|    201|    marker| 8.49|
|    201|443|  229|       1|        7|    201|calculator| 3.99|
|    201|415|  176|       1|        5|    201|    marker| 8.49|
|    201|415|  176|       1|        7|    201|calculator| 3.99|
|    201| 13|  808|       0|        5|  

In [9]:
print(joined_df.rdd.getNumPartitions())
joined_df.rdd.glom().collect()

8


[[Row(OrderID=201, Qty=95, Sales=698, Discount=1, ProductID=5, OrderID=201, Product='marker', Price=8.49),
  Row(OrderID=201, Qty=95, Sales=698, Discount=1, ProductID=7, OrderID=201, Product='calculator', Price=3.99),
  Row(OrderID=201, Qty=56, Sales=330, Discount=0, ProductID=5, OrderID=201, Product='marker', Price=8.49),
  Row(OrderID=201, Qty=56, Sales=330, Discount=0, ProductID=7, OrderID=201, Product='calculator', Price=3.99),
  Row(OrderID=201, Qty=170, Sales=351, Discount=1, ProductID=5, OrderID=201, Product='marker', Price=8.49),
  Row(OrderID=201, Qty=170, Sales=351, Discount=1, ProductID=7, OrderID=201, Product='calculator', Price=3.99),
  Row(OrderID=201, Qty=45, Sales=3223, Discount=1, ProductID=5, OrderID=201, Product='marker', Price=8.49),
  Row(OrderID=201, Qty=45, Sales=3223, Discount=1, ProductID=7, OrderID=201, Product='calculator', Price=3.99),
  Row(OrderID=201, Qty=443, Sales=229, Discount=1, ProductID=5, OrderID=201, Product='marker', Price=8.49),
  Row(OrderID=20

In [10]:
# descriptive stats
from pyspark.sql import functions as f

groups = df_skew.join(df_small, df_skew.OrderID == df_small.OrderID, how = 'inner')

summary = groups.select(
    f.round(f.mean(groups.Sales)).alias('avg'),
    f.min(groups.Sales).alias('min'),
    f.max(groups.Sales).alias('max'),
    f.stddev(groups.Sales).alias('stdev')
)

summary.show()


+-----+---+----+----------------+
|  avg|min| max|           stdev|
+-----+---+----+----------------+
|813.0| 60|3223|822.824746894844|
+-----+---+----+----------------+



## Mitigate data skewness: SALTING

In [16]:
from pyspark.sql import functions as f 

# add random value between [0,2]
df_skew_salting = df_skew.withColumn("_salt_", f.round(f.rand() * 2))
df_small_salting = df_small.withColumn("_salt_", f.round(f.rand() * 2))

df_skew_salting.show()
df_small_salting.show()

+-------+---+-----+--------+------+
|OrderID|Qty|Sales|Discount|_salt_|
+-------+---+-----+--------+------+
|    301| 72|   60|       0|   1.0|
|    401|126| 2598|       0|   2.0|
|    201| 95|  698|       1|   1.0|
|    201| 56|  330|       0|   1.0|
|    101| 34|  579|       0|   1.0|
|    201|170|  351|       1|   0.0|
|    301|135| 1755|       0|   1.0|
|    301| 21| 1217|       0|   0.0|
|    201| 45| 3223|       1|   2.0|
|    401| 60| 1864|       0|   2.0|
|    201|443|  229|       1|   2.0|
|    301|755|  529|       0|   1.0|
|    201|415|  176|       1|   2.0|
|    201| 13|  808|       0|   1.0|
|    201| 45|  734|       0|   1.0|
+-------+---+-----+--------+------+

+---------+-------+----------+-----+------+
|ProductID|OrderID|   Product|Price|_salt_|
+---------+-------+----------+-----+------+
|        1|    101|    pencil| 4.99|   2.0|
|        2|    101|      book|  9.5|   1.0|
|        3|    101|  scissors| 14.0|   1.0|
|        4|    301|      glue|  7.0|   1.0|
|      

In [18]:
# # repartition using _salt_
df_skew_salting = df_skew_salting.repartition(3, "_salt_")
df_small_salting = df_small_salting.repartition(3, "_salt_")

In [19]:
df_skew_salting.rdd.glom().collect()

[[Row(OrderID=401, Qty=126, Sales=2598, Discount=0, _salt_=2.0),
  Row(OrderID=201, Qty=45, Sales=3223, Discount=1, _salt_=2.0),
  Row(OrderID=401, Qty=60, Sales=1864, Discount=0, _salt_=2.0),
  Row(OrderID=201, Qty=443, Sales=229, Discount=1, _salt_=2.0),
  Row(OrderID=201, Qty=415, Sales=176, Discount=1, _salt_=2.0)],
 [Row(OrderID=201, Qty=170, Sales=351, Discount=1, _salt_=0.0),
  Row(OrderID=301, Qty=21, Sales=1217, Discount=0, _salt_=0.0)],
 [Row(OrderID=301, Qty=72, Sales=60, Discount=0, _salt_=1.0),
  Row(OrderID=201, Qty=95, Sales=698, Discount=1, _salt_=1.0),
  Row(OrderID=201, Qty=56, Sales=330, Discount=0, _salt_=1.0),
  Row(OrderID=101, Qty=34, Sales=579, Discount=0, _salt_=1.0),
  Row(OrderID=301, Qty=135, Sales=1755, Discount=0, _salt_=1.0),
  Row(OrderID=301, Qty=755, Sales=529, Discount=0, _salt_=1.0),
  Row(OrderID=201, Qty=13, Sales=808, Discount=0, _salt_=1.0),
  Row(OrderID=201, Qty=45, Sales=734, Discount=0, _salt_=1.0)]]

In [20]:
df_small_salting.rdd.glom().collect()

[[Row(ProductID=1, OrderID=101, Product='pencil', Price=4.99, _salt_=2.0)],
 [Row(ProductID=5, OrderID=201, Product='marker', Price=8.49, _salt_=0.0),
  Row(ProductID=7, OrderID=201, Product='calculator', Price=3.99, _salt_=0.0)],
 [Row(ProductID=2, OrderID=101, Product='book', Price=9.5, _salt_=1.0),
  Row(ProductID=3, OrderID=101, Product='scissors', Price=14.0, _salt_=1.0),
  Row(ProductID=4, OrderID=301, Product='glue', Price=7.0, _salt_=1.0),
  Row(ProductID=6, OrderID=301, Product='label', Price=2.0, _salt_=1.0),
  Row(ProductID=8, OrderID=501, Product='eraser', Price=1.55, _salt_=1.0)]]

In [21]:
df_skew_salting.drop('_salt_')
df_small_salting.drop('_salt_')

groups = df_skew_salting.join(df_small_salting, df_skew_salting.OrderID == df_small_salting.OrderID, how = 'inner')

summary = groups.select(
    f.round(f.mean(groups.Sales)).alias('avg'),
    f.min(groups.Sales).alias('min'),
    f.max(groups.Sales).alias('max'),
    f.stddev(groups.Sales).alias('stdev')
)

summary.show()

+-----+---+----+----------------+
|  avg|min| max|           stdev|
+-----+---+----+----------------+
|813.0| 60|3223|822.824746894844|
+-----+---+----+----------------+



# (3) Run a shuffle `Join()` to see how the skew effects computation resources.

In [12]:
from pyspark.sql import functions as f

spark.conf.set("spark.sql.shuffle.partitions", 8)
groups = df_skew.join(df_small, df_skew.OrderID == df_small.OrderID, how = 'inner')

summary = groups.select(
    f.round(f.mean(groups.Sales)).alias('avg'),
    f.min(groups.Sales).alias('min'),
    f.max(groups.Sales).alias('max'),
    f.stddev(groups.Sales).alias('stdev')
)

summary.show()

+------+---+----+-----------------+
|   avg|min| max|            stdev|
+------+---+----+-----------------+
|1723.0| 10|3399|967.5906776265002|
+------+---+----+-----------------+



## Mitigate data skewness: SALTING

In [8]:
from pyspark.sql import functions as f

spark.conf.set("spark.sql.shuffle.partitions", 8)


# add random value between [0,2]
df_skew_salting = df_skew.withColumn("_salt_", f.round(f.rand() * 7))
df_small_salting = df_small.withColumn("_salt_", f.round(f.rand() * 7))

# # repartition using _salt_
df_skew_salting = df_skew_salting.repartition(8, "_salt_")
df_small_salting = df_small_salting.repartition(8, "_salt_")

df_skew_salting.drop('_salt_')
df_small_salting.drop('_salt_')

groups = df_skew_salting.join(df_small_salting, df_skew_salting.OrderID == df_small_salting.OrderID, how = 'inner')

summary = groups.select(
    f.round(f.mean(groups.Sales)).alias('avg'),
    f.min(groups.Sales).alias('min'),
    f.max(groups.Sales).alias('max'),
    f.stddev(groups.Sales).alias('stdev')
)

summary.show()

+------+---+----+-----------------+
|   avg|min| max|            stdev|
+------+---+----+-----------------+
|1723.0| 10|3399|967.7618105077845|
+------+---+----+-----------------+



## Mitigate data skewness: Broadcast Hash Join

In [13]:
from pyspark.sql import functions as f

spark.conf.set("spark.sql.shuffle.partitions", 8)

groups_brd = df_skew.join(f.broadcast(df_small), df_skew.OrderID == df_small.OrderID, how = 'inner')

summary = groups_brd.select(
    f.round(f.mean(groups_brd.Sales)).alias('avg'),
    f.min(groups_brd.Sales).alias('min'),
    f.max(groups_brd.Sales).alias('max'),
    f.stddev(groups_brd.Sales).alias('stdev')
)

summary.show()

+------+---+----+----------------+
|   avg|min| max|           stdev|
+------+---+----+----------------+
|1723.0| 10|3399|967.590677626551|
+------+---+----+----------------+

