In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import random
import timeit
random.seed(42)

In [2]:
spark = SparkSession.builder.\
    appName("SparkExecutionPlanTests").\
    master("local[*]").\
    config("spark.driver.memory", "4g").\
    config("spark.executor.memory", "4g").\
    config("spark.driver.bindAddress","localhost").\
    config("spark.ui.port","4050").\
    getOrCreate()

# Spark UI: http://localhost:4050/jobs/

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/31 11:27:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/31 11:27:35 WARN Utils: Service 'SparkUI' could not bind on port 4050. Attempting port 4051.
24/10/31 11:27:35 WARN Utils: Service 'SparkUI' could not bind on port 4051. Attempting port 4052.


In [3]:
df_size = 1000000 # how many rows in the dataframe
num_join_cols = 100 # how many columns get joined with the dataframe to build up the execution plan
num_df_actions = 30 # how many iterations of actions to perform on the dataframe to test performance of each execution plan splitting strategy

In [4]:
df = spark.range(df_size)

In [5]:
for i in range(num_join_cols):
    temp_df = spark.range(df_size).withColumnRenamed("id", f"id_{i}")
    df = df.join(temp_df, df["id"] == temp_df[f"id_{i}"])
    df = df.withColumn(f"id_{i}", (f.rand() * 100).cast("int"))
df = df.drop("id")


In [6]:
df.show()

24/10/31 11:27:52 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 1:>                                                          (0 + 1) / 1]

+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|id_0|id_1|id_2|id_3|id_4|id_5|id_6|id_7|id_8|id_9|id_10|id_11|id_12|id_13|id_14|id_15|id_16|id_17|id_18|id_19|id_20|id_21|id_22|id_23|id_24|id_25|id_26|id_27|id_28|id_29|id_30|id_31|id_32|id_33|id_34|id_35|id_36|id_37|id_38|id_39|id_40|id_41|id_42|id_43|id_44|id_45|id_46|id_47|id_48|id_49|id_50|id_51|id_52|id_53|id_54|id_55|id_56|id_57|id_58|id_59|id_60|id_61|id_62|id_63|id_64|id_65|id_66|id_67|id_68|id_

                                                                                

In [7]:
def run_analysis_iterations(df, iterations):
    for i in range(iterations):
        random_col = random.choice(df.columns)
        random_num = random.randint(0, 100)
        random_sum = df.filter(f.col(random_col) < random_num).select(f.sum(f.col(random_col))).collect()[0][0]
        print(f"iteration {i} col {random_col} count: {random_sum}")

def cache_strategy(df):
    df.cache()
    df.count()
    return df

def checkpoint_strategy(df):
    spark.sparkContext.setCheckpointDir("./checkpoint")
    df = df.checkpoint(eager=True)
    return df

def local_checkpoint_strategy(df):
    df = df.localCheckpoint()
    return df

def temp_write_strategy(df):
    df.write.mode("overwrite").parquet("./temp_write")
    df = spark.read.parquet("./temp_write")
    return df

def rebuild_df_strategy(df):
    df =  spark.createDataFrame(df.rdd, df.schema)
    return df

In [8]:
print("No strategy:")
start_time = timeit.default_timer()
df_no_strategy = df
run_analysis_iterations(df_no_strategy, num_df_actions)
print(f"Execution time: {round(timeit.default_timer() - start_time, 2)} seconds")

No strategy:


                                                                                

iteration 0 col id_81 count: 909102


                                                                                

iteration 1 col id_3 count: 43693381


                                                                                

iteration 2 col id_35 count: 4646131
iteration 3 col id_28 count: 1359575
iteration 4 col id_94 count: 779495


                                                                                

iteration 5 col id_86 count: 43713383


                                                                                

iteration 6 col id_69 count: 549980


                                                                                

iteration 7 col id_75 count: 14311286
iteration 8 col id_4 count: 30069
iteration 9 col id_11 count: 3518979
iteration 10 col id_29 count: 20149617
iteration 11 col id_77 count: 29804
iteration 12 col id_71 count: 2995489
iteration 13 col id_91 count: 34066920
iteration 14 col id_89 count: 23464895


                                                                                

iteration 15 col id_53 count: 3782507
iteration 16 col id_57 count: 27728250
iteration 17 col id_35 count: None
iteration 18 col id_97 count: 1903762
iteration 19 col id_89 count: 14335561


                                                                                

iteration 20 col id_43 count: 5963337
iteration 21 col id_19 count: 3510675
iteration 22 col id_97 count: 9034057
iteration 23 col id_13 count: 549095
iteration 24 col id_48 count: 656490


                                                                                

iteration 25 col id_45 count: 9448232


                                                                                

iteration 26 col id_77 count: 5294377
iteration 27 col id_5 count: 42810819


                                                                                

iteration 28 col id_58 count: 22755625
iteration 29 col id_15 count: 11270328
Execution time: 54.2 seconds


In [9]:
print("Cache strategy:")
start_time = timeit.default_timer()
df_cache = df
df_cache = cache_strategy(df_cache)
run_analysis_iterations(df_cache, num_df_actions)
print(f"Execution time: {round(timeit.default_timer() - start_time, 2)} seconds")
df_cache.unpersist()

Cache strategy:


                                                                                

iteration 0 col id_10 count: 24147573
iteration 1 col id_37 count: 31602882
iteration 2 col id_79 count: 10357216
iteration 3 col id_73 count: 2752099
iteration 4 col id_90 count: 281335
iteration 5 col id_5 count: 34836176
iteration 6 col id_29 count: 47539965
iteration 7 col id_37 count: 450416
iteration 8 col id_29 count: 662653
iteration 9 col id_48 count: 5948980
iteration 10 col id_58 count: 32431021
iteration 11 col id_46 count: 1899830
iteration 12 col id_47 count: 9909002
iteration 13 col id_26 count: 35692079
iteration 14 col id_34 count: 39136908
iteration 15 col id_87 count: 33219939
iteration 16 col id_9 count: 29259892
iteration 17 col id_81 count: 2104125
iteration 18 col id_68 count: 42758037
iteration 19 col id_31 count: 1898573
iteration 20 col id_59 count: 11257900
iteration 21 col id_34 count: 32423610
iteration 22 col id_88 count: 24853108
iteration 23 col id_28 count: 37429808
iteration 24 col id_41 count: 47531896
iteration 25 col id_99 count: 209592
iteration 26

DataFrame[id_0: int, id_1: int, id_2: int, id_3: int, id_4: int, id_5: int, id_6: int, id_7: int, id_8: int, id_9: int, id_10: int, id_11: int, id_12: int, id_13: int, id_14: int, id_15: int, id_16: int, id_17: int, id_18: int, id_19: int, id_20: int, id_21: int, id_22: int, id_23: int, id_24: int, id_25: int, id_26: int, id_27: int, id_28: int, id_29: int, id_30: int, id_31: int, id_32: int, id_33: int, id_34: int, id_35: int, id_36: int, id_37: int, id_38: int, id_39: int, id_40: int, id_41: int, id_42: int, id_43: int, id_44: int, id_45: int, id_46: int, id_47: int, id_48: int, id_49: int, id_50: int, id_51: int, id_52: int, id_53: int, id_54: int, id_55: int, id_56: int, id_57: int, id_58: int, id_59: int, id_60: int, id_61: int, id_62: int, id_63: int, id_64: int, id_65: int, id_66: int, id_67: int, id_68: int, id_69: int, id_70: int, id_71: int, id_72: int, id_73: int, id_74: int, id_75: int, id_76: int, id_77: int, id_78: int, id_79: int, id_80: int, id_81: int, id_82: int, id_8

In [10]:
print("Checkpoint strategy:")
start_time = timeit.default_timer()
df_checkpoint = df
df_checkpoint = checkpoint_strategy(df_checkpoint)
run_analysis_iterations(df_checkpoint, num_df_actions)
print(f"Execution time: {round(timeit.default_timer() - start_time, 2)} seconds")

Checkpoint strategy:


                                                                                

iteration 0 col id_91 count: 7796434
iteration 1 col id_27 count: 33971210
iteration 2 col id_63 count: 12272798
iteration 3 col id_82 count: 16546112
iteration 4 col id_18 count: 5281492
iteration 5 col id_17 count: 4652780
iteration 6 col id_95 count: 24849328
iteration 7 col id_68 count: 5286886
iteration 8 col id_95 count: 27018502
iteration 9 col id_54 count: 27017752
iteration 10 col id_51 count: 10339990
iteration 11 col id_28 count: 1357130
iteration 12 col id_65 count: 19488753
iteration 13 col id_11 count: 45592254
iteration 14 col id_6 count: 913646
iteration 15 col id_19 count: 31550292
iteration 16 col id_20 count: 37432452
iteration 17 col id_54 count: 28489225
iteration 18 col id_8 count: 11764180
iteration 19 col id_48 count: 28520626
iteration 20 col id_59 count: 22100706
iteration 21 col id_32 count: 24169417
iteration 22 col id_1 count: 37456673
iteration 23 col id_92 count: 913851
iteration 24 col id_87 count: 22753582
iteration 25 col id_96 count: 5607014
iteration

In [11]:
print("Local checkpoint strategy:")
start_time = timeit.default_timer()
df_local_checkpoint = df
df_local_checkpoint = local_checkpoint_strategy(df_local_checkpoint)
run_analysis_iterations(df_local_checkpoint, num_df_actions)
print(f"Execution time: {round(timeit.default_timer() - start_time, 2)} seconds")

Local checkpoint strategy:


                                                                                

iteration 0 col id_0 count: 41850504
iteration 1 col id_92 count: 5287764
iteration 2 col id_64 count: 46571162
iteration 3 col id_22 count: 20175962
iteration 4 col id_13 count: 31609238
iteration 5 col id_38 count: 32384688
iteration 6 col id_64 count: 29253221
iteration 7 col id_25 count: 1711368
iteration 8 col id_47 count: 46577109
iteration 9 col id_20 count: 23453366
iteration 10 col id_99 count: 22102679
iteration 11 col id_0 count: 28458621
iteration 12 col id_41 count: 18912313
iteration 13 col id_2 count: 911530
iteration 14 col id_46 count: 7395166
iteration 15 col id_30 count: 209781
iteration 16 col id_30 count: 25621829
iteration 17 col id_10 count: 450040
iteration 18 col id_93 count: 18949626
iteration 19 col id_8 count: 46560272
iteration 20 col id_68 count: 47501480
iteration 21 col id_16 count: 1201694
iteration 22 col id_84 count: 17711726
iteration 23 col id_70 count: 2103612
iteration 24 col id_33 count: 22155084
iteration 25 col id_77 count: 14303919
iteration 2

In [12]:
print("Temp Write strategy:")
start_time = timeit.default_timer()
df_temp_write = df
df_temp_write = temp_write_strategy(df_temp_write)
run_analysis_iterations(df_temp_write, num_df_actions)
print(f"Execution time: {round(timeit.default_timer() - start_time, 2)} seconds")

Temp Write strategy:


                                                                                

iteration 0 col id_51 count: 35730774
iteration 1 col id_83 count: 10808317
iteration 2 col id_56 count: 21410598
iteration 3 col id_57 count: 1047915
iteration 4 col id_31 count: 3780085
iteration 5 col id_8 count: 9020345
iteration 6 col id_2 count: 27802210
iteration 7 col id_70 count: 4057644
iteration 8 col id_75 count: 3787506
iteration 9 col id_0 count: 362127
iteration 10 col id_90 count: 31599620
iteration 11 col id_7 count: 4062493
iteration 12 col id_8 count: 60265
iteration 13 col id_42 count: 361482
iteration 14 col id_65 count: 4366566
iteration 15 col id_35 count: 35673092
iteration 16 col id_62 count: 3507007
iteration 17 col id_69 count: 1199506
iteration 18 col id_92 count: 26257226
iteration 19 col id_73 count: 17703329
iteration 20 col id_31 count: 49453321
iteration 21 col id_60 count: 13253930
iteration 22 col id_24 count: 661940
iteration 23 col id_12 count: 34872797
iteration 24 col id_55 count: 9899146
iteration 25 col id_54 count: 13264749
iteration 26 col id_

In [13]:
print("Rebuild strategy:")
start_time = timeit.default_timer()
df_rebuild = df
df_rebuild = rebuild_df_strategy(df_rebuild)
run_analysis_iterations(df_rebuild, num_df_actions)
print(f"Execution time: {round(timeit.default_timer() - start_time, 2)} seconds")

Rebuild strategy:


                                                                                

iteration 0 col id_51 count: 42784993


                                                                                

iteration 1 col id_43 count: 784859


                                                                                

iteration 2 col id_31 count: 2761707


                                                                                

iteration 3 col id_24 count: 22770297


                                                                                

iteration 4 col id_57 count: 1354956


                                                                                

iteration 5 col id_54 count: 2525072


                                                                                

iteration 6 col id_35 count: 17108816


                                                                                

iteration 7 col id_31 count: 362303


                                                                                

iteration 8 col id_56 count: 24094038


                                                                                

iteration 9 col id_12 count: 149722


                                                                                

iteration 10 col id_83 count: 23453879


                                                                                

iteration 11 col id_1 count: 549584


                                                                                

iteration 12 col id_96 count: 4339846


                                                                                

iteration 13 col id_21 count: 13269207


                                                                                

iteration 14 col id_62 count: 18289671


                                                                                

iteration 15 col id_27 count: 12770425


                                                                                

iteration 16 col id_7 count: 2102143


                                                                                

iteration 17 col id_48 count: None


                                                                                

iteration 18 col id_49 count: 5289017


                                                                                

iteration 19 col id_58 count: 6277605


                                                                                

iteration 20 col id_54 count: 39158389


                                                                                

iteration 21 col id_93 count: 49504333


                                                                                

iteration 22 col id_71 count: 34843951


                                                                                

iteration 23 col id_91 count: 18904179


                                                                                

iteration 24 col id_19 count: 2767316


                                                                                

iteration 25 col id_37 count: 3502378


                                                                                

iteration 26 col id_7 count: 27027514


                                                                                

iteration 27 col id_94 count: 23465265


                                                                                

iteration 28 col id_7 count: 44674132


[Stage 580:>                                                      (0 + 12) / 12]

iteration 29 col id_40 count: 210814
Execution time: 209.61 seconds


                                                                                

In [10]:
spark.stop()