In [None]:
# Required 
input_bq_goolge_project: str = None

# Optional
input_workflow_id: str = None
input_submission_id: str = None
input_workspace_namespace: str = None
input_workspace_name: str = None
input_task_name: list = None
input_target_shard: int = None
input_days_back_upper_bound: int = None  # The number of days back from today that the workflow started.
input_days_back_lower_bound: int = None  # The number of days back from today that the workflow ended.
output_bucket: str = None

# Plot Resource Monitoring Data

This notebook visualizes data from a BQ dataset that holds workflow resource usage monitoring data. Please refer to the Readme for what data are collected and how they are collected. The notebook will produce three plots, one PDF and two interactive HTML reports.

The following need to be provided to the notebook: 
- BQ Google Project: The google project that holds the BQ dataset.

## Import packages

In [None]:
!pip install --upgrade pip
!pip uninstall -y cromwellMonitor
!pip install --no-cache-dir git+https://github.com/broadinstitute/cromwell-task-monitor-bq-vis.git

In [None]:
import os
import pandas as pd
pd.set_option('display.max_rows', 200) # so we can see all df rows

from cromonitor.query.queryBQ import QueryBQToMonitor
from cromonitor.plotting import plotting as plotUtils
from cromonitor.table import utils as tableUtils
from cromonitor.jupyter import utils as jupyterUtils
from cromonitor.fiss import utils as fissUtils 


In [None]:
workspace_namespace = input_workspace_namespace if input_workspace_namespace else os.environ["WORKSPACE_NAMESPACE"]
workspace_name = input_workspace_name if input_workspace_name else os.environ["WORKSPACE_NAME"]

## Select Submission and Workflow 

In [None]:
if input_submission_id is None:
    ###
    # Get a list of submission for the workspace
    ###
    submissions_df_sorted = fissUtils.get_list_of_submissions(
        workspace_namespace = workspace_namespace,
            workspace_name= workspace_name,
    )
    display(submissions_df_sorted)
    print("Select Submission ID Below")
    ###
    # Select Submission Id
    ###
    sumission_ids = submissions_df_sorted['submissionId'].tolist()
    selected_submission_id = jupyterUtils.create_submission_selector(
    options = sumission_ids
    )

In [None]:
if input_workflow_id is None:
    ####
    # Get all the workflow ids assoccaited with the submission
    ####
    workflow_id_df_sorted = fissUtils.get_submission_workflow_ids(
            workspace_namespace=workspace_namespace,
            workspace_name=workspace_name,
            submission_id=selected_submission_id.value,
    )
    display(workflow_id_df_sorted)
    print("Select Workflow ID Below")
    ###
    # Select Workflow Id 
    ###
    workflow_ids = workflow_id_df_sorted['workflowId'].tolist()
    selected_workflow_id = jupyterUtils.create_workflow_selector(
    options = workflow_ids
    )

In [None]:
####
# Create a workflow object to retrieve and store the selected workflow information
####
selected_workflow_info = fissUtils.Workflow(
            workspace_namespace=workspace_namespace,
            workspace_name=workspace_name,
            submission_id= input_submission_id if input_submission_id else selected_submission_id.value,
            parent_workflow_id=input_workflow_id if input_workflow_id else selected_workflow_id.value,
)

## Query BQ database

In [None]:
workflow_ids =  selected_workflow_info.subworkflow_ids + [selected_workflow_info.parent_workflow_id] 
PARENT_WORKFLOW_ID = selected_workflow_info.parent_workflow_id
days_back_upper_bound = input_days_back_upper_bound if input_days_back_upper_bound else selected_workflow_info.days_from_workflow_start 
days_back_lower_bound = input_days_back_lower_bound if input_days_back_lower_bound else selected_workflow_info.days_from_workflow_end
bq_goolge_project = input_bq_goolge_project  


df_monitoring = QueryBQToMonitor(workflow_ids=workflow_ids, days_back_upper_bound=days_back_upper_bound, days_back_lower_bound=days_back_lower_bound, bq_goolge_project=bq_goolge_project)#, debug=True)
metrics_filename = PARENT_WORKFLOW_ID + '_metrics_resource_monitoring.pkl'
metadata_filename = PARENT_WORKFLOW_ID + '_metadata_runtime_resource_monitoring.pkl'

Next we have two scenarios

Scenario 1 : Perform a fresh query and save query results locally to be used in another session.
Uses the QueryBQToMonitor class to query the BQ database using the variables that were provided for workflow_id and dates. 
After querying the BQ database, the data is saved locally to avoid the cost of querying the BQ again in the future. The next cell will save the pandas dataframe into a pickle file. (If data is saved locally you may skip this cell.)

Scenario 2 : Import local query results that was saved from a earlier session. 
If resource data is saved locally from a previous run of this job then you will want to import them instead of rerunning the BQ query above. Run the next cell to import the pickle files saved from a previous session. 

In [None]:
if os.path.exists(metrics_filename) and os.path.exists(metadata_filename):
    print("Loading data from local files")
    df_monitoring.metrics = tableUtils.load_dataframe(metrics_filename)
    df_monitoring.metadata_runtime = tableUtils.load_dataframe(metadata_filename)
else:
    print("Loading data from querying BQ database")
    df_monitoring.query()
    
    ## Saves dataframe locally in pickle format
    if not df_monitoring.metrics.empty and not df_monitoring.metrics.empty:
        df_monitoring.metrics.to_pickle(metrics_filename)
        df_monitoring.metadata_runtime.to_pickle(metadata_filename)
    else:
        print("Empty Database: No Files Saved")

Using the tables obtained from scenario 1 or 2 the next cell will create an addtional monitoring dataframe table that will be used later during ploting. 

In [None]:
#Create metrics_runtime table
df_monitoring.metrics_runtime = tableUtils.create_metrics_runtime_table(metrics=df_monitoring.metrics, metadata_runtime=df_monitoring.metadata_runtime)

## Plot Data


### Workflow duration Summary
The next few cells will obtain and plot the workflow duration summary. Consisting of a table and plot of the workflow duration per task.

In [None]:
plotUtils.generate_workflow_summary(
    parent_workflow_id=PARENT_WORKFLOW_ID, 
    df_monitoring=df_monitoring,
)

### Task sharded summary and task detailed summary
The next cells will create either a sharded summary or a task detailed summary
- Task shard summary: This is used for any scattered tasks, it provides a high level view of resource usage for each shard of a task. For example: all the average cpu usage for each task will be ploted togther in a bar plot. If there is interest in looking at the resourse usage for a particular shard, use the "target_shard" parameter in "plot_resource_usage" function to plot resourses used over time. 
- Task detailed summary: When a task isn't scattered plots the resource usage for task over time will be displayed 

In [None]:
####
# Select task to plot
####
#Get an array of task names in workflow
AllTaskNames = df_monitoring.metrics_runtime.runtime_task_call_name.unique()
# Create the SelectMultiple widget
task_selector = jupyterUtils.create_task_selector(AllTaskNames)

In [None]:
# After selecting the tasks, pass the selected tasks to the function
selected_task = input_task_name if input_task_name else task_selector.value
resource_plot = plotUtils.plot_resource_usage(
    df_monitoring=df_monitoring, 
    parent_workflow_id=PARENT_WORKFLOW_ID, 
    task_names=selected_task,
    plt_height=4000, 
    plt_width=1000,
    target_shard=input_target_shard
)
resource_plot.show()

In [None]:
# Save the returned plot as a PDF
task_header = "_".join(selected_task)

plot_file_name = f"{PARENT_WORKFLOW_ID}_{selected_workflow_info.workflow_name}_{task_header}_resource_monitoring.pdf"
plotUtils.save_plot_as_pdf(plot=resource_plot, filename=plot_file_name)

## Save files to a Google Bucket
Here we will be saving the file produced by the notebook into an a google bucket. You'll need to set the google bucket. 

In [None]:
#Requires that user (or Terra user proxy) has edit access to destination bucket
OUTPUT_BUCKET = output_bucket if output_bucket else os.environ["WORKSPACE_BUCKET"]+"/"+"workflow_monitoring"

!gsutil cp ./{plot_file_name} {OUTPUT_BUCKET}