## Parsl Test/Demo with the @App decorators.

We outline a simple set of workflow patterns and show how they can be implemented with the @App decorators.
Here are the workflows :

1. Single apps.
2. N apps launched in parallel
3. N apps launched in sequence such that the i'th task depends solely on the i-1'th.
4. N sequences launched in parallel.


In [12]:
import time
import random
from parsl import *
import parsl
from functools import partial
parsl.set_stream_logger()

# Here we specify the 
workers = ThreadPoolExecutor(max_workers=4)
#workers = ProcessPoolExecutor(max_workers=4)
dfk = DataFlowKernel(workers)


@App('bash', dfk)
def echo(inputs=[], stderr='std.err', stdout='std.out'):        
    cmd_line = 'echo {inputs[0]} {inputs[1]}'

@App('bash', dfk)
def echo_to_file(x=1, outputs=[], stderr='std.err', stdout='std.out'):        
    cmd_line = 'echo {x}} > {outputs[0]}'
    

@App('bash', dfk)
def sleep_n(t):
    cmd_line = 'sleep {t}'    

@App('bash', dfk)
def cats_n_sleep (x, inputs, outputs):
    cmd_line = 'sleep $(($RANDOM % {x})); cat {inputs[0]} > {outputs[0]}'
    
@App('bash', dfk)
def incr (inputs, outputs):
    cmd_line = 'y=$(cat {inputs[0]}); echo $(($y+1)) > {outputs[0]}'
    

2017-02-16 20:00:51,837 parsl.app.app [DEBUG] Apptype : bash
2017-02-16 20:00:51,837 parsl.app.app [DEBUG] Apptype : bash
2017-02-16 20:00:51,922 parsl.app.app [DEBUG] Executor : <class 'parsl.dataflow.dflow.DataFlowKernel'>
2017-02-16 20:00:51,922 parsl.app.app [DEBUG] Executor : <class 'parsl.dataflow.dflow.DataFlowKernel'>
2017-02-16 20:00:51,923 parsl.app.app [DEBUG] Decorator Exec : <function echo at 0x7f13980dfd08>
2017-02-16 20:00:51,923 parsl.app.app [DEBUG] Decorator Exec : <function echo at 0x7f13980dfd08>
2017-02-16 20:00:51,925 parsl.app.app [DEBUG] __init__ 
2017-02-16 20:00:51,925 parsl.app.app [DEBUG] __init__ 
2017-02-16 20:00:51,926 parsl.app.app [DEBUG] Apptype : bash
2017-02-16 20:00:51,926 parsl.app.app [DEBUG] Apptype : bash
2017-02-16 20:00:51,927 parsl.app.app [DEBUG] Executor : <class 'parsl.dataflow.dflow.DataFlowKernel'>
2017-02-16 20:00:51,927 parsl.app.app [DEBUG] Executor : <class 'parsl.dataflow.dflow.DataFlowKernel'>
2017-02-16 20:00:51,929 parsl.app.app 

In [None]:
fu, outs = echo(inputs=["Hello", "World!"], stdout='std.out', stderr='std.err')

In [None]:
#fu, outs = echo(inputs=["Hello", "World"])
print(fu, outs)

In [None]:
with open('test.txt', 'w') as tmp:
    tmp.write("E=mc^2")
    
f, outs = cats_n_sleep (x=5,inputs=['test.txt'],outputs=['out.txt'])

In [None]:
print(f,outs)
print("Duration      : ", f.result())
print("Output        : ", outs[0].result())
print("Output done ? : ", outs[0].done())

In [None]:
%%time
import os
import shutil
shutil.rmtree('./outputs') ; os.mkdir('./outputs')
''' This tests the first pattern, a parallel for loop.

Pool          |     Width      |     Duration
Process       |     10K        |       2.96s
Process       |      1K        |       311ms
Process       |    100K        |       29.6s
'''

def launch_n (n):
    d = {}    
    for i in range(0,n):        
        d[i] = cats_n_sleep(x=4, inputs=['test.txt'], outputs=['outputs/out.{0}.txt'.format(i)])
    return d

x = launch_n(4)
time.sleep(8)
os.listdir('.')

In [None]:
print([x[i][1][0].filepath for i in x])
os.listdir('outputs/')

In [10]:
%%time
''' Testing Pipeline, a sequence of dependencies.

A -> B ... -> N

'''
open('start.txt', 'w').write('0');

def pipeline_n (n):
    d    = {}
    deps = ['start.txt']
    for i in range(0,n):        
        print(i, deps)         
        fu, deps = incr(inputs=deps, outputs=['incr.{0}.txt'.format(i)])
        d[i] = {'App_fu' : fu, 'Data_fu' : deps}
    return d


foo = pipeline_n (5)
print(foo)

2017-02-16 19:51:56,245 parsl.app.app [DEBUG] In __Call__
2017-02-16 19:51:56,245 parsl.app.app [DEBUG] Received : ['start.txt'] 
2017-02-16 19:51:56,322 parsl.app.app [DEBUG] Submitting via : <parsl.dataflow.dflow.DataFlowKernel object at 0x7f13980c1a58>
2017-02-16 19:51:56,323 parsl.app.app [DEBUG] cmd    : y=$(cat {inputs[0]}); echo $(($y+1)) > {outputs[0]}
2017-02-16 19:51:56,323 parsl.app.app [DEBUG] Exec   : y=$(cat start.txt); echo $(($y+1)) > incr.0.txt
2017-02-16 19:51:56,324 parsl.dataflow.dflow [DEBUG] Task:88266f9f-1795-4f30-bdf3-bc82b2d3e96a   dep_cnt:0  deps:[]
2017-02-16 19:51:56,325 parsl.dataflow.dflow [DEBUG] Submitting to executor : 88266f9f-1795-4f30-bdf3-bc82b2d3e96a
2017-02-16 19:51:56,325 parsl.dataflow.dflow [DEBUG] Launched : 88266f9f-1795-4f30-bdf3-bc82b2d3e96a with <AppFuture at 0x7f1398056978 state=pending>
2017-02-16 19:51:56,326 parsl.app.app [DEBUG] Running app : bash
2017-02-16 19:51:56,326 parsl.app.futures [DEBUG] Filepath : /home/yadu/swython/incr.0.t

0 ['start.txt']
1 [<DataFuture at 0x7f1398056a90 state=pending>]
2 [<DataFuture at 0x7f13980b5518 state=pending>]
3 [<DataFuture at 0x7f13980c1240 state=pending>]
4 [<DataFuture at 0x7f13980c1748 state=pending>]
{0: {'App_fu': <AppFuture at 0x7f1398056978 state=finished returned float>, 'Data_fu': [<DataFuture at 0x7f1398056a90 state=pending>]}, 1: {'App_fu': <AppFuture at 0x7f13980b5748 state=finished returned float>, 'Data_fu': [<DataFuture at 0x7f13980b5518 state=pending>]}, 2: {'App_fu': <AppFuture at 0x7f1398937198 state=finished returned float>, 'Data_fu': [<DataFuture at 0x7f13980c1240 state=pending>]}, 3: {'App_fu': <AppFuture at 0x7f13980c11d0 state=finished returned float>, 'Data_fu': [<DataFuture at 0x7f13980c1748 state=pending>]}, 4: {'App_fu': <AppFuture at 0x7f13980c1080 state=running>, 'Data_fu': [<DataFuture at 0x7f13980c16a0 state=pending>]}}
CPU times: user 52 ms, sys: 24 ms, total: 76 ms
Wall time: 137 ms


2017-02-16 19:51:56,381 parsl.app.app [DEBUG] Launching app : y=$(cat /home/yadu/swython/incr.3.txt); echo $(($y+1)) > incr.4.txt
2017-02-16 19:51:56,387 parsl.app.app [DEBUG] RunCommand Completed y=$(cat /home/yadu/swython/incr.3.txt); echo $(($y+1)) > incr.4.txt
2017-02-16 19:51:56,388 parsl.dataflow.dflow [DEBUG] Completed : 8557a994-b518-4f77-b5ce-6be0a1f61a99 with <Future at 0x7f13980c10b8 state=finished returned float>
2017-02-16 19:51:56,388 parsl.dataflow.dflow [DEBUG] Pending:0   Runnable:0   Done:12


In [11]:
for key in sorted(foo.keys()):
    print (key, foo[key]['Data_fu'][0].filepath, open(foo[key]['Data_fu'][0].filepath,'r').read())

0 /home/yadu/swython/incr.0.txt 1

1 /home/yadu/swython/incr.1.txt 2

2 /home/yadu/swython/incr.2.txt 3

3 /home/yadu/swython/incr.3.txt 4

4 /home/yadu/swython/incr.4.txt 5



In [15]:
'''  MapReduce
        foo   foo ... foo
          \    |      /
           \   |     /
             merge
''' 


def map_n_reduce(n, dfk):
    map_stage = []
    for i in range(0,n):
        _, outs = echo_to_file(x=i, outputs=['map.{0}.txt'.format(i)])
        map_stage.append(outs)        
    
    #red = (partial(sleep_n, 1), map_stage, None)
    red = None
    return map_stage, red

m,r = map_n_reduce(1, dfk)

2017-02-16 20:38:16,033 parsl.app.app [DEBUG] In __Call__
2017-02-16 20:38:16,033 parsl.app.app [DEBUG] In __Call__


KeyError: 'inputs'

In [None]:
print(m, r)

r.result()