In [1]:
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

In [2]:
log_df = pd.read_json("../cluster_logs/eventlogs/2222592368926707400/eventlog.json", lines=True)

In [3]:
log_df['Event'].unique()

array(['DBCEventLoggingListenerMetadata', 'SparkListenerExecutorAdded',
       'SparkListenerBlockManagerAdded', 'SparkListenerEnvironmentUpdate',
       'SparkListenerApplicationStart', 'SparkListenerJobStart',
       'SparkListenerStageSubmitted', 'SparkListenerTaskStart',
       'SparkListenerTaskEnd', 'SparkListenerStageCompleted',
       'SparkListenerTaskGettingResult', 'SparkListenerJobEnd'],
      dtype=object)

In [4]:
def filter_df(event_types, log_df):
    
    df_list = []
    for event_type in event_types: 
        df = log_df[log_df['Event'] == event_type]
        df.dropna(axis=1, how='all', inplace=True)
        df_list.append(df)
    
    return df_list

In [5]:
event_types = ['SparkListenerJobStart', 'SparkListenerJobEnd']
job_list_df = filter_df(event_types, log_df)
job_df = job_list_df[0].merge(job_list_df[1], on=['Job ID'])

In [6]:
def process_job_df(job_df):
    
    job_df['Duration'] = (job_df['Completion Time'] - job_df['Submission Time']) / 1000
    job_df['Job ID'] = job_df['Job ID'].astype(int)
    job_df.set_index(['Job ID'], inplace=True)
    
    stage_df_list = []
    for index, row in job_df.iterrows():
        tmp_df = row['Stage Infos']
        tmp_df = pd.DataFrame(tmp_df)
        tmp_df['Job ID'] = index
        stage_df_list.append(tmp_df)
    
    stage_df = pd.concat(stage_df_list)
    stage_df.set_index(['Stage ID'], inplace=True)
    stage_df = stage_df[['Job ID']]
    return job_df, stage_df

In [7]:
job_df, stage_df = process_job_df(job_df)

In [8]:
job_df.head()

Unnamed: 0_level_0,Event_x,Submission Time,Stage Infos,Stage IDs,Properties,Event_y,Completion Time,Job Result,Duration
Job ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
0,SparkListenerJobStart,1614261000000.0,"[{'Stage ID': 0, 'Stage Attempt ID': 0, 'Stage...","[0, 1, 2]",{'spark.scheduler.pool': '3563561430952536780'...,SparkListenerJobEnd,1614261000000.0,{'Result': 'JobSucceeded'},42.414
1,SparkListenerJobStart,1614261000000.0,"[{'Stage ID': 5, 'Stage Attempt ID': 0, 'Stage...","[5, 6, 3, 7, 4]",{'spark.scheduler.pool': '3563561430952536780'...,SparkListenerJobEnd,1614261000000.0,{'Result': 'JobSucceeded'},58.101
2,SparkListenerJobStart,1614261000000.0,"[{'Stage ID': 12, 'Stage Attempt ID': 0, 'Stag...","[12, 9, 13, 10, 14, 11, 8]",{'spark.scheduler.pool': '3563561430952536780'...,SparkListenerJobEnd,1614261000000.0,{'Result': 'JobSucceeded'},161.005
3,SparkListenerJobStart,1614261000000.0,"[{'Stage ID': 15, 'Stage Attempt ID': 0, 'Stag...","[15, 19, 16, 20, 17, 21, 18, 22, 23]",{'spark.scheduler.pool': '3563561430952536780'...,SparkListenerJobEnd,1614261000000.0,{'Result': 'JobSucceeded'},54.654
4,SparkListenerJobStart,1614261000000.0,"[{'Stage ID': 33, 'Stage Attempt ID': 0, 'Stag...","[33, 30, 27, 34, 31, 32, 24, 28, 25, 29, 26]",{'spark.scheduler.pool': '3563561430952536780'...,SparkListenerJobEnd,1614261000000.0,{'Result': 'JobSucceeded'},9.891


In [11]:
def process_stage_df(stage_list_df):
    
    ret_list_info = []
    ret_list_rdd = []
    rdd_info_list = []
    for stage_df in stage_list_df:
                
        info_df_list = []
        for index, row in stage_df.iterrows():
            
            tmp_df = row['Stage Info']
                        
            rdd_info = tmp_df['RDD Info']
#             print(rdd_info)
            rdd_info_df = pd.DataFrame(rdd_info)
#             print(rdd_info_df)
            rdd_info_df['Stage ID'] = tmp_df['Stage ID']
            rdd_info_list.append(rdd_info_df)
            
            tmp_df = pd.DataFrame.from_dict(tmp_df, orient='index')
            tmp_df = tmp_df.transpose()
            tmp_df.set_index(['Stage ID'], inplace=True)
            info_df_list.append(tmp_df)
            
        info_df = pd.concat(info_df_list)
        rdd_info_df_ret = pd.concat(rdd_info_list)
        
        ret_list_info.append(info_df)
        ret_list_rdd.append(rdd_info_df_ret)
        
    return ret_list_info, ret_list_rdd

In [13]:
stages = ['SparkListenerStageSubmitted', 'SparkListenerStageCompleted']
stage_list_df = filter_df(stages, log_df)
# stage_list_df[0]['Stage Info']
info_df_list, rdd_info_list = process_stage_df(stage_list_df)

stage_df = stage_df.merge(info_df_list[1], left_index=True, right_index=True)
stage_df['Duration'] = (stage_df['Completion Time'] - stage_df['Submission Time']) / 1000

rdd_info_list[0].head()

KeyError: 'Completion Time'

In [None]:
task_types = ['SparkListenerTaskStart', 'SparkListenerTaskEnd', 'SparkListenerTaskGettingResult']
tasks_df_list = filter_df(task_types, log_df)
tasks_df_list[1].info()

In [None]:
def process_tasks(task_df_list):
    
    ret_list = []
    for task_df in task_df_list:
        
        info_df_list = []
        for index, row in task_df.iterrows():
            
            tmp_df = row['Task Info']
            tmp_df = pd.DataFrame.from_dict(tmp_df, orient='index')
            tmp_df = tmp_df.transpose()
            tmp_df['Stage ID'] = int(row['Stage ID'])
            tmp_df.set_index(['Task ID'], inplace=True)
            info_df_list.append(tmp_df)
            
        info_df = pd.concat(info_df_list)
        ret_list.append(info_df)
        
    return ret_list

In [None]:
task_list = process_tasks(tasks_df_list[0:2])

In [None]:
# print(task_list[0].info())
# print(task_list[1].info())
task_list[1].head()

In [None]:
rdd_infos_0_0 = pd.DataFrame(stage_df_0.loc[0, 'RDD Info'])
rdd_infos_0_0

In [None]:
stages = ['SparkListenerStageSubmitted', 'SparkListenerStageCompleted']
stage_start_df = log_df[log_df['Event'] == stages[0]]
stage_end_df = log_df[log_df['Event'] == stages[1]]
stage_result_df = log_df[log_df['Event'] == stages[2]]

stage_start_df.dropna(axis=1, how='all', inplace=True)
stage_end_df.dropna(axis=1, how='all', inplace=True)
stage_result_df.dropna(axis=1, how='all', inplace=True)
# task_df = task_start_df.merge(task_end_df, on=['Stage ID'])
# task_df
stage_result_df

In [None]:
[    
    {'RDD ID': 0, 'Name': 'dbfs:/FileStore/graph_10M.txt', 'Scope': '{"id":"0","name":"textFile"}', 'Callsite': 'textFile at NativeMethodAccessorImpl.java:0', 'Parent IDs': [], 
     'Storage Level':
         {'Use Disk': False, 'Use Memory': False, 'Deserialized': False, 'Replication': 1},
     'Barrier': False, 'Number of Partitions': 2, 'Number of Cached Partitions': 0, 'Memory Size': 0, 'Disk Size': 0},
    
    {'RDD ID': 1, 'Name': 'dbfs:/FileStore/graph_10M.txt', 'Scope': '{"id":"0","name":"textFile"}', 'Callsite': 'textFile at NativeMethodAccessorImpl.java:0', 'Parent IDs': [0],
     'Storage Level':
         {'Use Disk': False, 'Use Memory': False, 'Deserialized': False, 'Replication': 1},
     'Barrier': False, 'Number of Partitions': 2, 'Number of Cached Partitions': 0, 'Memory Size': 0, 'Disk Size': 0}
    
    {'RDD ID': 2, 'Name': 'PythonRDD', 'Callsite': 'groupByKey at <command-193306450925007>:32', 'Parent IDs': [1],
     'Storage Level': 
         {'Use Disk': False, 'Use Memory': False, 'Deserialized': False, 'Replication': 1},
     'Barrier': False, 'Number of Partitions': 2, 'Number of Cached Partitions': 0, 'Memory Size': 0, 'Disk Size': 0},
    
    {'RDD ID': 3, 'Name': 'PairwiseRDD', 'Callsite': 'groupByKey at <command-193306450925007>:32', 'Parent IDs': [2], 
     'Storage Level': 
         {'Use Disk': False, 'Use Memory': False, 'Deserialized': False, 'Replication': 1},
     'Barrier': False, 'Number of Partitions': 2, 'Number of Cached Partitions': 0, 'Memory Size': 0, 'Disk Size': 0
    }, 
]



