In [3]:
from parsl import *

import logging
import sys
import concurrent.futures as cf
import kale.workflow_objects as wo
from kale.workflow_widgets import WorkflowWidget, WorkerPoolWidget

In [4]:
def gen_empty_future():
    f = cf.Future()
    f._state = 'FINISHED'
    return f

In [5]:
wpw = WorkerPoolWidget()
wpw

In [19]:
class KaleDFK(DataFlowKernel):
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.fu_to_func = dict()
        self.fu_to_task = dict()
        
        # A workflow must be created manually
        # by calling self.new_workflow(name)
        self.workflow_created = False
        
        # Store names and funcs of deps,
        # indexed by task_id.
        self.dep_funcs = dict()
        self.dep_names = dict()
        
    def parse_args(self, args, kwargs):
        """Replace any futures in args or kwargs with their corresponding Task at definition time
        (which will be replaced by the future's result by Kale at execution time)."""
        new_args = []
        for arg in args:
            if isinstance(arg, cf.Future):
                new_args.append(self.fu_to_task[arg])
            else:
                new_args.append(arg)

        new_kwargs = dict()
        for key, val in kwargs.items():
            if isinstance(val, cf.Future):
                new_kwargs[key] = self.fu_to_task[val]
            else:
                new_kwargs[key] = val

        return new_args, new_kwargs

    
    def new_workflow(self, name):
        self.kale_workflow = wo.Workflow(name=name)
        self.workflow_created = True
    
    def submit(self, func, *args, **kwargs):
        if self.workflow_created:
            return self.inner_submit(func, *args, **kwargs)
        else:
            raise Exception("Call KaleDFK.new_workflow(name) before adding tasks.")
            
    def launch_task(self, task_id, executable, *args, **kwargs):
        """Override launch_task to do nothing at definition time."""
        return gen_empty_future()
        
    def inner_submit (self, func, *args, **kwargs):
        from parsl.dataflow.states import States
        from parsl.dataflow.futures import AppFuture
        ''' Add task to the dataflow system.

        If all deps are met :
              send to the runnable queue
              and launch the task
        Else:
              post the task in the pending queue

        Returns:
               (AppFuture) [DataFutures,]
        '''
        
        #print("""Inner submit
        #self: {}
        #func: {}
        #args: {}
        #kwargs: {}
        #""".format(self,func, args, kwargs))
        

        task_id = self.task_count
        self.task_count += 1
        
        # Extract task & dependency info
        task_name = func.__name__

        dep_cnt, depends = super()._count_all_deps(task_id, args, kwargs)

        #print("task_id = {}".format(task_id))
        #print("args = {}".format(args))
        #print("kwargs = {}".format(kwargs))
        #print("dep_cnt = {}".format(dep_cnt))
        #print("depends = {}".format(depends))

        
        self.dep_funcs[task_id] = [self.fu_to_func[fu] for fu in depends]
        self.dep_names[task_id] = [dep.__name__ for dep in self.dep_funcs[task_id]]

        #print("dep funcs = {}".format(self.dep_funcs[task_id]))
        #print("dep names = {}".format(self.dep_names[task_id]))
        
        #dep_cnt  = self._count_deps(dep ends, task_id)
        task_def = { 'depends'    : depends,
                     'func'       : func,
                     'args'       : args,
                     'kwargs'     : kwargs,
                     'callback'   : None,
                     'dep_cnt'    : dep_cnt,
                     'exec_fu'    : None,
                     'status'     : States.unsched,
                     'app_fu'     : None  }

        if task_id in self.tasks:
            raise DuplicateTaskError("Task {0} in pending list".format(task_id))
        else:
            self.tasks[task_id] = task_def

        # Extract stdout and stderr to pass to AppFuture:
        task_stdout = kwargs.get('stdout', None)
        task_stderr = kwargs.get('stderr', None)

        if dep_cnt == 0 :
            # Set to running
            new_args, kwargs, exceptions = self.sanitize_and_wrap(task_id, args, kwargs)
            if not exceptions:
                self.tasks[task_id]['exec_fu'] = self.launch_task(task_id, func, *new_args, **kwargs)
                self.tasks[task_id]['app_fu']  = AppFuture(self.tasks[task_id]['exec_fu'],
                                                           tid=task_id,
                                                           stdout=task_stdout,
                                                           stderr=task_stderr)
                self.tasks[task_id]['status']  = States.running
            else:
                self.tasks[task_id]['exec_fu'] = None
                app_fu = AppFuture(self.tasks[task_id]['exec_fu'],
                                   tid=task_id,
                                   stdout=task_stdout,
                                   stderr=task_stderr)
                app_fu.set_exception(DependencyError(exceptions, "Failures in input dependencies", None))
                self.tasks[task_id]['app_fu']  = app_fu
                self.tasks[task_id]['status']  = States.dep_fail
        else:
            # Send to pending, create the AppFuture with no parent and have it set
            # when an executor future is available.
            self.tasks[task_id]['app_fu']  = AppFuture(None, tid=task_id,
                                                       stdout=task_stdout,
                                                       stderr=task_stderr)
            self.tasks[task_id]['status']  = States.pending

        #logger.debug("Task:%s Launched with AppFut:%s", task_id, task_def['app_fu'])
        
        
        fu = task_def['app_fu'] # This was the return value
        
        ## End of Parsl code
        
        self.fu_to_func[fu] = func
        
        # Replace futures with Tasks in args for Task definition
        new_args, new_kwargs = self.parse_args(args, kwargs)
        
        # Create Task
        task = wo.PythonFunctionTask(
            name=task_name,
            func=func,
            args=new_args,
            kwargs=new_kwargs
        )
        
        self.fu_to_task[fu] = task
        
        # This doesn't work if multiple tasks have the same name (common w/ Parsl definition)
        #dep_tasks = [
        #    self.kale_workflow.get_task_by_name(dep_name)
        #    for dep_name in self.dep_names[task_id]
        #]
        
        dep_tasks = [
            self.fu_to_task[depend]
            for depend in depends
        ]
        
        # Add to workflow
        self.kale_workflow.add_task(
            task,
            dependencies=dep_tasks
        )
        
        return fu
        


In [20]:
workers = cf.ThreadPoolExecutor()
parsl_dfk = DataFlowKernel(executors=[workers])
kale_dfk = KaleDFK(executors=[workers])
kale_dfk.new_workflow('MyWorkflow')

In [21]:
@App('python', kale_dfk)
def rand_add(prev, lim=[0,10]):
    """Add a random number to the previous one.
    """
    import random
    myrand = random.randint(*lim)
    mysum = myrand + prev
    print("My number is {}. Theirs was {}. The sum is {}.".format(myrand, prev, mysum))
    return mysum

In [22]:
rand_add(rand_add(0))

<AppFuture at 0x7fbc0f8a5cf8 state=finished returned NoneType>

In [23]:
WorkflowWidget(kale_dfk.kale_workflow, wpw)

Workflow submitted.Attempting to start job.

parsl_run

My number is 7. Theirs was 0. The sum is 7.

My number is 9. Theirs was 7. The sum is 16.
Workflow submitted.Attempting to start job.

parsl_run

My number is 2. Theirs was 0. The sum is 2.

My number is 5. Theirs was 2. The sum is 7.


# Parsl example
http://parsl.readthedocs.io/en/latest/userguide/workflow.html


In [41]:
@App('python', kale_dfk)
def wait_sleep_double(x, fu_1, fu_2):
    import time
    time.sleep(2)   # Sleep for 2 seconds
    print("Given {}, I give you {}.".format(x, x*2))
    return x*2

kale_dfk.new_workflow('Parsl Example')
# Launch two apps, which will execute in parallel, since they don't have to
# wait on any futures
doubled_x = wait_sleep_double(10, None, None)
doubled_y = wait_sleep_double(10, None, None)

# The third depends on the first two :
#    doubled_x   doubled_y     (2 s)
#           \     /
#           doublex_z          (2 s)
doubled_z = wait_sleep_double(doubled_x, doubled_x, doubled_y)

# doubled_z will be done in ~4s
print(doubled_z.result())

None


In [42]:
WorkflowWidget(kale_dfk.kale_workflow, wpw)

Workflow submitted.Attempting to start job.

parsl_run


Given 10, I give you 20.
Given 10, I give you 20.

Given 20, I give you 40.


In [46]:
kale_dfk.kale_workflow.index_dict[1].future.result()

20

In [178]:
@App('python', kale_dfk)
def step1():
    import timedef rand_add(prev, lim=[0,10]):
    """Add a random number to the previous one.
    """
    import random
    myrand = random.randint(*lim)
    mysum = myrand + prev
    print("My number is {}. Theirs was {}. The sum is {}.".format(myrand, prev, mysum))
    return mysum
    print("Step 1")
    time.sleep(1)
    return 1
    
@App('python', kale_dfk)
def step2(prev, fav_num):
    print("Step 2 after s@App('python', kale_dfk)
def rand_add(prev, lim=[0,10]):
    """Add a random number to the previous one.
    """
    import random
    myrand = random.randint(*lim)
    mysum = myrand + prev
    print("My number is {}. Theirs was {}. The sum is {}.".format(myrand, prev, mysum))
    return mysumtep {}. My favorite number is {}.".format(prev, fav_num))
    return 2

# Outstanding Questions
- How to prevent execution at definition time?
- When to create workflow, how to pass it around to call add_task?

In [179]:
step2(step1(), 2).result()

Inner submit
        self: <__main__.KaleDFK object at 0x7f815c4b7550>
        func: <function step1 at 0x7f815e8c2c80>
        args: ()
        kwargs: {}
        
task_id = 0
args = ()
kwargs = {}
dep_cnt = 0
depends = []
dep funcs = []
dep names = []
Inner submit
        self: <__main__.KaleDFK object at 0x7f815c4b7550>
        func: <function step2 at 0x7f815e8c2a60>
        args: (<AppFuture at 0x7f815c423ac8 state=running>, 2)
        kwargs: {}
        
Step 1task_id = 1

args = (<AppFuture at 0x7f815c423ac8 state=running>, 2)
kwargs = {}
dep_cnt = 1
depends = [<AppFuture at 0x7f815c423ac8 state=running>]
dep funcs = [<function step1 at 0x7f815e8c2c80>]
dep names = ['step1']
Step 2 after step 1. My favorite number is 2.


2

In [181]:
WorkflowWidget(kale_dfk.kale_workflow, wpw)

Attempting to start job.Workflow submitted.
parsl_run


Step 1

Step 2 after step 1. My favorite number is 2.
Workflow submitted.Attempting to start job.

parsl_run

Step 1

Step 2 after step 1. My favorite number is 2.


In [4]:
class LogFuture(cf.Future):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._state = 'FINISHED'
    
    def add_done_callback(self, *args, **kwargs):
        print("Done callback:")
        print("args = {}".format(args))
        print("kwargs = {}".format(kwargs))
        super().add_done_callback(*args, **kwargs)