In [12]:
import sys
from pyspark import SparkContext
import time
from pyspark.sql.functions import countDistinct
import timeit
import gc
import pandas as pd

In [None]:
def adjust_values(init_time, finish_time, granularity):
    init_time = init_time*10**6
    finish_time = finish_time*10**6
    granularity = granularity*10**6
    return init_time, finish_time, granularity

def calculate_tuples(cluster_list):
    tuples = []
    for i, e in enumerate(cluster_list):
        if i < len(cluster_list)-1:
            tuples.append((e, cluster_list[i+1]))
    return tuples

def mean_time_evaluation(sc, cluster_list):
    tuples = calculate_tuples(cluster_list)
    tuples_RDD = sc.parallelize(tuples)
    
    tuples_RDD = tuples_RDD.map(lambda elem: (elem[1] - elem[0]))
    mean_value = numpy.mean(tuples_RDD.collect())
    return mean_value

def get_interval_values(sc, cluster_list, init_time, finish_time):
    tuples = calculate_tuples(cluster_list)
    interval = sc.parallelize(tuples)
    interval = interval.map(lambda elem: (init_time, finish_time, elem[0], elem[1], (elem[1]-elem[0])/10**6))
    return interval.collect()

def metrics(inter_list):
    if inter_list.count > 0:
        return [np.mean(inter_list), np.var(inter_list), np.median(inter_list), np.std(inter_list)]
    else:
        return [0, 0, 0, 0]

In [2]:
sc = SparkContext("local[1]")
sc.setLogLevel("ERROR")

In [4]:
job_events = pd.read_csv("../../datasets/job_events/part-00000-of-00500.csv")

0. timestamp 1. missing info 2. job ID 3. event type 4. user name 5. scheduling class 6. job name 7. logical job name

In [23]:
job_events = sc.textFile('../../datasets/job_events/*.gz')
print("Step 1: {}".format(job_events.take(1)))
job_events_RDD = job_events.map(lambda line: line.split(","))
print("Step 2: {}".format(job_events_RDD.take(1)))

job_events_RDD = job_events_RDD.map(lambda tokens: (int(tokens[0]), int(tokens[2]), int(tokens[3]), tokens[4], int(tokens[5]), tokens[6], tokens[7]))
print("Step 3: {}".format(job_events_RDD.take(1)))

Step 1: ['1639265380304,,6408251525,4,jVEIdGnEYLp+j9YJHh5dEBhUdpD2fs+PKTWwQo5ZrJk=,1,zXOstV9tbQGnXU6u6FfAjOzoe8gToH3sDCKZNm0RAwM=,1H3WBVcen2RS9lximFVb5A/HIAxc6rH8XBp0IStE/Co=']
Step 2: [['1639265380304', '', '6408251525', '4', 'jVEIdGnEYLp+j9YJHh5dEBhUdpD2fs+PKTWwQo5ZrJk=', '1', 'zXOstV9tbQGnXU6u6FfAjOzoe8gToH3sDCKZNm0RAwM=', '1H3WBVcen2RS9lximFVb5A/HIAxc6rH8XBp0IStE/Co=']]
Step 3: [(1639265380304, 6408251525, 4, 'jVEIdGnEYLp+j9YJHh5dEBhUdpD2fs+PKTWwQo5ZrJk=', 1, 'zXOstV9tbQGnXU6u6FfAjOzoe8gToH3sDCKZNm0RAwM=', '1H3WBVcen2RS9lximFVb5A/HIAxc6rH8XBp0IStE/Co=')]


In [28]:
#remove from the traces records occured before the beginning of the trace window (timestamp=0)
#remove records occured after the end of the trace window (timestamp=2^63-1)
#sort in accending order with respect to timestamps
job_events_RDD_filtered = job_events_RDD.map(lambda elem: (elem[0], (elem[1], elem[2]))).filter(lambda elem: elem[0] != 0 and elem[0] != (2^63-1)).sortByKey(1, 1)
#sortByKey(accending=True, number of partitions)
print("{} traces removed".format(job_events_RDD.count()-job_events_RDD_filtered.count()))
print("Step 4: {}".format(job_events_RDD_filtered.take(1)))

7970 traces removed
Step 4: [(604046276, (6251639640, 4))]


In [None]:
#Input: 
    ##event_type: type of the event in the traces
    ##init_time: time starting from which we want to evaluate the model
    ##finish_time: it is the time when we want to stop the evaluation
    ##granularity: the level of granularity for plotting the results of the model
    ##over a window of 200 sec, granularity = 10 sec ==> derived traces will be clustered based on the timestamps
    ##EX: cluster 1: time interval 0-10, cluster 2: time interval 10-20 ... cluster 20: time interval 180-200

def job_eval_time_window(event_type, init_time, finish_time, granularity):
    init_time, finish_time, granularity = adjust_values(init_time, finish_time, granularity)
    if (event_type != None):
        job_eval_traces_RDD = job_events_RDD_filtered.filter(lambda elem: elem[1][1] == event_type)
    if (init_time!=None and finish_time!=None):
        job_eval_traces_RDD = job_eval_traces_RDD.filter(lambda elem: elem[0]>=init_time and elem[0]<=finish_time).map(lambda elem: elem[0])
    print("{} Traces satify query".format(job_eval_traces_RDD.count()))
    job_eval_traces_list = job_eval_traces_RDD.collect()
    job_evaluated_means_list = []
    lower_granularity = init_time #lowest bound for the clusterization
    n_clusters = (finish_time - init_time)/granularity
    job_interval_values_list = []
    
    #propagate the last element of each cluster to be the first element of the following cluster
    last_element = 0
    n_empty_clusters = 0
    clusters_metrics = []
    
    #iterate to create each cluster
    for i in range(0, int(n_clusters+1)):
        j_cluster_traces = [timestamp for timestamp in job_eval_traces_list if timestamp >= lower_granularity and timestamp < (lower_granularity+granularity)]
        
        if len(j_cluster_traces) == 0:
            
        if i!=0:
            j_cluster_traces.insert(0, last_element)
        
        #append to a list the cluster (cluster_lower_bound, cluster_upper_bound, mean_time_between_jobs)
        job_evaluated_means_list.append([lower_granularity, (lower_granularity+granularity), mean_time_evaluation(sc, j_cluster_traces)])
        
        #list of all interarrival times inside the cluster
        interval_values_list = get_interval_values(sc, j_cluster_traces, lower_granularity, lower_granularity+granularity)
        inter_arrivals = sc.parallelize(interval_values_list).map(lambda x: x[4])
        #Evaluate the metrics over the interarrivals of the cluster
        #[mean, variance, median, std]
        cluster_metrics = metrics(inter_arrivals.collect())
        
        if (math.isnan(cluster_metrics[0]) == False):
            clusters_metrics.append(cluster_metrics)
        job_interval_values_list.extend(interval_values_list)
        lower_granularity += granularity
        last_element = j_cluster_traces[-1]
        
    metrics_list = evaluate_statistics(sc, job_interval_values_list)
    job_mean_metrics_rdd = sc.parallelize(job_clusters_metrics)
    