In [2]:
import threading
from time import sleep
import yaml
import inspect
import pandas as pd
from yaml.loader import SafeLoader
import logging
from threading import Thread

logging.basicConfig(filename='WorkFlowLog-B.txt', filemode='w',
                    format='%(asctime)s;%(message)s', datefmt='%Y-%m-%d %H:%M:%S.000000', level=logging.INFO)

workFlowVariables = dict()


def checkCondition(task):
    condition = task['Condition']
    variable = task['Condition'].split(" ")[0]
    while not (variable in workFlowVariables):
        sleep(1)
    value = workFlowVariables[variable]
    return eval(condition.replace(variable, str(value)))


def checkInputs(Inputs):
    newInputs = Inputs
    for key, value in newInputs.items():
        if value.startswith('$('):
            newInputs[key] = workFlowVariables[value]
    return newInputs


def TimeFunction(name, params):
    fValue = params['FunctionInput']
    duration = params['ExecutionTime']
    # Get Current Function Name
    fName = inspect.getframeinfo(inspect.currentframe()).function
    logging.info('%s Executing %s', name, str.format(
        "{}({},{})", fName, fValue, duration))
    sleep(int(duration))


def DataLoad(name, params):
    fileName = params['Filename']
    df = pd.read_csv(fileName)
    fName = inspect.getframeinfo(inspect.currentframe()).function
    logging.info('%s Executing %s', name, str.format(
        "{}({})", fName, fileName))
    return [df, len(df)]


Tasks = {
    'TimeFunction': TimeFunction,
    'DataLoad': DataLoad
}


def executeFlow(name, flow, lock=None):
    logging.info('%s Entry', name)
    if flow['Execution'] == 'Sequential':
        for key, act in flow['Activities'].items():
            log_name = name + '.' + key
            act_type = act['Type']
            if act_type == 'Flow':
                # logging.info('%s Entry', log_name)
                executeFlow(log_name, act)
                # logging.info('%s Exit', log_name)
            if act_type == 'Task':
                executeTask(log_name, act)
    if flow['Execution'] == 'Concurrent':
        activities = flow['Activities'].items()
        threads = []
        lock = threading.Lock()
        for key, act in activities:
            log_name = name + '.' + key
            act_type = act['Type']
            if act_type == 'Flow':
                t = Thread(target=executeFlow, args=(log_name, act, lock, ))
                t.start()
                threads.append(t)
            if act_type == 'Task':
                t = Thread(target=executeTask, args=(log_name, act, lock, ))
                t.start()
                threads.append(t)
        for t in threads:
            t.join()
    logging.info('%s Exit', name)


def executeTask(name, task, lock=None):
    condn_res = True
    if ('Condition' in task):
        condn_res = checkCondition(task)

    if not condn_res:
        logging.info('%s Entry', name)
        logging.info('%s Skipped', name)
        logging.info('%s Exit', name)

    else:
        task['Inputs'] = checkInputs(task['Inputs'])
        logging.info('%s Entry', name)
        if not ('Outputs' in task):
            Tasks[task['Function']](name, task['Inputs'])
        else:
            res = Tasks[task['Function']](name, task['Inputs'])
            for i in range(len(task['Outputs'])):
                key = '$(' + name + '.' + task['Outputs'][i] + ')'
                if lock is not None:
                    lock.acquire()
                workFlowVariables[key] = res[i]
                if lock is not None:
                    lock.release()
        logging.info('%s Exit', name)


with open('Milestone2B.yaml', 'r') as InputFile:
    workflow = list(yaml.load_all(InputFile, Loader=SafeLoader))

    for wf in workflow:
        for key, value in wf.items():
            steps = value
            if(steps['Type'] == 'Flow'):
                executeFlow(key, steps)
            if(steps['Type'] == 'Task'):
                executeTask(key, steps)