In [1]:
import pandas as pd
import numpy as np
import regex as re
import requests
import igraph as ig
import sys
import pickle
from pytictoc import TicToc
import matplotlib.pyplot as plt
import seaborn as sns
from functools import reduce
import itertools
from itertools import chain
import cairocffi as cairo
import networkx as nx
import csv
from torch_geometric.utils import to_networkx, from_networkx
import urllib.request

In [None]:
#count the number of nodes withing a string of tasks
def nodeCounter(dags):
    temp = re.split(",|§",dags)
    temp = ' '.join(temp).replace('-','').split()
    return (len(set(temp)))


#extracting the edges from string of tasks in desired format of igraph
def edgeExtractor (df):
    fr = []
    edges = df[df["task_name"].str.contains("-")]["task_name"].to_list()
    for i in edges:
        templist = list (map (int, i.split("-§-"))) 
        templist = list (map (lambda x: x - 1, templist))
        a, b = [templist[0]], templist[1:]
        c = list(itertools.product(b,a))
        fr.append(c)
    fr = list(chain.from_iterable(fr))
    return fr


#eliminating the islands of a graph
def isolatedDeletor (g):
    to_delete_ids = [v.index for v in g.vs if v.degree() == 0]
    g.delete_vertices(to_delete_ids)
    return g

#sort a dataframe in a way that the attributes of tasks match to the nodes correctly
def sorter (df):
    df['node_root'] = df['task_name'].str.extract('(\d+)').astype(int)
    df['node_root'] = df['node_root']  -1
    return df.sort_values(by = ['node_root'])


#return lists of attributes
def attExtractor (df):
    return df.plan_cpu.to_list(), df.plan_mem.to_list(), df.instance_num.to_list()


# simply the graph object (eiminate the duplicate edges)
def simplifier (g):  
    simpleG = g.simplify()
    return simpleG

#highlighting the relevant time intervals
secondsMin, secondsHour, secondsDay = [],[],[]
def lablingfunc(x):
    starttime = DAG.loc[DAG['job_name'] == x ,"totalStart"]
    if starttime.isin(secondsMin).bool(): x = x + " M"
    if starttime.isin(secondsHour).bool(): x = x + " H"
    if starttime.isin(secondsDay).bool(): x = x + " D"
     
    return x 


#selecting only isomorphic graphs.
def filterfunc(x):
    return DAGDict[x].isomorphic(DAGDict[check])


#return all time intervals of interest i.e 900 sec(15 min), 3600(hourly), 86400(daily)
#considers also a 6% margins
def secondExtractor(second):
    
    mins = list(range(second,maxTime,900))
   # mins.pop(0)
    minsMax = list(map(lambda x : x + (9*3), mins)) 
    minsMin = list(map(lambda x : x - (9*3), mins)) 
    
    hours = list(range(second,maxTime,3600))
  #  hours.pop(0)
    hoursMax = list(map(lambda x : x + (36*3), hours))
    hoursMin = list(map(lambda x : x - (36*3), hours)) 
    
    days = list(range(second,maxTime,86400))
  #  days.pop(0)
    daysMax = list(map(lambda x : x + (864*3), days)) 
    daysMin = list(map(lambda x : x - (864*3), days)) 
    
    MaxInt = minsMax + hoursMax + daysMax
    MinInt = minsMin + hoursMin + daysMin
    
        
    secondsMin =  list()   
    for i,j in zip(minsMin,minsMax) : #creating only minute by minute intervals (every 15 minitues)
        tempInt = []
        tempInt = list(range(i,j))
        secondsMin.extend(tempInt)

        
    secondsHour =  list()
    for i,j in zip(hoursMin,hoursMax) : #creating only hourly intervals 
        tempInt = []
        tempInt = list(range(i,j))
        secondsHour.extend(tempInt)

        
    secondsDay =  list()
    for i,j in zip(daysMin,daysMax) : #creating only daily intervals
        tempInt = []
        tempInt = list(range(i,j))
        secondsDay.extend(tempInt)
        
    secondsTotal = list()
    secondsTotal =  secondsMin + secondsHour + secondsDay
    
    return secondsTotal, secondsMin, secondsHour, secondsDay

In [2]:
time=TicToc()

In [33]:
df = pd.read_csv("data/batch_task.csv",sep =',')

In [34]:
print("shape of the data:", df.shape)
print("number of jobs:", len(set(df.job_name)))

shape of the data: (14295731, 9)
number of jobs: 4201014


In [35]:
df.columns

Index(['task_name', 'instance_num', 'job_name', 'task_type', 'status',
       'start_time', 'end_time', 'plan_cpu', 'plan_mem'],
      dtype='object')

In [36]:
#assigning runtime, deleting unfinished jobs
df["Duration"] = df["end_time"] - df["start_time"]
#deleting tasks that are not complete
df["task_name"] = df["task_name"].apply(lambda x :x[:-1] if x[-1] is '_' else x)
#transferring independent tasks to readble tasks
df.loc[(df["task_name"].str.startswith("task")),"task_name"]= (np.arange((df["task_name"].str.startswith("task")).sum()) + 1) + 500

In [37]:
#preparing data for generating DAGs
df["task_name"]=df.task_name.str.replace("[^0-9._]", "")
df["task_name"]=df.task_name.str.replace("\\_", "-§-")
df = df.dropna(0)

In [None]:
df['DAGs'] = df[['job_name','task_name']].groupby(['job_name'])['task_name'].transform(lambda x: ','.join(x))
df['totalStart'] = df[['job_name','start_time']].groupby(['job_name'])['start_time'].transform(lambda x: min(x))
df['totalEnd'] = df[['job_name','end_time']].groupby(['job_name'])['end_time'].transform(lambda x: max(x))

In [50]:
df = df.dropna(0)
df = sorter (df)
len(df)

11892803

In [None]:
len(set(df.job_name))
#count node with two descriptions
df['node'] = df['task_name'].str[0]
without_two_description = df[['job_name', 'node']].drop_duplicates(keep = False)
with_two_description = df[~df.job_name.isin(without_two_description.job_name)]
print('# jobs with two descriptions:',len(set(with_two_description.job_name)))

#counting the number of jobs which have lesser than ten nodes
jobs = df[['job_name','DAGs','totalStart','totalEnd']].drop_duplicates()
jobs["nNodes"] = jobs.DAGs.apply(nodeCounter)
jobs = jobs[jobs.nNodes<=9]
print('# lesser than ten nodes',len(jobs))

##counting the number of jobs without dependencies
DAG_without_dependencies = df[~df["DAGs"].str.contains("§")]
print('#without dependencies', len(set(DAG_without_dependencies.job_name)))

In [51]:
DAG = df[['job_name','DAGs','totalStart','totalEnd']].drop_duplicates()
DAG = DAG[DAG["DAGs"].str.contains("§")]
DAG['run_time']=DAG['totalEnd'] - DAG['totalStart']

In [52]:
DAG["nNodes"]= 0
DAG["nNodes"] = DAG.DAGs.apply(nodeCounter)
DAG = DAG[DAG["nNodes"] > 9]
DAG = DAG.sort_values("totalStart")

In [24]:
targetJobs = list(set(DAG.job_name))
len(targetJobs)

250076

In [58]:

#DAG.to_csv("data/preprocessed_tasks_v2.csv",sep =',')
DAG = pd.read_csv('data/preprocessed_tasks_v2.csv')

In [None]:
#generating dictionary of igraph objects for jobs
#targetJobs = list(set(DAG.job_name))
#highly time consuming ca. three days
targetJobs = list(set(DAG.job_name))
len(targetJobs)
time.tic()
DagsDict = {}
for x in targetJobs:
    tempdf = df[df.job_name == x]
    tempdf = sorter (tempdf)
    tempg = ig.Graph(edgeExtractor(tempdf),directed=True)
    tempg.vs["plan_cpu"], tempg.vs["plan_mem"], tempg.vs["instance_num"] = attExtractor(tempdf)
    DagsDict.update({x : tempg})
time.toc()

In [None]:
igraphObjects = {k: isolatedDeletor(v) for k, v in DagsDict.items()}
DagsDictPruned = {k: simplifier(v) for k, v in igraphObjects.items()}
networkXObjects = {key: value.to_networkx() for key, value in igraphObjects.items()}

In [None]:
#creating a dictionary for job. key is job id and value is list of interesting jobs
#time consuming loop
time.tic()
maxTime = max(DAG["totalEnd"])
isomorphicDict = {}
mutable = len(DAG)

#for y in range(len(DAG)):
for y in range(10):
    secondsTotal, secondsMin, secondsHour, secondsDay = [], [], [], []
    secondsTotal, secondsMin, secondsHour, secondsDay = secondExtractor(DAG.iloc[y,2])


    
    listIsomorphic = list()
    
    target = DAG[DAG['totalStart'].isin(secondsTotal)]
    target = list(target['job_name'])
    
    
    check = DAG.iloc[y,0]
    
    recurrentJobs = list()
    recurrentJobs = list(filter(f, target))
    recurrentJobs = map(lablingfunc,recurrentJobs)
     
    
    isomorphicDict.update({DAG.iloc[y,0] : list(recurrentJobs)})    

time.toc()

In [None]:
### transformation of DAG df. 
#adding assigned group to jobs base on time interval and isomorphic.
#detecting to which time interval they belong.
#!!time consuming loop, the results saved in ruleBased_DAGs.csv

### tempppp
DAG['IfMins'] = False
DAG['IfHours'] = False 
DAG['IfDays'] = False

DAG['mutable'] = True
DAG['group'] = 0

time.tic()
maxTime = max(DAG["totalEnd"])
isomorphicDict = {}


for y in range(len(DAG)) :
    
    secondsTotal, secondsMin, secondsHour, secondsDay = [], [], [], []
    secondsTotal, secondsMin, secondsHour, secondsDay = secondExtractor(DAG.iloc[y,2])


    
    listIsomorphic = list()
    
    target = DAG[DAG['totalStart'].isin(secondsTotal)]
    target = target[target['mutable'] == True]
    target = list(target['job_name'])
    
    check = DAG.iloc[y,0]
    
    recurrentJobs = list()
    recurrentJobs = list(filter(filterfunc, target))
    
    DAG.loc[(DAG.job_name.isin(recurrentJobs)) & (DAG.mutable == True), 'group'] = y
    
    DAG.loc[(DAG.group == y) & (DAG.totalStart.isin(secondsMin)), 'IfMins'] = True
    DAG.loc[(DAG.group == y) & (DAG.totalStart.isin(secondsHour)), 'IfHours'] = True
    DAG.loc[(DAG.group == y) & (DAG.totalStart.isin(secondsDay)), 'IfDays'] = True
    
    DAG.loc[(DAG.job_name.isin(recurrentJobs)) & (DAG.mutable == True), 'mutable'] = False 

time.toc()


In [None]:
def isolatedDeletor (g):
    to_delete_ids = [v.index for v in g.vs if v.degree() == 0]
    g.delete_vertices(to_delete_ids)
    return g
d2 = {k: isolatedDeletor(v) for k, v in DagsDict.items()}

In [None]:
#file_to_write = open("data/DAGs_igraph_dict.pickle", "wb")
#pickle.dump(d2, file_to_write)

In [None]:
#DAG.to_csv("data/ruleBased_DAGs.csv",sep =',') 

In [None]:
#np.save('data/ruleBased_DAGs_dict.npy', isomorphicDict)

In [None]:
#batch.to_csv("data/preprocessed_jobs.csv",sep =',')

In [None]:
#####check point########

In [72]:
file_to_read = open("data/DAGs_igraph_dict.pickle", "rb")
DAGs_Dict = pickle.load(file_to_read)
DAG = pd.read_csv("data/ruleBased_DAGs.csv",sep =',')

In [75]:
#exploring the groups
#size of groups of isomorohic DAGs
read_dictionary = np.load('data/ruleBased_DAGs_dict.npy',allow_pickle='TRUE').item()
length_dict = {key: len(value) for key, value in read_dictionary.items()}
lengths = [len(v) for v in read_dictionary.values()]
sns.boxplot(lengths)

In [78]:
DAG.groupby('group')['run_time'].var().mean()
print('runtima variation within every group',DAG.run_time.var())

1094936.3586614884

In [79]:
number = sum(i >= 0 for i in lengths)
for j in range(1,30):
    print ( sum(i > j for i in lengths),"from " ,number, " jobs more than" , j, "recurrent jobs")

206517 from  250076  jobs more than 1 recurrent jobs
192527 from  250076  jobs more than 2 recurrent jobs
181479 from  250076  jobs more than 3 recurrent jobs
170335 from  250076  jobs more than 4 recurrent jobs
161959 from  250076  jobs more than 5 recurrent jobs
153458 from  250076  jobs more than 6 recurrent jobs
149010 from  250076  jobs more than 7 recurrent jobs
143972 from  250076  jobs more than 8 recurrent jobs
140089 from  250076  jobs more than 9 recurrent jobs
135985 from  250076  jobs more than 10 recurrent jobs
132794 from  250076  jobs more than 11 recurrent jobs
128672 from  250076  jobs more than 12 recurrent jobs
125806 from  250076  jobs more than 13 recurrent jobs
123021 from  250076  jobs more than 14 recurrent jobs
120198 from  250076  jobs more than 15 recurrent jobs
117313 from  250076  jobs more than 16 recurrent jobs
114852 from  250076  jobs more than 17 recurrent jobs
112331 from  250076  jobs more than 18 recurrent jobs
110148 from  250076  jobs more than 1