In [1]:
import csv
import time
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, count, max, avg, explode, split, length, lower, struct, row_number, desc, countDistinct

# Initialization
spark_master = "sparkmaster"
data_path = "hdfs://192.168.2.157:9000/user/ubuntu/input/corpus-webis-tldr-17.json"
results_path = "performance_test_3_workers_swarm.csv"

# Functions
def create_spark_session(worker_count):
    """Initialize Spark session."""
    return SparkSession.builder\
        .master(f"spark://{spark_master}:7077")\
        .appName(f"PerformanceTest{worker_count}Workers")\
        .config("spark.executor.instances", str(worker_count))\
        .config("spark.executor.cores", "2")\
        .config("spark.executor.memory", "2g")\
        .config("spark.driver.port", 9999)\
        .config("spark.blockManager.port", 10005)\
        .getOrCreate()

def log_performance(test_id, task, duration):
    """Log performance results to CSV."""
    with open(results_path, 'a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow([test_id, task, f"{duration:.2f}"])

def measure_time(func):
    """Measure execution time of a function."""
    start_time = time.time()
    result = func()
    duration = time.time() - start_time
    return duration, result

def run_tasks(spark, worker_count):
    """Run tasks and measure performance."""
    df = spark.read.json(data_path).cache()
    df.count()  # Trigger cache

    # Define tasks within this function to ensure access to the DataFrame
    tasks = {
        'aggregate_subreddit_stats': lambda: df.groupBy('subreddit').agg(count("*").alias("count"), max("content_len"), avg("summary_len")).count(),
        'agg': lambda: df.groupBy('subreddit').agg(count("*").alias("count"), max("content_len"), avg("summary_len")).count(),
        'join': lambda: df.alias("df1").join(df.alias("df2"), col("df1.id") == col("df2.id")).count(),
        'complex_transformation': lambda: df.withColumn("new_content_len", col("content_len") * 2).groupBy('subreddit').agg(avg("new_content_len")).collect(),
        'window_function': lambda: df.withColumn("rank", row_number().over(Window.partitionBy("subreddit").orderBy(desc("content_len")))).filter(col("rank") <= 10).count(),
        'explode_split': lambda: df.withColumn("words", explode(split(col("body"), " "))).groupBy("words").count().orderBy(desc("count")).limit(10).collect(),
        'avg_summary_by_author': lambda: df.groupBy('author').agg(avg('summary_len').alias('avg_summary_length')).count(),
        'distinct_subreddit_count': lambda: df.select('subreddit').distinct().count(),
        'max_summary_length_per_subreddit': lambda: df.groupBy('subreddit').agg(max('summary_len').alias('max_summary_length')).count(),
        'top_title_per_subreddit': lambda: df.withColumn("title_length", length(col("title"))).groupBy('subreddit').agg(max(struct(col('title_length'), col('title'))).alias('top_title')).count(),
        'word_count_in_titles': lambda: df.withColumn("word", explode(split(lower(col("title")), "\\s+"))).groupBy('word').count().orderBy(desc('count')).limit(10).collect(),
        # 'self_join_shuffle': lambda: df.alias("df1").join(df.alias("df2"), col("df1.subreddit") == col("df2.subreddit")).agg(count("*")).collect(),
        'cross_join': lambda: df.crossJoin(df.limit(10)).agg(count("*")).collect(),  
        'complex_aggregation': lambda: df.groupBy('subreddit').agg(count("*"), avg("summary_len"), max("content_len"), countDistinct("author")).collect(),
        'sort_by_large_dataset': lambda: df.orderBy(desc("summary_len")).limit(1000).collect(),
    }

    for task_name, task_func in tasks.items():
        duration, _ = measure_time(task_func)
        print(f"Task {task_name} with {worker_count} workers completed in {duration:.2f} seconds.")
        log_performance(f"{worker_count}_workers", task_name, duration)


In [2]:
# Main
if __name__ == "__main__":
    # Setup CSV for results
    with open(results_path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['TestID', 'Task', 'Duration'])
    
# Run tasks for 3 workers
worker_counts = 3

spark_session = create_spark_session(worker_counts)
run_tasks(spark_session, worker_counts)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/15 17:01:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Task aggregate_subreddit_stats with 3 workers completed in 125.73 seconds.


                                                                                

Task agg with 3 workers completed in 118.70 seconds.


                                                                                

Task join with 3 workers completed in 217.56 seconds.


                                                                                

Task complex_transformation with 3 workers completed in 123.86 seconds.


                                                                                

Task window_function with 3 workers completed in 126.02 seconds.


                                                                                

Task explode_split with 3 workers completed in 267.57 seconds.


                                                                                

Task avg_summary_by_author with 3 workers completed in 131.73 seconds.


                                                                                

Task distinct_subreddit_count with 3 workers completed in 128.73 seconds.


                                                                                

Task max_summary_length_per_subreddit with 3 workers completed in 173.71 seconds.


                                                                                

Task top_title_per_subreddit with 3 workers completed in 153.65 seconds.


                                                                                

Task word_count_in_titles with 3 workers completed in 112.40 seconds.


                                                                                

Task cross_join with 3 workers completed in 213.19 seconds.


                                                                                

Task complex_aggregation with 3 workers completed in 183.65 seconds.


24/03/15 17:44:18 ERROR TaskSetManager: Total size of serialized results of 102 tasks (1025.0 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
24/03/15 17:44:18 WARN TaskSetManager: Lost task 96.0 in stage 77.0 (TID 2845) (10.0.1.6 executor 2): TaskKilled (Tasks result size has exceeded maxResultSize)
24/03/15 17:44:19 WARN TaskSetManager: Lost task 104.0 in stage 77.0 (TID 2847) (10.0.1.16 executor 1): TaskKilled (Stage cancelled: Job aborted due to stage failure: Total size of serialized results of 102 tasks (1025.0 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB))
24/03/15 17:44:19 WARN TaskSetManager: Lost task 97.0 in stage 77.0 (TID 2846) (10.0.1.6 executor 2): TaskKilled (Stage cancelled: Job aborted due to stage failure: Total size of serialized results of 102 tasks (1025.0 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB))
24/03/15 17:44:20 WARN TaskSetManager: Lost task 112.0 in stage 77.0 (TID 2849) (10.0.1.14 executor 0): TaskKilled (Stage ca

Py4JJavaError: An error occurred while calling o208.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 102 tasks (1025.0 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2493)
	at org.apache.spark.rdd.RDD.$anonfun$reduce$1(RDD.scala:1139)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1121)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$1(RDD.scala:1568)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1555)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.$anonfun$executeCollect$1(limit.scala:291)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:285)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4148)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4145)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)


24/03/15 17:44:20 WARN TaskSetManager: Lost task 108.0 in stage 77.0 (TID 2848) (10.0.1.14 executor 0): TaskKilled (Stage cancelled: Job aborted due to stage failure: Total size of serialized results of 102 tasks (1025.0 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB))
24/03/15 17:44:21 WARN TaskSetManager: Lost task 101.0 in stage 77.0 (TID 2851) (10.0.1.6 executor 2): TaskKilled (Stage cancelled: Job aborted due to stage failure: Total size of serialized results of 102 tasks (1025.0 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB))
24/03/15 17:44:22 WARN TaskSetManager: Lost task 109.0 in stage 77.0 (TID 2850) (10.0.1.16 executor 1): TaskKilled (Stage cancelled: Job aborted due to stage failure: Total size of serialized results of 102 tasks (1025.0 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB))


In [None]:
spark_session.stop()