In [None]:
#%load_ext autoreload
#%autoreload 2

# <center> Pegasus WMS Workflow MATLAB Example </center>

## Overview

- This tool demonstrates running a Ghub Pegasus Workflow Management System (WMS) workflow with Matlab executables on CCR's generally accessible compute cluster, UB-HPC. This tool's repository is located at https://github.com/GhubGateway/Ghub_Pegasus_WMS_MATLAB_Example. Please see this repository for information on hosting a GitHub tool on Ghub.

- Enter the latitude and longitude coordinates in decimal degrees. Click the `Run Workflow` button to run the workflow which converts the coordinates to UTM.

## Background

- The Matlab executables are encapsulated as a workflow. The Pegasus Workflow Management System (WMS) automates and manages the execution of the workflow jobs, <br /> including staging the jobs, distributing the work, submitting the jobs to run in parallel on CCR, as well as handling data flow dependencies and overcoming job failures.  See https://pegasus.isi.edu/documentation/index.html for more information on the Pegasus Workflow Management System (WMS). See https://theghub.org/tools/pegtut for an introductory Pegasus tutorial.
- The submit command enables Ghub users to execute code on CCR's UB-HPC compute cluster. See https://theghub.org/kb/development/using-submit for more information on the submit command. See https://help.hubzero.org/documentation/current/tooldevs/grid/pegasuswf and ./bin/Wrapper.py for more information on submitting a pegasus-plan for the workflow.
- This Jupyter-based tool uses Python 3. See https://theghub.org/resources?alias=jupyterexamples for more information on developing Jupyter-based tools on Ghub.
- This tool is deployed on Debian 10 to run in Tool or App mode style. See https://theghub.org/kb/development/deploy-styles-for-jupyter-tools for more information on deploying Jupyter-based tools on Ghub.


In [None]:
# Setup and preoprocessing:

import sys
import os
import getpass
import platform
import shutil
import atexit
import math
import numpy as np
import pandas as pd
import time

import ipywidgets as widgets
from IPython.display import display, HTML, Markdown, clear_output, Image, Javascript
#import xml.etree.ElementTree as et

import hublib
#print (help(hublib))
import hublib.ui as ui
#print (help(ui))
import hublib.use
#print (help(hublib.use))

#print(sys.path)

# Set up the environment for this notebook

# Setup paths to executables
scriptpath = os.path.realpath(" ")
        
# Get the parent dirs
self_tooldir = os.path.dirname(scriptpath)

# Setup path to python and bash scripts
self_bindir = os.path.join(self_tooldir, "bin")

# Add to PYTHONPATH
sys.path.insert (1, self_bindir)

# Set up path to the current data directory
self_datadir = os.path.join(self_tooldir, "data")

# Set up path to the current session directory
self_workingdir = os.getcwd()

# Set up path to the user's home directory
self_homedir = os.path.expanduser("~")

# Initialize the dated run directory.
# Workflow results are not available until after a workflow is executed via Pegasus and completes
self_rundir = ""

self_user = getpass.getuser()

# Configuration parameters

import Configuration as cfg

# Version of Pegasus
# Note: when switching the version of Pegasus, delete ~/.pegasus/workflow.db
%use pegasus-5.0.1
from Wrapper_5_0_1 import Wrapper

np.set_printoptions(threshold=np.inf) 

self_log_filepath = os.path.join(self_workingdir, 'ghub_exercise3_log_file.txt')
self_log_snapshot_filepath = os.path.join(self_workingdir, 'ghub_exercise3_log_snapshot_file.txt')
self_log_backup_filepath = os.path.join(self_workingdir, 'ghub_exercise3_log_backup_file.txt')

widget_border_style = '1px solid black'
widget_output_border_style = '1px solid black'

BOLD = '\033[1m'
SUCCESS = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
END = '\033[0m'

dropdown_str_width = 16

dropdown_width = '965px'
dropdown_height = '30px'
button_width = '250px'
button_height = '40px'
ui_string_width = '96.5%'
ui_dropdown_width = '96.2%'

# Clean up: remove files from the data/results folder and the bin/__pycache__ folder
def exit_handler():
    
    for file in os.listdir(self_workingdir):
        
        if os.path.isfile(file):
            if file.endswith(".txt"):
                if file != "README.txt" and file.endswith('utm.txt') == False and file.endswith('deg.txt') == False \
                    and file != self_log_filepath:
                    #print ("Deleting: %s\n" %file)
                    os.remove(file)
            elif file.endswith(".yml"):
                #print ("Deleting: %s\n" %file)
                os.remove(file)
            elif file.endswith(".stdout"):
                #print ("Deleting: %s\n" %file)
                os.remove(file)
            elif file.endswith(".stderr"):
                #print ("Deleting: %s\n" %file)
                os.remove(file)

    #dirpath = os.path.join(self_bindir, "__pycache__")
    #if (os.path.exists(dirpath)):
        #print ("Deleting: %s\n" %dirpath)
        #shutil.rmtree(dirpath)

atexit.register(exit_handler);   

In [None]:
# prevent In[] and Out[] from displaying on left
#HTML('''
#<style>.prompt{width: 0px; min-width: 0px; visibility: collapse}</style>
#''')

In [None]:
#https://api.jquery.com/ready/
HTML('''
<script>
    function scroll_to_top() {
        Jupyter.notebook.scroll_to_top();
    } 
    $( window ).on( "load", scroll_to_top() );
</script>
''')

In [None]:
# Button styles
HTML('''
<style>.buttontextclass { color:black ; font-size:130%}</style>
''')

In [None]:
if os.path.exists(self_log_filepath):
    shutil.copy (self_log_filepath, self_log_backup_filepath)
    
FH1 = open(self_log_filepath, 'w')

show_log_output_button = widgets.Button(description="Show Log Output", disabled=False,\
    layout=widgets.Layout(width=button_width, height=button_height),\
    style= {'button_color':'lightgreen','font_weight':'bold'})

# Utility Functions

def log_info (message):
    if show_log_output_button.description == 'Hide Log Output': 
        with log_output:
            print (message)    
    FH1.write('%s\n' %message)
    FH1.flush()
        
def log_status (output_widget, message):
    
    with output_widget:
        print (message)
    log_info (message)
    
def log_success (output_widget, message):
    
    with output_widget:
        print ('%s%s%s' %(SUCCESS,message,END))
    log_info (message)
    
def log_warning (output_widget, message):
    
    with output_widget:
        print ('%s%s%s' %(WARNING,message,END))
    log_info (message)
    
def log_error (output_widget, message):
    
    with output_widget:
        print ('%s%s%s' %(FAIL,message,END))
    log_info (message)


if (1): #cfg.VERBOSE == True:
    
    log_info ('Operating System Platform: ' + platform.system() + ' ' + platform.release())
    log_info ('\n')

    log_info ('Environment:\n')
    log_info ('scriptpath: ' + scriptpath)
    log_info ('tooldir: ' + self_tooldir)
    log_info ('bindir: ' + self_bindir)
    log_info ('datadir: ' + self_datadir)
    log_info ('workingdir: ' + self_workingdir)
    log_info ('homedir: ' + self_homedir)
    log_info ('user: ' + self_user)
    log_info ('\n')
    
    #print (type(sys.path)) # <class 'list'>
    #print (sys.path)
    log_info ('sys.path: ' + ' '.join(str(path)+'\n' for path in sys.path))
    log_info ('\n')
    
    #print (type(os.environ["PATH"])) # <class 'str'>
    #print (os.environ["PATH"])
    log_info ('os.environ["PATH"]: ' + os.environ["PATH"])
    log_info ('\n')



### Note: ghub group membership is required to use this tool.<br/>



In [None]:
# Verify ghub group membership
in_ghub_group = False

groups_info_filename = 'groups_info.txt'
groups_cmd = 'groups > %s' %groups_info_filename
os.system(groups_cmd)

if os.path.exists(groups_info_filename):
    f = open(groups_info_filename,'r')
    for line in f:
        groups = line.split(' ')
        #print ('groups: ', groups)
        for group in groups:
            if group == 'ghub':
                in_ghub_group = True
                break
if in_ghub_group == False:
    message = 'ghub group membership is required to use this tool. %s is not a member of the ghub group. Please contact us.' %self_user
    print ('%s%s%s' %(FAIL,message,END), flush=True)
    log_info (message)
else:
    message = '%s is a member of the ghub group.' %self_user
    print ('%s%s%s' %(SUCCESS,message,SUCCESS), flush=True)
    log_info (message)


<a name="top"></a>
#### [**Processing Steps**](#top)<br />

1. [Enter the Latitude and Longitude Coordinates in Decimal Degrees](#step_1) <br />
2. [Run the Workflow](#step_2)<br />
    1. [Executes deg2utm to convert the coordinates to UTM. Creates utm.txt](#step_2)<br />
    2. [Executes utm2deg to convert the coordinates back to decimal degrees. Reads utm.txt and creates deg.txt](#step_2)<br />
3. [View Workflow Progress](#step_3)<br />
4. [View Workflow Results](#step_4)<br />
5. [View Log Output](#step_5)<br />


In [None]:
# #219F hex = #8607 decimal
# This works also works for an up arrow: [$\tiny\uparrow$](#top)

<a name="step_1"></a>
## Step 1: Enter the Latitude and Longitude Coordinates [&#8607;](#top)

Enter the latitude and longitude coordinates in decimal degrees.

Enter Latitude.   [Degrees North -90 to 90]<br />
Enter Longitude.  [Degrees East -180 to 180]


In [None]:
# Default latitude and longitude to Buffalo, NY latitude and longitude coordinates.
#https://www.gps-latitude-longitude.com/gps-coordinates-of-buffalo-ny
# Also see: https://www.latlong.net/lat-long-utm.html
latitude = ui.Number(
    name = 'Latitude',
    description = 'Latitude [degrees north -90 to 90]',
    units = '',
    value = '42.886447',
    min = '-90.0',
    max = '90.0'
)
longitude = ui.Number(
    name = 'Longitude',
    description = 'Longitude [degrees east -180 to 180]',
    units = '',
    value = '-78.878369',
    min = '-180.0',
    max = '180.0'
)
coordinates_form = ui.Form([latitude,
             longitude], name = 'Coordinates')


In [None]:
display(coordinates_form)

In [None]:
 # Run Workflow

self_numsamples = 0

maxwalltime = ui.Number(
    name = 'Maximum Walltime',
    description = 'Maximum Walltime [min]',
    units = 'min',
    value = '10.0',
    min = '5.0',
    max = '60.0'
)

workflow_run_options_form = ui.Form([maxwalltime], name = 'Workflow Run Options')

def run_workflow(p):
    
    # print (p) #Button
    global self_workflow_succeeded
    self_workflow_succeeded = False
        
    workflow_progress.clear_output()
    workflow_results.clear_output()
        
    with workflow_progress:
        
        runWorkflowButton.disabled = True
        show_log_output_button.disabled = True
        
        start_time = time.time()

        try:
            
            matlab_launch_exec_path = os.path.join(self_tooldir, "remotebin", "matlabLaunch.sh")
            log_info("matlabLaunch.sh path: " + matlab_launch_exec_path)

            log_status (workflow_progress, "Pegasus workflow in progress. This should take approximately 5 minutes...")
            
            Wrapper (" ", \
                self_tooldir, self_bindir, self_datadir, self_workingdir, self_rundir, \
                latitude.value, longitude.value, int(maxwalltime.value))
            
            # Check if utm.txt and deg.txt were created and transferred from CCR 
            # to determine if workflow completed successfully

            self_workflow_results1_filepath = os.path.join(self_workingdir, "utm.txt")
            self_workflow_results2_filepath = os.path.join(self_workingdir, "deg.txt")

            if os.path.exists(self_workflow_results1_filepath) and os.path.exists(self_workflow_results2_filepath):

                log_status (workflow_progress, 'Workflow completed successfully\n')
                self_workflow_succeeded = True
                
                with workflow_results:
                    
                    print ('Workflow Results:\n')
        
                    print('UTM Coordinates:\n')
                    f = open(self_workflow_results1_filepath, 'r')
                    output = f.read()
                    f.close()
                    #print (type(output))
                    output = output.split('\n')
                    #print (type(output))
                    print ('x [UTM Easting]:  ' + output[0])
                    print ('y [UTM Northing]: ' + output[1])
                    print ('UTMZONE:          ' + output[2])
                    
                    print('\nLatitude and Longitude Coordinates:\n')
                    f = open(self_workflow_results2_filepath, 'r')
                    output = f.read()
                    f.close()
                    #print (type(output))
                    output = output.split('\n')
                    #print (type(output))
                    print ('Latitude  [Degrees North -90 to 90]:  ' + output[0])
                    print ('Longitude [Degrees East -180 to 180]: ' + output[1])

            else:

                log_error (workflow_progress, 'Workflow did not complete successfully')
                log_error (workflow_progress, '%s and/or %s not generated by the workflow\n' \
                       %(self_workflow_results1_filepath, self_workflow_results2_filepath))
                self_workflow_succeeded = False

                filepath = os.path.join(self_workingdir, 'pegasus.analysis')
                if (os.path.exists(filepath)):
                    log_info("pegasus.analysis:\n")
                    FH1 = open(filepath, 'r')
                    output = FH1.read()
                    FH1.close()
                    log_info (output)
       
        except Exception as e:
        
            log_error (workflow_progress, 'Workflow Exception: %s\n' %str(e))
       
        runWorkflowButton.disabled = False
        show_log_output_button.disabled = False

        log_info ('\nWorkflow elapsed time: ' + str((time.time() - start_time)/60.0) + ' [min]\n')

        finish_workflow_processing()

# Abort
# Select Kernel Interrupt
#if self_tW.is_alive() == True:
   #self_tW.terminate()

runWorkflowButton = widgets.Button(description="Run Workflow", disabled=False,\
    layout=widgets.Layout(width=button_width, height=button_height),\
    style= {'button_color':'lightgreen','font_weight':'bold'})
runWorkflowButton.add_class("buttontextclass")
runWorkflowButton.on_click (run_workflow)
#help (runWorkflowButton)

# Note: See /apps/share64/debian7/anaconda/anaconda-6/lib/python3.7/site-packages/hublib/ui/pathselect.py,
# file property initialized to None, when a file is selected gets set to the selected file.


<a name="step_2"></a>
## Step 2: Run the Workflow [&#8607;](#top)

 Click the `Run Workflow` button to run the workflow which converts the decimal degrees coordinates to UTM.
 

In [None]:
display(workflow_run_options_form)
display(runWorkflowButton)

In [None]:
def send_user_email(workflow_succeeded):

    # Reference: JMS crevasseoib tool:
    job_num = str(os.environ['SESSION'])
    
    email_subject = 'ghubex3 session #' + str(os.environ['SESSION'])
    
    if workflow_succeeded:
        email_text = 'Your ghubex3 job is complete!\r'
        email_text = email_text+'\rOutput files can be accessed on theGHub.org in the following directory:'
        email_text = email_text+'\r' + str(self_workingdir)
    else:
        email_text = 'ghubex3 job #' + str(job_num) + ' Failed.'
        email_text = email_text+'\rPlease check theghub.org for further information, in the directory:'
        email_text = email_text+'\r' + str(self_workingdir)        
        
    email_cmd = 'submit --progress silent mail2self -t "'+email_text+'" -s "'+email_subject+'"'
    
    # email debugging
    #start_time = time.time()
    os.system(email_cmd)
    #elapsed_time = time.time() - start_time
    #print ('email elapsed time: ', elapsed_time)
    
def finish_workflow_processing():
    
    try:

        log_info ('\nfinish_workflow_processing...')
        
        # ghub_exercise1-workflow.dax is created by Wrapper.py
        #filepath = os.path.join(self_workingdir, 'ghub_exercise1-workflow.dax')
        #if os.path.exists(filepath):
            #print ("Deleting: %s\n" %filepath)
            #os.remove(filepath)

        for file in os.listdir(self_workingdir):
            if os.path.isfile(file):
                if file.endswith('.stdout'):
                    #if file.startswith('matlab-'):
                        #log_info ('file ' + file + ':\n')
                        #f = open(file,'r')
                        #for line in f:
                            #log_info (line)
                        #f.close()
                    os.remove(file)
                    
        for file in os.listdir(self_workingdir):
            if os.path.isfile(file):
                if file.endswith('.stderr'):
                    if file.startswith('matlab-'):
                        log_info ('file ' + file + ':\n')
                        f = open(file,'r')
                        for line in f:
                            log_info (line)
                        f.close()
                    os.remove(file)
         
        filepath = os.path.join(self_workingdir, 'pegasus.analysis')
        if (os.path.exists(filepath)):
            filesize = os.path.getsize(filepath)
            log_info ('pegasus.analysis filesize: ' + str(filesize))
            log_info ('pegasus.analysis:\n')
            f = open(filepath, 'r')
            output = f.read()
            f.close()
            log_info (output)
            os.remove(filepath)
        
        filepath = os.path.join(self_workingdir, "pegasusstatus.txt")
        if os.path.exists(filepath):
            #print ("Deleting: %s\n" %filepath)
            os.remove(filepath)

        filepath = os.path.join(self_workingdir, "pegasusjobstats.csv")
        if os.path.exists(filepath):
            #print ("Deleting: %s\n" %filepath)
            os.remove(filepath)

        filepath = os.path.join(self_workingdir, "pegasussummary-time.csv")
        if os.path.exists(filepath):
            #print ("Deleting: %s\n" %filepath)
            os.remove(filepath)

        filepath = os.path.join(self_workingdir, "pegasussummary.csv")
        if os.path.exists(filepath):
            #print ("Deleting: %s\n" %filepath)
            os.remove(filepath)

        # send email to user
        send_user_email(self_workflow_succeeded)
        
        log_info ('finish_workflow_processing done.')
        
    except Exception as e:
        log_error (create_figures_button_callback_output, "EXCEPTION: %s\n" % str(e))


<a name="step_3"></a>
## Step 3: View Workflow Progress [&#8607;](#top)


In [None]:
workflow_progress = widgets.Output(layout={'border': '1px solid black'})
display(workflow_progress)

<a name="step_4"></a>
## Step 4: View Workflow Results [&#8607;](#top)


In [None]:
workflow_results = widgets.Output(layout={'border': widget_output_border_style})
display(workflow_results)

<a name="step_5"></a>
## Step 5: View Log Output [&#8607;](#top)

- If an error is encountered while running this tool,
the cause of the error will be written to the log output file, ghub_exercise1_log_file.txt.

- Click the `Show Log Output` button to open the `Log Output` window and view the log output file.


In [None]:
def show_log_output(change):
    
    if os.path.exists(self_log_filepath):
            
        if show_log_output_button.description == 'Show Log Output':
        
            show_log_output_button.description = 'Hide Log Output'
        
            with log_output:
            
                if os.path.exists(self_log_filepath):
                    print("%s: \n\n" %self_log_filepath)
                    f = open(self_log_filepath,'r')
                    for line in f:
                        print(line.rstrip())
                    f.close()
                else:
                    log_error (log_output, '%s does not exist ' %filepath + '. Please contact us.')
        else:
        
            show_log_output_button.description = 'Show Log Output'
            log_output.clear_output()
    else:
        log_error (log_output, '%s does not exist ' %filepath + '. Please contact us.')

show_log_output_button.add_class("buttontextclass")
show_log_output_button.on_click(show_log_output)
display (show_log_output_button)

In [None]:
log_output = widgets.Output(layout={'border': widget_output_border_style})
display (log_output)

In [None]:
# Download from Ghub
#def flush_log_file():
    #FH1.flush()
#display(HTML('<h4>Download File: %s</h4>' %os.path.basename(self_log_filepath)))
#downloadTXTButton = hublib.ui.Download(os.path.relpath(self_log_filepath, os.getcwd()),
    #label = 'Download Log', style='success', icon='fa-arrow-circle-down', cb=flush_log_file)
#display(downloadTXTButton)