In [None]:
import sys
from pyspark.sql import SparkSession

print("PySpark Script: ", sys.argv[0])

# Create a spark context and print some information about the context object
spark: SparkSession = SparkSession.builder.appName("Demo").getOrCreate()

print(spark.sparkContext.version)
print(spark.sparkContext.pythonVer)
print(spark.sparkContext.master)
# Stop Pyspark
spark.stop()
print("Spark Successfully Stopped!")

In [11]:
import time
import random
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


DEFAULT_DATA_SIZE = 10000000

data_size_labels = {
    50000000: 'large',
    10000000: 'small'
}

def generate_data(size):
    return [(random.randint(1, 10000), random.random()) for _ in range(size)]


def run_large_spark_job(spark: SparkSession, data_size: int, app_name: str, generate_rdd: bool = False):
    '''
    Runs the spark job with the passed in SparkSession that contains an optimized
    SparkConf
    '''
    print(f"Running spark job for {app_name}")
    # conf = spark.sparkContext.getConf()
    # for key, value in conf.getAll():
    #     print(f"{key} = {value}")
    # print()
    try:
        start_time = time.time()
        sc = spark.sparkContext
        sc.setLogLevel("ERROR")
        # Create a large DataFrame with random data
        data = [(i, i % 100, i % 1000) for i in range(data_size)]
        columns = ["id", "group", "subgroup"]
        large_df = spark.createDataFrame(data, columns)

        # Perform transformation, aggregation, and sorting: shuffling and sorting
        result_df = (large_df
                    .withColumn("group_id", F.col("group") * 10)
                    .groupBy("group_id")
                    .agg(F.avg("subgroup").alias("avg_subgroup"),
                        F.count("id").alias("count"))
                    .orderBy("avg_subgroup"))
        result_df.show()
        # # Create small dataframe & join operation which will trigger a shuffle
        # small_df = spark.createDataFrame([(i, i * 2) for i in range(100)], ["group_id", "value"])
        # # Perform the join operation
        # joined_df = result_df.join(small_df, "group_id")
        # joined_df.show()
        end_time = time.time()
        print("Execution Time (secs):", (end_time-start_time))
    except Exception as e:
        print('Stopping context with error', e)
        spark.stop()
    spark.stop()

## Baseline: Default Spark Config

- `spark.serializer` = org.apache.spark.serializer.JavaSerializer

- `spark.reducer.maxSizeInFlight` = 48m

- `spark.shuffle.compress` = true

- `spark.shuffle.spill.compress` = true

- `spark.rdd.compress` = true

- `spark.shuffle.file.buffer` = 32k

- `spark.shuffle.io.preferDirectBufs` = true

- `spark.io.compression.codec` = lz4



Source: https://spark.apache.org/docs/latest/tuning.html


In [12]:
# Run the job with default config
data_size_label = data_size_labels.get(DEFAULT_DATA_SIZE, DEFAULT_DATA_SIZE)
app_name = f'Baseline: Default Config (data_size={data_size_label})'
spark: SparkSession = SparkSession.builder \
    .master("local[*]") \
    .appName(app_name) \
    .config("spark.reducer.maxSizeInFlight", "48m") \
    .config("spark.shuffle.compress", "true") \
    .config("spark.io.compression.codec", "lz4") \
    .config("spark.shuffle.io.preferDirectBufs", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.JavaSerializer") \
    .config("spark.shuffle.file.buffer", "32k") \
    .config("spark.shuffle.spill.compress", "true")\
    .config("spark.eventLog.compress", "true")\
    .config("spark.eventLog.enabled", "false")\
    .config("spark.eventLog.dir", "/tmp/spark-events")\
    .getOrCreate()
run_large_spark_job(spark=spark, data_size=DEFAULT_DATA_SIZE, app_name=app_name)

Running spark job for Baseline: Default Config (data_size=small)


                                                                                

+--------+------------+------+
|group_id|avg_subgroup| count|
+--------+------------+------+
|       0|       450.0|100000|
|      10|       451.0|100000|
|      20|       452.0|100000|
|      30|       453.0|100000|
|      40|       454.0|100000|
|      50|       455.0|100000|
|      60|       456.0|100000|
|      70|       457.0|100000|
|      80|       458.0|100000|
|      90|       459.0|100000|
|     100|       460.0|100000|
|     110|       461.0|100000|
|     120|       462.0|100000|
|     130|       463.0|100000|
|     140|       464.0|100000|
|     150|       465.0|100000|
|     160|       466.0|100000|
|     170|       467.0|100000|
|     180|       468.0|100000|
|     190|       469.0|100000|
+--------+------------+------+
only showing top 20 rows

Execution Time (secs): 99.55303621292114


## Experiment 1: Changing the serializer

spark.serializer = JavaSerializer -> KryoSerializer

In [13]:
# Run the job with default config
data_size_label = data_size_labels.get(DEFAULT_DATA_SIZE, DEFAULT_DATA_SIZE)
app_name = f'Experiment 1: KryoSerializer (data_size={data_size_label})'
spark: SparkSession = SparkSession.builder \
    .master("local[*]") \
    .appName(app_name) \
    .config("spark.reducer.maxSizeInFlight", "48m") \
    .config("spark.shuffle.compress", "true") \
    .config("spark.io.compression.codec", "lz4") \
    .config("spark.shuffle.io.preferDirectBufs", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.shuffle.file.buffer", "32k") \
    .config("spark.shuffle.spill.compress", "true")\
    .config("spark.eventLog.compress", "true")\
    .config("spark.eventLog.enabled", "false")\
    .config("spark.eventLog.dir", "/tmp/spark-events")\
    .getOrCreate()
run_large_spark_job(spark=spark, data_size=DEFAULT_DATA_SIZE, app_name=app_name)

Running spark job for Experiment 1: KryoSerializer (data_size=small)


                                                                                

+--------+------------+------+
|group_id|avg_subgroup| count|
+--------+------------+------+
|       0|       450.0|100000|
|      10|       451.0|100000|
|      20|       452.0|100000|
|      30|       453.0|100000|
|      40|       454.0|100000|
|      50|       455.0|100000|
|      60|       456.0|100000|
|      70|       457.0|100000|
|      80|       458.0|100000|
|      90|       459.0|100000|
|     100|       460.0|100000|
|     110|       461.0|100000|
|     120|       462.0|100000|
|     130|       463.0|100000|
|     140|       464.0|100000|
|     150|       465.0|100000|
|     160|       466.0|100000|
|     170|       467.0|100000|
|     180|       468.0|100000|
|     190|       469.0|100000|
+--------+------------+------+
only showing top 20 rows

Execution Time (secs): 99.18301320075989


## Experiment 2.1: Decreasing the Shuffle File Buffer
- spark.shuffle.file.buffer = 32k -> 16k

In [14]:
# Run the job with default config
data_size_label = data_size_labels.get(DEFAULT_DATA_SIZE, DEFAULT_DATA_SIZE)
app_name = f'Experiment 2.1: Decrease file_buffer to 16k (data_size={data_size_label})'
spark: SparkSession = SparkSession.builder \
    .master("local[*]") \
    .appName(app_name) \
    .config("spark.reducer.maxSizeInFlight", "48m") \
    .config("spark.shuffle.compress", "true") \
    .config("spark.io.compression.codec", "lz4") \
    .config("spark.shuffle.io.preferDirectBufs", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.JavaSerializer") \
    .config("spark.shuffle.file.buffer", "16k") \
    .config("spark.shuffle.spill.compress", "true")\
    .config("spark.eventLog.compress", "true")\
    .config("spark.eventLog.enabled", "false")\
    .config("spark.eventLog.dir", "/tmp/spark-events")\
    .getOrCreate()
run_large_spark_job(spark=spark, data_size=DEFAULT_DATA_SIZE, app_name=app_name)

Running spark job for Experiment 2.1: Decrease file_buffer to 16k (data_size=small)


                                                                                

+--------+------------+------+
|group_id|avg_subgroup| count|
+--------+------------+------+
|       0|       450.0|100000|
|      10|       451.0|100000|
|      20|       452.0|100000|
|      30|       453.0|100000|
|      40|       454.0|100000|
|      50|       455.0|100000|
|      60|       456.0|100000|
|      70|       457.0|100000|
|      80|       458.0|100000|
|      90|       459.0|100000|
|     100|       460.0|100000|
|     110|       461.0|100000|
|     120|       462.0|100000|
|     130|       463.0|100000|
|     140|       464.0|100000|
|     150|       465.0|100000|
|     160|       466.0|100000|
|     170|       467.0|100000|
|     180|       468.0|100000|
|     190|       469.0|100000|
+--------+------------+------+
only showing top 20 rows

Execution Time (secs): 97.55631804466248


## Experiment 2.2: Increasing the Shuffle File Buffer
- spark.shuffle.file.buffer = 32k -> 64k

In [15]:
# Run the job with default config
data_size_label = data_size_labels.get(DEFAULT_DATA_SIZE, DEFAULT_DATA_SIZE)
app_name = f'Experiment 2.2: Increase file buffer to 64k (data_size={data_size_label})'
spark: SparkSession = SparkSession.builder \
    .master("local[*]") \
    .appName(app_name) \
    .config("spark.reducer.maxSizeInFlight", "48m") \
    .config("spark.shuffle.compress", "true") \
    .config("spark.io.compression.codec", "lz4") \
    .config("spark.shuffle.io.preferDirectBufs", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.JavaSerializer") \
    .config("spark.shuffle.file.buffer", "64k") \
    .config("spark.shuffle.spill.compress", "true")\
    .config("spark.eventLog.compress", "true")\
    .config("spark.eventLog.enabled", "false")\
    .config("spark.eventLog.dir", "/tmp/spark-events")\
    .getOrCreate()
run_large_spark_job(spark=spark, data_size=DEFAULT_DATA_SIZE, app_name=app_name)

Running spark job for Experiment 2.2: Increase file buffer to 64k (data_size=small)


                                                                                

+--------+------------+------+
|group_id|avg_subgroup| count|
+--------+------------+------+
|       0|       450.0|100000|
|      10|       451.0|100000|
|      20|       452.0|100000|
|      30|       453.0|100000|
|      40|       454.0|100000|
|      50|       455.0|100000|
|      60|       456.0|100000|
|      70|       457.0|100000|
|      80|       458.0|100000|
|      90|       459.0|100000|
|     100|       460.0|100000|
|     110|       461.0|100000|
|     120|       462.0|100000|
|     130|       463.0|100000|
|     140|       464.0|100000|
|     150|       465.0|100000|
|     160|       466.0|100000|
|     170|       467.0|100000|
|     180|       468.0|100000|
|     190|       469.0|100000|
+--------+------------+------+
only showing top 20 rows

Execution Time (secs): 99.2040319442749


## Experiment 3.1: Turn off shuffle compress
- spark.shuffle.compress = true -> false


In [16]:
# Run the job with default config
data_size_label = data_size_labels.get(DEFAULT_DATA_SIZE, DEFAULT_DATA_SIZE)
app_name = f'Experiment 3.1: No Shuffle Compression (data_size={data_size_label})'
spark: SparkSession = SparkSession.builder \
    .master("local[*]") \
    .appName(app_name) \
    .config("spark.reducer.maxSizeInFlight", "48m") \
    .config("spark.shuffle.compress", "false") \
    .config("spark.io.compression.codec", "lz4") \
    .config("spark.shuffle.io.preferDirectBufs", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.JavaSerializer") \
    .config("spark.shuffle.file.buffer", "32k") \
    .config("spark.shuffle.spill.compress", "true")\
    .config("spark.eventLog.compress", "true")\
    .config("spark.eventLog.enabled", "false")\
    .config("spark.eventLog.dir", "/tmp/spark-events")\
    .getOrCreate()
run_large_spark_job(spark=spark, data_size=DEFAULT_DATA_SIZE, app_name=app_name)

Running spark job for Experiment 3.1: No Shuffle Compression (data_size=small)


                                                                                

+--------+------------+------+
|group_id|avg_subgroup| count|
+--------+------------+------+
|       0|       450.0|100000|
|      10|       451.0|100000|
|      20|       452.0|100000|
|      30|       453.0|100000|
|      40|       454.0|100000|
|      50|       455.0|100000|
|      60|       456.0|100000|
|      70|       457.0|100000|
|      80|       458.0|100000|
|      90|       459.0|100000|
|     100|       460.0|100000|
|     110|       461.0|100000|
|     120|       462.0|100000|
|     130|       463.0|100000|
|     140|       464.0|100000|
|     150|       465.0|100000|
|     160|       466.0|100000|
|     170|       467.0|100000|
|     180|       468.0|100000|
|     190|       469.0|100000|
+--------+------------+------+
only showing top 20 rows

Execution Time (secs): 98.6578140258789


## Experiment 3.2: Turn off shuffle.spill compress
- spark.shuffle.spill.compress = true -> false


In [17]:
# Run the job with default config
data_size_label = data_size_labels.get(DEFAULT_DATA_SIZE, DEFAULT_DATA_SIZE)
app_name = f'Experiment 3.2: No Shuffle Spill Compression (data_size={data_size_label})'
spark: SparkSession = SparkSession.builder \
    .master("local[*]") \
    .appName(app_name) \
    .config("spark.reducer.maxSizeInFlight", "48m") \
    .config("spark.shuffle.compress", "true") \
    .config("spark.io.compression.codec", "lz4") \
    .config("spark.shuffle.io.preferDirectBufs", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.JavaSerializer") \
    .config("spark.shuffle.file.buffer", "32k") \
    .config("spark.shuffle.spill.compress", "false")\
    .config("spark.eventLog.compress", "true")\
    .config("spark.eventLog.enabled", "false")\
    .config("spark.eventLog.dir", "/tmp/spark-events")\
    .getOrCreate()
run_large_spark_job(spark=spark, data_size=DEFAULT_DATA_SIZE, app_name=app_name)

Running spark job for Experiment 3.2: No Shuffle Spill Compression (data_size=small)


                                                                                

+--------+------------+------+
|group_id|avg_subgroup| count|
+--------+------------+------+
|       0|       450.0|100000|
|      10|       451.0|100000|
|      20|       452.0|100000|
|      30|       453.0|100000|
|      40|       454.0|100000|
|      50|       455.0|100000|
|      60|       456.0|100000|
|      70|       457.0|100000|
|      80|       458.0|100000|
|      90|       459.0|100000|
|     100|       460.0|100000|
|     110|       461.0|100000|
|     120|       462.0|100000|
|     130|       463.0|100000|
|     140|       464.0|100000|
|     150|       465.0|100000|
|     160|       466.0|100000|
|     170|       467.0|100000|
|     180|       468.0|100000|
|     190|       469.0|100000|
+--------+------------+------+
only showing top 20 rows

Execution Time (secs): 97.03834915161133


## Experiment 4.1: Change compression codec to: zstd
- spark.io.compression.codec = lz4 -> zstd

In [18]:
# Run the job with default config
data_size_label = data_size_labels.get(DEFAULT_DATA_SIZE, DEFAULT_DATA_SIZE)
app_name = f'Experiment 4.1: compression=zstd (data_size={data_size_label})'
spark: SparkSession = SparkSession.builder \
    .master("local[*]") \
    .appName(app_name) \
    .config("spark.reducer.maxSizeInFlight", "48m") \
    .config("spark.shuffle.compress", "true") \
    .config("spark.io.compression.codec", "zstd") \
    .config("spark.shuffle.io.preferDirectBufs", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.JavaSerializer") \
    .config("spark.shuffle.file.buffer", "32k") \
    .config("spark.shuffle.spill.compress", "true")\
    .config("spark.eventLog.compress", "true")\
    .config("spark.eventLog.enabled", "false")\
    .config("spark.eventLog.dir", "/tmp/spark-events")\
    .getOrCreate()
run_large_spark_job(spark=spark, data_size=DEFAULT_DATA_SIZE, app_name=app_name)

Running spark job for Experiment 4.1: compression=zstd (data_size=small)


                                                                                

+--------+------------+------+
|group_id|avg_subgroup| count|
+--------+------------+------+
|       0|       450.0|100000|
|      10|       451.0|100000|
|      20|       452.0|100000|
|      30|       453.0|100000|
|      40|       454.0|100000|
|      50|       455.0|100000|
|      60|       456.0|100000|
|      70|       457.0|100000|
|      80|       458.0|100000|
|      90|       459.0|100000|
|     100|       460.0|100000|
|     110|       461.0|100000|
|     120|       462.0|100000|
|     130|       463.0|100000|
|     140|       464.0|100000|
|     150|       465.0|100000|
|     160|       466.0|100000|
|     170|       467.0|100000|
|     180|       468.0|100000|
|     190|       469.0|100000|
+--------+------------+------+
only showing top 20 rows

Execution Time (secs): 100.85207486152649


## Experiment 4.2: Change compression codec to: snappy
- spark.io.compression.codec = lz4 -> snappy

In [19]:
# Run the job with default config
data_size_label = data_size_labels.get(DEFAULT_DATA_SIZE, DEFAULT_DATA_SIZE)
app_name = f'Experiment 4.2: compression=snappy (data_size={data_size_label})'
spark: SparkSession = SparkSession.builder \
    .master("local[*]") \
    .appName(app_name) \
    .config("spark.reducer.maxSizeInFlight", "48m") \
    .config("spark.shuffle.compress", "true") \
    .config("spark.io.compression.codec", "snappy") \
    .config("spark.shuffle.io.preferDirectBufs", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.JavaSerializer") \
    .config("spark.shuffle.file.buffer", "32k") \
    .config("spark.shuffle.spill.compress", "true")\
    .config("spark.eventLog.compress", "true")\
    .config("spark.eventLog.enabled", "false")\
    .config("spark.eventLog.dir", "/tmp/spark-events")\
    .getOrCreate()
run_large_spark_job(spark=spark, data_size=DEFAULT_DATA_SIZE, app_name=app_name)

Running spark job for Experiment 4.2: compression=snappy (data_size=small)


                                                                                

+--------+------------+------+
|group_id|avg_subgroup| count|
+--------+------------+------+
|       0|       450.0|100000|
|      10|       451.0|100000|
|      20|       452.0|100000|
|      30|       453.0|100000|
|      40|       454.0|100000|
|      50|       455.0|100000|
|      60|       456.0|100000|
|      70|       457.0|100000|
|      80|       458.0|100000|
|      90|       459.0|100000|
|     100|       460.0|100000|
|     110|       461.0|100000|
|     120|       462.0|100000|
|     130|       463.0|100000|
|     140|       464.0|100000|
|     150|       465.0|100000|
|     160|       466.0|100000|
|     170|       467.0|100000|
|     180|       468.0|100000|
|     190|       469.0|100000|
+--------+------------+------+
only showing top 20 rows

Execution Time (secs): 103.56076312065125


## Experiment 4.3: Change compression codec to: lzf
- spark.io.compression.codec = lz4 -> lzf

In [20]:
# Run the job with default config
data_size_label = data_size_labels.get(DEFAULT_DATA_SIZE, DEFAULT_DATA_SIZE)
app_name = f'Experiment 4.3: compression=lzf (data_size={data_size_label})'
spark: SparkSession = SparkSession.builder \
    .master("local[*]") \
    .appName(app_name) \
    .config("spark.reducer.maxSizeInFlight", "48m") \
    .config("spark.shuffle.compress", "true") \
    .config("spark.io.compression.codec", "lzf") \
    .config("spark.shuffle.io.preferDirectBufs", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.JavaSerializer") \
    .config("spark.shuffle.file.buffer", "32k") \
    .config("spark.shuffle.spill.compress", "true")\
    .config("spark.eventLog.compress", "true")\
    .config("spark.eventLog.enabled", "false")\
    .config("spark.eventLog.dir", "/tmp/spark-events")\
    .getOrCreate()
run_large_spark_job(spark=spark, data_size=DEFAULT_DATA_SIZE, app_name=app_name)

Running spark job for Experiment 4.3: compression=lzf (data_size=small)


                                                                                

+--------+------------+------+
|group_id|avg_subgroup| count|
+--------+------------+------+
|       0|       450.0|100000|
|      10|       451.0|100000|
|      20|       452.0|100000|
|      30|       453.0|100000|
|      40|       454.0|100000|
|      50|       455.0|100000|
|      60|       456.0|100000|
|      70|       457.0|100000|
|      80|       458.0|100000|
|      90|       459.0|100000|
|     100|       460.0|100000|
|     110|       461.0|100000|
|     120|       462.0|100000|
|     130|       463.0|100000|
|     140|       464.0|100000|
|     150|       465.0|100000|
|     160|       466.0|100000|
|     170|       467.0|100000|
|     180|       468.0|100000|
|     190|       469.0|100000|
+--------+------------+------+
only showing top 20 rows

Execution Time (secs): 100.27239227294922


## Final Optimization

TODO: Figure out the final optimization

In [None]:
# # Run the job with default config
# data_size_label = data_size_labels.get(DEFAULT_DATA_SIZE, DEFAULT_DATA_SIZE)
# app_name = f'Final Optimization (data_size={data_size_label})'
# spark: SparkSession = SparkSession.builder \
#     .master("local[*]") \
#     .appName(app_name) \
#     .config("spark.reducer.maxSizeInFlight", "48m") \
#     .config("spark.shuffle.compress", "false") \
#     .config("spark.io.compression.codec", "zstd") \
#     .config("spark.shuffle.io.preferDirectBufs", "true") \
#     .config("spark.serializer", "org.apache.spark.serializer.KyroSerializer") \
#     .config("spark.shuffle.file.buffer", "64k") \
#     .config("spark.shuffle.spill.compress", "true")\
#     .config("spark.eventLog.compress", "true")\
#     .config("spark.eventLog.enabled", "true")\
#     .config("spark.eventLog.dir", "/tmp/spark-events")\
#     .getOrCreate()
# run_large_spark_job(spark=spark, data_size=DEFAULT_DATA_SIZE, app_name=app_name)