## 1. Small-large datasets: Broadcast joins, to send small ref data to all workers, to avoid many data shuffling (repartitioning both tables the same way) in joins.
## 2. Skewed (uneven divided) data: Bucketing, to group data based on hash function of join/groupby key, to balance worker worflows executions, reduce shuffling during joins, avoid data skews.  Used for high cardinality (highly unique, HIGH selectivity, hence HASH function must be applied to map unique values to a few "buckets"). Partitioning is the opposite: used for low cardinality (non-unique, LOW selectivity), partition key is directly the column values, that are only a few!
## 3. too small or too large partitions: Repartitioning (splitting large partitions) or coalescing (merging small partitions), to redistribute data evenly (same computational sizes), so to avoid data skewing (small and large partitions, where large partition computations will delay the overall result). Ideally 1 partition should be of about 128MB
## 4. AQE (Adaptive Query Execution): Spark optimizer based on runtime statistics (broadcast, repartition, coalesce, dynamic join stragegy selection)
### Skewed data handling to ensure balanced workflows (dynamically adjust partition sizes: repartition() -  split large partitions / coalesce() - merge small partitions)
### Dynamic join strategy selection
### Dynamic partition coalescing (merging of small partitions to reduce join shuffling)


In [1]:
# setup spark on linux
from pyspark.sql import *

spark = (SparkSession.builder
         #.master("local") # local- no parallelizm at all, local[2] - 2 cores, local[*] - as many cores as local logical cores
         .appName("Broadcast joins - controlled")
         .enableHiveSupport() # enableHiveSupport() needed to make data persistent... 
         .config("spark.sql.autoBroadcastJoinThreshold", -1) # max. byte size of table to be broadcast. -1 = disable
         .config("spark.sql.adaptive.enabled", False) # disable AQE
         #.config("spark.log.level", "ERROR")
         .getOrCreate())

print('spark version:', spark.version)
print('Done.')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/13 18:12:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/13 18:12:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


spark version: 3.5.5
Done.


In [2]:
spark.sql('show databases').show()

25/05/13 18:13:12 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/05/13 18:13:12 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
25/05/13 18:13:14 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
25/05/13 18:13:14 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore jovyan@172.17.0.2


+---------+
|namespace|
+---------+
|  default|
|     mydb|
| mytestdb|
+---------+



In [4]:
# generate test data
import random

transaction_list = [
    (f'Product_{random.randint(1, 100)}',  # product_id
     random.randint(1, 1000)               # sales_amount
     )
    for c in range(1000000)]
transaction_df = spark.createDataFrame(transaction_list, ['product_id', 'sales_amount'])
transaction_df.show(5, truncate = False)

ref_list = [
    (f'Product_{c}',                     # product_id
     f'Category_{random.randint(1, 10)}' # category_id
     )
    for c in range(1, 101)]
ref_df = spark.createDataFrame(ref_list, ['product_id', 'category_id'])
ref_df.show(5, truncate = False)

print("Done.")

25/05/13 18:17:37 WARN TaskSetManager: Stage 2 contains a task of very large size (4325 KiB). The maximum recommended task size is 1000 KiB.


+----------+------------+
|product_id|sales_amount|
+----------+------------+
|Product_70|574         |
|Product_99|683         |
|Product_32|304         |
|Product_27|498         |
|Product_59|989         |
+----------+------------+
only showing top 5 rows

+----------+-----------+
|product_id|category_id|
+----------+-----------+
|Product_1 |Category_9 |
|Product_2 |Category_6 |
|Product_3 |Category_4 |
|Product_4 |Category_9 |
|Product_5 |Category_9 |
+----------+-----------+
only showing top 5 rows

Done.


In [5]:
# 1. standard vs broadcast join (for fact + ref data)
from datetime import datetime
from pyspark.sql.functions import col
from pyspark.sql.functions import broadcast

if 'standard_join_df' in globals():
    del standard_join_df

ct = datetime.now()
standard_join_df = transaction_df.alias('tr').join(ref_df.alias('ref'), col('tr.product_id') == col('ref.product_id'), 'inner')
standard_join_df.count()  # action
print(f"standard_join_df done in {(datetime.now() - ct)} secs.")

ct = datetime.now()
broadcast_join_df = transaction_df.alias('tr').join(broadcast(ref_df).alias('ref'), col('tr.product_id') == col('ref.product_id'), 'inner')
broadcast_join_df.count()  # action
print(f"broadcast_join_df done in {(datetime.now() - ct)} secs.")

broadcast_join_df.show(5, truncate= False)
print("Done.")

25/05/13 18:20:16 WARN TaskSetManager: Stage 5 contains a task of very large size (4325 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

standard_join_df done in 0:00:05.296089 secs.


25/05/13 18:20:21 WARN TaskSetManager: Stage 9 contains a task of very large size (4325 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

broadcast_join_df done in 0:00:01.263778 secs.
+----------+------------+----------+-----------+
|product_id|sales_amount|product_id|category_id|
+----------+------------+----------+-----------+
|Product_70|574         |Product_70|Category_8 |
|Product_99|683         |Product_99|Category_5 |
|Product_32|304         |Product_32|Category_3 |
|Product_27|498         |Product_27|Category_10|
|Product_59|989         |Product_59|Category_3 |
+----------+------------+----------+-----------+
only showing top 5 rows

Done.


25/05/13 18:20:22 WARN TaskSetManager: Stage 12 contains a task of very large size (4325 KiB). The maximum recommended task size is 1000 KiB.


In [6]:
# 2. bucketed join (needs to save the data after bucketing.  good for multiple reads. only saveAsTable is supported for now!...)

# 0. delete the tables if exist
spark.sql('DROP TABLE IF EXISTS bucketed_transaction')
spark.sql('DROP TABLE IF EXISTS bucketed_ref')

# 1. save the data in buckets
buckets = 10
transaction_df.write.format('parquet').bucketBy(buckets, 'product_id').saveAsTable('bucketed_transaction')
ref_df.write.format('parquet').bucketBy(buckets, 'product_id').saveAsTable('bucketed_ref')

# 2. read back the data
bucketed_transaction_df = spark.table('bucketed_transaction')
bucketed_ref_df = spark.table('bucketed_ref')

# 3. execute bucketed join
ct = datetime.now()
bucketed_join_df = bucketed_transaction_df.alias('tr').join(bucketed_ref_df.alias('ref'), col('tr.product_id') == col('ref.product_id'), 'inner')
bucketed_join_df.count()  # action
print(f"bucketed_join done in {(datetime.now() - ct)} secs.")
bucketed_join_df.show(5, truncate= False)

print("Done.")

25/05/13 18:22:20 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
25/05/13 18:22:20 WARN FileUtils: File file:/home/jovyan/work/various_tests/spark/Performance_tuning/spark-warehouse/bucketed_transaction does not exist; Force to delete it.
25/05/13 18:22:20 ERROR FileUtils: Failed to delete file:/home/jovyan/work/various_tests/spark/Performance_tuning/spark-warehouse/bucketed_transaction
25/05/13 18:22:20 WARN FileUtils: File file:/home/jovyan/work/various_tests/spark/Performance_tuning/spark-warehouse/bucketed_ref does not exist; Force to delete it.
25/05/13 18:22:20 ERROR FileUtils: Failed to delete file:/home/jovyan/work/various_tests/spark/Performance_tuning/spark-warehouse/bucketed_ref
25/05/13 18:22:21 WARN TaskSetManager: Stage 13 contains a task of very large size (4325 KiB). The maximum recommended task size is 1000 KiB.
25/05/13 18:22:24 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is

bucketed_join done in 0:00:01.627052 secs.
+----------+------------+----------+-----------+
|product_id|sales_amount|product_id|category_id|
+----------+------------+----------+-----------+
|Product_10|851         |Product_10|Category_3 |
|Product_10|699         |Product_10|Category_3 |
|Product_10|404         |Product_10|Category_3 |
|Product_10|236         |Product_10|Category_3 |
|Product_10|759         |Product_10|Category_3 |
+----------+------------+----------+-----------+
only showing top 5 rows

Done.


In [9]:
# 3. AQE with new config setup (AQE, skewJoins, coalescePartitions enabled, broadcast disabled)
# setup spark on linux
from pyspark.sql import *

try:
    spark.stop()
except:
    None

spark = (SparkSession.builder
         #.master("local") # local- no parallelizm at all, local[2] - 2 cores, local[*] - as many cores as local logical cores
         .appName("AQE")
         .enableHiveSupport() # enableHiveSupport() needed to make data persistent... 
         .config("spark.sql.adaptive.enabled", True) # enable AQE
         .config("spark.sql.adaptive.skewJoin.enabled", True) # enable skewJoin
         .config("spark.sql.adaptive.coalescePartitions.enabled", True) # enable coalescePartitions
         .config("spark.sql.autoBroadcastJoinThreshold", -1) # max. byte size of table to be broadcast. -1 = disable
         .getOrCreate())

print('spark version:', spark.version)
print('Done.')

spark version: 3.5.5
Done.


In [10]:
# change the transaction test data to be skewed (Product_1 with much higher occurence)
import random

transaction_list = [
    (f'Product_{random.randint(1, 100)  if c < 1000 else 1}',  # product_id
     random.randint(1, 1000)                                   # sales_amount
     )
    for c in range(1000000)]
transaction_df = spark.createDataFrame(transaction_list, ['product_id', 'sales_amount'])
transaction_df.groupBy('product_id').count().orderBy('count', ascending = False).show(5, truncate = False)

ref_list = [
    (f'Product_{c}',                     # product_id
     f'Category_{random.randint(1, 10)}' # category_id
     )
    for c in range(1, 101)]
ref_df = spark.createDataFrame(ref_list, ['product_id', 'category_id'])
ref_df.show(5, truncate = False)

print('Done.')

25/05/13 18:25:18 WARN TaskSetManager: Stage 0 contains a task of very large size (4102 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:>                                                          (0 + 4) / 4]

+----------+------+
|product_id|count |
+----------+------+
|Product_1 |999003|
|Product_50|18    |
|Product_92|17    |
|Product_10|17    |
|Product_14|16    |
+----------+------+
only showing top 5 rows

+----------+-----------+
|product_id|category_id|
+----------+-----------+
|Product_1 |Category_2 |
|Product_2 |Category_5 |
|Product_3 |Category_2 |
|Product_4 |Category_8 |
|Product_5 |Category_4 |
+----------+-----------+
only showing top 5 rows

Done.


                                                                                

In [11]:
# join with / without AQE
from datetime import datetime
from pyspark.sql.functions import col

# change config, disable AQE
spark.conf.set("spark.sql.adaptive.enabled", False)  # disable AQE
ct = datetime.now()
non_aqe_join_df = transaction_df.alias('tr').join(ref_df.alias('ref'), col('tr.product_id') == col('ref.product_id'), 'inner')
non_aqe_join_df.count()  # action
print(f"non_AQE_join done in {(datetime.now() - ct)} secs.")

# change config, enable AQE
spark.conf.set("spark.sql.adaptive.enabled", True)  # enable AQE
ct = datetime.now()
aqe_join_df = transaction_df.alias('tr').join(ref_df.alias('ref'), col('tr.product_id') == col('ref.product_id'), 'inner')
aqe_join_df.count()  # action
print(f"AQE_join done in {(datetime.now() - ct)} secs.")
print('Done.')

25/05/13 18:25:25 WARN TaskSetManager: Stage 5 contains a task of very large size (4102 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

non_AQE_join done in 0:00:02.921626 secs.


25/05/13 18:25:28 WARN TaskSetManager: Stage 8 contains a task of very large size (4102 KiB). The maximum recommended task size is 1000 KiB.

AQE_join done in 0:00:01.322358 secs.
Done.


                                                                                

In [12]:
# 3. AQE with new config setup (AQE disabled - sort merge join, enabled - broadcasting)
# setup spark on linux
from pyspark.sql import *

try:
    spark.stop()
except:
    None

spark = (SparkSession.builder
         #.master("local") # local- no parallelizm at all, local[2] - 2 cores, local[*] - as many cores as local logical cores
         .appName("AQE")
         .enableHiveSupport() # enableHiveSupport() needed to make data persistent... 
         .config("spark.sql.adaptive.enabled", True) # enable AQE
         .getOrCreate())

print('spark version:', spark.version)# change the transaction test data to be skewed (Product_1 with much higher occurence)
print('Done.')

spark version: 3.5.5
Done.


In [13]:
# generate test data
import random

transaction_list = [
    (f'Product_{random.randint(1, 100)}',  # product_id
     random.randint(1, 1000)               # sales_amount
     )
    for c in range(1000000)]
transaction_df = spark.createDataFrame(transaction_list, ['product_id', 'sales_amount'])
transaction_df.groupBy('product_id').count().orderBy('count', ascending = False).show(5, truncate = False)

ref_list = [
    (f'Product_{c}',                     # product_id
     f'Category_{random.randint(1, 10)}' # category_id
     )
    for c in range(1, 101)]
ref_df = spark.createDataFrame(ref_list, ['product_id', 'category_id'])
ref_df.show(5, truncate = False)

print('Done.')

25/05/13 18:28:19 WARN TaskSetManager: Stage 0 contains a task of very large size (4325 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:>                                                          (0 + 4) / 4]

+----------+-----+
|product_id|count|
+----------+-----+
|Product_78|10282|
|Product_48|10254|
|Product_3 |10201|
|Product_43|10171|
|Product_79|10161|
+----------+-----+
only showing top 5 rows

+----------+-----------+
|product_id|category_id|
+----------+-----------+
|Product_1 |Category_7 |
|Product_2 |Category_10|
|Product_3 |Category_1 |
|Product_4 |Category_6 |
|Product_5 |Category_10|
+----------+-----------+
only showing top 5 rows

Done.


                                                                                

In [14]:
# join with / without AQE
from datetime import datetime
from pyspark.sql.functions import col

# change config, disable AQE (not using broadcast, but sort-merge join strategy)
spark.conf.set("spark.sql.adaptive.enabled", False)  # disable AQE
ct = datetime.now()
non_aqe_join_df = transaction_df.alias('tr').join(ref_df.alias('ref'), col('tr.product_id') == col('ref.product_id'), 'inner')
non_aqe_join_df.count()  # action
print(f"non_AQE_join done in {(datetime.now() - ct)} secs.")

# change config, enable AQE (using broadcast)
spark.conf.set("spark.sql.adaptive.enabled", True)  # enable AQE
ct = datetime.now()
aqe_join_df = transaction_df.alias('tr').join(ref_df.alias('ref'), col('tr.product_id') == col('ref.product_id'), 'inner')
aqe_join_df.count()  # action
print(f"AQE_join done in {(datetime.now() - ct)} secs.")
print('Done.')

25/05/13 18:28:23 WARN TaskSetManager: Stage 4 contains a task of very large size (4325 KiB). The maximum recommended task size is 1000 KiB.

non_AQE_join done in 0:00:02.634615 secs.


25/05/13 18:28:25 WARN TaskSetManager: Stage 8 contains a task of very large size (4325 KiB). The maximum recommended task size is 1000 KiB.
[Stage 9:>                                                          (0 + 4) / 4]

AQE_join done in 0:00:01.508587 secs.
Done.


                                                                                

In [1]:
# 4. Dynamic partition coalescing
# 3. AQE with new config setup (AQE disabled - sort merge join, enabled - broadcasting)
# setup spark on linux
from pyspark.sql import *

try:
    spark.stop()
except:
    None

spark = (SparkSession.builder
         #.master("local") # local- no parallelizm at all, local[2] - 2 cores, local[*] - as many cores as local logical cores
         .appName("Dynamic partition coalescing")
         .enableHiveSupport() # enableHiveSupport() needed to make data persistent... 
         .config("spark.sql.adaptive.enabled", True) # enable AQE
         .config("spark.sql.adaptive.coalescePartitions.enabled", True) # will coalesce partitions to be of size advisoryPartitionSizeInBytes
         .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB") # size of the partitions to shuffle
         .config("spark.sql.shuffle.Partitions", "20") # number of partitions to use when shuffling (default is 200)
         .getOrCreate())

print('spark version:', spark.version)# change the transaction test data to be skewed (Product_1 with much higher occurence)
print('Done.')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/25 00:26:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


spark version: 3.5.5
Done.


In [15]:
# generate test data
import random

transaction_list = [
    (f'Product_{random.randint(1, 100)}',  # product_id
     random.randint(1, 1000)               # sales_amount
     )
    for c in range(1000000)]
transaction_df = spark.createDataFrame(transaction_list, ['product_id', 'sales_amount'])
print('Done.')

Done.


In [16]:
# aggregate

from datetime import datetime
from pyspark.sql.functions import col

# change config, disable coalescePartitions
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)  # disable coalescePartitions
ct = datetime.now()
none_coalesced_result_df = transaction_df.groupBy('product_id').sum('sales_amount')
none_coalesced_result_df.count() # action
print(f"non_coalesced_aggregate done in {(datetime.now() - ct)} secs.")

# change config, enable AQE (using broadcast)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)  # enable coalescePartitions, spark will dynamically merge (coalesce small partitions)
ct = datetime.now()
coalesced_result_df = transaction_df.groupBy('product_id').sum('sales_amount')
coalesced_result_df.count() # action
print(f"coalesced_aggregate done in {(datetime.now() - ct)} secs.")
print('Done.')

25/05/13 18:35:29 WARN TaskSetManager: Stage 17 contains a task of very large size (4325 KiB). The maximum recommended task size is 1000 KiB.
25/05/13 18:35:31 WARN TaskSetManager: Stage 23 contains a task of very large size (4325 KiB). The maximum recommended task size is 1000 KiB.


non_coalesced_aggregate done in 0:00:01.833081 secs.


[Stage 23:>                                                         (0 + 4) / 4]

coalesced_aggregate done in 0:00:00.837081 secs.
Done.


                                                                                

In [4]:
# stop spark session (localhost:4040 will be reset, unaccessible until a new session starts)
spark.stop()
print('Done.')

Done.
