In [None]:
import sys
from pyspark import SparkContext, SparkConf
import time
from pyspark.sql import functions as F
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType, BooleanType


In [None]:
#### Driver program

# start spark with 1 worker thread
sc = SparkContext("local[8]")
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)

print("OK")

In [5]:
### combine task_event and task_usage and compare


# Task Events Schema definieren
task_events_schema = StructType([
    StructField("time", IntegerType(), True),
    StructField("missing_info", IntegerType(), True),
    StructField("job_id", IntegerType(), False),
    StructField("task_index", IntegerType(), False),
    StructField("machine_id", IntegerType(), True),
    StructField("event_type", IntegerType(), False),
    StructField("user", StringType(), True),
    StructField("scheduling_class", IntegerType(), True),
    StructField("priority", IntegerType(), False),
    StructField("cpu_request", FloatType(), True),
    StructField("memory_request", FloatType(), True),
    StructField("disk_space_request", FloatType(), True),
    StructField("different_machines_restriction", BooleanType(), True)
])

# Task Usage Schema definieren
task_usage_schema = StructType([
    StructField("start_time", IntegerType(), False),
    StructField("end_time", IntegerType(), False),
    StructField("job_id", IntegerType(), False),
    StructField("task_index", IntegerType(), False),
    StructField("machine_id", IntegerType(), False),
    StructField("cpu_rate", FloatType(), True),
    StructField("canonical_memory_usage", FloatType(), True),
    StructField("assigned_memory_usage", FloatType(), True),
    StructField("unmapped_page_cache", FloatType(), True),
    StructField("total_page_cache", FloatType(), True),
    StructField("maximum_memory_usage", FloatType(), True),
    StructField("disk_io_time", FloatType(), True),
    StructField("local_disk_space_usage", FloatType(), True),
    # ... weitere Felder
])




task_events_paths = [
    "google-dataset/task_events/part-00265-of-00500.csv.gz",
    "google-dataset/task_events/part-00266-of-00500.csv.gz",
    "google-dataset/task_events/part-00267-of-00500.csv.gz",
    "google-dataset/task_events/part-00268-of-00500.csv.gz",
    "google-dataset/task_events/part-00269-of-00500.csv.gz",
]

task_usage_paths = [
    "google-dataset/task_usage/part-00265-of-00500.csv.gz",
    "google-dataset/task_usage/part-00266-of-00500.csv.gz",
    "google-dataset/task_usage/part-00267-of-00500.csv.gz",
    "google-dataset/task_usage/part-00268-of-00500.csv.gz",
    "google-dataset/task_usage/part-00269-of-00500.csv.gz",
]



# load data
df_events = sqlContext.read.csv("google-dataset/task_events/part-*-of-*.csv.gz", 
                           schema=task_events_schema, 
                           header=False)

df_usage = sqlContext.read.csv("google-dataset/task_usage/part-*-of-*.csv.gz",
                          schema=task_usage_schema,
                          header=False)


# only keep valid data remove NULL
task_events = df_events.filter(
    (df_events.cpu_request.isNotNull()) |
    (df_events.memory_request.isNotNull()) |
    (df_events.disk_space_request.isNotNull())
)

task_usage = df_usage.filter(
    (df_usage.cpu_rate.isNotNull()) |
    (df_usage.canonical_memory_usage.isNotNull()) |
    (df_usage.local_disk_space_usage.isNotNull())
)


print("task_events")
task_events.show()

print("task_usage")
task_usage.show()



# Optional: Cachen, weil wir mehrfach darauf zugreifen
task_events.cache()
task_usage.cache()


task_events
+----+------------+---------+----------+----------+----------+--------------------+----------------+--------+-----------+--------------+------------------+------------------------------+
|time|missing_info|   job_id|task_index|machine_id|event_type|                user|scheduling_class|priority|cpu_request|memory_request|disk_space_request|different_machines_restriction|
+----+------------+---------+----------+----------+----------+--------------------+----------------+--------+-----------+--------------+------------------+------------------------------+
|NULL|        NULL|     NULL|        70|  56896039|         1|XXdY557FQk791swgp...|               2|       4|     0.0625|        0.0636|          5.817E-5|                          NULL|
|NULL|        NULL|     NULL|       256|2107285354|         1|XXdY557FQk791swgp...|               2|       4|     0.0625|        0.0636|          5.817E-5|                          NULL|
|NULL|        NULL|515042969|         5| 351621284|  

DataFrame[start_time: int, end_time: int, job_id: int, task_index: int, machine_id: int, cpu_rate: float, canonical_memory_usage: float, assigned_memory_usage: float, unmapped_page_cache: float, total_page_cache: float, maximum_memory_usage: float, disk_io_time: float, local_disk_space_usage: float]

In [6]:

print("events per event-type")
# Lass dir die h채ufigsten Eventtypen anzeigen, um den Eviction-Code zu finden:
task_events.groupBy("event_type").count().orderBy(F.desc("event_type")).show(20)

# Nachdem du im Output den Eviction-Code identifiziert hast, trage ihn hier ein.
# Beispiel: EVICTED_CODE = 2  (BITTE mit echtem Wert ersetzen!)
EVICTED_CODE = 2

# filter evicted tasks
task_eviction_flag = (
    task_events
    .withColumn("is_eviction_event", (F.col("event_type") == EVICTED_CODE).cast("integer"))
    .groupBy("job_id", "task_index")
    .agg(
        F.max("is_eviction_event").alias("was_evicted")  # 1, wenn mindestens ein Eviction-Event
    )
)

print("evicted tasks")
task_eviction_flag.show()

# -------------------------------------------------------------------
# 6. Ressourcennutzung pro Task aggregieren
# -------------------------------------------------------------------
# task_usage enth채lt typischerweise mehrere Zeilen pro Task 체ber die Zeit.
# Wir machen eine zusammengefasste Sicht pro Task:
# - durchschnittliche CPU-Rate
# - maximale CPU-Rate
# - durchschnittliche und maximale Memory-Nutzung
# - optional weitere Kennzahlen

task_usage_agg = (
    task_usage
    .groupBy("job_id", "task_index")
    .agg(
        F.avg("cpu_rate").alias("avg_cpu_rate"),
        F.max("cpu_rate").alias("max_cpu_rate"),
        F.avg("canonical_memory_usage").alias("avg_canon_mem"),
        F.max("canonical_memory_usage").alias("max_canon_mem"),
        F.avg("local_disk_space_usage").alias("avg_local_disk"),
        F.max("local_disk_space_usage").alias("max_local_disk"),
        F.count("*").alias("usage_samples")  # wie viele Zeit-Samples pro Task
    )
)

print("task_usage_agg")
task_usage_agg.show()

# -------------------------------------------------------------------
# 7. Task-Ressourcennutzung mit Eviction-Flag joinen
# -------------------------------------------------------------------
task_level = (
    task_usage_agg
    .join(task_eviction_flag, on=["job_id", "task_index"], how="left")
    .fillna({"was_evicted": 0})
)

print("task_level resource usage joined with evicted tasks")
task_level.show()


# Optional: in Cache legen
task_level.cache()


events per event-type
+----------+------+
|event_type| count|
+----------+------+
|         8|  3885|
|         5| 91183|
|         4|137146|
|         3|236496|
|         2| 79807|
|         1|525583|
|         0|529048|
+----------+------+

evicted tasks
+------+----------+-----------+
|job_id|task_index|was_evicted|
+------+----------+-----------+
|  NULL|     15619|          1|
|  NULL|      3794|          1|
|  NULL|      7993|          1|
|  NULL|     10206|          1|
|  NULL|     18498|          1|
|  NULL|     16386|          1|
|  NULL|     16574|          1|
|  NULL|      1591|          1|
|  NULL|      6654|          1|
|  NULL|       148|          1|
|  NULL|       463|          1|
|  NULL|       471|          1|
|  NULL|       496|          1|
|  NULL|       833|          1|
|  NULL|      1088|          1|
|  NULL|      1238|          1|
|  NULL|      1342|          1|
|  NULL|      1580|          1|
|  NULL|      1645|          1|
|  NULL|      1829|          1|
+------

DataFrame[job_id: int, task_index: int, avg_cpu_rate: double, max_cpu_rate: float, avg_canon_mem: double, max_canon_mem: float, avg_local_disk: double, max_local_disk: float, usage_samples: bigint, was_evicted: int]

In [7]:

# -------------------------------------------------------------------
# 8. Deskriptive Auswertung: Evicted vs. Nicht-Evicted
# -------------------------------------------------------------------
# Durchschnittliche Nutzung f체r evicted vs. nicht-evicted Tasks vergleichen

desc_stats = (
    task_level
    .groupBy("was_evicted")
    .agg(
        F.avg("avg_cpu_rate").alias("mean_avg_cpu_rate"),
        F.avg("max_cpu_rate").alias("mean_max_cpu_rate"),
        F.avg("avg_canon_mem").alias("mean_avg_canon_mem"),
        F.avg("max_canon_mem").alias("mean_max_canon_mem"),
        F.avg("avg_local_disk").alias("mean_avg_local_disk"),
        F.avg("max_local_disk").alias("mean_max_local_disk"),
        F.count("*").alias("num_tasks")
    )
)

desc_stats.show()

# -------------------------------------------------------------------
# 9. Einfache Korrelationen berechnen
# -------------------------------------------------------------------
# Korrelation zwischen Nutzung und Eviction-Flag (treated as numeric 0/1)

corr_avg_cpu = task_level.stat.corr("avg_cpu_rate", "was_evicted")
corr_max_cpu = task_level.stat.corr("max_cpu_rate", "was_evicted")
corr_avg_mem = task_level.stat.corr("avg_canon_mem", "was_evicted")
corr_max_mem = task_level.stat.corr("max_canon_mem", "was_evicted")

print("Correlation avg_cpu_rate vs was_evicted:", corr_avg_cpu)
print("Correlation max_cpu_rate vs was_evicted:", corr_max_cpu)
print("Correlation avg_canon_mem vs was_evicted:", corr_avg_mem)
print("Correlation max_canon_mem vs was_evicted:", corr_max_mem)

# -------------------------------------------------------------------
# 10. Optional: Logistische Regression zur Quantifizierung
# -------------------------------------------------------------------
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

assembler = VectorAssembler(
    inputCols=["avg_cpu_rate", "max_cpu_rate", "avg_canon_mem", "max_canon_mem"],
    outputCol="features"
)

data_ml = (
    task_level
    .select("avg_cpu_rate", "max_cpu_rate", "avg_canon_mem", "max_canon_mem", "was_evicted")
    .dropna()
)
data_ml = assembler.transform(data_ml).select("features", F.col("was_evicted").alias("label"))

train, test = data_ml.randomSplit([0.8, 0.2], seed=42)

lr = LogisticRegression(labelCol="label", featuresCol="features")
model = lr.fit(train)

print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)

# Optional: einfache Modellbewertung
predictions = model.transform(test)
predictions.select("label", "probability", "prediction").show(20, truncate=False)





+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|was_evicted|   mean_avg_cpu_rate|   mean_max_cpu_rate|  mean_avg_canon_mem|  mean_max_canon_mem| mean_avg_local_disk| mean_max_local_disk|num_tasks|
+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|          1|0.001352120105942...|0.008651086071040481|0.002140481629648868|  0.0029882720077876|1.495592038767064...|2.250575173093238...|       25|
|          0|0.006526975098758615|0.052900681443475844|0.006804221501569972|0.025395527801841365|4.635580097586943E-5|1.778847737615823...|    22163|
+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+

Correlation avg_cpu_rate vs was_evicted: -0.01903622392340978
Correlation max_cpu_rate vs was_evict