# Plot Resource Monitoring Data

This notebook visualizes data from a BQ dataset that holds workflow resource usage monitoring data. Please refer to the Readme for what data are collected and how they are collected. The notebook will produce three plots, one PDF and two interactive HTML reports.

The following need to be provided to the notebook: 
- workflow and sub-workflow IDs
- workflow execution dates
- where to store the output

## Import packages

In [None]:
import datetime
from datetime import timedelta
import concurrent.futures
import json
import re
import logging
import os
from os import path
import sys
import warnings

import numpy as np
import pandas as pd
import statistics
from math import isnan, pi

import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages

import bokeh.io 
from bokeh import *
from bokeh.io import output_notebook, output_file, show 
from bokeh.models import Span, HoverTool, Title, Label, Legend, LegendItem, ColumnDataSource, Div
from bokeh.models.widgets import DataTable, DateFormatter, TableColumn
from bokeh.plotting import figure
from bokeh.layouts import column, layout
from bokeh.palettes import Category20c
from bokeh.transform import cumsum
import seaborn as sns

import google.auth
from google.cloud import bigquery

## Query BQ database

The QueryBQToMonitor class contains the query scripts for the three different BQ tables (Metrics, Runtime, Cromwell Metadata). Its takes in a workflow ID and IDs of subworkflows (if any) and estimated dates when the job was submitted and successfully finished. It uses these parameters to query the BQ tables and produces a pandas datafram. 

By default the the queury searches in the `broad-dsde-methods` BQ project. If needed change this to the project the workflow was run on or the project you expect the monitoirng tables to be saved in. 

In [None]:
class QueryBQToMonitor:
    
    def __init__(self, workflowids, days_back_upper_bound, days_back_lower_bound):
        
        self.logger = logging.getLogger()
        self.logger.setLevel(logging.INFO)

        h = logging.StreamHandler(sys.stderr)
        h.flush = sys.stderr.flush
        self.logger.addHandler(h)
        
        self.formated_workflow_ids = ','.join(workflow_ids)
        
        self.days_back_upper_bound = days_back_upper_bound
        self.days_back_lower_bound = days_back_lower_bound
        
        # Explicitly create a credentials object. This allows you to use the same
        # credentials for both the BigQuery and BigQuery Storage clients, avoiding
        # unnecessary API calls to fetch duplicate authentication tokens.
        credentials, project_id = google.auth.default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )

        # Make clients.
        self.bq_client = bigquery.Client(credentials=credentials, project=project_id )
        
    def query(self):
        self._get_runtime_and_metadata()
        self._get_metrics()
    
    def _get_runtime_and_metadata(self):
        
        self._fetch_runtime()
        self._fetch_metadata()
        
        runtime_nrow, runtime_ncol = self.runtime.shape
        meta_nrow, meta_ncol = self.metadata.shape
        
        # basic QC
        if (meta_nrow != runtime_nrow):
            self.logger.warning('Metadata and runtime number of rows are different. You might want to check.')
        summary_msg = f"Nrows of runtime: {runtime_nrow}, Ncols of runtime: {runtime_ncol}, \nNrows of meta: {meta_nrow}, Ncols of meta: {meta_ncol}"
        self.logger.info(summary_msg)
        
        # merge the two
        self.metadata_runtime = pd.merge(self.metadata, self.runtime, left_on='meta_instance_name', right_on='runtime_instance_name')
        print()
        self.metadata_runtime.runtime_task_call_name.describe()

    def _fetch_runtime(self):
        # query runtime data
        runtime_sql = f"""

        SELECT

          runtime.attempt AS runtime_attempt,
          runtime.cpu_count AS runtime_cpu_count,
          runtime.cpu_platform AS runtime_cpu_platform,
          runtime.disk_mounts AS runtime_disk_mounts,
          runtime.disk_total_gb AS runtime_disk_total_gb,
          runtime.instance_id AS runtime_instance_id,
          runtime.instance_name AS runtime_instance_name,
          runtime.mem_total_gb AS runtime_mem_total_gb,
          runtime.preemptible AS runtime_preemptible,
          runtime.project_id AS runtime_project_id,
          runtime.shard AS runtime_shard,
          runtime.start_time AS runtime_start_time,
          runtime.task_call_name AS runtime_task_call_name,
          runtime.workflow_id AS runtime_workflow_id,
          runtime.zone AS runtime_zone

        FROM
          `broad-dsde-methods.cromwell_monitoring.runtime`  runtime 

        WHERE
              DATE(runtime.start_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL {self.days_back_upper_bound} DAY)
          AND DATE(runtime.start_time) <= DATE_SUB(CURRENT_DATE(), INTERVAL {self.days_back_lower_bound} DAY)

          AND runtime.workflow_id IN ({self.formated_workflow_ids})    
        """
        self.runtime = self.bq_client.query(query = runtime_sql).to_dataframe()
        self.logger.info("Fetched runtime table.")
    
    def _fetch_metadata(self):
        # query metadata table
        metadata_sql = f"""

        SELECT
          metadata.attempt AS meta_attempt,
          metadata.cpu_count AS meta_cpu,
          metadata.disk_mounts AS meta_disk_mounts,
          metadata.disk_total_gb AS meta_disk_total_gb,
          metadata.disk_types AS meta_disk_types,
          metadata.docker_image AS meta_docker_image,
          metadata.end_time AS meta_end_time,
          metadata.execution_status AS meta_execution_status,
          metadata.inputs AS meta_inputs,
          metadata.instance_name AS meta_instance_name,
          metadata.mem_total_gb AS meta_mem_total_gb,
          metadata.preemptible AS meta_preemptible,
          metadata.project_id AS meta_project_id,
          metadata.shard AS meta_shard,
          metadata.start_time AS meta_start_time,
          metadata.task_call_name AS meta_task_call_name,
          metadata.workflow_id AS meta_workflow_id,
          metadata.workflow_name AS meta_workflow_name,
          metadata.zone AS meta_zone,
          TIMESTAMP_DIFF(metadata.end_time, metadata.start_time, SECOND) meta_duration_sec

        FROM
          `broad-dsde-methods.cromwell_monitoring.metadata` metadata

        WHERE
              DATE(metadata.start_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL {self.days_back_upper_bound} DAY)
          AND DATE(metadata.start_time) <= DATE_SUB(CURRENT_DATE(), INTERVAL {self.days_back_lower_bound} DAY)

          AND metadata.workflow_id IN ({self.formated_workflow_ids})    
        """
        self.metadata = self.bq_client.query(query = metadata_sql).to_dataframe()
        self.logger.info("Fetched metadata table")
            
    def _get_metrics(self):
        
        b = datetime.datetime.now()
        start_time = b.strftime("%H:%M:%S")
        self.logger.info(f"Started querying metrics on {start_time}.")
        # provision jobs
        instance_ids = list(self.metadata_runtime.runtime_instance_id.unique())
        n = 8 # magic number based on experience
        cap = len(instance_ids) // 8

        ids_pool = dict()
        jobs_pool = dict()
        with concurrent.futures.ThreadPoolExecutor() as executor:

            for i in range(n+1):
                start = i * cap
                if (i != n):
                    end = (i+1) * cap - 1
                else:
                    end = len(instance_ids)
                ids_pool[i] = instance_ids[start:end]
                jobs_pool[i] = executor.submit(self._fetch_metrics_on_vms_batch, ids_pool[i])


        results_pool = dict()
        for i in range(n+1):
            results_pool[i] = jobs_pool[i].result()
        
        
        f = datetime.datetime.now()
        finish_time = f.strftime("%H:%M:%S")
        pf = f - b
        s = pf.seconds
        hours, remainder = divmod(s, 3600)
        minutes, seconds = divmod(remainder, 60)
        elapse = '{:02}:{:02}:{:02}'.format(int(hours), int(minutes), int(seconds))
        self.logger.info(f"Finished on {finish_time}.")
        self.logger.info(f"Totalling {elapse}.")
       
        
        l = list(results_pool.values())            
        self.metrics = pd.concat(l)
        
        # QC
        retries = 0
        d = set(self.metadata_runtime.runtime_instance_id.unique()) - set(self.metrics.metrics_instance_id.unique())
        while( (not d) and 10>retries ):
            self.logger.info(f"Retrieving metrics info on leftovers: {d}")
            left_over = self._fetch_metrics_on_vms_batch(d)
            if (not left_over.empty):
                self.metrics = pd.concat([self.metrics, left_over], axis=0)
            d = set(self.metadata_runtime.runtime_instance_id.unique()) - set(self.metrics.metrics_instance_id.unique())
            retries += 1
        if (0!=d):
            self.logger.warning(f"Not all VMs provisioned have their metrics sent over ({d} didn't).")

    def _fetch_metrics_on_vms_batch(self, vm_instance_ids):
        ids_string = ', '.join(map(str, vm_instance_ids))
        metrics_sql = f"""

        SELECT

          metrics.cpu_used_percent AS metrics_cpu_used_percent,
          metrics.disk_read_iops AS metrics_disk_read_iops,
          metrics.disk_used_gb AS metrics_disk_used_gb,
          metrics.disk_write_iops AS metrics_disk_write_iops,
          metrics.instance_id AS metrics_instance_id,
          metrics.mem_used_gb AS metrics_mem_used_gb,
          metrics.timestamp AS metrics_timestamp

        FROM
          `broad-dsde-methods.cromwell_monitoring.metrics`  metrics

        WHERE

              DATE(metrics.timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL {self.days_back_upper_bound} DAY)
          AND DATE(metrics.timestamp) <= DATE_SUB(CURRENT_DATE(), INTERVAL {self.days_back_lower_bound} DAY)
          AND metrics.instance_id IN ({ids_string})

        """
        return self.bq_client.query(metrics_sql).to_dataframe()

Set the following variables:
1. workflow_ids = Workflow id for the executed job and any subworkflows ids that maybe be available. 
2. days_back_upper_bound = The number of days back from today that the workflow started.
3. days_back_lower_bound = The number of days back from today that the workflow ended.

Example:
```
workflow_ids= ["\"a0a92771-c925-49b4-887e-877deeacc742\"",      # main workflow id
               "\"9837e566-0f37-444b-be3e-a135616f4c6e\""]      # sub workflow id
days_back_upper_bound = 85
days_back_lower_bound = 80
```

In [None]:
# Control 05/09/2020
workflow_ids= # ADD WORKFLOW ID
PARENT_WORKFLOW_ID = workflow_ids[0].strip('"')
days_back_upper_bound = # ADD INTEGER
days_back_lower_bound = # ADD INTEGER
df_monitoring = QueryBQToMonitor(workflow_ids, days_back_upper_bound, days_back_lower_bound)

Next we have two scenarios
1) Perform a fresh query and save query results locally to be used in another session.
2) Import local query results that was saved from a earlier session. 

### Scenario 1
The next cell uses uses the QueryBQToMonitor class to query the BQ database using the variables that were provided for workflow_id and dates. (If data is saved locally you may skip this cell.)

In [None]:
## Query the BQ database
df_monitoring.query()

After quering the BQ database, you may want to save the data locally to avoid the cost of querying the BQ again in the future. The next cell will save the pandas dataframe into a pickle file. (If data is saved locally you may skip this cell.) 

In [None]:
## Saves dataframe locally in pickle format
df_monitoring.metrics.to_pickle(PARENT_WORKFLOW_ID + '_metrics_resource_monitoring.pkl')
df_monitoring.metadata_runtime.to_pickle(PARENT_WORKFLOW_ID + '_metadata_runtime_resource_monitoring.pkl')

### Scenario 2
If resource data is saved locally from a prevous run of this job then you will want to import them instead of reruning the BQ query above. Run the next cell to import the pickle files saved from a previous session. 

In [None]:
## Import datafram locally
if path.exists(PARENT_WORKFLOW_ID + '_metrics_resource_monitoring.pkl'):
    print('Loading metrics from existing pickel file')
    df_monitoring.metrics = pd.read_pickle(PARENT_WORKFLOW_ID + '_metrics_resource_monitoring.pkl') 
else:
    print('No metrics from existing pickel file ' + PARENT_WORKFLOW_ID + '_metrics_resource_monitoring.pkl')
if path.exists(PARENT_WORKFLOW_ID + '_metadata_runtime_resource_monitoring.pkl'):
    print('Loading metadata and runtime from existing pkl')
    df_monitoring.metadata_runtime = pd.read_pickle(PARENT_WORKFLOW_ID + '_metadata_runtime_resource_monitoring.pkl') 
else:
    print('No metadata and runtime dataframe from existing pikel file ' + PARENT_WORKFLOW_ID + '_metadata_runtime_resource_monitoring.pkl')

Using the tables obtained from scenario 1 or 2 the next cell will create an addtional monitoring dataframe table that will be used later during ploting. 

In [None]:
#Adds runtime columns to metrics needed for ploting
df_monitoring.metrics_runtime = pd.merge(df_monitoring.metrics,
                                df_monitoring.metadata_runtime[['runtime_workflow_id',
                                                               'runtime_task_call_name',
                                                               'runtime_shard',
                                                               'runtime_instance_id',
                                                               'meta_duration_sec']], 
                                left_on='metrics_instance_id', right_on='runtime_instance_id')

## Extracting and Plot Data
We will now begin extracting and plotting data from the resource monitoring dataframe

### Workflow duration Summary
The next few cells will obtain and plot the workflow duration summary. Consisting of a table and plot of the workflow duration per task.

In [None]:
#Get workflow total duration in sec
latest_end_datetime = max(df_monitoring.metadata_runtime['meta_end_time'])
earliest_start_datetime = min(df_monitoring.metadata_runtime['meta_start_time'])
workflow_duration = round(datetime.timedelta.total_seconds(latest_end_datetime - earliest_start_datetime))

In [None]:
#Get an array of task names in workflow
TaskNames = df_monitoring.metrics_runtime.runtime_task_call_name.unique()

In [None]:
def get_info_per_task(task_name,df):
    df_task = df.loc[(df['runtime_task_call_name'] == task_name)]
    latest_end_datetime = max(df_task['metrics_timestamp'])
    earliest_start_datetime = min(df_task['metrics_timestamp'])
    task_duration = round(datetime.timedelta.total_seconds(latest_end_datetime - earliest_start_datetime))
    
    shard_count = len(df_task.runtime_shard.unique())
    return task_duration, shard_count

In [None]:
# Get task duration and shard count using the get_info_per_task def per task in the workflow
task_summary_dict = {}
for task in TaskNames:
    task_summary_dict[task] = get_info_per_task(task,df_monitoring.metrics_runtime)

#create new dic, for each element in task_summary_dict, get the first element in tuple and add it to the new dict
task_summary_duration = {}
for task in task_summary_dict:
    task_summary_duration[task] = task_summary_dict[task][0]

In [None]:
# Turn task_summary to datafram in order for it to be easily turned into a bokeh table
df_task_summary = pd.DataFrame.from_dict(task_summary_dict)
df_task_summary_T = df_task_summary.T.rename_axis('Tasks').reset_index()
df_task_summary_named = df_task_summary_T.rename(columns={0: "Duration", 1: "Shards"},errors="raise")

In [None]:
def plot_pie(data_dict, title):
    output_notebook()
    x = data_dict

    data = pd.Series(x).reset_index(name='value').rename(columns={'index':'task'})
    data['angle'] = data['value']/data['value'].sum() * 2*pi
    data['color'] = Category20c[len(x)]
    
    p = figure(plot_height=350, 
               title=title, 
               toolbar_location=None,
               tools="hover", 
               tooltips=("@task: @value"), 
               x_range=(-0.5, 1.0))

    p.wedge(x=0, y=1, radius=0.4,
            start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'),
            line_color="white", fill_color='color', legend='task', source=data)

    p.axis.axis_label=None
    p.axis.visible=False
    p.grid.grid_line_color = None

    #show(p)
    return p

In [None]:
# Place any outputs from bokeh into the following file
output_file("{}_workflow_summary.html".format(PARENT_WORKFLOW_ID))

#Get Column names from pandas datafram
Columns = [TableColumn(field=Ci, title=Ci) for Ci in df_task_summary_named.columns] # bokeh columns

#Create bokeh datatables using pandas dataframe and column names
task_summary_table = DataTable(columns=Columns, source=ColumnDataSource(df_task_summary_named)) # bokeh table

#Create pie plot for workflow duration
workflow_duration_pie = plot_pie(task_summary_duration, "Workflow Duration in Seconds")

#Divs
workflow_title_div = Div(text="<h1>{} <br> Workflow Summary</h2>".format(PARENT_WORKFLOW_ID),width=600, height=50)
workflow_duration_div = Div(text="The workflow took {} seconds to complete.".format(workflow_duration),width=300, height=25)

# Place the outputs from bokeh in this layout, which will be the layout in html file _workflow_summary.html
show(layout([
            [workflow_title_div],
            [workflow_duration_div],
            [Div(text="<h3>Workflow Summary Table</h3>",width=300, height=25)],
            [task_summary_table],
            [workflow_duration_pie]
            ]))

### Task sharded summary and task detailed summary
The next few cells will extract and plot 
- A task shard summary plot html file. This used for any scattered tasks, providing a high level view of resource usage for each shard in a task (*_shard_summary.html)
- A detailed resource usage plot per task, This were the resource usage over the course of time are displayed for each task.(_resource_monitoring.pdf)

In [None]:
def mean_of_string(x):
    xfloat = np.array(x).astype(np.float)
    return np.nanmean(xfloat)   

In [None]:
def get_1st_disk_usage(x):
    xfloat = np.array(x).astype(np.float)
    return xfloat[0]

In [None]:
def plot_line(task_name, 
              shard, 
              task_shard_duration, 
              df_timestamp, 
              x_value,
              resource_name,
              obtained_resource,
              requested_resource,
              y_label,
              x_label,t):
        
    if t==true: {plt.title("Task Name: " + task_name + " Shard: " + str(shard) + " Duration: " +  str(task_shard_duration), fontsize=20)}
    plt.plot(df_timestamp, x_value, label='{} Used'.format(resource_name))
    plt.plot([], [], ' ', label='Obtained {}: {}' .format(resource_name, obtained_resource))
    plt.legend(loc='upper center', bbox_to_anchor=(1.20, 0.8), shadow=True, ncol=1)
    plt.ylabel(y_label)
    plt.xlabel(x_label)
    plt2 = plt.twiny()
    plt2.set_xlim(0, task_shard_duration)
    plt2.set_xlabel("Duration Time")
    plt.grid(True)
    
    return plt

In [None]:
def plot_bar(x_value, y_value, chart_title, y_label,x_label):
    # Needed in every cell to show plots in notebook
    output_notebook()

    #what to display when hovering
    TOOLTIPS = [(x_label,' @x'),(y_label,' @top')]

    #plot attributes
    p = figure(x_range = x_value, 
               plot_height = 350, 
               sizing_mode = "scale_width", 
               title = chart_title,
               toolbar_location = "left", 
               tooltips = TOOLTIPS) 

    #plot 
    p.vbar(x=x_value, top=y_value, width=0.9)

    p.xgrid.grid_line_color = None
    p.y_range.start = 0

    # orient x labels vertically
    p.xaxis.major_label_orientation = "vertical"
    
    # axis titles
    p.xaxis.axis_label = x_label
    p.yaxis.axis_label = y_label
    
    #Add the Mean horizontal line
    p.ray(x=[0],y=[round(statistics.mean(y_value),1)],length=len(x_value), color='red', angle=0, legend="Mean: "+str(round(statistics.mean(y_value),1)))

    #show(p)
    return p

In [None]:
def plot_violin(x_value, y_value, chart_title, y_label, x_label):
    output_notebook()
    #set plot size
    #plt.rcParams["figure.figsize"] = (7, 7)
    #plt.figure(figsize=(7,7))
    
    #create df using array of values
    df = pd.DataFrame(dict(Resource_Usage=y_value, Shard_Index=x_value))
    
    #plot styling
    sns.set(style="whitegrid")
    
    #plot values
    ax = sns.violinplot(y=df["Resource_Usage"], figsize=(7,7))
    
    #set plot lables
    ax.set(xlabel='Shards', ylabel=y_label, title=chart_title)
    
    #return(plt.show())
    return ax

In [None]:
def get_outliers(shards, resource_value, resource_label):
    df = pd.DataFrame(dict(Resource_Usage=resource_value, Shard_Index=shards))

    # find the quartiles and IQR 
    q1 = df.Resource_Usage.quantile(q=0.25)
    q2 = df.Resource_Usage.quantile(q=0.5)
    q3 = df.Resource_Usage.quantile(q=0.75)
    iqr = q3 - q1
    upper = q3 + 1.5*iqr
    lower = q1 - 1.5*iqr
    
    upper_outliers = df[df.Resource_Usage > upper]
    lower_outliers = df[df.Resource_Usage < lower]
    
    upper_outliers = upper_outliers.rename(columns={"Resource_Usage": resource_label})
    lower_outliers = lower_outliers.rename(columns={"Resource_Usage": resource_label})
    
    
    #print("Upper outliers: ")
    #display(upper_outliers)
    if  upper_outliers.empty:
        upper_outliers = upper_outliers.append({resource_label: "None", "Shard_Index": "None"}, ignore_index=True)


    #print("Lower outliers: ")
    #display(lower_outliers)
    if  lower_outliers.empty:
        lower_outliers = lower_outliers.append({resource_label: "None", "Shard_Index": "None"}, ignore_index=True)

    #Get Column names from pandas datafram
    Columns = [TableColumn(field=Ci, title=Ci) for Ci in upper_outliers.columns] # bokeh columns
    
    #Create bokeh datatables using pandas dataframe and column names
    upper_table = DataTable(columns=Columns, source=ColumnDataSource(upper_outliers)) # bokeh table
    lower_table = DataTable(columns=Columns, source=ColumnDataSource(lower_outliers)) # bokeh table

    #table title
    upper_div = Div(text="<h3>Upper Outliers</h3>",width=200, height=20)
    lower_div = Div(text="<h3>Lower Outliers</h3>",width=200, height=20)
    
        
    #return upper_outliers, lower_outliers
    return upper_div, upper_table, lower_div, lower_table
    #return upper_table, lower_table

In [None]:
def remove_nan(input_dict):
    # functional
    clean_dict = filter(lambda k: not isnan(input_dict[k]), input_dict)
    # dict comprehension
    clean_dict = {k: input_dict[k] for k in input_dict if not isnan(input_dict[k])}
    return clean_dict

In [None]:
def plot_1_metric(input_dict, title, y_label, x_label):
    x = np.array((list(input_dict.keys())))
    y = list(input_dict.values())
    o=get_outliers(x, y, y_label)
    p1=plot_violin(x, y, title, y_label, x_label)
    p2=plot_bar(x, y, title, y_label, x_label)
    return o[0], o[1], o[2], o[3], p1, p2

#### Task Shard summary

The next cell will plot summary on a particular task, assuming that the task is shard-ed.

In [None]:
def shard_summary(df_input, task_name_input, workflow_id):

    #Removes shards with null in columns being measured.  
    df_droped_na = df_input.metrics_runtime.dropna(subset=['meta_duration_sec', 'metrics_disk_used_gb', 'metrics_mem_used_gb', 'metrics_mem_used_gb', 'metrics_cpu_used_percent'])
    
    #put dataframe into and array
    summary_shards = df_droped_na.runtime_shard.loc[(df_droped_na['runtime_task_call_name'] == task_name)].unique()

    #For each element in sereas get the average cpu, max cpu, max mem, max disk usage from monitoring datafram put in a dict
    average_cpu_per_shard_dict = {}
    max_cpu_per_shard_dict = {}
    max_memory_per_shard_dict = {}
    max_disk_per_shard_dict = {}
    duration_per_shard_dict = {}
    #for shard in summary_shards[0:99]: #used to test code on 100 shards
    for shard in summary_shards:
        #create dataframe for a given task name and shard
        df_summary_shard = df_input.metrics_runtime.loc[(df_input.metrics_runtime['runtime_task_call_name'] == task_name_input) & (df_input.metrics_runtime['runtime_shard'] == shard)]

        cpu_time_mean = df_summary_shard.metrics_cpu_used_percent.apply(mean_of_string)

        average_cpu_per_shard_dict[str(shard)] = cpu_time_mean.mean()
        max_cpu_per_shard_dict[str(shard)] = cpu_time_mean.max()

        max_memory_per_shard_dict[str(shard)] = df_summary_shard.metrics_mem_used_gb.max()

        max_disk_per_shard_dict[str(shard)] = df_summary_shard.metrics_disk_used_gb.apply(get_1st_disk_usage).max()

        duration_per_shard_dict[str(shard)] = df_summary_shard['meta_duration_sec'].iloc[0]

    #Removes nan values from dict
    average_cpu_per_shard_clean_dict = remove_nan(average_cpu_per_shard_dict)
    max_cpu_per_shard_clean_dict = remove_nan(max_cpu_per_shard_dict)
    max_memory_per_shard_clean_dict = remove_nan(max_memory_per_shard_dict)
    max_disk_per_shard_clean_dict = remove_nan(max_disk_per_shard_dict)
    duration_per_shard_clean_dict = remove_nan(duration_per_shard_dict)

    #Sort resource dict by value
    average_cpu_per_shard_sorted_dict = {k: v for k, v in sorted(average_cpu_per_shard_clean_dict.items(), reverse = True, key=lambda item: item[1])}  
    max_cpu_per_shard_sorted_dict = {k: v for k, v in sorted(max_cpu_per_shard_clean_dict.items(), reverse = True, key=lambda item: item[1])}  
    max_memory_per_shard_sorted_dict = {k: v for k, v in sorted(max_memory_per_shard_clean_dict.items(), reverse = True, key=lambda item: item[1])}  
    max_disk_per_shard_sorted_dict = {k: v for k, v in sorted(max_disk_per_shard_clean_dict.items(), reverse = True, key=lambda item: item[1])} 
    duration_per_shard_sorted_dict = {k: v for k, v in sorted(duration_per_shard_clean_dict.items(), reverse = True, key=lambda item: item[1])} 

    output_file("{}_{}_shard_summary.html".format(PARENT_WORKFLOW_ID, task_name_input)) 
    
    p_cpu_a = plot_1_metric(average_cpu_per_shard_sorted_dict, "Average CPU Usage Per Shard", "CPU Usage","shards")
    p_cpu_m = plot_1_metric(max_cpu_per_shard_sorted_dict, "Max CPU Usage Per Shard", "CPU Usage","shards")
    p_mem_m = plot_1_metric(max_memory_per_shard_sorted_dict, "Max Memory Usage Per Shard", "Memory Usage GB","shards")
    p_dis_m = plot_1_metric(max_disk_per_shard_sorted_dict, "Max Disk Usage Per Shard", "Disk Usage GB","shards")
    p_dur = plot_1_metric(duration_per_shard_sorted_dict, "Time Duration Per Shard", "Seconds","shards")
    Title_div = Div(text="<h1>{} {} Task Shard Summary</h2>".format(workflow_id, task_name),width=1200, height=25)

    # Writes to html file all the generated plots and tables for summary shard
    show(layout([
                [Title_div],
                [Div(text="<h2>{} Average CPU Usage Per Shard</h2>".format(task_name),width=600, height=25)],
                [p_cpu_a[0], p_cpu_a[2]],
                [p_cpu_a[1], p_cpu_a[3]],
                [p_cpu_a[5], Div(text="",width=50, height=25)],

                [Div(text="<h2>{} Max CPU Usage Per Shard</h2>".format(task_name),width=600, height=25)],
                [p_cpu_m[0], p_cpu_m[2]],
                [p_cpu_m[1], p_cpu_m[3]],
                [p_cpu_m[5], Div(text="",width=50, height=25)],

                [Div(text="<h2>{} Max Memory Usage Per Shard</h2>".format(task_name),width=600, height=25)],
                [p_mem_m[0], p_mem_m[2]],
                [p_mem_m[1], p_mem_m[3]],
                [p_mem_m[5], Div(text="",width=50, height=25)],

                [Div(text="<h2>{} Max Disk Usage Per Shard</h2>".format(task_name),width=600, height=25)],
                [p_dis_m[0], p_dis_m[2]],
                [p_dis_m[1], p_dis_m[3]],
                [p_dis_m[5], Div(text="",width=50, height=25)],

                [Div(text="<h2>{} Time Duration Usage Per Shard</h2>".format(task_name),width=600, height=25)],
                [p_dur[0], p_dur[2]],
                [p_dur[1], p_dur[3]],
                [p_dur[5]]
                ], sizing_mode='scale_width' ))

#### Task Detailed Summary
This for loop will iterate over each task in the workflow, if the task has more than one shard than it will execute the shard summary definition above createing a task summary html and move on to the next task. If the task has only 1 shard than it will create the detailed resource usage pdf file. 

In [None]:
# Saves plots into the following pdf
with PdfPages(PARENT_WORKFLOW_ID + '_resource_monitoring.pdf') as pdf:

    for task_name in TaskNames:
        # Gets the all shards for a given task name
        shards = df_monitoring.metadata_runtime.meta_shard.loc[(df_monitoring.metadata_runtime['meta_task_call_name'] == task_name)]
        shards = shards.sort_values().unique()
        
        # If shard counts is greater than 10 then gets 10 longest running shards for a given task name
        max_shards=2
        if len(shards) >= max_shards:
            #create and sort meta table by duration
            df_monitoring_task = df_monitoring.metadata_runtime.loc[(df_monitoring.metadata_runtime['meta_task_call_name'] == task_name)]
            
            #removes duplicate shards
            df_monitoring_task_uniqueShard = df_monitoring_task.drop_duplicates(subset ="meta_shard")
            df_monitoring_task_sorted_duration = df_monitoring_task_uniqueShard.sort_values(by='meta_duration_sec', ascending=False)
            
            #replace all shards in varaible shards with the first 50 of the sorted duration table
            shards = df_monitoring_task_sorted_duration.meta_shard.head(max_shards)
            
            #Get shard summary
            shard_summary(df_monitoring, task_name, PARENT_WORKFLOW_ID)

        # Create detailed resource usage plots
        if len(shards) < 2:
            for shard in shards:
                
                # For size and style of plots
                plt.rcParams["figure.figsize"] = (15, 20)
                sns.set(style="whitegrid")

                df_monitoring_task_shard = df_monitoring.metrics_runtime.loc[(df_monitoring.metrics_runtime['runtime_task_call_name'] == task_name) & (df_monitoring.metrics_runtime['runtime_shard'] == shard)]
                df_monitoring_metadata_runtime_task_shard = df_monitoring.metadata_runtime.loc[(df_monitoring.metadata_runtime['runtime_task_call_name'] == task_name) & (df_monitoring.metadata_runtime['runtime_shard'] == shard)]
                df_monitoring_task_shard = df_monitoring_task_shard.sort_values(by='metrics_timestamp')

                task_shard_meta_duration = df_monitoring_metadata_runtime_task_shard.meta_duration_sec.iloc[0]
                max_datetime = max(df_monitoring_task_shard['metrics_timestamp'])
                min_datetime = min(df_monitoring_task_shard['metrics_timestamp'])
                task_shard_duration = round(datetime.timedelta.total_seconds(max_datetime - min_datetime))

                # create an array for list coloumns
                cpu_used_percent_array = [np.asarray(x).mean() for x in df_monitoring_task_shard.metrics_cpu_used_percent]
                disk_used_gb_array = [np.asarray(x).max() for x in df_monitoring_task_shard.metrics_disk_used_gb]
                disk_read_iops_array = [np.asarray(x).max() for x in df_monitoring_task_shard.metrics_disk_read_iops]
                disk_write_iops_array = [np.asarray(x).max() for x in df_monitoring_task_shard.metrics_disk_write_iops]

                runtime_list= df_monitoring_metadata_runtime_task_shard.iloc[0].at['meta_inputs']
                runtime_dic={}
                for i, element in enumerate(runtime_list):
                    if re.search("default_attr", element["key"]) or re.search("runtime_attr_override", element["key"]):
                        continue
                    else:
                        k = element["key"].replace("[", "").replace("]", "").replace("runtime_attr", "").replace("\"", "", 2)
                        v = element["value"]
                        runtime_dic[k]=v

                plt.subplot(5, 1, 1)
                plt.title("Task Name: " + task_name + " Shard: " + str(shard) + " Duration: " +  str(task_shard_duration), fontsize=20)
                plt.plot(df_monitoring_task_shard.metrics_timestamp.astype('O'), cpu_used_percent_array, label='CPU Used')
                plt.plot([], [], ' ', label='Obtained CPU Cores: {}' .format(df_monitoring_metadata_runtime_task_shard.iloc[0].at['meta_cpu']))
                plt.plot([], [], ' ', label='Requested CPU Cores: {}' .format(round(float(runtime_dic["cpu_cores"]))))
                plt.legend(loc='upper center', bbox_to_anchor=(1.20, 0.8), shadow=True, ncol=1)
                plt.ylabel('CPU Percentage Used')
                plt.xlabel("Date Time")
                plt2 = plt.twiny()
                plt2.set_xlim(0, task_shard_duration)
                plt2.set_xlabel("Duration Time")
                plt.grid(True)

                plt.subplot(5, 1, 2)
                plt.plot(df_monitoring_task_shard.metrics_timestamp.astype('O'), df_monitoring_task_shard.metrics_mem_used_gb, label='Memory Used')
                plt.axhline(y=df_monitoring_metadata_runtime_task_shard.iloc[0].at['meta_mem_total_gb'], color='r', label='Max Memory GB: %.2f' %(df_monitoring_metadata_runtime_task_shard.iloc[0].at['meta_mem_total_gb']))
                plt.plot([], [], ' ', label='Requested Memory GB: {}' .format(round(float(runtime_dic["mem_gb"]), 2)))
                plt.legend(loc='upper center', bbox_to_anchor=(1.20, 0.8), shadow=True, ncol=1)
                plt.ylabel('Memory Used in GB')
                plt.xlabel("Date Time")
                plt2 = plt.twiny()
                plt2.set_xlim(0, task_shard_duration)
                plt2.set_xlabel("Duration Time")
                plt.grid(True)

                plt.subplot(5, 1, 3)
                plt.plot(df_monitoring_task_shard.metrics_timestamp.astype('O'), disk_used_gb_array, label='Disk Used')
                plt.axhline(y=max(df_monitoring_metadata_runtime_task_shard.iloc[0].at['meta_disk_total_gb']), color='r', label='Max Disksize GB: %.2f' %(max(df_monitoring_metadata_runtime_task_shard.iloc[0].at['meta_disk_total_gb'])))
                plt.plot([], [], ' ', label='Requested Disksize GB: {}' .format(round(float(runtime_dic["disk_gb"]), 2)))
                plt.legend(loc='upper center', bbox_to_anchor=(1.20, 0.8), shadow=True, ncol=1)
                plt.ylabel('Diskspace Used in GB')
                plt.xlabel("Date Time")
                plt2 = plt.twiny()
                plt2.set_xlim(0, task_shard_duration)
                plt2.set_xlabel("Duration Time")
                plt.grid(True)

                plt.subplot(5, 1, 4)
                plt.plot(df_monitoring_task_shard.metrics_timestamp.astype('O'), disk_read_iops_array)
                plt.ylabel('Disk Read IOps')
                plt.xlabel("Date Time")
                plt2 = plt.twiny()
                plt2.set_xlim(0, task_shard_duration)
                plt2.set_xlabel("Duration Time")
                plt.grid(True)

                plt.subplot(5, 1, 5)
                plt.plot(df_monitoring_task_shard.metrics_timestamp.astype('O'), disk_write_iops_array)
                plt.ylabel('Disk Write_IOps')
                plt.xlabel("Date Time")
                plt2 = plt.twiny()
                plt2.set_xlim(0, task_shard_duration)
                plt2.set_xlabel("Duration Time")
                plt.grid(True)

                plt.subplots_adjust(hspace = 0.5)
                pdf.savefig(bbox_inches='tight', pad_inches=0.5)
                plt.show()
                plt.close()

## Save files to a Google Bucket
Here we will be saving the file produced by the notebook into an a google bucket. You'll need to set the google bucket. 

In [None]:
#Requires that user (or Terra user proxy) has edit access to destination bucket
OUTPUT_BUCKET = "gs://broad-dsde-methods-bshifaw/monitoring-plots/test/"
WORKFLOW_NAME = df_monitoring.metadata_runtime.iloc[0].at['meta_workflow_name'] 

!gsutil cp ./{PARENT_WORKFLOW_ID}_resource_monitoring.pdf {OUTPUT_BUCKET}
!gsutil cp ./{PARENT_WORKFLOW_ID}*_shard_summary.html {OUTPUT_BUCKET}
!gsutil cp ./{PARENT_WORKFLOW_ID}_workflow_summary.html {OUTPUT_BUCKET}