In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

def print_basic_stats( jobs_df, workflow_df, resumes_df ):
    workflow_count = len(jobs_df['dag_id'].unique())
    print( f"Total number of workflows: {workflow_count}")
    
    successful_job_count = len(jobs_df.loc[jobs_df['status']=='D'])
    print( f"Total number of successful jobs: {successful_job_count}")
    
    earliest_status_date = workflow_df['status_date'].min()
    print( f"Earliest workflow status date: {earliest_status_date}")
    
    last_status_date = workflow_df['status_date'].max()
    print( f"Last workflow status date: {last_status_date}")
    
    max_job_count = workflow_df['number_of_jobs'].max()
    print( f"Max number of jobs in workflow: {max_job_count}")
    
    done_df = workflow_df.loc[(workflow_df['status']=='D')]
    median_job_count = done_df['number_of_jobs'].median()
    print( f"Median number of jobs in workflow: {median_job_count}")
    
    retried_df = jobs_df.loc[(jobs_df['status']=='D') & (jobs_df['num_attempts'] > 1)]
    immediate_df = jobs_df.loc[(jobs_df['status']=='D') & (jobs_df['num_attempts'] == 1)]
    print( "Immediate shape {i}, retry shape {r}".format(i=immediate_df.shape, r=retried_df.shape))
    # The following code not good due to bad data pull.
    # workflowids and workflow_run_ids not unique across databases.
    # Was not used in the paper
#     good_df=resumes_df.loc[resumes_df['w_status']=='D']
#     good_df['unique-key']=good_df['database'].astype(str) + "_" + good_df['w_id'].astype(str)

def plot_workflow_sizes( workflow_df ):
    """Bucket the workflows by number of jobs, too inefficient for millions of jobs"""
    job_counts = workflow_df['number_of_jobs']
    plt.hist(job_counts, bins=100)
    plt.ylabel('Counts')
    plt.title('Jobs per workflow')
    
def plot_retry_rates( jobs_df ):
    """Bucket number of retries"""
    done_jobs_after_retry = jobs_df.loc[(jobs_df['status']=='D') & (jobs_df['num_attempts'] > 1)]
    attempts = done_jobs_after_retry['num_attempts']
    plt.hist(attempts, bins=30)
    plt.ylabel('Counts')
    plt.title('#Attempts for Jobs that suceeded after the 1st attempt')
    
def plot_retry_rates_per_month( jobs_df ):
    """Plot %-rate of retries per month"""
    jobs_df['year'] = jobs_df['date'].dt.year
    jobs_df['month'] = jobs_df['date'].dt.month
    jobs_df['month_index'] = (jobs_df['year']-2018)*12 + jobs_df['month'] - 4
    retried_df = jobs_df.loc[(jobs_df['status']=='D') & (jobs_df['num_attempts'] > 1)]
    g_retried_df = retried_df.groupby(['month_index']).count()[['num_attempts']]
    immediate_df = jobs_df.loc[(jobs_df['status']=='D') & (jobs_df['num_attempts'] == 1)]
    g_immediate_df = immediate_df.groupby(['month_index']).count()[['num_attempts']]
    merge_df = g_immediate_df.join(g_retried_df, lsuffix='_immediate', rsuffix='_retried')
    merge_df['rate'] = 100 * merge_df['num_attempts_retried'] / merge_df['num_attempts_immediate']   
    rate_df = merge_df['rate']
    
    total_retries = retried_df.count()[['num_attempts']]
    total_immediate = immediate_df.count()[['num_attempts']]
    
    r,_ = retried_df.shape
    i,_= immediate_df.shape
    percentage = (100*r)/i
    print( f"Overall Percentage retry rate {percentage}")
    
    print('Calculations complete')

    fig, ax = plt.subplots()
    plt.plot(rate_df, color="black")
    plt.ylabel('%-Rate')
    plt.xlabel('Month')
    ax.set_xticks([x for x in range(12)])
    x_ticks_labels = ['A','M', 'J', 'J','A','S','O','N','D','J','F','M']
    ax.set_xticklabels(x_ticks_labels, rotation='vertical', fontsize=14)
    plt.title('Retry %-rate per month from May 2018')
    plt.show()
    
def plot_retries_per_workflow( jobs_df ):
    """Which were the most unreliable workflow? Not useful, abandonded. Currently shows totals, not rate
    For rate, copy the method used in the by-month graph"""
    done_jobs = jobs_df.loc[(jobs_df['status']=='D') & (jobs_df['num_attempts'] > 1)]
    done_by_workflow = done_jobs.groupby(['dag_id']).count()[['num_attempts']]
    
    fig, ax = plt.subplots()
    plt.plot(done_by_workflow, color="black")
    plt.ylabel('%-Rate')
    plt.ylabel('Counts')
    plt.title('Retry Counts by Workflow')
    plt.show()
    
def max_attempts_per_workflow( jobs_df ):
    """Bucket the workflows by number of jobs"""
    done_jobs = jobs_df.loc[(jobs_df['status']=='D') ]
    done_by_workflow = done_jobs.groupby(['dag_id']).max()
    attempts_by_workflow = done_by_workflow['num_attempts']
    plt.hist(attempts_by_workflow, bins=5)
    plt.ylabel('Counts')
    plt.title('Max Attempts Counts by Workflow')
    
def clean_read( name, filepath, status_date_column_name ):
    print( f"Reading {name}")
    df = pd.read_hdf( filepath, key='counts', mode='r')
    df['date'] = pd.to_datetime(df[status_date_column_name])
    print( "  {name} columns {c}".format( name=name, c=list(df.columns) ))
    print( "  {name} shape {s}".format(name=name, s=df.shape))
    return df
    
def load():
    root_path = '/ihme/homes/gphipps/results'
    workflow_df = clean_read('workflow', f"{root_path}/stats_workflows_all.h5", 'status_date' )
    jobs_df = clean_read('jobs', f"{root_path}/stats_jobs_all.h5", 'status_date' )
    resumes_df = clean_read('resumes', f"{root_path}/stats_resumes_all.h5", 'wr_status_date' )
    return workflow_df, jobs_df, resumes_df
    

In [None]:
# Read the files once, as globals
workflow_df, jobs_df, resumes_df = load()

In [None]:
print_basic_stats( jobs_df, workflow_df, resumes_df  )

In [None]:
#Too inefficient and not needed
#plot_workflow_sizes( workflow_df )

In [None]:
#Too inefficient and not needed
#plot_retry_rates( jobs_df )

In [None]:
plot_retry_rates_per_month( jobs_df )

In [None]:
#Not useful, left here as an example
plot_retries_per_workflow( jobs_df )

In [None]:
max_attempts_per_workflow( jobs_df )