In [1]:
import findspark

findspark.init()

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

dataset_root_path = "D:\\Documents\\Programming\\Python-Projects\\Clusterdata_2019_e\\"
spark = SparkSession.builder.appName('Failure Prediction on Google Borg Cluster Traces').master('local[*]').getOrCreate()
SparkContext.setSystemProperty('spark.executor.memory', '2g')
SparkContext.setSystemProperty('spark.driver.memory', '2g')

jobs_with_type_df = spark.read.parquet("./intermediary_data/jobs_with_type.parquet")
instance_usage_df = spark.read.parquet(dataset_root_path + "instance_usage-*.parquet.gz")

In [2]:
failed_jobs_df = jobs_with_type_df.filter(jobs_with_type_df.event_success == 0).select('collection_id', 'parent_collection_id')
failed_jobs_df = failed_jobs_df.fillna(0, subset=["parent_collection_id"])
failed_jobs_df.count()

727512

In [3]:
import networkx as nx

# Create a directed graph using NetworkX
graph = nx.DiGraph()

# Add edges to the graph based on the parent-child relationships in the DataFrame
edges = failed_jobs_df.select("parent_collection_id", "collection_id").collect()
graph.add_edges_from(edges)

print(nx.is_directed_acyclic_graph(graph))
print(nx.dag_longest_path_length(graph) - 1) # because where parent_collection_id = 0 or the node has no ancestors, the edge will be (-no_ancestor-, collection_id), which should not be counted, as these represent top level collections

True
2


In [4]:
topological_sort = list(nx.topological_sort(nx.line_graph(graph)))
print('Topological sort length: ', len(topological_sort))

# take as depth 0 the jobs that don't have a parent (are top level jobs), or the ones that do have a parent that did not fail (not appearing in the list, therefore the collection_id doesn't have ancestors)
failed_jobs_depth_0 = list(filter(lambda pair: (pair[0] == 0) or (pair[0] != 0 and len(nx.ancestors(graph, pair[0])) == 0), topological_sort))

failed_job_ids_depth_0 = set()
for job in failed_jobs_depth_0:
    failed_job_ids_depth_0.add(job[1])

print('Failed job ids with depth of 0: ', len(failed_job_ids_depth_0))
failed_jobs_with_depth_bigger_than_0 = list((set(topological_sort) - set(failed_jobs_depth_0)))

failed_jobs_depth_1 = list(filter(lambda pair: pair[0] in failed_job_ids_depth_0, failed_jobs_with_depth_bigger_than_0))
failed_job_ids_depth_1 = set()
for job in failed_jobs_depth_1:
    failed_job_ids_depth_1.add(job[1])

print('Failed job ids with depth of 1: ', len(failed_job_ids_depth_1))
failed_jobs_with_depth_bigger_than_1 = list((set(failed_jobs_with_depth_bigger_than_0) - set(failed_jobs_depth_1)))

failed_jobs_depth_2 = list(filter(lambda pair: pair[0] in failed_job_ids_depth_1, failed_jobs_with_depth_bigger_than_1))
failed_job_ids_depth_2 = set()
for job in failed_jobs_depth_2:
    failed_job_ids_depth_2.add(job[1])

print('Failed job ids with depth of 2: ', len(failed_job_ids_depth_2))

failed_jobs_ids = failed_job_ids_depth_0.union(failed_job_ids_depth_1).union(failed_job_ids_depth_2)
print('All failed job ids length: ', len(failed_jobs_ids))

Topological sort length:  726923
Failed job ids with depth of 0:  720074
Failed job ids with depth of 1:  4983
Failed job ids with depth of 2:  1866
All failed job ids length:  726923


In [5]:
from pyspark.sql.types import LongType

# get only the valid failed jobs
df = spark.createDataFrame(failed_jobs_ids, LongType())
failed_jobs_df = failed_jobs_df.join(df, failed_jobs_df.collection_id == df.value, how="leftsemi")

# filter only the Job instance usage entries and select the maximum end_time for each collection_id
instance_usage_df = instance_usage_df.filter(instance_usage_df.collection_type == 0).select('end_time', 'collection_id')
instance_usage_df = instance_usage_df.groupBy("collection_id").agg(F.max("end_time").alias("collection_end_time"))

failed_jobs_df = failed_jobs_df.join(instance_usage_df, on='collection_id')
failed_jobs_df.printSchema()

root
 |-- collection_id: long (nullable = true)
 |-- parent_collection_id: long (nullable = true)
 |-- collection_end_time: long (nullable = true)



In [6]:
# create a dataframe with failed jobs of depth 1
df = spark.createDataFrame(failed_job_ids_depth_1, LongType())
failed_jobs_depth_1_df = failed_jobs_df.join(df, failed_jobs_df.collection_id == df.value, how='leftsemi')
failed_jobs_depth_1_df = failed_jobs_depth_1_df.withColumnRenamed('collection_id', 'collection_id_1').withColumnRenamed('parent_collection_id', 'parent_collection_id_1').withColumnRenamed('collection_end_time', 'collection_end_time_1')

# filter jobs that died after their parent died, because they were killed by the parent and are not relevant for the analysis
failed_jobs_from_parent_depth_1_df = failed_jobs_depth_1_df.alias("c").join(failed_jobs_df.alias("p"), F.col("c.parent_collection_id_1") == F.col("p.collection_id"), "inner").filter(F.col("c.collection_end_time_1") >= F.col("p.collection_end_time"))
failed_jobs_from_parent_depth_1_df = failed_jobs_from_parent_depth_1_df.drop(*['collection_id', 'parent_collection_id', 'collection_end_time'])
failed_jobs_from_parent_depth_1_df = failed_jobs_from_parent_depth_1_df.withColumnRenamed('collection_id_1', 'collection_id').withColumnRenamed('parent_collection_id_1', 'parent_collection_id').withColumnRenamed('collection_end_time_1', 'collection_end_time')

In [7]:
# create a dataframe with failed jobs of depth 2
df = spark.createDataFrame(failed_job_ids_depth_2, LongType())
failed_jobs_depth_2_df = failed_jobs_df.join(df, failed_jobs_df.collection_id == df.value, how='leftsemi')
failed_jobs_depth_2_df = failed_jobs_depth_2_df.withColumnRenamed('collection_id', 'collection_id_2').withColumnRenamed('parent_collection_id', 'parent_collection_id_2').withColumnRenamed('collection_end_time', 'collection_end_time_2')

# filter jobs that died after their parent died, because they were killed by the parent and are not relevant for the analysis
failed_jobs_from_parent_depth_2_df = failed_jobs_depth_2_df.alias("c").join(failed_jobs_depth_1_df.alias("p"), F.col("c.parent_collection_id_2") == F.col("p.collection_id_1"), "inner").filter(F.col("c.collection_end_time_2") >= F.col("p.collection_end_time_1"))
failed_jobs_from_parent_depth_2_df = failed_jobs_from_parent_depth_2_df.drop(*['collection_id_1', 'parent_collection_id_1', 'collection_end_time_1'])
failed_jobs_from_parent_depth_2_df = failed_jobs_from_parent_depth_2_df.withColumnRenamed('collection_id_2', 'collection_id').withColumnRenamed('parent_collection_id_2', 'parent_collection_id').withColumnRenamed('collection_end_time_2', 'collection_end_time')

In [8]:
failed_jobs_from_parent_depth_1_df.printSchema()
failed_jobs_from_parent_depth_2_df.printSchema()

root
 |-- collection_id: long (nullable = true)
 |-- parent_collection_id: long (nullable = true)
 |-- collection_end_time: long (nullable = true)

root
 |-- collection_id: long (nullable = true)
 |-- parent_collection_id: long (nullable = true)
 |-- collection_end_time: long (nullable = true)



In [9]:
failed_jobs_from_parent = failed_jobs_from_parent_depth_1_df.union(failed_jobs_from_parent_depth_2_df)
failed_jobs_from_parent = failed_jobs_from_parent.drop_duplicates()
failed_jobs_from_parent = failed_jobs_from_parent.select("collection_id")

In [10]:
failed_jobs_from_parent.printSchema()

root
 |-- collection_id: long (nullable = true)



In [11]:
failed_jobs_from_parent.write.parquet("./intermediary_data/job_ids_to_remove.parquet")