# Gradient Boosted Trees (GBT) & Analytics Workload Experiment setup

- Date: 25.09.2021
- Execution: DOS ARM Spark K8s cluster
- Checkpoint Interval: every 2 iterations
- Only the Analytics workloads wrote files on HDFS


| **Workload App**         | **Checkpoint**     | **Input size** | **Drivers**         | **Driver Cores**     |**Driver Memory** |**Executors** |**Executors Cores** |**Executors Memory** |**Iterations** | **Checkpoint size** |
|--------------|-----------|------------|--------------|-----------|------------|------------|------------|------------|------------|------------|
| gbt-9000000-10 | NO      | 35 GB        | 1 | 4      | 4 GB        | 10        | 4        | 8 GB    |5        | / |
| gbt-9000000-10-checkpoint      | YES  | 35 GB       | 1      | 4  | 4 GB       | 10        | 4        | 8 GB | 5 | 0  |
| analytics-1gb      | NO  | 30 GB       | 1      | 4  | 4 GB       | 10       | 4        | 8 GB | 5  | /  |
| analytics-1gb-checkpoint | YES  | 30 GB       | 1      | 4  | 4 GB       | 10       | 4        | 8 GB | 5 | 40GB


# 1. Analysis

## 1.1 Data preparation

In [81]:
from pyspark.sql import types as T
from pyspark.sql import functions as F
from pyspark.sql import DataFrame

In [82]:
def cast(gbt_df: DataFrame) -> DataFrame:
    # cast numeric and datetime columns
    datetime_pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSz"

    gbt_df = gbt_df.select(
        "*",
        gbt_df.taskIndex.cast(T.IntegerType()).alias("taskIndex_"),
        gbt_df.rddId.cast(T.IntegerType()).alias("rddId_"),
        gbt_df.taskAttempt.cast(T.IntegerType()).alias("taskAttempt_"),
        gbt_df.taskDuration.cast(T.FloatType()).alias("taskDuration_"),
        gbt_df.taskExecutorId.cast(T.IntegerType()).alias("taskExecutorId_"),
        gbt_df.tcMs.cast(T.FloatType()).alias("tcMs_"),
        F.to_timestamp(gbt_df.submissionTime, datetime_pattern).alias("submissionTime_"),
        F.to_timestamp(gbt_df.firstTaskLaunchedTime, datetime_pattern).alias("firstTaskLaunchedTime_"),
        F.to_timestamp(gbt_df.completionTime, datetime_pattern).alias("completionTime_"),
        F.to_timestamp(gbt_df.taskLaunchTime, datetime_pattern).alias("taskLaunchTime_")
    ).drop("taskIndex", "rddId", "taskId", "tcMs",
           "taskAttempt", "taskDuration", "taskExecutorId", "submissionTime",
           "firstTaskLaunchedTime", "completionTime", "taskLaunchTime")
    
    # rename columns
    new_column_names = [c.rstrip("_") for c in gbt_df.columns]
    gbt_df = gbt_df.toDF(*new_column_names)

    gbt_df = gbt_df.withColumnRenamed("_c0", "id")
    
    return gbt_df

In [83]:
gbt_df = spark.read.csv(
    path="../output/cluster/GradientBoostedTrees/spark-62411c71565b4cf8b0e316c07c8a4d5f_normal.csv",
    sep=",",
    header=True,
    inferSchema=True
)

In [84]:
gbt_check_df = spark.read.csv(
    path="../output/cluster/GradientBoostedTrees/spark-6f87c902a05c4a52b460bc8555ca42a0_checkpoint.csv",
    sep=",",
    header=True,
    inferSchema=True
)

## 1.2 join each 5 execution of the `Analytics` workload DataFrames together 

In [85]:
first_analytics_run_path = "../output/cluster/Analytics/spark-9078bc6ddb3a4c0b913072d827c939bc_normal.csv"
# key: the sequence of execution, value: the path of the output of run_experiments.py
analytics_df_runs = {
    "2": "../output/cluster/Analytics/spark-827da77e3d5946b395e7359bb0534f22_normal.csv",
    "3": "../output/cluster/Analytics/spark-ab3aa599abf141568bed1c53aee2f842_normal.csv",
    "4": "../output/cluster/Analytics/spark-f9330f93633948d195315dbeba6313f2_normal.csv",
    "5": "../output/cluster/Analytics/spark-af529ab0a80b48168d3c95c60bf7bca7_normal.csv"
}
analytics_df = spark.read.csv(
    path=first_analytics_run_path,
    sep=",",
    header=True,
    inferSchema=True
)

analytics_df = cast(analytics_df)

rows_before_join = analytics_df.count()
print(f"rows_before_join: {rows_before_join}")

for key, value in analytics_df_runs.items():
    df_inc = spark.read.csv(
        path=value,
        sep=",",
        header=True,
        inferSchema=True
    )
    
    df_inc = cast(df_inc)
    
    # add suffix to the incrementing dataframe
    df_inc = df_inc.select([F.col(c).alias(f"{c}_{key}") for c in df_inc.columns])
    
    # add join column
    df_inc = df_inc.withColumn("id", F.monotonically_increasing_id())
    
    # of these columns, we use the mean of the 5 runs
    mean_cols = [
        "id",
        df_inc.columns[6],  # numCompleteTasks
        df_inc.columns[7],  # numFailedTasks
        df_inc.columns[8],  # numKilledTasks
        df_inc.columns[13],  # taskAttempt
        df_inc.columns[14],  # taskDuration
        df_inc.columns[16]  # tcMs
    ]
    
    analytics_df = analytics_df.join(
        df_inc.select(mean_cols),
        on="id",
        how="inner"
    )

rows_after_join = analytics_df.count()
print(f"rows_after_join: {rows_after_join}")

assert rows_after_join == rows_before_join

rows_before_join: 1392
rows_after_join: 1392


In [87]:
first_run_path = "../output/cluster/Analytics/spark-d493e730d6be481896910ff2a003db4e_checkpoint.csv"
# key: the sequence of execution, value: the path of the output of run_experiments.py
analytics_check_df_runs = {
    "2": "../output/cluster/Analytics/spark-86ce2033320d452ebfb4c69e0d5aaaad_checkpoint.csv",
    "3": "../output/cluster/Analytics/spark-581a0b09967648cca77d0084ed25af2f_checkpoint.csv",
    "4": "../output/cluster/Analytics/spark-83e529840d2c4c0eb550105373fed434_checkpoint.csv",
    "5": "../output/cluster/Analytics/spark-4c736126ec9a4b72a96a76c1155cd03e_checkpoint.csv"
}
analytics_check_df = spark.read.csv(
    path=first_run_path,
    sep=",",
    header=True,
    inferSchema=True
)

analytics_check_df = cast(analytics_check_df)

rows_before_join = analytics_check_df.count()
print(f"rows_before_join: {rows_before_join}")

for key, value in analytics_check_df_runs.items():
    df = spark.read.csv(
        path=value,
        sep=",",
        header=True,
        inferSchema=True
    )
    
    df = cast(df)
    
    
    # add suffix to the incrementing dataframe
    df = df.select([F.col(c).alias(f"{c}_{key}") for c in df.columns])
    
    # add join column
    df = df.withColumn("id", F.monotonically_increasing_id())
    
    # of these columns, we use the mean of the 5 runs
    mean_cols = [
        "id",
        df.columns[6],  # numCompleteTasks
        df.columns[7],  # numFailedTasks
        df.columns[8],  # numKilledTasks
        df.columns[13],  # taskAttempt
        df.columns[14],  # taskDuration
        df.columns[16]  # tcMs
    ]
    
    analytics_check_df = analytics_check_df.join(
        df.select(mean_cols),
        on="id",
        how="inner"
    )


rows_after_join = analytics_check_df.count()
print(f"rows_after_join: {rows_after_join}")

assert rows_after_join == rows_before_join

rows_before_join: 10152
rows_after_join: 10152


In [88]:
analytics_check_df

DataFrame[id: int, status: string, stageId: int, attemptId: int, numTasks: int, numActiveTasks: int, numCompleteTasks: int, numFailedTasks: int, numKilledTasks: int, name: string, rddIds: string, taskIndex: int, rddId: int, taskAttempt: int, taskDuration: float, taskExecutorId: int, tcMs: float, submissionTime: timestamp, firstTaskLaunchedTime: timestamp, completionTime: timestamp, taskLaunchTime: timestamp, numCompleteTasks_2: int, numFailedTasks_2: int, numKilledTasks_2: int, taskAttempt_2: int, taskDuration_2: float, tcMs_2: float, numCompleteTasks_3: int, numFailedTasks_3: int, numKilledTasks_3: int, taskAttempt_3: int, taskDuration_3: float, tcMs_3: float, numCompleteTasks_4: int, numFailedTasks_4: int, numKilledTasks_4: int, taskAttempt_4: int, taskDuration_4: float, tcMs_4: float, numCompleteTasks_5: int, numFailedTasks_5: int, numKilledTasks_5: int, taskAttempt_5: int, taskDuration_5: float, tcMs_5: float]

## 1.3 calculate the mean of the values that were collected over all 5 runs in the `Analytics` workload

In [89]:
analytics_df = analytics_df.withColumn(
    "meanNumCompleteTasks",
    (F.col("numCompleteTasks") + F.col("numCompleteTasks_2") + F.col("numCompleteTasks_3") + F.col("numCompleteTasks_4") + F.col("numCompleteTasks_5") / 5)
)

analytics_df = analytics_df.withColumn(
    "meanNumFailedTasks",
    (F.col("numFailedTasks") + F.col("numFailedTasks_2") + F.col("numFailedTasks_3") + F.col("numFailedTasks_4") + F.col("numFailedTasks_5") / 5)
)

analytics_df = analytics_df.withColumn(
    "meanNumKilledTasks",
    (F.col("numKilledTasks") + F.col("numKilledTasks_2") + F.col("numKilledTasks_3") + F.col("numKilledTasks_4") + F.col("numKilledTasks_5") / 5)
)

analytics_df = analytics_df.withColumn(
    "meanTaskAttempt",
    (F.col("taskAttempt") + F.col("taskAttempt_2") + F.col("taskAttempt_3") + F.col("taskAttempt_4") + F.col("taskAttempt_5") / 5)
)

analytics_df = analytics_df.withColumn(
    "meanTaskAttempt",
    (F.col("taskAttempt") + F.col("taskAttempt_2") + F.col("taskAttempt_3") + F.col("taskAttempt_4") + F.col("taskAttempt_5") / 5)
)

analytics_df = analytics_df.withColumn(
    "meanTaskDuration",
    (F.col("taskDuration") + F.col("taskDuration_2") + F.col("taskDuration_3") + F.col("taskDuration_4") + F.col("taskDuration_5") / 5)
)

analytics_df = analytics_df.withColumn(
    "meanTcMs",
    (F.col("tcMs") + F.col("tcMs_2") + F.col("tcMs_3") + F.col("tcMs_4") + F.col("tcMs_5") / 5)
)

In [90]:
analytics_check_df = analytics_check_df.withColumn(
    "meanNumCompleteTasks",
    (F.col("numCompleteTasks") + F.col("numCompleteTasks_2") + F.col("numCompleteTasks_3") + F.col("numCompleteTasks_4") + F.col("numCompleteTasks_5") / 5)
)

analytics_check_df = analytics_check_df.withColumn(
    "meanNumFailedTasks",
    (F.col("numFailedTasks") + F.col("numFailedTasks_2") + F.col("numFailedTasks_3") + F.col("numFailedTasks_4") + F.col("numFailedTasks_5") / 5)
)

analytics_check_df = analytics_check_df.withColumn(
    "meanNumKilledTasks",
    (F.col("numKilledTasks") + F.col("numKilledTasks_2") + F.col("numKilledTasks_3") + F.col("numKilledTasks_4") + F.col("numKilledTasks_5") / 5)
)

analytics_check_df = analytics_check_df.withColumn(
    "meanTaskAttempt",
    (F.col("taskAttempt") + F.col("taskAttempt_2") + F.col("taskAttempt_3") + F.col("taskAttempt_4") + F.col("taskAttempt_5") / 5)
)

analytics_check_df = analytics_check_df.withColumn(
    "meanTaskAttempt",
    (F.col("taskAttempt") + F.col("taskAttempt_2") + F.col("taskAttempt_3") + F.col("taskAttempt_4") + F.col("taskAttempt_5") / 5)
)

analytics_check_df = analytics_check_df.withColumn(
    "meanTaskDuration",
    (F.col("taskDuration") + F.col("taskDuration_2") + F.col("taskDuration_3") + F.col("taskDuration_4") + F.col("taskDuration_5") / 5)
)

analytics_check_df = analytics_check_df.withColumn(
    "meanTcMs",
    (F.col("tcMs") + F.col("tcMs_2") + F.col("tcMs_3") + F.col("tcMs_4") + F.col("tcMs_5") / 5)
)

In [None]:
gbt_df = cast(gbt_df)

In [None]:
gbt_df

In [None]:
gbt_check_df = cast(gbt_check_df)

## 1.4 Data summary

### `GBT`
- 16 as maximum for `numCompleteTasks` make sense since 4 executors with 4 cores each can compute maximum 16 tasks in parallel
- 4 as maximum for `numFailedTasks` means that no cores were available anymore, thus the executor failed

In [None]:
gbt_df.select("numCompleteTasks", "numFailedTasks", "numKilledTasks", "taskAttempt", "taskDuration").summary().show()

In [None]:
gbt_check_df.select("numCompleteTasks", "numFailedTasks", "numKilledTasks", "taskAttempt", "taskDuration", "tcMs").summary().show()

### `Analytics`
- higher number of tasks per stage
- no failed or killed tasks
- higher task and checkpoint duration than `GBT`

In [91]:
analytics_df.select("meanNumCompleteTasks", "meanNumFailedTasks", "meanNumKilledTasks", "meanTaskAttempt", "meanTaskDuration", "meanTcMs").summary().show()

+-------+--------------------+------------------+------------------+---------------+----------------+--------+
|summary|meanNumCompleteTasks|meanNumFailedTasks|meanNumKilledTasks|meanTaskAttempt|meanTaskDuration|meanTcMs|
+-------+--------------------+------------------+------------------+---------------+----------------+--------+
|  count|                1392|              1392|              1392|            392|            1392|    1392|
|   mean|   649.5999999999941|               0.0|               0.0|            0.0|             NaN|     NaN|
| stddev|  134.80985058948727|               0.0|               0.0|            0.0|             NaN|     NaN|
|    min|               547.2|               0.0|               0.0|            0.0|         23063.6|     NaN|
|    25%|               547.2|               0.0|               0.0|            0.0|        533834.6|     NaN|
|    50%|               561.6|               0.0|               0.0|            0.0|             NaN|     NaN|
|

In [92]:
analytics_check_df.select("meanNumCompleteTasks", "meanNumFailedTasks", "meanNumKilledTasks", "meanTaskAttempt", "meanTaskDuration", "meanTcMs").summary().show()

+-------+--------------------+------------------+------------------+--------------------+----------------+--------+
|summary|meanNumCompleteTasks|meanNumFailedTasks|meanNumKilledTasks|     meanTaskAttempt|meanTaskDuration|meanTcMs|
+-------+--------------------+------------------+------------------+--------------------+----------------+--------+
|  count|               10152|             10152|             10152|                 696|           10152|   10152|
|   mean|   526.5040189125624|1.3475177304964538|               0.0|0.002873563218390...|             NaN|     NaN|
| stddev|   308.1893605657361|3.7889078664038935|               0.0|0.053567047660431966|             NaN|     NaN|
|    min|                 0.0|               0.0|               0.0|                 0.0|         30546.0|914530.8|
|    25%|               475.2|               0.0|               0.0|                 0.0|             NaN|     NaN|
|    50%|               569.6|               0.0|               0.0|    

In [None]:
gbt_check_df.select("tcMs", "rddId").distinct().sort("rddId").show(10)

In [93]:
analytics_check_df.select("meanTcMs", "rddId").distinct().sort("rddId").show(10)

+--------+-----+
|meanTcMs|rddId|
+--------+-----+
|     NaN| null|
|     NaN|   14|
|914530.8|   14|
+--------+-----+



### *Plot 1*: total App runtime compared

In [None]:
gbt_runtime = gbt_df.select(
    F.min("submissionTime").alias("start"),
    F.max("completionTime").alias("end"),
    (F.max("completionTime").cast(T.LongType()) - F.min("submissionTime").cast(T.LongType())).alias("duration")  # in seconds
).select("duration").collect()[0][0] / 60

analytics_runtime = analytics_df.select(
    F.min("submissionTime").alias("start"),
    F.max("completionTime").alias("end"),
    (F.max("completionTime").cast(T.LongType()) - F.min("submissionTime").cast(T.LongType())).alias("duration")  # in seconds
).select("duration").collect()[0][0] / 60

gbt_check_runtime = gbt_check_df.select(
    F.min("submissionTime").alias("start"),
    F.max("completionTime").alias("end"),
    (F.max("completionTime").cast(T.LongType()) - F.min("submissionTime").cast(T.LongType())).alias("duration")  # in seconds
).select("duration").collect()[0][0] / 60

analytics_check_runtime = analytics_check_df.select(
    F.min("submissionTime").alias("start"),
    F.max("completionTime").alias("end"),
    (F.max("completionTime").cast(T.LongType()) - F.min("submissionTime").cast(T.LongType())).alias("duration")  # in seconds
).select("duration").collect()[0][0] / 60

gbt_distinct_tcms = gbt_check_df.filter(~F.isnan(F.col("tcMs"))).select("tcMs").distinct()
gbt_tcMs_sum_seconds = distinct_tcms.select((F.sum(F.col("tcMs")) / 1000).alias("tcMsSumSeconds")).collect()[0][0]

analytics_distinct_tcms = analytics_check_df.filter(~F.isnan(F.col("tcMs"))).select("tcMs").distinct()
analytics_tcMs_sum_seconds = analytics_distinct_tcms.select((F.sum(F.col("tcMs")) / 1000).alias("tcMsSumSeconds")).collect()[0][0]

In [None]:
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure

font = {'family' : 'arial',
        'weight' : 'bold',
        'size'   : 16}

plt.rc('font', **font)


labels = ['gbt-9000000-10', 'gbt-9000000-10-checkpoint', 'analytics-1gb', 'analytics-1gb-checkpoint']


runtimes = [gbt_runtime, gbt_check_runtime, analytics_runtime, analytics_check_runtime,]

yerrs = [0, (gbt_tcMs_sum_seconds / 60), 0, (analytics_tcMs_sum_seconds / 60) ]


fig, ax = plt.subplots()
fig.set_size_inches(14, 8)

#
ax.bar(labels, runtimes, width, label='App runtime (in minutes)', yerr=yerrs, color=["lightgreen", "green", "lightblue", "blue"])

# annotate the values to the bars
for i,j in zip(labels,runtimes):
    ax.annotate(str(format(j, '.2f')),xy=(i,j))

ax.set_ylabel('Total Spark App runtime (in minutes)')
ax.set_title('Total Spark App runtime compared')

plt.show()

In [None]:
gbt_tcMs_sum_seconds

In [None]:
analytics_tcMs_sum_seconds

### *Plot 2*: GBT `tcMs` over time

In [None]:
gbt_check_submission_time = gbt_check_df.select("submissionTime").toPandas()["submissionTime"]
gbt_tc_ms = gbt_check_df.select("tcMs").toPandas()["tcMs"]

fig = plt.figure(figsize=(14, 8))

plt.plot(gbt_check_submission_time, tc_ms, '-o', color="green")

plt.xlabel('time')
plt.ylabel('checkpoint time (ms)')
plt.title('Evolution of the checkpoint time (ms) over time')

plt.show()

### *Plot 3*: Analytics `tcMs` over time

In [None]:
analytics_check_submission_time = analytics_check_df.select("submissionTime").toPandas()["submissionTime"]
analytics_tc_ms = analytics_check_df.select("tcMs").toPandas()["tcMs"]

fig = plt.figure(figsize=(14, 8))

plt.plot(analytics_check_submission_time, analytics_tc_ms, '-o', color="blue")

plt.xlabel('time')
plt.ylabel('checkpoint time (ms)')
plt.title('Evolution of the checkpoint time (ms) over time')

plt.show()

### *Plot 3*: GBT: Failed and completed tasks per stage over time

In [None]:
gbt_submission_time = gbt_df.select("submissionTime").toPandas()["submissionTime"]

gbt_complete_tasks = gbt_df.select("numCompleteTasks").toPandas()["numCompleteTasks"]
gbt_failed_tasks = gbt_df.select("numFailedTasks").toPandas()["numFailedTasks"]

gbt_check_complete_tasks = gbt_check_df.select("numCompleteTasks").toPandas()["numCompleteTasks"]
gbt_check_failed_tasks = gbt_check_df.select("numFailedTasks").toPandas()["numFailedTasks"]


fig = plt.figure(figsize=(14, 8))

plt.plot(gbt_submission_time, gbt_complete_tasks, color="lightgreen", label="completed tasks")
plt.plot(gbt_submission_time, gbt_failed_tasks, color="red", label="failed tasks")

plt.xlabel('time')
plt.ylabel('#tasks')
plt.title('GBT: Failed and completed tasks per stage over time')
plt.legend()

plt.show()

### *Plot 4*: GBT: Failed and completed tasks per stage over time (with checkpoint)

In [None]:
gbt_check_submission_time = gbt_check_df.select("submissionTime").toPandas()["submissionTime"]

gbt_check_complete_tasks = gbt_check_df.select("numCompleteTasks").toPandas()["numCompleteTasks"]
gbt_check_failed_tasks = gbt_check_df.select("numFailedTasks").toPandas()["numFailedTasks"]


fig = plt.figure(figsize=(14, 8))

plt.plot(gbt_check_submission_time, gbt_check_complete_tasks, color="green", linestyle="dashed", label="completed tasks checkpoint")
plt.plot(gbt_check_submission_time, gbt_check_failed_tasks, color="red", linestyle="dashed", label="failed tasks checkpoint")

plt.xlabel('time')
plt.ylabel('#tasks')
plt.title(' GBT: Failed and completed tasks per stage over time (with checkpoint)')
plt.legend()

plt.show()

### *Plot 6*: Analytics: Failed and completed tasks per stage over time (with checkpoint)
- number of tasks completed grows linearly without any failures

In [None]:
analytics_submission_time = analytics_df.select("submissionTime").toPandas()["submissionTime"]

analytics_complete_tasks = analytics_df.select("numCompleteTasks").toPandas()["numCompleteTasks"]
analytics_failed_tasks = analytics_df.select("numFailedTasks").toPandas()["numFailedTasks"]


fig = plt.figure(figsize=(14, 8))

plt.plot(analytics_submission_time, analytics_complete_tasks, color="blue", linestyle="dashed", label="completed tasks checkpoint")
plt.plot(analytics_submission_time, analytics_failed_tasks, color="red", linestyle="dashed", label="failed tasks checkpoint")

plt.xlabel('time')
plt.ylabel('#tasks')
plt.title(' Analytics: Failed and completed tasks per stage over time')
plt.legend()

plt.show()

### *Plot 7*: Analytics: Failed and completed tasks per stage over time (with checkpoint)

In [None]:
analytics_check_submission_time = analytics_check_df.select("submissionTime").toPandas()["submissionTime"]

analytics_check_complete_tasks = analytics_check_df.select("numCompleteTasks").toPandas()["numCompleteTasks"]
analytics_check_failed_tasks = analytics_check_df.select("numFailedTasks").toPandas()["numFailedTasks"]


fig = plt.figure(figsize=(14, 8))

plt.plot(analytics_check_submission_time, analytics_check_complete_tasks, color="lightblue", linestyle="dashed", label="completed tasks checkpoint")
plt.plot(analytics_check_submission_time, analytics_check_failed_tasks, color="red", linestyle="dashed", label="failed tasks checkpoint")

plt.xlabel('time')
plt.ylabel('#tasks')
plt.title(' Analytics: Failed and completed tasks per stage over time (with checkpoint)')
plt.legend()

plt.show()