# Notebook to analyse the cleaned cluster trace

## Imports

In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

## create DataFrame from cleaned cluster trace Alibaba 2018
- the data was cleaned on the cluster on 12.01.2022
- its a join of batch task (100%) and batch instance (30%) with a stratified sample of 0.0015% success and 100% failed instances

In [2]:
batch_jobs_clean_path = "../../out/clean/batch_jobs_clean_03inst_1task_00015S_1F/*.csv.gz"

master = "local[2]"
app_name = "analyse clean Alibaba 2018 cluster trace"
config = SparkConf().setAll([
    ('spark.driver.memory', '3g')
])
spark_session = SparkSession.builder\
    .master(master)\
    .appName(app_name)\
    .config(conf=config)\
    .getOrCreate()

batch_jobs_clean = spark_session.read.csv(path=batch_jobs_clean_path, header=True, inferSchema=True)

22/01/18 15:47:40 WARN Utils: Your hostname, felix-Surface-Book resolves to a loopback address: 127.0.1.1; using 192.168.0.4 instead (on interface wlp3s0)
22/01/18 15:47:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/01/18 15:47:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

## Simple analysis

In [3]:
batch_jobs_clean.count()

                                                                                

1246728

In [4]:
batch_jobs_clean.dtypes

[('task_name', 'string'),
 ('job_name', 'string'),
 ('task_id_num', 'int'),
 ('instance_num', 'int'),
 ('task_type', 'int'),
 ('status', 'string'),
 ('start_time', 'int'),
 ('end_time', 'int'),
 ('plan_cpu', 'double'),
 ('plan_mem', 'double'),
 ('map_reduce', 'string'),
 ('earliest', 'int'),
 ('sched_intv', 'int'),
 ('job_exec', 'int'),
 ('logical_job_name', 'string'),
 ('latest', 'int'),
 ('task_duration', 'int'),
 ('tts_task', 'int'),
 ('mtts_task', 'double'),
 ('ttf_task', 'int'),
 ('ttr_task', 'int'),
 ('reduce_checkpoint', 'int'),
 ('second_quant_checkpoint', 'int'),
 ('third_quant_checkpoint', 'int'),
 ('instance_name', 'string'),
 ('instance_task_type', 'int'),
 ('instance_status', 'string'),
 ('instance_start_time', 'int'),
 ('instance_end_time', 'int'),
 ('machine_id', 'string'),
 ('seq_no', 'int'),
 ('total_seq_no', 'int'),
 ('cpu_avg', 'double'),
 ('cpu_max', 'double'),
 ('mem_avg', 'double'),
 ('mem_max', 'double'),
 ('labels', 'int')]

In [5]:
len(batch_jobs_clean.dtypes)

37

In [6]:
num_cols = [x[0] for x in batch_jobs_clean.dtypes if x[1] == 'int' or x[1] == 'double']
num_cols

['task_id_num',
 'instance_num',
 'task_type',
 'start_time',
 'end_time',
 'plan_cpu',
 'plan_mem',
 'earliest',
 'sched_intv',
 'job_exec',
 'latest',
 'task_duration',
 'tts_task',
 'mtts_task',
 'ttf_task',
 'ttr_task',
 'reduce_checkpoint',
 'second_quant_checkpoint',
 'third_quant_checkpoint',
 'instance_task_type',
 'instance_start_time',
 'instance_end_time',
 'seq_no',
 'total_seq_no',
 'cpu_avg',
 'cpu_max',
 'mem_avg',
 'mem_max',
 'labels']

In [7]:
len(num_cols)

29

In [8]:
# time related
time_cols = ["start_time", "instance_start_time", "instance_end_time", "end_time", "earliest", "latest"]
summary_cols = [x for x in num_cols if x not in time_cols]

In [17]:
# taken from GBTCVTaskFailBinPred.scala
columns = ["instance_num",  # number of instances for the task
           "task_type",  # values 1-12, meaning unknown (ordinal categ. feature)
           "plan_cpu",  # number of cpus needed by the task, 100 is 1 core
           "plan_mem",  # normalized memory size, [0, 100]
           # from batch instances
           "seq_no",
           "labels",
           # custom fields I added in trace.py
           #"map_reduce",
           # whether this task is a map "m" or reduce "r" operation (nominal categ. feature) -> 1hot encode
           "sched_intv",  # the schedule interval (in ?) if this value is set, its a recurring job (batch job)
           "job_exec",  # the number of execution of this task
           #"logical_job_name"
        ]

In [19]:
summary = batch_jobs_clean.select(columns).summary("count", "mean", "50%", "stddev", "min", "max")
# summary = batch_jobs_clean.select(summary_cols).summary("count", "mean", "50%", "stddev", "min", "max")
for c in columns:
    summary = summary.withColumn(c, F.round(c, 2))
summary = summary.toPandas().T
summary

                                                                                

Unnamed: 0,0,1,2,3,4,5
summary,count,mean,50%,stddev,min,max
instance_num,1246728.0,18734.01,6579.0,23910.69,1.0,99583.0
task_type,1246728.0,1.61,1.0,2.15,1.0,12.0
plan_cpu,1246173.0,94.41,100.0,43.58,5.0,1000.0
plan_mem,1246173.0,0.37,0.3,0.26,0.02,17.17
seq_no,1246728.0,1.15,1.0,0.55,1.0,11.0
labels,1246728.0,0.51,1.0,0.5,0.0,1.0
sched_intv,1246268.0,845.7,1.0,8048.98,0.0,570959.0
job_exec,1245468.0,114417.86,54134.0,132744.1,1.0,566559.0


In [20]:
print(summary.to_latex())

\begin{tabular}{lllllll}
\toprule
{} &          0 &          1 &        2 &         3 &     4 &         5 \\
\midrule
summary      &      count &       mean &      50\% &    stddev &   min &       max \\
instance\_num &  1246728.0 &   18734.01 &   6579.0 &  23910.69 &   1.0 &   99583.0 \\
task\_type    &  1246728.0 &       1.61 &      1.0 &      2.15 &   1.0 &      12.0 \\
plan\_cpu     &  1246173.0 &      94.41 &    100.0 &     43.58 &   5.0 &    1000.0 \\
plan\_mem     &  1246173.0 &       0.37 &      0.3 &      0.26 &  0.02 &     17.17 \\
seq\_no       &  1246728.0 &       1.15 &      1.0 &      0.55 &   1.0 &      11.0 \\
labels       &  1246728.0 &       0.51 &      1.0 &       0.5 &   0.0 &       1.0 \\
sched\_intv   &  1246268.0 &      845.7 &      1.0 &   8048.98 &   0.0 &  570959.0 \\
job\_exec     &  1245468.0 &  114417.86 &  54134.0 &  132744.1 &   1.0 &  566559.0 \\
\bottomrule
\end{tabular}



## Investigate negative `task_duration`

In [21]:
neg_dur = batch_jobs_clean.filter(
    (F.col("task_duration") < 0) &
    (F.col("task_id_num").isNotNull())  # the majority is non-batch tasks, should they be included?
)

In [22]:
neg_dur.count()

                                                                                

74

In [23]:
neg_dur.groupby("status").count().show()

                                                                                

+----------+-----+
|    status|count|
+----------+-----+
|Terminated|   74|
+----------+-----+



In [10]:
neg_dur.filter(F.col("start_time") < 0).count()

                                                                                

0

In [25]:
neg_dur.select("job_name", "status", "start_time", "end_time", "task_duration").show(10)

+---------+----------+----------+--------+-------------+
| job_name|    status|start_time|end_time|task_duration|
+---------+----------+----------+--------+-------------+
| j_154465|Terminated|    425538|  425530|           -8|
| j_154465|Terminated|    425538|  425530|           -8|
| j_154465|Terminated|    425538|  425530|           -8|
| j_154465|Terminated|    425538|  425530|           -8|
| j_154465|Terminated|    425538|  425530|           -8|
| j_154465|Terminated|    425538|  425530|           -8|
| j_154465|Terminated|    425538|  425530|           -8|
| j_731222|Terminated|    645086|  645085|           -1|
|j_2482384|Terminated|    296578|  296576|           -2|
|j_2482384|Terminated|    296578|  296576|           -2|
+---------+----------+----------+--------+-------------+
only showing top 10 rows



## Map and Reduce Distribution

In [26]:
batch_jobs_clean.groupby("map_reduce").count().show()

                                                                                

+----------+-------+
|map_reduce|  count|
+----------+-------+
|         m|1131166|
|         r| 115562|
+----------+-------+



In [27]:
mr_ratio = 115562 / 1131166
print(mr_ratio)

0.10216184008359516


## Labels distribution

In [28]:
# ratio of almost 1
batch_jobs_clean.groupby("labels").count().show()

                                                                                

+------+------+
|labels| count|
+------+------+
|     1|639013|
|     0|607715|
+------+------+



In [29]:
# batch_jobs_clean_01inst_1task_05S_1F
labels_ratio = 213090 / 67456240
print(labels_ratio)

0.0031589368159268883


In [30]:
# batch_jobs_clean_01inst_1task_0006S_1F
labels_ratio = 213023 / 808168
print(labels_ratio)

0.26358752140644026


In [31]:
# batch_jobs_clean_03inst_1task_00015S_1F
labels_ratio = 607715 / 639013
print(labels_ratio)

0.9510213407238977


In [32]:
batch_jobs_clean.groupby("status").count().show()


                                                                                

+----------+------+
|    status| count|
+----------+------+
|Terminated|686431|
|   Running| 58512|
|    Failed|501785|
+----------+------+



In [33]:
batch_jobs_clean.groupby("instance_status").count().show()

                                                                                

+---------------+------+
|instance_status| count|
+---------------+------+
|     Terminated|602546|
|          Ready|     1|
|        Running|  5117|
|         Failed|639013|
|    Interrupted|    51|
+---------------+------+



In [34]:
batch_jobs_clean.groupby("instance_status", "labels").count().show()

                                                                                

+---------------+------+------+
|instance_status|labels| count|
+---------------+------+------+
|          Ready|     0|     1|
|    Interrupted|     0|    51|
|     Terminated|     0|602546|
|         Failed|     1|639013|
|        Running|     0|  5117|
+---------------+------+------+



## Maximum number of categories per categorical features

In [36]:
categ_feat = ["map_reduce", "logical_job_name"]
batch_jobs_clean.select(F.approx_count_distinct("map_reduce").alias("dist_mr"), F.approx_count_distinct("logical_job_name").alias("dist_ljn")).show()

AttributeError: 'NoneType' object has no attribute '_jvm'

In [35]:
spark_session.sparkContext.stop()