In [None]:
from ayx import Alteryx
    
try:
    from multiprocess import process, pool
except:
    Alteryx.installPackages('multiprocess')
    from multiprocess import process, pool

In [22]:
#Libraries required to process the data and send out alerts
import pandas as pd

# Reads in paths to the workflows and counts the number of them                 
paths = Alteryx.read("#1")
count_of_flows = paths.FullPath.count()

In [None]:
"""
This is the actual runner, essentially it uses AlteryxEngineCmd.exe to execute workflows and has
a timeout function - if any workflow takes more than 40 minutes (default time) it is 
automatically killed and the runner carries on to the next flow.

The worker then parses the results and sends them out to be read or carried into another function

Essential libraries are called from within the function itself because when the function is ran
in parallel it will be ran as it's own process.

"""

def worker(path):
    from datetime import timedelta, time as timeobj
    import pandas as pd
    import subprocess
    import time
    import re
    from socket import gethostname
    def workflow_runner(location):
        start = time.time()
        try:
            output = subprocess.check_output([f'\\\{gethostname()}\Alteryx\\bin\AlteryxEngineCmd.exe',
                                              location],timeout=2400)
        except subprocess.CalledProcessError as e:
            output = e.output
        except subprocess.TimeoutExpired:
            output = str('\r\n Timeout Error, workflow finished with 1 errors \r\n').encode("cp437")
        end = time.time()-start
        return output, end
    def result_parser(output,end):
        warn = re.compile(r"\d+\swarnings")
        error =  re.compile(r"\d+\serrors")
        conversion_err = re.compile(r"\d+\sfield conversion errors")
        seconds = re.compile(r"\d+\.\d\d\d")
        t= time.localtime()
        current_time = time.strftime("%m/%d/%Y %H:%M:%S", t)
        try:
            warnings = [int(warn
                            .findall(output.decode("cp437")
                            .split('\r\n')[-2])[0]
                            .split(" ")[0])]
        except:
            warnings = [0]
        try:
            errors = [int(error
                          .findall(output.decode("cp437")
                          .split('\r\n')[-2])[0]
                          .split(" ")[0])]
        except:
            errors = [0]    
        try:
            conversion = [int(conversion_err
                              .findall(output.decode("cp437")
                              .split('\r\n')[-2])[0]
                              .split(" ")[0])]
        except:
            conversion = [0]
        try:
            duration = [end]
        except:
            duration = [timeobj(0,0,0)]
        workflow = pd.DataFrame({"Output":[output.decode("cp437").split('\r\n')[-2]],
                                     "Warnings":warnings,
                                     "Conversion Errors":conversion,
                                     "Errors":errors,
                                     "Log":[output.decode("cp437")],
                                     "File":path.split('\\')[-1][:-5],
                                     "ModuleFullPath":path,
                                     "MasterRunTime":current_time,
                                     "Time":duration})
        return workflow
    results, finish = workflow_runner(path)
    return result_parser(results, finish)

In [None]:
# Emailer function used in the workflow when list runner takes too long
def emailer(email):
    import smtplib
    from email.message import EmailMessage
    msg = EmailMessage()
    warning_time = time.strftime('%H:%M:%S', time.gmtime(email['Total Time (seconds)'][0]))
    msg['Subject'] = f'**ALERT: List Runner has been running for {warning_time}**'
    msg['From'] = from_email # set up an email address for the email to come from
    msg['To'] = email['Addresses'][0]
    
    msg.set_content(f"""
    Hello!

    The list runner has currently finished {email['Completed Flows'][0]} of {email['Total Workflows'][0]} workflows and has been running for {warning_time}!

    Can you please take a few minutes to investigate or kill the list runner?

    Respectfully,
    List Runner
    """)

    s = smtplib.SMTP(smtp_relay) # alter this to the applicable SMTP relay
    s.send_message(msg)
    s.quit()

In [None]:
# This is the alert function, it checks that the total list runner time is still under dafault_duration
# If the list runner is over 2.5 hours (default) - it sends an email out with the current progress / run time

def duration_alert(duration,completed_flows):
    if duration >= 9000:
        emails = pd.DataFrame({'Addresses':[to_email],
                               'Total Workflows':[count_of_flows],
                               'Completed Flows':[completed_flows],
                               'Total Time (seconds)':[duration]})
        emailer(emails)
    else:
        pass

"""
This is the actually list_runner that uses the worker and alert functions to run and alert
users - it then outputs a df of metrics similar to the original list runner.

The "processes" variable is using the pool function to let the system know how many 
concurrent workers to run at once. We default at 2 but this number can be increased 
please consider the specifications of your system and the volume of data brought in
memory in each workflow when increasing the number of concurrent processes.
""" 

def list_runner(list_of_paths):
    df = pd.DataFrame({"Output":[],
                        "Warnings":[],
                        "Conversion Errors":[],
                        "Errors":[],
                        "Log":[],
                        "File":[],
                        "ModuleFullPath":[],
                        "MasterRunTime":[],
                        "Time":[]})
    t = 0
    flows = 0
    processes = pool.Pool(2)
    for item in processes.map(worker, list_of_paths):
        t2 = time.time()
        df = df.append(item)
        t3 = time.time() - t2
        t+=t3 
        flows +=1
        duration_alert(t,flows)
    return df

In [None]:
"""
This cell is where the list runner is actually executed against the paths supplied to it.
Those results then have some additional logic applied to pull out before being
output to Alteryx.
"""
workflows =  list_runner(paths['FullPath'])
result_test = lambda x: "Succeeded" if (x == 0) else "Failed"
seconds_conversion = lambda x: timedelta(seconds=x)
workflows['Result'] = list(map(result_test,workflows['Errors']))
workflows['Time'] = list(map(seconds_conversion,workflows['Time']))

In [None]:
# This is the cell that supplies to the results of the list runner to the first python output
Alteryx.write(workflows,1)