In [70]:
import os
from datetime import date
import graphviz
import pandas as pd
import json
from collections import defaultdict
from collections import OrderedDict
from copy import copy
import re
import configparser
import queue
from pathlib import Path
import pickle
import ast
import random

config = configparser.ConfigParser()
config.read('config.ini')



class RddAsNode:
    
    def __init__(self, name, is_cached, number_of_usage, number_of_computations):
        self.name = name
        self.is_cached = is_cached
        self.number_of_usage = number_of_usage
        self.number_of_computations = number_of_computations

        
class Rdd:
    
    def __init__(self, id, name, parents_lst, stage_id, job_id, is_cached):
        self.id = id
        self.name = name
        self.parents_lst = parents_lst
        self.stage_id = stage_id
        self.job_id = job_id
        self.is_cached = is_cached
        
    def to_json(self):
        return {
            "id": self.id,
            "name": self.name,
            "parents_lst": self.parents_lst,
            "stage_id": self.stage_id,
            "job_id": self.job_id,
            "is_cached": self.is_cached
        }

    @classmethod
    def from_json(cls, data):
        return cls(
            data["id"],
            data["name"],
            data["parents_lst"],
            data["stage_id"],
            data["job_id"],
            data["is_cached"]
        )
        
class Transformation:
    
    def __init__(self, from_rdd, to_rdd, is_narrow):
        self.from_rdd = from_rdd
        self.to_rdd = to_rdd
        self.is_narrow = is_narrow

    def __eq__(self, other):
        if (isinstance(other, Transformation)):
            return self.from_rdd == other.from_rdd and self.to_rdd == other.to_rdd
        return False

    def __hash__(self):
        return hash(self.from_rdd) ^ hash(self.to_rdd)

    def __lt__(self, other):
        if self.from_rdd == other.from_rdd:
            self.to_rdd < other.to_rdd
        return self.from_rdd < other.from_rdd
    
class TransformationWithoutI:
    
    def __init__(self, from_rdd, to_rdd, is_narrow):
        self.from_rdd = from_rdd
        self.to_rdd = to_rdd
        self.is_narrow = is_narrow
    
class CachingPlanItem:
    
    def __init__(self, stage_id, job_id, rdd_id, is_cache_item):
        self.stage_id = stage_id
        self.job_id = job_id
        self.rdd_id = rdd_id
        self.is_cache_item = is_cache_item

    def __lt__(self, other):
        if self.job_id == other.job_id:
            if self.stage_id == other.stage_id:
                if self.is_cache_item == other.is_cache_item:
                    return self.rdd_id
                return self.is_cache_item
            return self.stage_id < other.stage_id
        return self.job_id < other.job_id

class CachingPlanItemWithStorage:
    
    def __init__(self, stage_id, job_id, rdd_id, is_cache_item, persist_status):
        self.stage_id = stage_id
        self.job_id = job_id
        self.rdd_id = rdd_id
        self.is_cache_item = is_cache_item
        self.persist_status = persist_status
        
class Utility():
    def get_absolute_path(path):
        if not os.path.isabs(path):
            return str(Path().absolute()) + '/' + path
        return path

    def intersection(lst1, lst2):
        lst3 = [value for value in lst1 if value in lst2]
        return lst3    

    
class FactHub():
    
    app_name = ""
    job_info_dect = {}
    stage_info_dect = {}
    stage_job_dect = {}
    stage_name_dect = {}
    submitted_stage_last_rdd_dect = {}
    stage_no_of_tasks = {}
    stage_i_operator_dect = defaultdict(list)
    stage_i_operators_id = defaultdict(list)
    submitted_stages = set()
    rdds_lst = []
    operator_partition_size = {}
    rddID_in_stage = defaultdict(list)
    stage_operator_partition = {}
    #total = accumulables_update + bytes_written
    stage_total = {}
    job_last_rdd = {}
    job_last_rdd_dect = {}
    job_last_stage = {}
    rdd_id_stage_with_max_tasks = {}
    task_in_which_stage = {}
    stage_has_what_tasks = defaultdict(list)
    rdds_lst_index_dict = {}
    taskid_launchtime = {}
    taskid_finishtime = {}
    taskid_last_processing_time = defaultdict(int)
    taskid_operator_dect = defaultdict(list)
    taskid_bytes_read = {}
    
    root_rdd_size = {}
    rddID_size = defaultdict(int)
    last_rdd_size = {}
    operator_timestamp = defaultdict(int)
    operator_in_which_tasks = {}
    rdds_lst_renumbered = []
    transformations = []
    
    rdds_lst_refactored = []
    rdds_lst_InstrumentedRdds = []
    rdds_lst_InstrumentedRdds_id = []
    
    stage_shuffle_writetime_dict = {}
    stage_shuffle_readtime_dict = {}
    stage_shuffle_time_dict = {}
    shuffled_rdds_id = []
    

    def flush():
        FactHub.app_name = ""
        FactHub.job_info_dect = {}
        FactHub.stage_info_dect = {}
        FactHub.stage_job_dect = {}
        FactHub.stage_name_dect = {}
        FactHub.submitted_stage_last_rdd_dect = {}
        FactHub.stage_no_of_tasks = {}
        FactHub.stage_i_operator_dect = defaultdict(list)
        FactHub.stage_i_operators_id = defaultdict(list)
        FactHub.submitted_stages.clear()
        FactHub.rdds_lst = []
        FactHub.operator_partition_size = {}
        FactHub.rddID_in_stage = defaultdict(list)
        FactHub.stage_operator_partition = {}
        #total = accumulables_update + bytes_written
        FactHub.stage_total = {}
        FactHub.job_last_rdd = {}
        FactHub.job_last_rdd_dect = {}
        FactHub.job_last_stage = {}
        FactHub.rdd_id_stage_with_max_tasks = {}
        FactHub.task_in_which_stage = {}
        FactHub.stage_has_what_tasks = defaultdict(list)
        FactHub.rdds_lst_index_dict = {}
        FactHub.taskid_launchtime = {}
        FactHub.taskid_finishtime = {}
        FactHub.taskid_last_processing_time = defaultdict(int)
        FactHub.taskid_operator_dect = defaultdict(list)
        FactHub.taskid_bytes_written = {}
        FactHub.root_rdd_size = {}
        FactHub.rddID_size = defaultdict(int)
        FactHub.last_rdd_size = {}
        FactHub.operator_timestamp = defaultdict(int)
        FactHub.operator_in_which_tasks = {}
        FactHub.rdds_lst_renumbered = []
        FactHub.transformations = []
        FactHub.rdds_lst_refactored = []
        FactHub.rdds_lst_InstrumentedRdds = []
        FactHub.rdds_lst_InstrumentedRdds_id = []
        FactHub.stage_shuffle_writetime_dict = {}
        FactHub.stage_shuffle_readtime_dict = {}
        FactHub.stage_shuffle_time_dict = {}
        FactHub.shuffled_rdds_id = []
        
class AnalysisHub():
    
    transformations_set = set()
    rdd_num_of_computations = defaultdict(int)
    rdd_num_of_usage = defaultdict(int)
    anomalies_dict = {}
    stage_computed_rdds = {}
    stage_used_rdds = {}
    computed_rdds = set()
    rdd_usage_lifetime_dict = {}
    caching_plan_lst = []
    memory_footprint_lst = []
    cached_rdds_set = set()
    non_cached_rdds_set = set()
    cached_rdds_lst = []
    rdds_computation_time = {}
    recommended_schedule_cache_at = {}
    recommended_schedule_unpersist_after = {}
    transformation_without_i = []
    transformation_from_to = {}
    transformation_to_from = defaultdict(list)
    memory_size = 0
    memory_read_capacity = 0
    memory_write_capacity = 0
    disk_size = 0
    disk_read_capacity = 0
    disk_write_capacity = 0
    time_model_scenario1 = defaultdict(int)
    time_model_scenario2 = defaultdict(int)
    time_model_scenario3 = defaultdict(int)
    rdd_cache_order = []
    memory_management_list = []
    disk_management_list = []
    persist_status_recorder = {}
    
    def flush():
        AnalysisHub.transformations_set.clear()
        AnalysisHub.rdd_num_of_computations = defaultdict(int)
        AnalysisHub.rdd_num_of_usage = defaultdict(int)
        AnalysisHub.anomalies_dict = {}
        AnalysisHub.stage_computed_rdds = {}
        AnalysisHub.stage_used_rdds = {}
        AnalysisHub.computed_rdds.clear()
        AnalysisHub.rdd_usage_lifetime_dict = {}
        AnalysisHub.caching_plan_lst = []
        #AnalysisHub.memory_footprint_lst = []
        #AnalysisHub.cached_rdds_set.clear()
        #AnalysisHub.non_cached_rdds_set.clear()
        AnalysisHub.cached_rdds_lst = []
        AnalysisHub.rdds_computation_time = {}
        AnalysisHub.recommended_schedule_cache_at = {}
        AnalysisHub.recommended_schedule_unpersist_after = {}
        AnalysisHub.transformation_without_i = []
        AnalysisHub.transformation_from_to = {}
        AnalysisHub.transformation_to_from = defaultdict(list)
        AnalysisHub.memory_size = 0
        AnalysisHub.memory_read_capacity = 0
        AnalysisHub.memory_write_capacity = 0
        AnalysisHub.disk_size = 0
        AnalysisHub.disk_read_capacity = 0
        AnalysisHub.disk_write_capacity = 0
        AnalysisHub.time_model_scenario1 = defaultdict(int)
        AnalysisHub.time_model_scenario2 = defaultdict(int)
        AnalysisHub.time_model_scenario3 = defaultdict(int)
        AnalysisHub.rdd_cache_order = []
        AnalysisHub.memory_management_list = []
        AnalysisHub.disk_management_list = []
        AnalysisHub.persist_status_recorder = {}
        
class Parser():    
    
    def prepare(raw_log_file):
        all_events_lst = pd.read_json(raw_log_file, lines=True)
        FactHub.app_name = all_events_lst[all_events_lst['Event'] == 'SparkListenerApplicationStart']['App Name'].tolist()[0]
        Parser.prepare_from_stage_submitted_events(all_events_lst[all_events_lst['Event'] == 'SparkListenerStageSubmitted'])
        Parser.prepare_from_job_start_events(all_events_lst[all_events_lst['Event'] == 'SparkListenerJobStart'])
        Parser.prepare_from_task_end_events(all_events_lst[all_events_lst['Event'] == 'SparkListenerTaskEnd'])
        Parser.prepare_RDD_ID_from_stage_submitted_events(all_events_lst[all_events_lst['Event'] == 'SparkListenerStageSubmitted'])
        Parser.prepare_root_from_stage_completed_events(all_events_lst[all_events_lst['Event'] == 'SparkListenerStageCompleted'])
        Parser.prepare_leaf_from_task_end_events(all_events_lst[all_events_lst['Event'] == 'SparkListenerTaskEnd'])
        Parser.prepare_from_task_end_events_for_timestamp(all_events_lst[all_events_lst['Event'] == 'SparkListenerTaskEnd'])
    
    def prepare_from_task_end_events_for_timestamp(task_end_events):
        rdd = 0
        for key, value in FactHub.taskid_operator_dect.items():
            for item in value:
                operator_id = item['Operator ID']
                if operator_id in FactHub.operator_in_which_tasks:
                    FactHub.operator_in_which_tasks[operator_id].append(key)
                else:
                    FactHub.operator_in_which_tasks[operator_id] = [key]       
        for task_id in FactHub.taskid_operator_dect.keys():
            #To get stage_id for this task
            stage_id = FactHub.task_in_which_stage[task_id]
            for j in FactHub.job_last_stage.keys():
                if (FactHub.job_last_stage[j] == stage_id):
                    rdd = FactHub.job_last_rdd[j]
            total_entries = 0
            # To calculate total operators count in one task
            for x in FactHub.taskid_operator_dect[task_id]:
                total_entries = total_entries + 1
            for i in range(total_entries):
                if (i == 0):
                    first_operator_time = FactHub.taskid_operator_dect[task_id][i]['Timestamp'] - FactHub.taskid_launchtime[task_id]
                    first_operator_time = abs(first_operator_time)
                    #print("first operator time: " + str(first_operator_time))
                    if FactHub.taskid_operator_dect[task_id][i]['Operator ID'] in FactHub.operator_timestamp.keys():
                        FactHub.operator_timestamp[FactHub.taskid_operator_dect[task_id][i]['Operator ID']] = FactHub.operator_timestamp[FactHub.taskid_operator_dect[task_id][i]['Operator ID']] + first_operator_time
                    else:
                        FactHub.operator_timestamp[FactHub.taskid_operator_dect[task_id][i]['Operator ID']] = first_operator_time
                #elif i!=0 and i!=total_entries-1:
                else:
                    middle_operator_time = FactHub.taskid_operator_dect[task_id][i]['Timestamp'] - FactHub.taskid_operator_dect[task_id][i-1]['Timestamp']
                    middle_operator_time = abs(middle_operator_time)
                    #print("middle operator time: " + str(middle_operator_time))
                    if FactHub.taskid_operator_dect[task_id][i]['Operator ID'] in FactHub.operator_timestamp.keys():
                        FactHub.operator_timestamp[FactHub.taskid_operator_dect[task_id][i]['Operator ID']] = FactHub.operator_timestamp[FactHub.taskid_operator_dect[task_id][i]['Operator ID']] + middle_operator_time
                    else:
                        FactHub.operator_timestamp[FactHub.taskid_operator_dect[task_id][i]['Operator ID']] = middle_operator_time
                #To calculate last remaining time between task finish time and last processed operation in a task and save it in a dict for 
                #caculating the time of last rdd in a job
                if (i == total_entries-1):
                    remaining_time = FactHub.taskid_finishtime[task_id] - FactHub.taskid_operator_dect[task_id][i]['Timestamp']
                    remaining_time = abs(remaining_time)
                    FactHub.taskid_last_processing_time[task_id] = remaining_time

        #To calculate timestamp of last rdd of a job (logic = used timestamp of an operator before the last rdd and task's finish time)
        for job in FactHub.job_last_rdd:
            last_rdd_id = FactHub.job_last_rdd[job]
            if last_rdd_id not in FactHub.operator_timestamp:
                stages_of_rdd = FactHub.rddID_in_stage[last_rdd_id]
                for stage in stages_of_rdd:
                    tasks_list = FactHub.stage_has_what_tasks[stage]
                    for task in tasks_list:
                        FactHub.operator_timestamp[last_rdd_id] = FactHub.operator_timestamp[last_rdd_id] + FactHub.taskid_last_processing_time[task]
        
    def prepare_leaf_from_task_end_events(task_end_events):
        stage_id_for_a_task = task_end_events['Stage ID'].tolist()
        #dictionary for task and the stage it belongs to
        for i, stage in enumerate(stage_id_for_a_task):
            FactHub.task_in_which_stage[i] = int(stage)
        for task in FactHub.task_in_which_stage:
            if FactHub.task_in_which_stage[task] in FactHub.stage_has_what_tasks:
                FactHub.stage_has_what_tasks[FactHub.task_in_which_stage[task]].append(task)
            else:
                FactHub.stage_has_what_tasks[FactHub.task_in_which_stage[task]].append(task)
        temp_dict = {}
        queue = []
        accumulables_info_list = task_end_events['Task Info'].tolist()
        for i, task_end in enumerate(task_end_events['Task Info'].tolist()):
            FactHub.taskid_launchtime[task_end['Task ID']] = task_end['Launch Time']
            FactHub.taskid_finishtime[task_end['Task ID']] = task_end['Finish Time']
            accumulables_update = 0
            total = 0
            for j, accumulables in enumerate(task_end['Accumulables']):
                if accumulables['Name'] == "internal.metrics.resultSize":
                    accumulables_update = accumulables['Update']
            for k, task_metrics in enumerate(task_end_events['Task Metrics']):
                    if "Bytes Written" in task_metrics['Output Metrics'].keys():
                        bytes_written = task_metrics['Output Metrics']['Bytes Written']
            total = accumulables_update + bytes_written
            queue.append(total)
        for i in FactHub.stage_no_of_tasks:
            total = 0
            for x in range(0, FactHub.stage_no_of_tasks[i]):
                if len(queue)==0:
                    break
                total = total + queue[0]
                queue.pop(0)
            FactHub.stage_total[i] = total
        for job_id in FactHub.job_last_stage:
            last_stage = FactHub.job_last_stage[job_id]
            if last_stage in FactHub.stage_total.keys():
                total = FactHub.stage_total[last_stage]
                FactHub.last_rdd_size[FactHub.job_last_rdd[job_id]] = total
                FactHub.rddID_size[FactHub.job_last_rdd[job_id]] = total
        queue = []
        for i, task_metrics in enumerate(task_end_events['Task Metrics']):
            if "Shuffle Write Time" in task_metrics['Shuffle Write Metrics'].keys():
                shuffle_write_time = task_metrics['Shuffle Write Metrics']['Shuffle Write Time']
                # Converting default wait time from nanoseconds to milliseconds
                shuffle_write_time = shuffle_write_time / 1000000
                shuffle_write_time = round(shuffle_write_time, 1)
                queue.append(shuffle_write_time)
        for i, stage in enumerate(stage_id_for_a_task):
            if int(stage) not in FactHub.stage_shuffle_writetime_dict.keys():
                FactHub.stage_shuffle_writetime_dict[int(stage)] = queue[0]
                queue.pop(0)
            else:
                FactHub.stage_shuffle_writetime_dict[int(stage)] = queue[0] + FactHub.stage_shuffle_writetime_dict[int(stage)]
                queue.pop(0)
        queue1 = []
        for i, task_metrics in enumerate(task_end_events['Task Metrics']):
            if "Fetch Wait Time" in task_metrics['Shuffle Read Metrics'].keys():
                shuffle_read_time = task_metrics['Shuffle Read Metrics']['Fetch Wait Time']
                queue1.append(shuffle_read_time)
        for i, stage in enumerate(stage_id_for_a_task):
            if int(stage) not in FactHub.stage_shuffle_readtime_dict.keys():
                FactHub.stage_shuffle_readtime_dict[int(stage)] = queue1[0]
                queue1.pop(0)
            else:
                FactHub.stage_shuffle_readtime_dict[int(stage)] = queue1[0] + FactHub.stage_shuffle_readtime_dict[int(stage)]
                queue1.pop(0)
        tdict = {}
        for i, stage in enumerate(FactHub.submitted_stages):
            tdict[i] = stage
        for i, stage in enumerate(FactHub.submitted_stages):
            if stage == 0:
                FactHub.stage_shuffle_time_dict[stage] = round(FactHub.stage_shuffle_readtime_dict[stage], 1)
            else:
                FactHub.stage_shuffle_time_dict[stage] = round((FactHub.stage_shuffle_readtime_dict[stage] + FactHub.stage_shuffle_writetime_dict[tdict[i-1]]), 1)
        #prepare a dict with task id and its shuffle bytes written
        taskid_queue = []
        for i, task_end in enumerate(task_end_events['Task Info'].tolist()):
            taskid_queue.append(task_end['Task ID'])
        for i, task_metric in enumerate(task_end_events['Task Metrics']):
            if "Shuffle Bytes Written" in task_metric['Shuffle Write Metrics'].keys():
                bytes_written = task_metric['Shuffle Write Metrics']['Shuffle Bytes Written']
                if bytes_written == 0:
                    if "Bytes Read" in task_metric['Input Metrics'].keys():
                        bytes_read = task_metric['Input Metrics']['Bytes Read']
                        FactHub.taskid_bytes_written[taskid_queue[0]] = bytes_read
                else:
                    FactHub.taskid_bytes_written[taskid_queue[0]] = bytes_written
            taskid_queue.pop(0)
        
    def prepare_root_from_stage_completed_events(stage_submitted_events):
        max_size_root_rdd = 0
        for i, submitted_stage in enumerate(stage_submitted_events['Stage Info'].tolist()):
            stage_id = submitted_stage['Stage ID']
            for j, rdd_info in enumerate(submitted_stage['RDD Info']):
                parent_id = rdd_info['Parent IDs']
                if not parent_id:
                    root_rdd = rdd_info['RDD ID']
            for k, read_bytes in enumerate(submitted_stage['Accumulables']):
                if read_bytes['Name'] == 'internal.metrics.input.bytesRead' and read_bytes['Value'] > max_size_root_rdd:
                    max_size_root_rdd = read_bytes['Value']
                    root_rdd = rdd_info['RDD ID']
        FactHub.root_rdd_size[root_rdd] = max_size_root_rdd
        FactHub.rddID_size[root_rdd] = max_size_root_rdd       
        
    def prepare_RDD_ID_from_stage_submitted_events(stage_submitted_events):
        for i, submitted_stage in enumerate(stage_submitted_events['Stage Info'].tolist()):
            stage_id = submitted_stage['Stage ID']
            for j, rdd_info in enumerate(submitted_stage['RDD Info']):
                rdd_id = rdd_info['RDD ID']
                FactHub.rddID_in_stage[rdd_id].append(stage_id)
        # To order the FactHub.rddID_in_stage in ascending order based on key
        dict1 = OrderedDict(sorted(FactHub.rddID_in_stage.items()))
        for i in dict1:
            max_no_of_tasks = 0
            for j in dict1[i]:
                if FactHub.stage_no_of_tasks[j] > max_no_of_tasks:
                    max_no_of_tasks = FactHub.stage_no_of_tasks[j]
                    respective_stage = j
            FactHub.rdd_id_stage_with_max_tasks[i] = respective_stage
        for operator in FactHub.operator_partition_size:
            if operator in FactHub.rdd_id_stage_with_max_tasks.keys():
                stage_with_max_tasks = FactHub.rdd_id_stage_with_max_tasks[operator]
            for stage in FactHub.stage_i_operators_id:
                if operator in FactHub.stage_i_operators_id[stage]:
                    operator_s_stage = stage
            if operator_s_stage == stage_with_max_tasks:
                FactHub.rddID_size[operator] = FactHub.operator_partition_size[operator]
            else:
                FactHub.rddID_size[operator] = FactHub.operator_partition_size[operator] * (FactHub.stage_no_of_tasks[stage_with_max_tasks])
            
    def prepare_from_job_start_events(job_start_events):
        job_ids_list = job_start_events['Job ID'].tolist()
        job_stage_info_list = job_start_events['Stage Infos'].tolist()
        for job_num, job_rec in enumerate(job_stage_info_list):
            job_id = int(job_ids_list[job_num])
            FactHub.job_info_dect[job_id] = job_rec
            id_of_last_rdd_in_job = -1
            for stage_num, stage_rec in enumerate(job_rec):
                stage_id = int(stage_rec['Stage ID'])
                FactHub.stage_job_dect[stage_id] = job_id
                FactHub.stage_info_dect[stage_id] = stage_rec
                FactHub.stage_name_dect[stage_id] = stage_rec['Stage Name']
                # To print the number of tasks in each stage
                FactHub.stage_no_of_tasks[stage_id] = stage_rec['Number of Tasks']
                id_of_last_rdd_in_stage = -1
                for stage_rdd_num, stage_rdd_rec in enumerate(stage_rec['RDD Info']):
                    rdd_id = stage_rdd_rec['RDD ID']
                    is_cached = stage_rdd_rec['Storage Level']['Use Memory'] or stage_rdd_rec['Storage Level']['Use Disk']
                    FactHub.rdds_lst.append(Rdd(rdd_id, stage_rdd_rec['Name'] + '\n' + stage_rdd_rec['Callsite'], stage_rdd_rec['Parent IDs'], stage_id, job_id, is_cached))
                    if id_of_last_rdd_in_job < rdd_id:
                        id_of_last_rdd_in_job = rdd_id
                    if id_of_last_rdd_in_stage < rdd_id:
                        id_of_last_rdd_in_stage = rdd_id
                if stage_id in FactHub.submitted_stages:
                    FactHub.submitted_stage_last_rdd_dect[stage_id] = id_of_last_rdd_in_stage
            #Dictionary with job and last rdd along with stage name
            FactHub.job_last_rdd_dect[job_id] = (id_of_last_rdd_in_job, stage_rec['Stage Name'])
            #Dictionary with job and last rdd details without stage name
            FactHub.job_last_rdd[job_id] = (id_of_last_rdd_in_job)
        for i in FactHub.job_last_rdd_dect:
            for j in FactHub.stage_job_dect:
                if (FactHub.stage_job_dect[j] == i):
                    FactHub.job_last_stage[i] = j            
        
    def prepare_from_task_end_events(task_end_events):
        task_stage_ids_list = task_end_events['Stage ID'].tolist()
        task_operators_info_list = task_end_events['SparkIOperatorsDetails'].tolist()
        for index, task_operator_rec in enumerate(task_operators_info_list):
            task_stage_id = int(task_stage_ids_list[index])
            FactHub.stage_i_operator_dect[task_stage_id].append(task_operator_rec)  
        a = []
        operator_ids_list = []
        count = 0
        for task_stage_i in FactHub.stage_i_operator_dect.keys():
            length = len(FactHub.stage_i_operator_dect[task_stage_i])
            for lent in range(length):
                # To Print lists of operators details
                a = FactHub.stage_i_operator_dect[task_stage_i][lent]
                FactHub.taskid_operator_dect[count] = a
                count = count + 1
                for a_seperate_list in a:
                    if a_seperate_list['Operator ID'] in FactHub.operator_partition_size.keys():
                        temp_dict_value = FactHub.operator_partition_size[a_seperate_list['Operator ID']]
                        FactHub.operator_partition_size.update({a_seperate_list['Operator ID'] : (a_seperate_list['Partition Size'] + temp_dict_value)})
                    else:
                        FactHub.operator_partition_size[a_seperate_list['Operator ID']] = a_seperate_list['Partition Size']
                        FactHub.stage_i_operators_id[task_stage_i].append(a_seperate_list['Operator ID'])
                FactHub.stage_operator_partition[task_stage_i] = FactHub.operator_partition_size
        
    def prepare_from_stage_submitted_events(stage_submitted_events):
        for index, submitted_stage in enumerate(stage_submitted_events['Stage Info'].tolist()):
            FactHub.submitted_stages.add(submitted_stage['Stage ID'])
    
class Analyzer():

    def is_narrow_transformation(rdd_id, parent_id):
        rdd_stages_set = set()
        parent_stages_set = set()
        for rdd in FactHub.rdds_lst:
            if rdd.id == rdd_id:
                rdd_stages_set.add(rdd.stage_id)
            elif rdd.id == parent_id:
                parent_stages_set.add(rdd.stage_id)
        return len(Utility.intersection(rdd_stages_set, parent_stages_set)) != 0

    def prepare_transformations_lst():
        for rdd in FactHub.rdds_lst:
            for parent_id in rdd.parents_lst:
                AnalysisHub.transformations_set.add(Transformation(rdd.id, parent_id, Analyzer.is_narrow_transformation(rdd.id, parent_id)))

    def add_rdd_and_its_parents_if_it_is_computed_in_stage(rdd_id, stage_id):#recursive
        if rdd_id not in AnalysisHub.stage_used_rdds[stage_id]:
            AnalysisHub.rdd_num_of_usage[rdd_id] += 1
            AnalysisHub.stage_used_rdds[stage_id].add(rdd_id)            
        for rdd in FactHub.rdds_lst:
            if rdd.id == rdd_id: 
                if rdd.is_cached:
                    if rdd_id not in AnalysisHub.rdd_usage_lifetime_dict:
                        AnalysisHub.rdd_usage_lifetime_dict[rdd.id] = (rdd.stage_id, rdd.job_id, rdd.stage_id, rdd.job_id)
                    if AnalysisHub.rdd_usage_lifetime_dict[rdd_id][0] > stage_id:
                        AnalysisHub.rdd_usage_lifetime_dict[rdd.id] = (rdd.stage_id, rdd.job_id, AnalysisHub.rdd_usage_lifetime_dict[rdd_id][2], AnalysisHub.rdd_usage_lifetime_dict[rdd_id][3])
                    if AnalysisHub.rdd_usage_lifetime_dict[rdd_id][2] < stage_id:
                        AnalysisHub.rdd_usage_lifetime_dict[rdd.id] = (AnalysisHub.rdd_usage_lifetime_dict[rdd_id][0], AnalysisHub.rdd_usage_lifetime_dict[rdd_id][1], rdd.stage_id, rdd.job_id)
            if rdd.id == rdd_id: 
                if rdd.stage_id == stage_id:
                    if rdd.is_cached:
                        if rdd_id in AnalysisHub.computed_rdds: #already cached
                            return
                        AnalysisHub.computed_rdds.add(rdd_id) #cached for the first time
                        AnalysisHub.stage_computed_rdds[stage_id].add(rdd_id)
                    else:
                        if rdd_id in AnalysisHub.computed_rdds: #handeling unpersistance
                            AnalysisHub.computed_rdds.remove(rdd_id)
                        AnalysisHub.stage_computed_rdds[stage_id].add(rdd_id)
                    for parent_id in rdd.parents_lst:
                        if Analyzer.is_narrow_transformation(rdd.id, parent_id):
                            Analyzer.add_rdd_and_its_parents_if_it_is_computed_in_stage(parent_id, stage_id)

    def calc_num_of_computations_of_rdds():
        AnalysisHub.rdd_usage_lifetime_dict = {}
        for stage_id in sorted(FactHub.submitted_stage_last_rdd_dect):
            id_of_last_rdd_in_stage = FactHub.submitted_stage_last_rdd_dect[stage_id]
            AnalysisHub.stage_computed_rdds[stage_id] = set()
            AnalysisHub.stage_used_rdds[stage_id] = set()
            Analyzer.add_rdd_and_its_parents_if_it_is_computed_in_stage(id_of_last_rdd_in_stage, stage_id)            
        for stage_id in AnalysisHub.stage_computed_rdds:
            for rdd_id in AnalysisHub.stage_computed_rdds[stage_id]:
                AnalysisHub.rdd_num_of_computations[rdd_id] += 1

    def prepare_anomalies_dict():
        for rdd in FactHub.rdds_lst:
            rdd.name, rdd.is_cached, AnalysisHub.rdd_num_of_usage[rdd.id], AnalysisHub.rdd_num_of_computations[rdd.id]
            if rdd.is_cached and AnalysisHub.rdd_num_of_usage[rdd.id] <= int(config['Caching_Anomalies']['rdds_computation_tolerance_threshold']):
                AnalysisHub.anomalies_dict[rdd.id] = "unneeded cache"
            elif not rdd.is_cached and AnalysisHub.rdd_num_of_computations[rdd.id] > int(config['Caching_Anomalies']['rdds_computation_tolerance_threshold']):
                AnalysisHub.anomalies_dict[rdd.id] = "recomputation"

    def prepare_caching_plan():
        AnalysisHub.caching_plan_lst = []
        for rdd_id, rdd_usage_lifetime in AnalysisHub.rdd_usage_lifetime_dict.items():
            if config['Caching_Anomalies']['include_caching_anomalies_in_caching_plan'] == "true" or rdd_id not in AnalysisHub.anomalies_dict:
                AnalysisHub.caching_plan_lst.append(CachingPlanItem(rdd_usage_lifetime[0], rdd_usage_lifetime[1], rdd_id, True))
                AnalysisHub.caching_plan_lst.append(CachingPlanItem(rdd_usage_lifetime[2], rdd_usage_lifetime[3], rdd_id, False)) 
        AnalysisHub.memory_footprint_lst = []
        incremental_rdds_set = set()
        for caching_plan_item in sorted(AnalysisHub.caching_plan_lst):
            if caching_plan_item.is_cache_item:
                incremental_rdds_set.add(caching_plan_item.rdd_id)
            else:
                incremental_rdds_set.remove(caching_plan_item.rdd_id)
            AnalysisHub.memory_footprint_lst.append((caching_plan_item.job_id, caching_plan_item.stage_id, (incremental_rdds_set.copy())))
            
    def analyze_caching_anomalies():
        for rdd in FactHub.rdds_lst:
            if rdd.id in AnalysisHub.cached_rdds_set:
                rdd.is_cached = True
            if rdd.id in AnalysisHub.non_cached_rdds_set:
                rdd.is_cached = False
        Analyzer.calc_num_of_computations_of_rdds()
        Analyzer.prepare_anomalies_dict() 
        Analyzer.prepare_caching_plan() 

class SparkDataflowVisualizer():
    def init():
        #AnalysisHub.cached_rdds_set.clear()
        #AnalysisHub.non_cached_rdds_set.clear()
        FactHub.flush()
        #AnalysisHub.flush()
    
    def parse(raw_log_file):
        Parser.prepare(raw_log_file)
        
    def analyze():
        #AnalysisHub.flush()
        Analyzer.prepare_transformations_lst()
        Analyzer.analyze_caching_anomalies()

    def rdds_lst_refactor():
        for i, rdd in enumerate(FactHub.rdds_lst):
            if ("InstrumentedRDD" not in rdd.name):
                FactHub.rdds_lst_refactored.append(rdd)
            else:
                FactHub.rdds_lst_InstrumentedRdds_id.append(rdd.id)
                FactHub.rdds_lst_InstrumentedRdds.append(rdd)
        # Temporary list 't' will store all the cached rdds ids
        t = []
        for rdd in FactHub.rdds_lst:
            if rdd.is_cached:
                t.append(rdd.id)
        t = list(dict.fromkeys(t))
        # Temporary list 't1' will store the rdds all details with updated cache status
        t1 = []
        for rdd in FactHub.rdds_lst_refactored:
            #print("rdd id: " + str(rdd.id) + " cache: " + str(rdd.is_cached))
            if rdd.id+1 in t:
                t1.append(Rdd(rdd.id, rdd.name, rdd.parents_lst, rdd.stage_id, rdd.job_id, True))
            else:
                t1.append(Rdd(rdd.id, rdd.name, rdd.parents_lst, rdd.stage_id, rdd.job_id, False))
        # Flushing all the details in the 'FactHub.rdds_lst_refactored' list and dump it again with all the details from 't1'
        # in order to update the caching staus in the 'FactHub.rdds_lst_refactored' list and to show it in the DAG
        FactHub.rdds_lst_refactored = []
        for rdd in t1:
            FactHub.rdds_lst_refactored.append(rdd)
            
        temp = []
        for rdd in FactHub.rdds_lst_refactored:
            temp.append(rdd.id)
        temp = list(dict.fromkeys(sorted(temp)))
        temp1 = []
        temp1 = [temp.index(x) for x in temp]
        for x in temp:
            FactHub.rdds_lst_index_dict[x] = temp.index(x)
        for rdd in FactHub.rdds_lst_refactored:
            FactHub.rdds_lst_renumbered.append(Rdd(FactHub.rdds_lst_index_dict[rdd.id],rdd.name, rdd.parents_lst, rdd.stage_id, rdd.job_id, rdd.is_cached))
        
        # AnalysisHub.transformation_from_to has been used in at config['Drawing']['show_rdd_computation_time'], we need that dict in beforehand 
        # thats why the below loops have been implemented before "dot = grahpviz.Digraph" code line
        # to create AnalysisHub.transformation_without_i to contain rdd ids without instrumentation ids but will have duplicates
        for transformation in sorted(AnalysisHub.transformations_set):
            if transformation.to_rdd not in FactHub.rdds_lst_InstrumentedRdds_id and transformation.from_rdd not in FactHub.rdds_lst_InstrumentedRdds_id:
                AnalysisHub.transformation_without_i.append(TransformationWithoutI(transformation.from_rdd, transformation.to_rdd, Analyzer.is_narrow_transformation(transformation.from_rdd, transformation.to_rdd)))
            if transformation.from_rdd in FactHub.rdds_lst_InstrumentedRdds_id:
                AnalysisHub.transformation_without_i.append(TransformationWithoutI(transformation.from_rdd - 1, transformation.to_rdd, Analyzer.is_narrow_transformation(transformation.from_rdd, transformation.to_rdd)))
            if transformation.to_rdd in FactHub.rdds_lst_InstrumentedRdds_id:
                AnalysisHub.transformation_without_i.append(TransformationWithoutI(transformation.from_rdd, transformation.to_rdd - 1, Analyzer.is_narrow_transformation(transformation.from_rdd, transformation.to_rdd)))
        # To refactor AnalysisHub.transformation_without_i to remove duplicates
        temp_lst_for_transformation_without_i = []
        for transformation in AnalysisHub.transformation_without_i:
            if(transformation.to_rdd != transformation.from_rdd):
                temp_lst_for_transformation_without_i.append(transformation)
        AnalysisHub.transformation_without_i.clear()
        for transformation in temp_lst_for_transformation_without_i:
            AnalysisHub.transformation_without_i.append(transformation)
        for transformation in AnalysisHub.transformation_without_i:
            AnalysisHub.transformation_from_to[transformation.from_rdd] = transformation.to_rdd
        for transformation in AnalysisHub.transformation_without_i:
                AnalysisHub.transformation_to_from[transformation.to_rdd].append(transformation.from_rdd)
            
        # Recorrecting RDD's number of usages
        AnalysisHub.rdd_num_of_usage = defaultdict(int)
        for transformation in AnalysisHub.transformation_without_i:
            if transformation.to_rdd in AnalysisHub.rdd_num_of_usage.keys(): 
                AnalysisHub.rdd_num_of_usage[transformation.to_rdd] = AnalysisHub.rdd_num_of_usage[transformation.to_rdd] + 1 
            else:
                AnalysisHub.rdd_num_of_usage[transformation.to_rdd] = 1
        for value in FactHub.job_last_rdd.values():
            if value in AnalysisHub.rdd_num_of_usage.keys():  
                AnalysisHub.rdd_num_of_usage[value] = AnalysisHub.rdd_num_of_usage[value] + 1
            else:
                AnalysisHub.rdd_num_of_usage[value] = 1
        if not FactHub.shuffled_rdds_id:
            for i, rdd in enumerate(FactHub.rdds_lst):
                if "ShuffledRDD" in rdd.name or "ShuffledRowRDD" in rdd.name:
                    FactHub.shuffled_rdds_id.append(rdd.id)
        # Remove repeated elements from the list FactHub.shuffled_rdds_id
        FactHub.shuffled_rdds_id = list(set(FactHub.shuffled_rdds_id))
        for rdd_id in FactHub.shuffled_rdds_id:
            rdd_shuffled_time = 0
            for stage in FactHub.rddID_in_stage[rdd_id]:
                rdd_shuffled_time = rdd_shuffled_time + FactHub.stage_shuffle_time_dict[stage]
            FactHub.operator_timestamp[rdd_id] = rdd_shuffled_time
        '''
        # To recorrect caching plan list rdd's unpersist stage and job status
        cached_rdds = set()
        for caching_plan_item in sorted(AnalysisHub.caching_plan_lst):
            cached_rdds.add(caching_plan_item.rdd_id - 1)
        print("cached_rdds")
        print(cached_rdds)
        print("transformation_to_from")
        print(AnalysisHub.transformation_to_from)
        print("FactHub.rddID_in_stage")
        print(FactHub.rddID_in_stage)
        '''
        
        '''
        for cached_rdd in cached_rdds:
            for transformation in AnalysisHub.transformation_without_i:
                if transformation.to_rdd == cached_rdd:
                    while len(AnalysisHub.transformation_to_from[transformation.to_rdd]) != 0
                        previous_transformation.to_rdd = transformation.to_rdd
                        transformation.to_rdd = AnalysisHub.transformation_to_from[transformation.to_rdd][-1]
                        if transformation.to_rdd in cached_rdds:
                            occurance = 0
                            for caching_plan_item in sorted(AnalysisHub.caching_plan_lst):
                                if caching_plan_item.rdd_id == previous_transformation.to_rdd:
                                    occurance += 1
                                if caching_plan_item.rdd_id == previous_transformation.to_rdd and occurance == 1:
        '''
                                
    def cache_rdds_handling():
        #To store cached rdds seperately in AnalysisHub
        for rdd in FactHub.rdds_lst_refactored:
            if rdd.is_cached:
                AnalysisHub.cached_rdds_lst.append(FactHub.rdds_lst_index_dict[rdd.id])
        AnalysisHub.cached_rdds_lst = list(dict.fromkeys(AnalysisHub.cached_rdds_lst))
        print("AnalysisHub.cached_rdds_lst")
        print(AnalysisHub.cached_rdds_lst)
        
    def visualize_property_DAG():
        dot = graphviz.Digraph(strict=True, comment='Spark-Application-Graph', format = config['Output']['selected_format'])
        dot.attr('node', shape=config['Drawing']['rdd_shape'], label='this is graph')
        dot.node_attr={'shape': 'plaintext'}
        dot.edge_attr.update(arrowhead='normal', arrowsize='1')
        dag_rdds_set = set()
        prev_action_name = ""
        iterations_count = int(config['Drawing']['max_iterations_count']) 
        for job_id, job in sorted(FactHub.job_last_rdd_dect.items()):
            action_name = job[1]
            draw_iteration_indicator = False
            if action_name == prev_action_name:
                if iterations_count == 0:
                    continue
                iterations_count-=1
            else:
                iterations_count = int(config['Drawing']['max_iterations_count']) 
            for rdd in FactHub.rdds_lst_refactored:
                if rdd.job_id == job_id and rdd.id not in dag_rdds_set:
                    dag_rdds_set.add(rdd.id)
                    node_label = "\n"
                    if config['Drawing']['show_action_id'] == "true":
                        renumbered_rdd_id = FactHub.rdds_lst_index_dict[rdd.id]
                        node_label = "[" + str(renumbered_rdd_id) + "] "
                        #node_label = "[" + str(rdd.id) + "] "
                    if config['Drawing']['show_rdd_name'] == "true":
                        node_label = node_label + rdd.name[:int(config['Drawing']['rdd_name_max_number_of_chars'])]
                    if config['Drawing']['show_rdd_size'] == "true":
                        if rdd.id in FactHub.rddID_size:
                            #print("rdd id inside rddid_size", str(rdd.id))
                            size_in_mb = FactHub.rddID_size[rdd.id] / 1000000
                            rounded_size = round(size_in_mb,3)
                            if rounded_size >= 1024:
                                rounded_size = rounded_size / 1024
                                rounded_size = round(rounded_size,3)
                                node_label = node_label + "\nsize: " + str(rounded_size) + " gb"
                            else:
                                node_label = node_label + "\nsize: " + str(rounded_size) + " mb"
                        elif rdd.id in FactHub.root_rdd_size:
                            #print("rdd id inside root_rdd_size", str(rdd.id))
                            size_in_mb = FactHub.root_rdd_size[rdd.id] / 1000000
                            rounded_size = round(size_in_mb,3)
                            if rounded_size >= 1024:
                                rounded_size = rounded_size / 1024
                                rounded_size = round(rounded_size,3)
                                node_label = node_label + "\nsize: " + str(rounded_size) + " gb"
                            else:
                                node_label = node_label + "\nsize: " + str(rounded_size) + " mb"
                        elif rdd.id in FactHub.last_rdd_size:
                            #print("rdd id inside last_rdd_size", str(rdd.id))
                            size_in_mb = FactHub.last_rdd_size[rdd.id] / 1000000
                            rounded_size = round(size_in_mb,3)
                            if rounded_size >= 1024:
                                rounded_size = rounded_size / 1024
                                rounded_size = round(rounded_size,3)
                                node_label = node_label + "\nsize: " + str(rounded_size) + " gb"
                            else:
                                node_label = node_label + "\nsize: " + str(rounded_size) + " mb"

                    if config['Drawing']['show_rdd_computation_time'] == "true":
                        reach_time = 0
                        curr_rdd = rdd.id
                        while 1: #loop runs until it reaches root parent node or cached node while traversing a RDD's parents
                            if curr_rdd == 0 or curr_rdd not in AnalysisHub.transformation_from_to.keys():
                                break
                            prev_rdd = AnalysisHub.transformation_from_to[curr_rdd]
                            if curr_rdd in FactHub.operator_timestamp.keys():
                                reach_time = reach_time + FactHub.operator_timestamp[curr_rdd]
                            prev_rdd_index = FactHub.rdds_lst_index_dict[prev_rdd]
                            if prev_rdd_index in AnalysisHub.cached_rdds_lst or prev_rdd == 0:
                                break
                            curr_rdd = prev_rdd
                            
                        AnalysisHub.rdds_computation_time[rdd.id] = reach_time
                        if reach_time >= 1000: #1000 is milliseconds ~1sec
                            reach_time = reach_time / 1000
                            reach_time = round(reach_time,1)
                            if reach_time >= 60:
                                reach_time = reach_time / 60
                                reach_time = round(reach_time,1)
                                if reach_time >= 60:
                                    reach_time = reach_time / 60
                                    reach_time = round(reach_time,1)
                                    node_label = node_label + "\nComputation time ≈ " + str(reach_time) + " hr"
                                else:
                                    node_label = node_label + "\nComputation time ≈ " + str(reach_time) + " min"
                            else:
                                node_label = node_label + "\nComputation time ≈ " + str(reach_time) + " s"
                        else:
                            node_label = node_label + "\nComputation time ≈ " + str(reach_time) + " ms"
                    if config['Caching_Anomalies']['show_number_of_rdd_usage'] == "true":
                        node_label = node_label + "\nused: " + str(AnalysisHub.rdd_num_of_usage[rdd.id])
                    if config['Caching_Anomalies']['show_number_of_rdd_computations'] == "true":
                        node_label = node_label + "\ncomputed: " + str(AnalysisHub.rdd_num_of_computations[rdd.id])
                    if  config['Caching_Anomalies']['highlight_unneeded_cached_rdds'] == "true" and AnalysisHub.anomalies_dict.get(rdd.id, "") == "unneeded cache":
                        dot.node(str(rdd.id), penwidth = '3', fillcolor = config['Drawing']['cached_rdd_bg_color'], color = 'red', shape = config['Drawing']['anomaly_shape'], style = 'filled', label = node_label)
                    elif config['Caching_Anomalies']['highlight_recomputed_rdds'] == "true" and AnalysisHub.anomalies_dict.get(rdd.id, "") == "recomputation":
                        dot.node(str(rdd.id), penwidth = '3', fillcolor = 'white', color = 'red', shape = config['Drawing']['anomaly_shape'], style = 'filled', label = node_label)
                    else:
                        dot.node(str(rdd.id), fillcolor = config['Drawing']['cached_rdd_bg_color'] if FactHub.rdds_lst_index_dict[rdd.id] in AnalysisHub.cached_rdds_lst else 'white', style = 'filled', label = node_label)
                        
            action_lable = "" 
            if config['Drawing']['show_action_id'] == "true":
                action_lable = "[" + str(job_id) + "]"
            if config['Drawing']['show_action_name'] == "true":
                action_lable = action_lable + action_name[:int(config['Drawing']['action_name_max_number_of_chars'])]
            if draw_iteration_indicator == True:    
                draw_iteration_indicator = False
                continue
            dot.node("Action_" + str(job_id), shape=config['Drawing']['action_shape'] if iterations_count != 0 else config['Drawing']['iterative_action_shape'], fillcolor = config['Drawing']['action_bg_collor'] if iterations_count != 0 else config['Drawing']['iterative_action_collor'], style = 'filled', label = action_lable)
            dot.edge(str(job[0]), "Action_" + str(job_id), color = 'black', arrowhead = 'none', style = 'dashed')
            prev_action_name = action_name
        
        for transformation in AnalysisHub.transformation_without_i:
            if transformation.to_rdd in dag_rdds_set and transformation.from_rdd in dag_rdds_set:
                dot.edge(str(transformation.to_rdd), str(transformation.from_rdd), color = config['Drawing']['narrow_transformation_color'] if transformation.is_narrow else config['Drawing']['wide_transformation_color'])
        
        for transformation in AnalysisHub.transformation_without_i:
            #print(str(transformation.to_rdd) + " " + str(transformation.from_rdd))
            if transformation.to_rdd in dag_rdds_set and transformation.from_rdd in dag_rdds_set:
                if transformation.from_rdd in FactHub.operator_timestamp:
                    milliseconds = FactHub.operator_timestamp[transformation.from_rdd]
                    if milliseconds < 1000:
                        dot.edge(str(transformation.to_rdd), str(transformation.from_rdd), label = "  " + str(int(milliseconds)) + " ms")
                    else:
                        hours = milliseconds // (1000 * 60 * 60)
                        remaining_milliseconds = milliseconds % (1000 * 60 * 60)
                        minutes = remaining_milliseconds // (1000 * 60)
                        remaining_milliseconds %= (1000 * 60)
                        seconds = remaining_milliseconds // 1000
                        milliseconds %= 1000
                        if hours != 0 or minutes != 0:
                            if hours != 0:
                                dot.edge(str(transformation.to_rdd), str(transformation.from_rdd), label = "  " + str(int(hours)) + " h " + str(int(minutes)) + " m " + str(int(seconds)) + " s " + str(int(milliseconds)) + " ms")
                            else:
                                dot.edge(str(transformation.to_rdd), str(transformation.from_rdd), label = "  " + str(int(minutes)) + " m " + str(int(seconds)) + " s " + str(int(milliseconds)) + " ms")
                        else:
                            dot.edge(str(transformation.to_rdd), str(transformation.from_rdd), label = "  " + str(int(seconds)) + " s " + str(int(milliseconds)) + " ms")
            if transformation.from_rdd in FactHub.shuffled_rdds_id:
                #print("transformation.from_rdd:  " + str(transformation.from_rdd))
                rdd_shuffled_time = 0
                for stage in FactHub.rddID_in_stage[transformation.from_rdd]:
                    rdd_shuffled_time = rdd_shuffled_time + FactHub.stage_shuffle_time_dict[stage]
                if rdd_shuffled_time < 1000:
                    FactHub.operator_timestamp[transformation.from_rdd] = rdd_shuffled_time 
                    dot.edge(str(transformation.to_rdd), str(transformation.from_rdd), label = "  " + str(rdd_shuffled_time) + " ms")
                else:
                    FactHub.operator_timestamp[transformation.from_rdd] = rdd_shuffled_time
                    milliseconds = rdd_shuffled_time
                    hours = milliseconds // (1000 * 60 * 60)
                    remaining_milliseconds = milliseconds % (1000 * 60 * 60)
                    minutes = remaining_milliseconds // (1000 * 60)
                    remaining_milliseconds %= (1000 * 60)
                    seconds = remaining_milliseconds // 1000
                    milliseconds %= 1000
                    if hours != 0 or minutes != 0:
                        if hours != 0:
                            dot.edge(str(transformation.to_rdd), str(transformation.from_rdd), label = "  " + str(int(hours)) + " h " + str(int(minutes)) + " m " + str(int(seconds)) + " s " + str(int(milliseconds)) + " ms")
                        else:
                            dot.edge(str(transformation.to_rdd), str(transformation.from_rdd), label = "  " + str(int(minutes)) + " m " + str(int(seconds)) + " s " + str(int(milliseconds)) + " ms")
                    else:
                        dot.edge(str(transformation.to_rdd), str(transformation.from_rdd), label = "  " + str(int(seconds)) + " s " + str(int(milliseconds)) + " ms")
                                    
        caching_plan_label = "\nRecommended Schedule:\n"
        for caching_plan_item in sorted(AnalysisHub.caching_plan_lst):
            if caching_plan_item.is_cache_item:
                caching_plan_label += "\nCache "
            else:
                caching_plan_label += "\nUnpersist "
            #cache_rdd variable used below will store the correct rdd.id of the instrumented rdd's map partition's id after refactoring & renumbering the rdds list
            cache_rdd = 0
            if caching_plan_item.rdd_id-1 in FactHub.rdds_lst_index_dict.keys():
                cache_rdd = FactHub.rdds_lst_index_dict[caching_plan_item.rdd_id-1]#caching_plan_label += "RDD[" + str(caching_plan_item.rdd_id - 1) + "] " + ("at" if caching_plan_item.is_cache_item else "after") + " stage(" + str(caching_plan_item.stage_id) + ") in job(" + str(caching_plan_item.job_id) + ")\n"
            caching_plan_label += "RDD[" + str(cache_rdd) + "] " + ("at" if caching_plan_item.is_cache_item else "after") + " stage(" + str(caching_plan_item.stage_id) + ") in job(" + str(caching_plan_item.job_id) + ")\n"
        caching_plan_label += "\n"
        if len(AnalysisHub.caching_plan_lst) > 0 and config['Caching_Anomalies']['show_caching_plan'] == "true":
            dot.node("caching_plan", shape = 'note', fillcolor = 'lightgray', style = 'filled', label = caching_plan_label)
            
        memory_footprint_label = "\nMemory Footprint:\n"
        total_size = 0
        temp = set()
        for memory_footprint_item in AnalysisHub.memory_footprint_lst:
            for val in memory_footprint_item[2]:
                temp.add(val-1)
        for val in temp:
                if val in FactHub.rddID_size:
                    size_in_mb = FactHub.rddID_size[val] / 1000000
                    rounded_size = round(size_in_mb,3)
                elif val in FactHub.root_rdd_size:
                    size_in_mb = FactHub.root_rdd_size[val] / 1000000
                    rounded_size = round(size_in_mb,3)
                elif val in FactHub.last_rdd_size:
                    size_in_mb = FactHub.last_rdd_size[val] / 1000000
                    rounded_size = round(size_in_mb,3)
                total_size = total_size + rounded_size
        cached_rddsID = set()
        for memory_footprint_item in AnalysisHub.memory_footprint_lst:
            for val in memory_footprint_item[2]:
                cached_rddsID.add(FactHub.rdds_lst_index_dict[val-1])
        cached_rddsID = sorted(list(dict.fromkeys(cached_rddsID)))
        print("cached_rddsID")
        print(cached_rddsID)
        memory_footprint_label += "\n"
        if len(cached_rddsID) == 0:
            memory_footprint_label += "Free"
        else:
            memory_footprint_label += str(cached_rddsID)
        memory_footprint_label += "\n"
        memory_footprint_label += "\n"
        memory_footprint_label += "Total size of cached RDDs: " + str(total_size) + " mb"
        memory_footprint_label += "\n"
        memory_footprint_label += "\n"
        
        #FactHub.rddID_size[42] = 97000000
        #FactHub.operator_timestamp[56] = 67
        print("operator timestamp")
        print(FactHub.operator_timestamp)
        print("rdd size")
        print(FactHub.rddID_size)
        
        #Time model scenarios
        if len(cached_rddsID) > 0:
            for rdd_id in cached_rddsID:
                rdd_id = list(FactHub.rdds_lst_index_dict.keys())[list(FactHub.rdds_lst_index_dict.values()).index(rdd_id)]
                if rdd_id in FactHub.operator_timestamp:
                    #Time model scenario 1: If dont cache at all
                    AnalysisHub.time_model_scenario1[rdd_id] = AnalysisHub.rdd_num_of_usage[rdd_id] * FactHub.operator_timestamp[rdd_id]
                    #Time model scenario 2: If cache in memory
                    if AnalysisHub.rdd_num_of_usage[rdd_id] == 1:
                        #size and time information doesnt exist for some rdds due to missing instrumentation from log files
                        time_taken_to_write = FactHub.rddID_size[rdd_id] / AnalysisHub.memory_write_capacity
                        AnalysisHub.time_model_scenario2[rdd_id] = time_taken_to_write
                    else:
                        time_taken_to_write = FactHub.rddID_size[rdd_id] / AnalysisHub.memory_write_capacity
                        time_taken_to_read = FactHub.rddID_size[rdd_id] / AnalysisHub.memory_read_capacity
                        total_time_taken_to_read = (AnalysisHub.rdd_num_of_usage[rdd_id] - 1) * time_taken_to_read 
                        AnalysisHub.time_model_scenario2[rdd_id] = time_taken_to_write + total_time_taken_to_read
                    #Time model scenario 3: If cache in disk
                    if AnalysisHub.rdd_num_of_usage[rdd_id] == 1:
                        #size and time information doesnt exist for some rdds due to missing instrumentation from log files
                        time_taken_to_write = FactHub.rddID_size[rdd_id] / AnalysisHub.disk_write_capacity
                        AnalysisHub.time_model_scenario3[rdd_id] = time_taken_to_write
                    else:
                        time_taken_to_write = FactHub.rddID_size[rdd_id] / AnalysisHub.disk_write_capacity
                        time_taken_to_read = FactHub.rddID_size[rdd_id] / AnalysisHub.disk_read_capacity
                        total_time_taken_to_read = (AnalysisHub.rdd_num_of_usage[rdd_id] - 1) * time_taken_to_read 
                        AnalysisHub.time_model_scenario3[rdd_id] = time_taken_to_write + total_time_taken_to_read 
        
        print("AnalysisHub.time_model_scenario1")
        print(AnalysisHub.time_model_scenario1)
        print("AnalysisHub.time_model_scenario2")
        print(AnalysisHub.time_model_scenario2)
        print("AnalysisHub.time_model_scenario3")
        print(AnalysisHub.time_model_scenario3)
        '''
        #debugging for log file ends with 821
        print("FactHub.rddID_in_stage")
        print(FactHub.rddID_in_stage[54])
        print(FactHub.rddID_in_stage[56])
        print("stage 11 has what tasks")
        print(FactHub.stage_has_what_tasks[11])
        '''
        print("=============================")
        for caching_plan_item in sorted(AnalysisHub.caching_plan_lst):
            print("rdd_id: " + str(caching_plan_item.rdd_id) + " stage_id: " + str(caching_plan_item.stage_id) +  " job_id: " + str(caching_plan_item.job_id) + " cache_status: " + str(caching_plan_item.is_cache_item))
        print("=============================")
        # To change persist and unpersit order for an edge case-(unpersisting of RDD A happens immediatedly after persisting RDD B in same stage and job)
        modified_analysishub_caching_plan_lst = sorted(AnalysisHub.caching_plan_lst)
        for i, caching_plan_item in enumerate(modified_analysishub_caching_plan_lst):
            if i < len(AnalysisHub.caching_plan_lst) - 1:
                next_instance = modified_analysishub_caching_plan_lst[i+1]
                curr_item_rdd_id = caching_plan_item.rdd_id
                curr_item_stage = caching_plan_item.stage_id
                curr_item_job = caching_plan_item.job_id
                curr_item_cache = caching_plan_item.is_cache_item
                next_item_rdd_id = next_instance.rdd_id
                next_item_stage = next_instance.stage_id
                next_item_job = next_instance.job_id
                next_item_cache = next_instance.is_cache_item
                print("curr rdd id: " + str(caching_plan_item.rdd_id) + " curr_item_stage: " + str(curr_item_stage) + " curr_item_job: " + str(curr_item_job) + " curr_item_cache: " + str(curr_item_cache) + " | " + " next_instance: " + str(next_instance.rdd_id) + " next_item_stage: " + str(next_item_stage) + " next_item_job: " + str(next_item_job) + " next_item_cache: " + str(next_item_cache))
                if curr_item_cache == True and next_item_cache == False and curr_item_stage == next_item_stage and curr_item_job == next_item_job and curr_item_rdd_id != next_item_rdd_id:
                    instance_to_move = modified_analysishub_caching_plan_lst.pop(i+1)
                    modified_analysishub_caching_plan_lst.insert(i, instance_to_move)
        print("=============================")
        for caching_plan_item in modified_analysishub_caching_plan_lst:
            print("rdd_id: " + str(caching_plan_item.rdd_id) + " stage_id: " + str(caching_plan_item.stage_id) +  " job_id: " + str(caching_plan_item.job_id) + " cache_status: " + str(caching_plan_item.is_cache_item))
        
        
        #for caching_plan_item in modified_analysishub_caching_plan_lst:
        for caching_plan_item in modified_analysishub_caching_plan_lst:
            #print("rdd_id: " + str(caching_plan_item.rdd_id) + " stage_id: " + str(caching_plan_item.stage_id) +  " job_id: " + str(caching_plan_item.job_id) + " persist_status: " + str(caching_plan_item.persist_status))
            if caching_plan_item.is_cache_item:
                if AnalysisHub.time_model_scenario2[caching_plan_item.rdd_id-1] < AnalysisHub.time_model_scenario3[caching_plan_item.rdd_id-1]:
                    persist_status = "memory"
                elif AnalysisHub.time_model_scenario2[caching_plan_item.rdd_id-1] > AnalysisHub.time_model_scenario3[caching_plan_item.rdd_id-1]:
                    persist_status = "disk"
                AnalysisHub.rdd_cache_order.append(CachingPlanItemWithStorage(caching_plan_item.stage_id, caching_plan_item.job_id, caching_plan_item.rdd_id-1, caching_plan_item.is_cache_item, persist_status))
            else:
                persist_status = "unpersist"
                AnalysisHub.rdd_cache_order.append(CachingPlanItemWithStorage(caching_plan_item.stage_id, caching_plan_item.job_id, caching_plan_item.rdd_id-1, caching_plan_item.is_cache_item, persist_status))
        
        #for caching_plan_item in AnalysisHub.rdd_cache_order:
            #print("rdd_id: " + str(caching_plan_item.rdd_id) + " stage_id: " + str(caching_plan_item.stage_id) +  " job_id: " + str(caching_plan_item.job_id) + " persist_status: " + str(caching_plan_item.persist_status))
        
        '''
        print("AnalysisHub.rdd_cache_order")        
        for caching_plan_item in AnalysisHub.rdd_cache_order:
            print("caching_plan_item.rdd_id: " + str(caching_plan_item.rdd_id))
            print("caching_plan_item.is_cache_item: " + str(caching_plan_item.is_cache_item))
            print("caching_plan_item.stage_id: " + str(caching_plan_item.stage_id))
            print("caching_plan_item.job_id: " + str(caching_plan_item.job_id))
            print("caching_plan_item.persist_status: " + str(caching_plan_item.persist_status))
        '''
        
        def check_memory_management_list(rdd_id, stage_id, job_id):
            nonlocal memory_footprint_label
            low_priority_rdd = 0
            lowest_priority_rdd = 0
            for rdd in AnalysisHub.memory_management_list:
                if AnalysisHub.rdd_num_of_usage[rdd_id] > AnalysisHub.rdd_num_of_usage[rdd]:
                    low_priority_rdd = rdd
                if lowest_priority_rdd == 0:
                    lowest_priority_rdd = low_priority_rdd
            if lowest_priority_rdd != 0:
                for caching_plan_item in AnalysisHub.rdd_cache_order:
                    if caching_plan_item.rdd_id == lowest_priority_rdd:
                            AnalysisHub.memory_size = AnalysisHub.memory_size + FactHub.rddID_size[low_priority_rdd]
                            memory_footprint_label += "Unpersist RDD[" + str(FactHub.rdds_lst_index_dict[caching_plan_item.rdd_id]) + "] from "+ "memory at stage " + str(stage_id) + " job " + str(job_id) + "\n"
                            memory_footprint_label += "persist RDD[" + str(FactHub.rdds_lst_index_dict[caching_plan_item.rdd_id]) + "] from "+ "memory at disk " + str(stage_id) + " job " + str(job_id) + "\n"
                            AnalysisHub.persist_status_recorder[caching_plan_item.rdd_id] = "disk"
                            AnalysisHub.memory_management_list.remove(caching_plan_item.rdd_id)
                            AnalysisHub.disk_management_list.append(caching_plan_item.rdd_id)
                memory_footprint_label += "Persist RDD[" + str(FactHub.rdds_lst_index_dict[rdd_id]) + "] in "+ "memory at stage " + str(stage_id) + " job " + str(job_id) + "\n"
                AnalysisHub.persist_status_recorder[rdd_id] = "memory"
                AnalysisHub.memory_management_list.append(rdd_id)
            #Persist directly to disk if no low priority rdd found in memory
            if lowest_priority_rdd == 0:
                AnalysisHub.disk_size = AnalysisHub.disk_size - FactHub.rddID_size[rdd_id]
                memory_footprint_label += "Persist RDD[" + str(FactHub.rdds_lst_index_dict[rdd_id]) + "] in "+ "disk at stage " + str(stage_id) + " job " + str(job_id) + "\n"    
                AnalysisHub.persist_status_recorder[rdd_id] = "disk"
                AnalysisHub.disk_management_list.append(rdd_id)
            
        def check_disk_management_list(stage_id, job_id):
            nonlocal memory_footprint_label
            if len(AnalysisHub.disk_management_list) != 0:
                high_priority_rdd = AnalysisHub.disk_management_list[0]
                highest_priority_rdd = AnalysisHub.disk_management_list[0]
                if len(AnalysisHub.disk_management_list) > 1:
                    for rdd in AnalysisHub.disk_management_list[1:]:
                        if AnalysisHub.rdd_num_of_usage[rdd] > AnalysisHub.rdd_num_of_usage[high_priority_rdd]:
                            high_priority_rdd = rdd
                        if AnalysisHub.rdd_num_of_usage[high_priority_rdd] > AnalysisHub.rdd_num_of_usage[highest_priority_rdd]:
                            highest_priority_rdd = high_priority_rdd
                if FactHub.rddID_size[highest_priority_rdd] < AnalysisHub.memory_size:
                        AnalysisHub.disk_size = AnalysisHub.disk_size + FactHub.rddID_size[highest_priority_rdd]
                        memory_footprint_label += "Unpersist RDD[" + str(FactHub.rdds_lst_index_dict[highest_priority_rdd]) + "] from "+ "disk at stage " + str(stage_id) + " job " + str(job_id) + "\n"
                        AnalysisHub.disk_management_list.remove(highest_priority_rdd)
                        AnalysisHub.memory_size = AnalysisHub.memory_size - FactHub.rddID_size[highest_priority_rdd]
                        memory_footprint_label += "Persist RDD[" + str(FactHub.rdds_lst_index_dict[highest_priority_rdd]) + "] in "+ "memory at stage " + str(stage_id) + " job " + str(job_id) + "\n"
                        AnalysisHub.persist_status_recorder[highest_priority_rdd] = "memory"
                        AnalysisHub.memory_management_list.append(highest_priority_rdd)
                        
        for caching_plan_item in AnalysisHub.rdd_cache_order:
            rdd_id = caching_plan_item.rdd_id
            stage_id = caching_plan_item.stage_id
            job_id = caching_plan_item.job_id
            persist_status = caching_plan_item.persist_status
            #print("rdd_id: " + str(rdd_id) + " stage_id: " + str(stage_id) +  " job_id: " + str(job_id) + " persist_status: " + str(persist_status))
            if caching_plan_item.is_cache_item:
                if persist_status == "memory":
                    if FactHub.rddID_size[rdd_id] < AnalysisHub.memory_size:
                        AnalysisHub.memory_size = AnalysisHub.memory_size - FactHub.rddID_size[rdd_id] 
                        memory_footprint_label += "Persist RDD[" + str(FactHub.rdds_lst_index_dict[rdd_id]) + "] in "+ "memory at stage " + str(stage_id) + " job " + str(job_id) + "\n"
                        AnalysisHub.persist_status_recorder[rdd_id] = "memory"
                        AnalysisHub.memory_management_list.append(rdd_id)
                    else:
                        check_memory_management_list(rdd_id, stage_id, job_id)
                #Test case todo - what if disk has limited capacity
                elif persist_status == "disk":
                    AnalysisHub.disk_size = AnalysisHub.disk_size - FactHub.rddID_size[rdd_id]
                    memory_footprint_label += "Persist RDD[" + str(FactHub.rdds_lst_index_dict[rdd_id]) + "] in "+ "disk at stage " + str(stage_id) + " job " + str(job_id) + "\n"
                    AnalysisHub.persist_status_recorder[rdd_id] = "disk"
                    AnalysisHub.disk_management_list.append(rdd_id)
            else:
                #print("AnalysisHub.persist_status_recorder[rdd_id]: " + str(rdd_id) + " " + str(AnalysisHub.persist_status_recorder[rdd_id]))
                if AnalysisHub.persist_status_recorder[rdd_id] == "memory":
                    AnalysisHub.memory_size = AnalysisHub.memory_size + FactHub.rddID_size[rdd_id] 
                    memory_footprint_label += "UnPersist RDD[" + str(FactHub.rdds_lst_index_dict[rdd_id]) + "] from "+ "memory at stage " + str(stage_id) + " job " + str(job_id) + "\n"
                    del AnalysisHub.persist_status_recorder[rdd_id]
                    AnalysisHub.memory_management_list.remove(rdd_id)
                    check_disk_management_list(stage_id, job_id)
                elif AnalysisHub.persist_status_recorder[rdd_id] == "disk":
                    AnalysisHub.memory_size = AnalysisHub.disk_size + FactHub.rddID_size[rdd_id] 
                    memory_footprint_label += "UnPersist RDD[" + str(FactHub.rdds_lst_index_dict[rdd_id]) + "] from "+ "disk at stage " + str(stage_id) + " job " + str(job_id) + "\n"
                    del AnalysisHub.persist_status_recorder[rdd_id]
                    AnalysisHub.disk_management_list.remove(rdd_id)
            '''
            print("AnalysisHub.memory_size")
            print(AnalysisHub.memory_size)
            print("AnalysisHub.disk_size")
            print(AnalysisHub.disk_size)
            print("AnalysisHub.memory_management_list")
            print(AnalysisHub.memory_management_list)
            print("AnalysisHub.disk_management_list")
            print(AnalysisHub.disk_management_list)
            print("AnalysisHub.persist_status_recorder")
            print(AnalysisHub.persist_status_recorder)
            '''
        '''
        print("without sorted")
        for i, caching_plan_item in enumerate(AnalysisHub.caching_plan_lst):
            print("rdd id: " + str(caching_plan_item.rdd_id) + " stage: " + str(caching_plan_item.stage_id) + " job: " + str(caching_plan_item.job_id) + " cache: " + str(caching_plan_item.is_cache_item))
        print("with sorted")
        for i, caching_plan_item in enumerate(sorted(AnalysisHub.caching_plan_lst)):
            print("rdd id: " + str(caching_plan_item.rdd_id) + " stage: " + str(caching_plan_item.stage_id) + " job: " + str(caching_plan_item.job_id) + " cache: " + str(caching_plan_item.is_cache_item))
        '''
             
        if len(AnalysisHub.caching_plan_lst) > 0 and config['Caching_Anomalies']['show_memory_footprint'] == "true":
            dot.node("memory_footprint", shape = 'note', fillcolor = 'lightgray', style = 'filled', label = memory_footprint_label)
        dot.attr(labelloc="t")
        dot.attr(label=FactHub.app_name)
        dot.attr(fontsize='40')
        spark_dataflow_visualizer_output_path = Utility.get_absolute_path(config['Paths']['output_path'])
        output_file_name = re.sub('[^a-zA-Z0-9]+', '', FactHub.app_name)
        dot.render(spark_dataflow_visualizer_output_path + '/' + output_file_name, view=config['Output']['view_after_render'] == 'true')
        
        AnalysisHub.flush()
        
def load_facthub_data(file_name, log_file_path):
    facthubs_folder = "FactHubs"
    file_path = os.path.join(facthubs_folder, file_name+".pickle")
    
    if os.path.exists(file_path):
        #Load the data from the file
        with open(file_path, 'r') as f:
            data = json.load(f)
            FactHub.app_name = data['app_name']
            print(FactHub.app_name)
            FactHub.job_info_dect = {int(k): v for k, v in data['job_info_dect'].items()}
            FactHub.stage_info_dect = {int(k): v for k, v in data['stage_info_dect'].items()}
            FactHub.stage_job_dect = {int(k): v for k, v in data['stage_job_dect'].items()}
            FactHub.stage_name_dect = {int(k): v for k, v in data['stage_name_dect'].items()}
            FactHub.submitted_stage_last_rdd_dect = {int(k): v for k, v in data['submitted_stage_last_rdd_dect'].items()}
            FactHub.stage_no_of_tasks = {int(k): v for k, v in data['job_info_dect'].items()}
            FactHub.stage_i_operator_dect = {int(k): v for k, v in data['stage_i_operator_dect'].items()}
            FactHub.stage_i_operators_id = {int(k): v for k, v in data['stage_i_operators_id'].items()}
            FactHub.submitted_stages = set(data['submitted_stages'])
            FactHub.rdds_lst = [Rdd.from_json(rdd) for rdd in data['rdds_lst']]
            FactHub.operator_partition_size = {int(k): v for k, v in data['operator_partition_size'].items()}
            FactHub.rddID_in_stage = {int(k): v for k, v in data['rddID_in_stage'].items()}
            FactHub.stage_operator_partition = {int(k): v for k, v in data['stage_operator_partition'].items()}
            FactHub.stage_total = {int(k): v for k, v in data['stage_total'].items()}
            FactHub.job_last_rdd = {int(k): v for k, v in data['job_last_rdd'].items()}
            FactHub.job_last_rdd_dect = {int(k): v for k, v in data['job_last_rdd_dect'].items()}
            FactHub.job_last_stage = {int(k): v for k, v in data['job_last_stage'].items()}
            FactHub.rdd_id_stage_with_max_tasks = {int(k): v for k, v in data['rdd_id_stage_with_max_tasks'].items()}
            FactHub.task_in_which_stage = {int(k): v for k, v in data['task_in_which_stage'].items()}
            FactHub.stage_has_what_tasks ={int(k): v for k, v in data['stage_has_what_tasks'].items()}
            FactHub.rdds_lst_index_dict = {int(k): v for k, v in data['rdds_lst_index_dict'].items()}
            FactHub.taskid_launchtime = {int(k): v for k, v in data['taskid_launchtime'].items()}
            FactHub.taskid_finishtime = {int(k): v for k, v in data['taskid_finishtime'].items()}
            FactHub.taskid_last_processing_time = {int(k): v for k, v in data['taskid_finishtime'].items()}
            FactHub.taskid_operator_dect ={int(k): v for k, v in data['taskid_operator_dect'].items()}
            FactHub.root_rdd_size = {int(k): v for k, v in data['root_rdd_size'].items()}
            FactHub.rddID_size = {int(k): v for k, v in data['rddID_size'].items()}
            FactHub.last_rdd_size = {int(k): v for k, v in data['last_rdd_size'].items()}
            FactHub.operator_timestamp = {int(k): v for k, v in data['operator_timestamp'].items()}
            FactHub.rdds_lst_renumbered = [Rdd.from_json(rdd) for rdd in data['rdds_lst_renumbered']]
            FactHub.rdds_lst_refactored = [Rdd.from_json(rdd) for rdd in data['rdds_lst_refactored']]
            FactHub.rdds_lst_InstrumentedRdds = [Rdd.from_json(rdd) for rdd in data['rdds_lst_InstrumentedRdds']]
            FactHub.rdds_lst_InstrumentedRdds_id = [int(x) for x in data['rdds_lst_InstrumentedRdds_id']]
            FactHub.stage_shuffle_writetime_dict = {int(k): v for k, v in data['stage_shuffle_writetime_dict'].items()}
            FactHub.stage_shuffle_readtime_dict = {int(k): v for k, v in data['stage_shuffle_readtime_dict'].items()}
            FactHub.stage_shuffle_time_dict = {int(k): v for k, v in data['stage_shuffle_time_dict'].items()}
            FactHub.shuffled_rdds_id = [int(x) for x in data['shuffled_rdds_id']]
            
    else:
        print(FactHub.app_name)
        SparkDataflowVisualizer.init()
        SparkDataflowVisualizer.parse(log_file_path)
        SparkDataflowVisualizer.rdds_lst_refactor()
        # Create the FactHubs folder in the current directory if it doesn't exist
        if not os.path.exists(facthubs_folder):
            os.makedirs(facthubs_folder)
        # Serialize the instance data and write it to a file
        data = {
        "app_name": FactHub.app_name,
        "job_info_dect": FactHub.job_info_dect,
        "stage_info_dect": FactHub.stage_info_dect,
        "stage_job_dect": FactHub.stage_job_dect,
        "stage_name_dect": FactHub.stage_name_dect,
        "submitted_stage_last_rdd_dect": FactHub.submitted_stage_last_rdd_dect,
        "stage_no_of_tasks": FactHub.stage_no_of_tasks,
        "stage_i_operator_dect": dict(FactHub.stage_i_operator_dect),
        "stage_i_operators_id": dict(FactHub.stage_i_operators_id),
        "submitted_stages": list(FactHub.submitted_stages),
        "rdds_lst": [rdd.to_json() for rdd in FactHub.rdds_lst],
        "operator_partition_size": FactHub.operator_partition_size,
        "rddID_in_stage": dict(FactHub.rddID_in_stage),
        "stage_operator_partition": FactHub.stage_operator_partition,
        "stage_total": FactHub.stage_total,
        "job_last_rdd": FactHub.job_last_rdd,
        "job_last_rdd_dect": FactHub.job_last_rdd_dect,
        "job_last_stage": FactHub.job_last_stage,
        "rdd_id_stage_with_max_tasks": FactHub.rdd_id_stage_with_max_tasks,
        "task_in_which_stage": FactHub.task_in_which_stage,
        "stage_has_what_tasks": dict(FactHub.stage_has_what_tasks),
        "rdds_lst_index_dict": FactHub.rdds_lst_index_dict,
        "taskid_launchtime": FactHub.taskid_launchtime,
        "taskid_finishtime": FactHub.taskid_finishtime,
        "taskid_last_processing_time": dict(FactHub.taskid_last_processing_time),
        "taskid_operator_dect": dict(FactHub.taskid_operator_dect),
        "root_rdd_size": FactHub.root_rdd_size,
        "rddID_size": FactHub.rddID_size,
        "last_rdd_size": FactHub.last_rdd_size,
        "operator_timestamp": FactHub.operator_timestamp,
        "rdds_lst_renumbered": [rdd.to_json() for rdd in FactHub.rdds_lst_renumbered],
        "rdds_lst_refactored": [rdd.to_json() for rdd in FactHub.rdds_lst_refactored],
        "rdds_lst_InstrumentedRdds": [rdd.to_json() for rdd in FactHub.rdds_lst_InstrumentedRdds],
        "rdds_lst_InstrumentedRdds_id": [str(rdd) for rdd in FactHub.rdds_lst_InstrumentedRdds_id],
        "stage_shuffle_writetime_dict": FactHub.stage_shuffle_writetime_dict,
        "stage_shuffle_readtime_dict": FactHub.stage_shuffle_readtime_dict,
        "stage_shuffle_time_dict": FactHub.stage_shuffle_time_dict,
        "shuffled_rdds_id": [str(rdd) for rdd in FactHub.shuffled_rdds_id]
        }
        with open(file_path, 'w') as f:
            json.dump(data, f)

def load_file(file_name):
    spark_dataflow_visualizer_input_path = Utility.get_absolute_path(config['Paths']['input_path'])
    log_file_path = spark_dataflow_visualizer_input_path + '/' + file_name
    load_facthub_data(file_name, log_file_path)

def draw_DAG():
    SparkDataflowVisualizer.analyze() 
    SparkDataflowVisualizer.rdds_lst_refactor()
    SparkDataflowVisualizer.cache_rdds_handling()
    SparkDataflowVisualizer.visualize_property_DAG()
    #AnalysisHub.memory_footprint_lst = []
    #AnalysisHub.cached_rdds_set.clear()
    #AnalysisHub.non_cached_rdds_set.clear()
    #AnalysisHub.cached_rdds_lst = []
    
def cache(rdd_id):
    print("FactHub.rdds_lst_index_dict")
    print(FactHub.rdds_lst_index_dict)
    key = list(FactHub.rdds_lst_index_dict.keys())[list(FactHub.rdds_lst_index_dict.values()).index(rdd_id)]
    AnalysisHub.cached_rdds_set.add(key+1)
    AnalysisHub.non_cached_rdds_set.discard(key+1)
    print("AnalysisHub.cached_rdds_set")
    print(AnalysisHub.cached_rdds_set)
    print("AnalysisHub.non_cached_rdds_set")
    print(AnalysisHub.non_cached_rdds_set)
    #draw_DAG()
    
def dont_cache(rdd_id):
    key = list(FactHub.rdds_lst_index_dict.keys())[list(FactHub.rdds_lst_index_dict.values()).index(rdd_id)]
    AnalysisHub.non_cached_rdds_set.add(key+1)
    AnalysisHub.cached_rdds_set.discard(key+1)
    print("AnalysisHub.cached_rdds_set")
    print(AnalysisHub.cached_rdds_set)
    print("AnalysisHub.non_cached_rdds_set")
    print(AnalysisHub.non_cached_rdds_set)
    #SparkDataflowVisualizer.analyze() 
    #SparkDataflowVisualizer.rdds_lst_refactor()
    #SparkDataflowVisualizer.visualize_property_DAG()
    
def storage_levels(mem_size, mem_read_capacity, mem_write_capacity, disk_size, disk_read_capacity, disk_write_capacity):
    AnalysisHub.memory_size = mem_size * 1000000 #Convert MB to Bytes
    AnalysisHub.memory_read_capacity = mem_read_capacity * 1000000000 #Convert GB to Bytes
    AnalysisHub.memory_write_capacity = mem_write_capacity * 1000000000 #Convert GB to Bytes
    AnalysisHub.disk_size = disk_size * 1000000 #Convert MB to Bytes
    AnalysisHub.disk_read_capacity = disk_read_capacity * 1000000 #Convert MB to Bytes
    AnalysisHub.disk_write_capacity = disk_write_capacity * 1000000 #Convert MB to Bytes
    print("storage levels: ")
    print(AnalysisHub.memory_size)
    print(AnalysisHub.memory_read_capacity)
    print(AnalysisHub.memory_write_capacity)
    print(AnalysisHub.disk_size)
    print(AnalysisHub.disk_read_capacity)
    print(AnalysisHub.disk_write_capacity)
    SparkDataflowVisualizer.analyze() 
    SparkDataflowVisualizer.rdds_lst_refactor()
    SparkDataflowVisualizer.cache_rdds_handling()
    SparkDataflowVisualizer.visualize_property_DAG()

In [71]:
#load_file('application_1676451203570_1317_1.bin') # ---> linear regression
#load_file('application_1676451203570_2310_1')
#load_file('local-1671802911751')
#load_file('local-1659083213412')
load_file('local-1671802703821')
#load_file('application_1674642543712_0001.json')
#load_file('application_1669826994084_2167_1')
#load_file('application_1669826994084_2164_1')
#load_file('application_1669826994084_2172_1')
#load_file('application_1669826994084_2160_1')
#load_file('application_1669826994084_2165_1')
#load_file('application_1669826994084_2169_1')
#load_file('application_1669826994084_2171_1')
#load_file('application_1669826994084_2168_1')
#load_file('application_1669826994084_2162_1')
#load_file('application_1669826994084_2161_1')
#load_file('application_1669826994084_2170_1')
#load_file('application_1669826994084_2166_1')
#load_file('local-1671802703821') 
#load_file('local-1671802911751')
#load_file('application_1676451203570_0461_1.bin')
config.read('config.ini') 
config['Drawing']['show_rdd_size'] = 'true' 
config['Drawing']['show_rdd_name'] = 'true' 
config['Drawing']['show_rdd_computation_time'] = 'true' 
config['Caching_Anomalies']['highlight_unneeded_cached_rdds'] = 'true'
config['Caching_Anomalies']['highlight_recomputed_rdds'] = 'true'
config['Caching_Anomalies']['show_number_of_rdd_usage'] = 'true'
cache(17)
#dont_cache(11)
storage_levels(100,2,2,500,500,400)
#draw_DAG()
#no_rdds_cached_cost()
#flush_all()

SparseNaiveBayes with Params(hdfs://localhost:9000/HiBench/Bayes/Input,0,-1,1.0)
FactHub.rdds_lst_index_dict
{0: 0, 1: 1, 3: 2, 5: 3, 7: 4, 9: 5, 10: 6, 12: 7, 14: 8, 16: 9, 18: 10, 20: 11, 22: 12, 24: 13, 26: 14, 38: 15, 40: 16, 42: 17, 44: 18, 46: 19, 48: 20, 50: 21, 52: 22, 54: 23, 56: 24, 58: 25, 60: 26}
AnalysisHub.cached_rdds_set
{43}
AnalysisHub.non_cached_rdds_set
set()
storage levels: 
100000000
2000000000
2000000000
500000000
500000000
400000000
AnalysisHub.cached_rdds_lst
[9, 11, 17]
cached_rddsID
[9, 11, 17]
operator timestamp
{1: 3773, 3: 1057, 5: 33006, 7: 242735, 12: 162, 16: 8835, 20: 67, 22: 127, 26: 251, 38: 37, 40: 116, 42: 1264, 44: 26, 46: 36.9, 48: 104, 24: 38, 52: 4, 58: 159, 10: 0, 9: 1.1, 14: 13, 18: 402, 50: 7, 60: 4}
rdd size
{1: 252293472, 3: 371400000, 5: 591700000, 7: 996500000, 12: 67960, 16: 96800000, 20: 97000000, 22: 3964, 26: 78216832, 38: 79343552, 40: 160617600, 42: 41504, 44: 41568, 46: 20800, 48: 19744, 24: 8113, 52: 18827008, 58: 234240, 0: 19420

In [288]:
#toyexample
load_file('local-1688386299499')
#caching example with cache
#load_file('local-1690222360874')
#caching example without cache
#load_file('local-1690222655824')
#config['Drawing']['show_rdd_name'] = 'true'
draw_DAG()

toyexample
AnalysisHub.memory_footprint_lst
[(0, 0, {1}), (2, 2, {1, 4}), (2, 2, {4}), (4, 4, set())]


In [3]:
load_file('application_1641567765635_0023')
draw_DAG()

mllib.BisectingKMeansExample


In [6]:
load_file('application_1635092038229_0122')
config.read('config.ini')
config['Caching_Anomalies']['highlight_recomputed_rdds'] = 'true'
config['Caching_Anomalies']['highlight_unneeded_cached_rdds'] = 'true'
draw_DAG()

DenseKMeans with Params(hdfs://localhost:9000/HiBench/Kmeans/Input/samples,10,5,MEMORY_ONLY,Random)


In [5]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName("SparkUIExample").getOrCreate()

# Enable Spark UI and retrieve the URL
spark_ui_url = spark.sparkContext.uiWebUrl

# Perform your Spark operations
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])
df.show()

# Stop the SparkSession
spark.stop()

# Print the Spark UI URL
print("Spark UI URL:", spark_ui_url)


KeyboardInterrupt: 

In [9]:
dont_cache(29)

In [7]:
dont_cache(6)

In [10]:
config.read('config.ini')

['config.ini']

In [11]:
load_file('application_1641567765635_0161')
draw_DAG()

allshortestpaths.py


In [10]:
cache(35)
cache(53)

In [12]:
load_file('local-1641586266617')
draw_DAG()

cc.py


In [13]:
load_file('application_1635092038229_0130')
draw_DAG()

Gradient Boosted Tree with Params(2,30,32,20,0.1,hdfs://localhost:9000/HiBench/GBT/Input)


In [14]:
load_file('application_1635092038229_0126')
draw_DAG()

ALS with Params(hdfs://localhost:9000/HiBench/ALS/Input,10,0.1,10,2,2,true)


In [15]:
load_file('application_1635092038229_0144')
draw_DAG()

NWeightGraphX


In [16]:
load_file('application_1635092038229_0140')
draw_DAG()

SVM with Params(100,1.0,0.01,hdfs://localhost:9000/HiBench/SVM/Input,MEMORY_ONLY)


In [18]:
config['Drawing']['max_iterations_count'] = '5'
draw_DAG()

In [25]:
config['Caching_Anomalies']['rdds_computation_tolerance_threshold'] = '3'
draw_DAG()

In [19]:
dont_cache(217)

In [20]:
cache(6)

In [21]:
dont_cache(2)

In [29]:
cache(1)

In [22]:
config.read('config.ini')

['config.ini']

In [30]:
load_file('application_1635092038229_0124')
draw_DAG()

LogisticRegressionWithLBFGS


In [24]:
config['Drawing']['max_iterations_count'] = '5'
draw_DAG()

In [8]:
config['Caching_Anomalies']['rdds_computation_tolerance_threshold'] = '2'
draw_DAG()

In [28]:
config['Caching_Anomalies']['rdds_computation_tolerance_threshold'] = '3'
draw_DAG()

In [27]:
config.read('config.ini')
config['Output']['view_after_render'] = 'false'
config['Drawing']['max_iterations_count'] = '5'

file_list = os.listdir(config['Paths']['input_path'])
for os_file_name in file_list:
    load_file(os_file_name)
    draw_DAG()

ScalaSort
ScalaTeraSort
ScalaWordCount
ScalaAggregation
ScalaJoin
ScalaScan
ScalaPageRank
BayesDataGen with Params(hdfs://localhost:9000/HiBench/Bayes/Input,hdfs://localhost:9000/HiBench/Bayes/Input.parquet,-1,false,100,100,10)
NaiveBayesExample with Params(hdfs://localhost:9000/HiBench/Bayes/Input.parquet,1.0)
DenseKMeans with Params(hdfs://localhost:9000/HiBench/Kmeans/Input/samples,10,5,MEMORY_ONLY,Random)
LogisticRegressionWithLBFGS
ALS with Params(hdfs://localhost:9000/HiBench/ALS/Input,10,0.1,10,2,2,true)
PCAExample
Gradient Boosted Tree with Params(2,30,32,20,0.1,hdfs://localhost:9000/HiBench/GBT/Input)
RFC with Params(hdfs://localhost:9000/HiBench/RF/Input,100,2,auto,gini,4,32)
SVD with Params(1000,800,true,1g,hdfs://localhost:9000/HiBench/SVD/Input)
LinearRegressionWithElasticNet
LDA Example with Params(hdfs://localhost:9000/HiBench/LDA/Input,hdfs://localhost:9000/HiBench/LDA/Output,10,10,online,1g)
SVM with Params(100,1.0,0.01,hdfs://localhost:9000/HiBench/SVM/Input,MEMORY_ON