In [1]:
#%load_ext autoreload
#%autoreload 2

# <center> Ghub Exercise 1 </center>

## Overview

- Demonstrates running a Ghub Pegasus Workflow Management System (WMS) workflow with a Python script on University at Buffalo (UB)'s Center For Computational Research (CCR)'s generally accessible high performance compute cluster, UB-HPC.

## Background

- The Python script is encapsulated as a workflow; the Pegasus Workflow Management System (WMS) automates and manages the execution of the workflow jobs, including staging the jobs, distributing the work, submitting the jobs to run on CCR's UB-HPC compute cluster, as well as handling data flow dependencies and overcoming job failures. See https://pegasus.isi.edu/documentation/index.html for more information on the Pegasus Workflow Management System (WMS). See https://theghub.org/tools/pegtut for an introductory Pegasus tutorial.
- The submit command enables Ghub users to execute code on CCR's UB-HPC compute cluster. See https://theghub.org/kb/development/using-submit for more information on the submit command. See https://help.hubzero.org/documentation/current/tooldevs/grid/pegasuswf and ./bin/Wrapper.py for more information on submitting a pegasus-plan for the workflow.
- This Jupyter-based tool uses Python 3. See https://theghub.org/resources?alias=jupyterexamples for more information on developing Jupyter-based tools on Ghub.
- This tool is deployed on Debian 10 to run in Tool or App mode style. See https://theghub.org/kb/development/deploy-styles-for-jupyter-tools for more information on deploying Jupyter-based tools on Ghub.


In [2]:
# Setup and preoprocessing:

import sys
import os
import getpass
import platform
import shutil
import atexit
import math
import numpy as np
import pandas as pd
import time

import ipywidgets as widgets
from IPython.display import display, HTML, Markdown, clear_output, Image, Javascript
#import xml.etree.ElementTree as et

import hublib
#print (help(hublib))
import hublib.ui as ui
#print (help(ui))
import hublib.use
#print (help(hublib.use))

#print(sys.path)

# Set up the environment for this notebook

# Setup paths to executables
scriptpath = os.path.realpath(" ")
        
# Get the parent dirs
self_tooldir = os.path.dirname(scriptpath)

# Setup path to python and bash scripts
self_bindir = os.path.join(self_tooldir, "bin")

# Add to PYTHONPATH
sys.path.insert (1, self_bindir)

# Set up path to the current data directory
self_datadir = os.path.join(self_tooldir, "data")

# Set up path to the current session directory
self_workingdir = os.getcwd()

# Set up path to the user's home directory
self_homedir = os.path.expanduser("~")

# Initialize the dated run directory.
# Workflow results are not available until after a workflow is executed via Pegasus and completes
self_rundir = ""

self_user = getpass.getuser()

# Configuration parameters

import Configuration as cfg
if cfg.VERBOSE == True:
    print ('cfg.DISPERSION_MODEL: ', cfg.DISPERSION_MODEL, '\n')

import GeoLocation

self_geo_location = GeoLocation.GeoLocation(0.0, 0.0, 0.0, 0.0)

# Version of Pegasus
if (1):
    # For initial testing only
    # pegasus-4.8.1 is the default
    %use pegasus-4.8.1
    from Wrapper_4_8_1 import Wrapper
else:
    %use pegasus-5.0.1
    from Wrapper_5_0_1 import Wrapper

np.set_printoptions(threshold=np.inf) 

self_log_filepath = os.path.join(self_workingdir, 'ghub_exercise1_log_file.txt')
self_log_snapshot_filepath = os.path.join(self_workingdir, 'ghub_exercise1_log_snapshot_file.txt')
self_log_backup_filepath = os.path.join(self_workingdir, 'ghub_exercise1_log_backup_file.txt')

widget_border_style = '1px solid black'
widget_output_border_style = '1px solid black'

BOLD = '\033[1m'
SUCCESS = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
END = '\033[0m'

dropdown_str_width = 16

dropdown_width = '965px'
dropdown_height = '30px'
button_width = '250px'
button_height = '40px'
ui_string_width = '96.5%'
ui_dropdown_width = '96.2%'

# Clean up: remove files from the data/results folder and the bin/__pycache__ folder
def exit_handler():
    
    for file in os.listdir(self_workingdir):
        
        if os.path.isfile(file):
            if file.endswith(".txt"):
                if file != "README.txt":
                    print ("Deleting: %s\n" %file)
                    os.remove(file)
            #elif file.endswith(".dax"):
                #print ("Deleting: %s\n" %file)
                #os.remove(file)
            #elif file.endswith(".stdout"):
                #print ("Deleting: %s\n" %file)
                #os.remove(file)
            #elif file.endswith(".stderr"):
                #print ("Deleting: %s\n" %file)
                #os.remove(file)

    #dirpath = os.path.join(self_bindir, "__pycache__")
    #if (os.path.exists(dirpath)):
        #print ("Deleting: %s\n" %dirpath)
        #shutil.rmtree(dirpath)

atexit.register(exit_handler);   

<IPython.core.display.Javascript object>

In [3]:
# prevent In[] and Out[] from displaying on left
#HTML('''
#<style>.prompt{width: 0px; min-width: 0px; visibility: collapse}</style>
#''')

In [4]:
#https://api.jquery.com/ready/
HTML('''
<script>
    function scroll_to_top() {
        Jupyter.notebook.scroll_to_top();
    } 
    $( window ).on( "load", scroll_to_top() );
</script>
''')

In [5]:
# Button styles
HTML('''
<style>.buttontextclass { color:black ; font-size:130%}</style>
''')

In [6]:
if os.path.exists(self_log_filepath):
    shutil.copy (self_log_filepath, self_log_backup_filepath)
    
FH1 = open(self_log_filepath, 'w')

show_log_output_button = widgets.Button(description="Show Log Output", disabled=False,\
    layout=widgets.Layout(width=button_width, height=button_height),\
    style= {'button_color':'lightgreen','font_weight':'bold'})

# Utility Functions

def log_info (message):
    
    if show_log_output_button.description == 'Hide Log Output': 
        with log_output:
            print (message)    
    FH1.write('%s\n' %message)
    FH1.flush()

def log_status (output_widget, message):
    
    with output_widget:
        print (message)
    log_info (message)
    
def log_success (output_widget, message):
    
    with output_widget:
        print ('%s%s%s' %(SUCCESS,message,END))
    log_info (message)
    
def log_warning (output_widget, message):
    
    with output_widget:
        print ('%s%s%s' %(WARNING,message,END))
    log_info (message)
    
def log_error (output_widget, message):
    
    with output_widget:
        print ('%s%s%s' %(FAIL,message,END))
    log_info (message)
    
if (1): #cfg.VERBOSE == True:
    
    log_info ('Operating System Platform: ' + platform.system() + ' ' + platform.release())
    log_info ('\n')

    log_info ('Environment:\n')
    log_info ('scriptpath: ' + scriptpath)
    log_info ('tooldir: ' + self_tooldir)
    log_info ('bindir: ' + self_bindir)
    log_info ('datadir: ' + self_datadir)
    log_info ('workingdir: ' + self_workingdir)
    log_info ('homedir: ' + self_homedir)
    log_info ('user: ' + self_user)
    log_info ('\n')
    
    #print (type(sys.path)) # <class 'list'>
    #print (sys.path)
    log_info ('sys.path: ' + ' '.join(str(path)+'\n' for path in sys.path))
    log_info ('\n')
    
    #print (type(os.environ["PATH"])) # <class 'str'>
    #print (os.environ["PATH"])
    log_info ('os.environ["PATH"]: ' + os.environ["PATH"])
    log_info ('\n')


### Note: ghub group membership is required to use this tool.<br/>



In [7]:
# Verify ghub group membership
in_ghub_group = False

groups_info_filename = 'groups_info.txt'
groups_cmd = 'groups > %s' %groups_info_filename
os.system(groups_cmd)

if os.path.exists(groups_info_filename):
    f = open(groups_info_filename,'r')
    for line in f:
        groups = line.split(' ')
        #print ('groups: ', groups)
        for group in groups:
            if group == 'ghub':
                in_ghub_group = True
                break
if in_ghub_group == False:
    message = 'ghub group membership is required to use this tool. %s is not a member of the ghub group. Please contact us.' %self_user
    print ('%s%s%s' %(FAIL,message,END), flush=True)
    log_info (message)
else:
    message = '%s is a member of the ghub group.' %self_user
    print ('%s%s%s' %(SUCCESS,message,SUCCESS), flush=True)
    log_info (message)


[92mrenettej is a member of the ghub group.[92m


<a name="top"></a>
#### [**Processing Steps**](#top)<br />

1. [Select Full Path to the Modeling Group](#step_1) <br />
2. [Run the Workflow](#step_2)<br />
    1. [Run get_netcdf_info.py](#step_2)<br />
3. [View Workflow Output](#step_3)<br />
4. [View Log Output](#step_4)<br />


In [8]:
# Get the UB CCR's map collection's information 
mapped_collections_filename = os.path.join('/data/groups/ghub/tools/ghubex1', 'ub-ccr-ghub-ISMIP6-CMIP6_Archive_Final-mapped_collections.xlsx')

mapped_collections_df = pd.read_excel (mapped_collections_filename)
print (type(mapped_collections_df))
print (mapped_collections_df)

num_mapped_collections = len(mapped_collections_df)
print (num_mapped_collections)

folder_list = []
ice_sheet_list = []
modeling_groups_list = []
description_list = []

for i in range(num_mapped_collections):
    folder = str(mapped_collections_df['Folder'][i].strip(' \t\n\r'))
    folder_list.append(folder)
    ice_sheet = str(mapped_collections_df['Ice Sheet'][i].strip(' \t\n\r'))
    ice_sheet_list.append(ice_sheet)
    modeling_groups = mapped_collections_df['Modeling Groups'][i].strip(' \t\n\r').split(',')
    modeling_groups_list.append(modeling_groups)
    description = str(mapped_collections_df['Description'][i].strip(' \t\n\r'))
    description_list.append(description)
print ('folder_list: ', folder_list)
print ('ice_sheet_list: ', ice_sheet_list)
print ('modeling_groups_list: ', modeling_groups_list)
print ('description_list: ', description_list)

ice_sheet_index = 0
modeling_group_index = 0


<class 'pandas.core.frame.DataFrame'>
                                              Folder Ice Sheet  \
0  /projects/grid/ghub/ISMIP6/Projections/Reproce...       AIS   
1  /projects/grid/ghub/ISMIP6/Projections/Reproce...       GIS   

                                     Modeling Groups Description  
0  AWI,DOE,ILTS_PIK,IMAU,JPL1, LSCE,NCAR,PIK,UCIJ...  Antarctica  
1  AWI,BGC,GSFC,ILTS_PIK,IMAU,JPL,LSCE,MUN,NCAR,U...   Greenland  
2
folder_list:  ['/projects/grid/ghub/ISMIP6/Projections/Reprocessed/CMIP6_Archive_Final', '/projects/grid/ghub/ISMIP6/Projections/Reprocessed/CMIP6_Archive_Final']
ice_sheet_list:  ['AIS', 'GIS']
modeling_groups_list:  [['AWI', 'DOE', 'ILTS_PIK', 'IMAU', 'JPL1', ' LSCE', 'NCAR', 'PIK', 'UCIJPL', 'ULB', 'UTAS', 'VUB', 'VUW'], ['AWI', 'BGC', 'GSFC', 'ILTS_PIK', 'IMAU', 'JPL', 'LSCE', 'MUN', 'NCAR', 'UAF', 'UCIJPL', 'VUB', 'VUW']]
description_list:  ['Antarctica', 'Greenland']


In [9]:
def ice_sheet_dropdown_callback(change):
    
    global ice_sheet_index
    global modeling_group_index
   
    if change['type'] == 'change' and change['name'] == 'value' and change['new'] != ' ' \
        and ice_sheet_dropdown.value != None:
        
        selected_ice_sheet = ice_sheet_dropdown.value
        #print ('selected ice sheet: ', selected_ice_sheet)
        ice_sheet_index = ice_sheet_list.index(selected_ice_sheet)
        #print ('selected ice sheet index: ', ice_sheet_index)
        # Reset modeling_group_index
        modeling_group_dropdown.options = modeling_groups_list[ice_sheet_index]
        modeling_group_index = 0
        modeling_group_path_text.value = os.path.join(folder_list[ice_sheet_index], ice_sheet_list[ice_sheet_index], modeling_groups_list[ice_sheet_index][modeling_group_index])
        #print ('ice_sheet_dropdown_callback modeling_group_path_text.value: ', modeling_group_path_text.value)

        
def modeling_group_dropdown_callback(change):
    
    global modeling_group_index
   
    if change['type'] == 'change' and change['name'] == 'value' and change['new'] != ' ' \
        and modeling_group_dropdown.value != None:
        
        selected_modeling_group = modeling_group_dropdown.value
        #print ('selected_modeling_group: ', selected_modeling_group)
        modeling_group_index = modeling_groups_list[ice_sheet_index][:].index(selected_modeling_group)
        #print ('modeling group index: ', modeling_group_index)
        modeling_group_path_text.value = os.path.join(folder_list[ice_sheet_index], ice_sheet_list[ice_sheet_index], modeling_groups_list[ice_sheet_index][modeling_group_index])
        #print ('modeling_group_dropdown_callback modeling_group_path_text.value: ', modeling_group_path_text.value)

        
ice_sheet_dropdown = widgets.Dropdown(
    description = 'Ice Sheet:',
    disabled = False,
    options = ice_sheet_list,
    value = ice_sheet_list[0],
    style = {'description_width': '150px'},
    layout = widgets.Layout(width=dropdown_width, height=dropdown_height)
)
ice_sheet_dropdown.observe(ice_sheet_dropdown_callback)

modeling_group_dropdown = widgets.Dropdown(
    description = 'Modeling Group:',
    disabled = False,
    options = modeling_groups_list[ice_sheet_index],
    value = modeling_groups_list[ice_sheet_index][modeling_group_index],
    style = {'description_width': '150px'},
    layout = widgets.Layout(width=dropdown_width, height=dropdown_height)
)
modeling_group_dropdown.observe(modeling_group_dropdown_callback)

modeling_group_path_text = widgets.Text(
    placeholder = '',
    description = "Modeling Group Path",
    disabled = True,
    value = os.path.join(folder_list[ice_sheet_index], ice_sheet_list[ice_sheet_index], modeling_groups_list[ice_sheet_index][modeling_group_index]),
    style = {'description_width': '150px'},\
    layout = widgets.Layout(width='65%', visibility = 'visible')
)



<a name="step_1"></a>
## Step 1: Select Full Path to the Modeling Group [&#8607;](#top)

This tool allows users to select an ice sheet and modeling group within a predetermined UB CCR's mapped collection.<br />
See the Ghub netCDF File Regrid Tool to explore a diferernt method for accessing an UB CCR's mapped collection.

In [10]:
modeling_groups_form = ui.Form([ice_sheet_dropdown, modeling_group_dropdown, modeling_group_path_text, ], name = 'Modeling Group Path')
display (modeling_groups_form)

Group(children=(HTML(value="<p   style='background-color: #DCDCDC; font-size: 150%; padding: 5px'>Modeling Gro…

In [11]:
 # Run Workflow

self_numsamples = 0

maxwalltime = ui.Number(
    name = 'Maximum Walltime',
    description = 'Maximum Walltime [min]',
    units = 'min',
    value = '10.0',
    min = '5.0',
    max = '60.0'
)

workflow_run_options_form = ui.Form([maxwalltime], name = 'Workflow Run Options')

def run_workflow(p):
    
    # print (p) #Button    
    
    global self_workflow_succeeded
    
    workflow_output.clear_output()
        
    with workflow_output:
        
        runWorkflowButton.disabled = True
        
        start_time = time.time()

        try:
            
            log_status (workflow_output, "Pegasus workflow in progress. This should take approximately 5 minutes...")
            
            
            Wrapper (" ", \
                self_tooldir, self_bindir, self_datadir, self_workingdir, self_rundir, \
                modeling_group_path_text.value, int(maxwalltime.value))
            
            # Check if elevation1.png and elevation2.png exist were created and tranferred from CCR 
            # to determine if workflow completed successfully
            
            self_workflow_results_filepath = os.path.join(self_workingdir, "get_netcdf_info.txt")

            if os.path.exists(self_workflow_results_filepath):

                print ("Workflow completed successfully\n")
                self_workflow_succeeded = True
               
                with workflow_output:
                    
                    print("%s: \n\n" %self_workflow_results_filepath)
                    f = open(self_workflow_results_filepath,'r')
                    for line in f:
                        print(line.rstrip())
                    f.close()
                    
            else:

                print ("Workflow did not complete successfully")
                print ("%s not generated by the workflow\n" %self_workflow_results_filepath)
                self_workflow_succeeded = False

                filepath = os.path.join(self_workingdir, 'pegasus.analysis')
                if (os.path.exists(filepath)):
                    print("pegasus.analysis:\n")
                    FH1 = open(filepath, 'r')
                    output = FH1.read()
                    FH1.close()
                    print (output)

                print ("\nPlease see the log output\n")
        
        except Exception as e:
        
            print ("Pegasus workflow Exception: %s\n" %str(e))
            print ("Please see the log output\n")
       
        runWorkflowButton.disabled = False
            
        #print ("Workflow elapsed time: " + str(time.time() - start_time) + " seconds\n")
        print ("\nWorkflow elapsed time: " + str((time.time() - start_time)/60.0) + " minutes\n")
        #print ("Workflow elapsed time: " + str((time.time() - start_time)/3600.0) + " hours\n")

        finish_workflow_processing()

# Abort
# Select Kernel Interrupt
#if self_tW.is_alive() == True:
   #self_tW.terminate()

runWorkflowButton = widgets.Button(description="Run Workflow", disabled=False,\
    layout=widgets.Layout(width=button_width, height=button_height),\
    style= {'button_color':'lightgreen','font_weight':'bold'})
runWorkflowButton.add_class("buttontextclass")
runWorkflowButton.on_click (run_workflow)
#help (runWorkflowButton)

# Note: See /apps/share64/debian7/anaconda/anaconda-6/lib/python3.7/site-packages/hublib/ui/pathselect.py,
# file property initialized to None, when a file is selected gets set to the selected file.


<a name="step_2"></a>
## Step 2: Run the Workflow [&#8607;](#top)


In [12]:
display(workflow_run_options_form)
display(runWorkflowButton)

Group(children=(HTML(value="<p   style='background-color: #DCDCDC; font-size: 150%; padding: 5px'>Workflow Run…

Button(description='Run Workflow', layout=Layout(height='40px', width='250px'), style=ButtonStyle(button_color…

In [13]:
def send_user_email(workflow_succeeded):

    environ = dict(os.environ)
    #print (type(environ))
    #print (environ)
    key = 'SESSION'
    if key in environ:
        job_num = str(environ[key])
    else:
        job_num = 'job_num unknown'
    #print (job_num)

    email_subject = 'ghubex1 session # ' + job_num
    
    if workflow_succeeded:
        email_text = 'Your ghubex1 job is complete!\r'
        email_text = email_text+'\rOutput files can be accessed on theghub.org in the following directory:'
        email_text = email_text+'\r' + str(self_workingdir)
    else:
        email_text = 'ghubex1 job #' + str(job_num) + ' Failed.'
        email_text = email_text+'\rPlease check theghub.org for further information, in the directory:'
        email_text = email_text+'\r' + str(self_workingdir)        
        
    email_cmd = 'submit --progress silent mail2self -t "'+email_text+'" -s "'+email_subject+'"'
    
    # email debugging
    #start_time = time.time()
    os.system(email_cmd)
    #elapsed_time = time.time() - start_time
    #print ('email elapsed time: ', elapsed_time)
    
def finish_workflow_processing():
    
    try:

        log_info ('\nfinish_workflow_processing...')
        
        # ghub_exercise1-workflow.dax is created by Wrapper.py
        #filepath = os.path.join(self_workingdir, 'ghub_exercise1-workflow.dax')
        #if os.path.exists(filepath):
            #print ("Deleting: %s\n" %filepath)
            #os.remove(filepath)

        for file in os.listdir(self_workingdir):
            if os.path.isfile(file):
                if file.startswith('python-') and file.endswith('.stdout'):
                    #f = open(file,'r')
                    #for line in f:
                        #log_info (line)
                    #f.close()
                    os.remove(file)
                    
        for file in os.listdir(self_workingdir):
            if os.path.isfile(file):
                if file.startswith('python-') and file.endswith('.stderr'):
                    f = open(file,'r')
                    for line in f:
                        log_info (line)
                    f.close()
                    #os.remove(file)
         
        filepath = os.path.join(self_workingdir, 'pegasus.analysis')
        if (os.path.exists(filepath)):
            filesize = os.path.getsize(filepath)
            log_info ('pegasus.analysis filesize: ' + str(filesize))
            log_info ("pegasus.analysis:\n")
            f = open(filepath, 'r')
            output = f.read()
            f.close()
            log_info (output)
            #os.remove(filepath)
        
        filepath = os.path.join(self_workingdir, "pegasusstatus.txt")
        if os.path.exists(filepath):
            #print ("Deleting: %s\n" %filepath)
            os.remove(filepath)

        filepath = os.path.join(self_workingdir, "pegasusjobstats.csv")
        if os.path.exists(filepath):
            #print ("Deleting: %s\n" %filepath)
            os.remove(filepath)

        filepath = os.path.join(self_workingdir, "pegasussummary-time.csv")
        if os.path.exists(filepath):
            #print ("Deleting: %s\n" %filepath)
            os.remove(filepath)

        filepath = os.path.join(self_workingdir, "pegasussummary.csv")
        if os.path.exists(filepath):
            #print ("Deleting: %s\n" %filepath)
            os.remove(filepath)

        # send email to user
        send_user_email(self_workflow_succeeded)
                
        log_info ('finish_workflow_processing done.')
        
    except Exception as e:
        log_error (workflow_output, "EXCEPTION: %s\n" % str(e))


<a name="step_3"></a>
## Step 3: View Workflow Output [&#8607;](#top)


In [14]:
workflow_output = widgets.Output(layout={'border': '1px solid black'})
display(workflow_output)

Output(layout=Layout(border='1px solid black'))

<a name="step_4"></a>
## Step 4: View Log Output [&#8607;](#top)

- If an error is encountered while running this tool,
the cause of the error will be written to the log output file, ghub_exercise1_log_file.txt.

- Click the `Show Log Output` button to open the `Log Output` window and view the log output file.


In [15]:
def show_log_output(change):
    
    if os.path.exists(self_log_filepath):
            
        if show_log_output_button.description == 'Show Log Output':
        
            show_log_output_button.description = 'Hide Log Output'
        
            with log_output:
            
                if os.path.exists(self_log_filepath):
                    print("%s: \n\n" %self_log_filepath)
                    f = open(self_log_filepath,'r')
                    for line in f:
                        print(line.rstrip())
                    f.close()
                else:
                    job_error (log_output, '%s does not exist ' %filepath + '. Please contact us.')
        else:
        
            show_log_output_button.description = 'Show Log Output'
            log_output.clear_output()
    else:
        job_error (log_output, '%s does not exist ' %filepath + '. Please contact us.')

show_log_output_button.add_class("buttontextclass")
show_log_output_button.on_click(show_log_output)
display (show_log_output_button)

Button(description='Show Log Output', layout=Layout(height='40px', width='250px'), style=ButtonStyle(button_co…

In [16]:
log_output = widgets.Output(layout={'border': widget_output_border_style})
display (log_output)

Output(layout=Layout(border='1px solid black'))

In [17]:
# Download from Ghub
#def flush_log_file():
    #FH1.flush()
#display(HTML('<h4>Download File: %s</h4>' %os.path.basename(self_log_filepath)))
#downloadTXTButton = hublib.ui.Download(os.path.relpath(self_log_filepath, os.getcwd()),
    #label = 'Download Log', style='success', icon='fa-arrow-circle-down', cb=flush_log_file)
#display(downloadTXTButton)

In [18]:
# Initialize widgets with default values

# Process default location
#get_bounding_box()

