## Parsl Test/Demo

We outline a simple set of workflow patterns and show how they can be implemented with the raw DataFlowKernel.
Here are the workflows :

1. Single apps.
2. N apps launched in parallel
3. N apps launched in sequence such that the i'th task depends solely on the i-1'th.
4. N sequences launched in parallel.


In [2]:
import time
import random
from parsl import *
import parsl
from functools import partial
#parsl.set_stream_logger()

In [3]:
tworkers = ThreadPoolExecutor(max_workers=4)
dfk = DataFlowKernel(tworkers)

def cback(*args):
    #print("cbk : {}", args)
    return args[0]*5

def sleep_n (x):
    time.sleep(x)
    return x

def sleep_n_rand(x,y):
    ''' Sleep x +/- random(y) seconds
    '''
    import time
    import random
    dur = x+random.randint(-y,y)
    time.sleep(dur)
    return dur

In [None]:
#x = tworkers.submit(partial(cback, 1))

In [4]:
x = dfk.submit(partial(cback,1), [], None)

In [6]:
print(x.result())

5


In [10]:
%%time
''' This tests the first pattern, a parallel for loop.
'''

def launch_n (n):
    d = {}    
    for i in range(0,n):
        #d[i] = dfk.submit(partial(sleep_n_rand,10,5), [], None)
        d[i] = dfk.submit(partial(cback,i), [], None)
    return d

x = launch_n(10000)
#time.sleep(10)
print(sum([x[i].result() for i in x]))

249975000
CPU times: user 1.07 s, sys: 168 ms, total: 1.24 s
Wall time: 1.1 s


In [11]:
%%time
''' Testing Pipeline, a sequence of dependencies.

A -> B ... -> N

'''

def pipeline_n (n):
    d = {}
    for i in range(0,n):
        if i-1 not in d:        
            deps = [] 
        else:
            deps = [d[i-1]]
            
        print(i, deps)        
        #d[i] = "hello {0}".format(i)
        #d[i] = dfk.submit(partial(cback,1), deps, None)
        d[i] = dfk.submit(partial(sleep_n_rand,5,5), deps, None)
        
    return d


foo = pipeline_n (4)
print(foo)

0 []
1 [<AppFuture at 0x7f5d134ff668 state=pending>]
2 [<AppFuture at 0x7f5d134f9668 state=pending>]
3 [<AppFuture at 0x7f5d134f22e8 state=pending>]
{0: <AppFuture at 0x7f5d134ff668 state=running>, 1: <AppFuture at 0x7f5d134f9668 state=pending>, 2: <AppFuture at 0x7f5d134f22e8 state=running>, 3: <AppFuture at 0x7f5d134ed4a8 state=pending>}
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 838 Âµs


In [None]:
'''  Pipeline
     A -> B ... -> N
'''
import time

def sleep_n (x):
    time.sleep(x)
    return x

def pipeline_n (n):
    d = {}
    for i in range(0,n):
        if i-1 not in d:        
            deps = [] 
        else:
            deps = [d[i-1]]
            
        print(i, deps)        
        #d[i] = "hello {0}".format(i)
        d[i] = dfk.submit(partial(sleep_n, 10), deps, None)
        
    return d


foo = pipeline_n (4)
#dfk.current_state()
print ([foo[i].result() for i in foo])

In [None]:
'''  MapReduce
        foo   foo ... foo
          \    |      /
           \   |     /
             merge
''' 


def map_n_reduce(n, dfk):
    map_stage = []
    for i in range(0,n):
        map_stage.append(dfk.submit(partial(sleep_n, 2), [], None))
    
    #print(map_stage)
    red = dfk.submit(partial(sleep_n, 1), map_stage, None)
    return map_stage, red

m,r = map_n_reduce(1, dfk)

In [None]:
print(m, r)

r.result()