# **Introduction: Why Parallel Scripting?**

Parsl is a parallel scripting library that supports the specification and execution of dataflow-based scripts comprised of external applications and/or Python functions.

Prasl runs applications and Python functions concurrently as soon as their inputs are available, reducing the need for complex parallel programming. Parsl expresses workflow in a portable fashion: the same script can run on multicore computers, clusters, clouds, grids, and supercomputers.

In this tutorial, you will be able to first try a few Parsl examples (examples 1-3) on your local machine, to get a sense of the library. Then, in examples 4-6 you will run similar workflows on any resource you may have access to, such as clouds (Amazon Web Services), Cray HPC systems, clusters etc, and see how more complex workflows can be expressed with Parsl scripts.

Examples 4-6 can also be run on a local multicore machine if desired.

To run the tutorial, ensure that Python (3.5+) and parsl 0.3 is installed on the machine you would be using to run the tutorial on. The tutorial can be run from within this Jupyter notebook or as independent Python scripts.

**To install Parsl:**

Install Parsl with Pip: 
    $ pip3 install parsl
    
**Or to install from source:**

Download Parsl:

    $ git clone https://github.com/Parsl/parsl.git parsl
Install:
    $ cd parsl

    $ python3 setup.py install
    

Setup the Parsl tutorial:

    $ git clone https://github.com/Parsl/parsl-tutorial.git parsl_tutorial

    $ cd parsl_tutorial

    $ bash setup.sh
    
Doing this will add the sample applications `simulate` and `stats` (mock "science" applications) and some other functionalities to your local $PATH for you to run the tutorial

# Tutorial Section One

This section will be a walk-through of the getting a simple "mock" science application running using Parsl on your local machine (localhost).

## Example 1: Run a single application using  Parsl

The first Parsl script runs `simulate.sh` to generate a single random number. It writes the number to standard out.

<img src="p1/pattern.png", align=left>

In [None]:
from parsl import App, DataFlowKernel, ThreadPoolExecutor
from parsl.dataflow.futures import Future

# Define our workers and dfk.
# In this case, we are running locally and specifying a max of 4
# concurrent threads
workers = ThreadPoolExecutor(max_workers=4)
dfk = DataFlowKernel(executors=[workers])

@App('bash', dfk)
def mysim(stdout="output/p1.out", stderr="output/p1.err"):
    """Set this example up as a bash app by returning the 
    command line app to be called, in this case simulate"""
    return "app/simulate"


mysim().result()

with open('output/p1.out', 'r') as f:
    print(f.read())

dfk.cleanup()

## Debugging

The easiest way to debug a Parsl application is via the Python stream logger. The following example shows how to enable such debugging. Information about app failures can be found in by looking at the stderr file. 

In [None]:
from parsl import App, DataFlowKernel, ThreadPoolExecutor
from parsl.dataflow.futures import Future
from parsl import set_stream_logger

set_stream_logger()

# Define our workers and dfk.
# In this case, we are running locally and specifying a max of 4
# concurrent threads
workers = ThreadPoolExecutor(max_workers=4)
dfk = DataFlowKernel(executors=[workers])

@App('bash', dfk)
def mysim(stdout="output/p1.out", stderr="output/p1.err"):
    """Set this example up as a bash app by returning the 
    command line app to be called, in this case simulate"""
    return "app/simulate"


mysim().result()

with open('output/p1.out', 'r') as f:
    print(f.read())

dfk.cleanup()

## Example 2: Run a single Python function using Parsl

The second example mirrors the first, however instead of using an external application instead it uses a Python function.

In [None]:
from parsl import App, DataFlowKernel, ThreadPoolExecutor
from parsl.dataflow.futures import Future

workers = ThreadPoolExecutor(max_workers=4)
dfk = DataFlowKernel(executors=[workers])

@App('python', dfk)
def mysim():
    from random import randint
    """Set this example up as a bash app by returning the 
    command line app to be called, in this case simulate"""
    return randint(1,100)


print(mysim().result())
dfk.cleanup()

## Example 3:  Running an ensemble of many apps in parallel with a loop

This script uses a Python for loop to run many concurrent simulations. Note: rather than rely on stdout the simulation app redirects the output to a specified file.

<img src="p2/pattern.png", align=left>

In [None]:
from parsl import App, DataFlowKernel, ThreadPoolExecutor
from parsl.dataflow.futures import Future

workers = ThreadPoolExecutor(max_workers=4)
dfk = DataFlowKernel(executors=[workers])

# Simulate application that redirects output to the
# specified file
@App('bash', dfk)
def mysim(outputs=[], stdout="output/p3.out", stderr="output/p3.err"):
    return 'app/simulate > {0}'.format(outputs[0])


results = []
for i in range(5):
    out_file = "output/p3_sim_{0}".format(i)
    results.append(mysim(outputs=[out_file]))

    
print ("Job Status: {}".format([r.done() for r in results]))

# wait for all apps to complete
[r.result() for r in results]

print ("Job Status: {}".format([r.done() for r in results]))

outputs = [r.outputs[0] for r in results]

for o in outputs:
    with open(o.filename, 'r') as f:
        print(f.read().strip())
        
dfk.cleanup()

## Example 4: Analyzing results of a parallel ensemble

After all the parallel simulations in an ensemble run have completed, it is typically necessary to gather and analyze their results with some kind of post-processing analysis program or script. p3.py introduces such a postprocessing step. In this case, the files created by all of the parallel runs of simulation.sh will be averaged by the trivial "analysis application" `stats.sh`:


<img src="p3/pattern.png", align=left>



In [None]:
from parsl import App, DataFlowKernel, ThreadPoolExecutor
from parsl.dataflow.futures import Future

workers = ThreadPoolExecutor(max_workers=4)
dfk = DataFlowKernel(executors=[workers])

@App('bash', dfk)
def mysim(outputs=[],
          stdout="output/p4_sim.out",
          stderr="output/p4_sim.err"):
    return 'app/simulate > {0}'.format(outputs[0])
    

@App('bash', dfk)
def stats(inputs=[],
          outputs=[],
          stderr='output/p4_stats.err',
          stdout='output/p4_stats.out'):
    """call stats cli utility with all simulations ans inputs"""
    return "app/stats {0} > {1}".format(" ".join(inputs), outputs[0])


# call the simulation app 5 times
results = []
for i in range(5):
    out_file = "output/p4_sim_{0}".format(i)
    results.append(mysim(outputs=[out_file]))

# collect the output data futures
sim_outputs = [r.outputs[0] for r in results]

# run the stats app
s = stats(inputs=sim_outputs, outputs=["output/p4_stats.txt"])

s.result()

with open('output/p4_stats.txt', 'r') as f:
    print(f.read())

dfk.cleanup()

# Tutorial Section Two

This section introduces the aspects of running on remote computational resources. We will go into the configuration aspects that allow parsl to run your applications on computation resources. 

Parsl supports a variety of resource providers as well as methods for submitting workload to those resources. Parsl supports execution from login nodes (or nodes with access to submission queues) as well as remote hosts via SSH. 

## Example 5: Running a simple app using pilot jobs

This script first generates a file with random numbers and then uses the `sort` application to sort those numbers. 

First we run the sort script using the local ThreadPoolExecutor. We will subsequently extend this example to submit the job via the IPyParallel pilot job model and then to a remote resource. 

### Example 5.a: Running the sort app locally using threads

First we use the same approach as the prior examples to run the `sort` command with local threads.

In [None]:
#!/usr/bin/env python3
from parsl import App, DataFlowKernel, ThreadPoolExecutor, IPyParallelExecutor
from parsl.dataflow.futures import Future

workers = ThreadPoolExecutor(max_workers=4)
dfk = DataFlowKernel(executors=[workers])

@App('bash', dfk)
def sort(unsorted, 
         outputs=[],
         stderr='output/p5_a_sort.err',
         stdout='output/p5_a_sort.out'):
    """Call sort executable on file `unsorted`"""
    return "sort -g {} > {}".format(unsorted, outputs[0])

s = sort("input/unsorted.txt", outputs=["output/a_sorted.txt"])

output_file = s.outputs[0].result()

print("Contents of the unsorted.txt file:")
with open('input/unsorted.txt', 'r') as f:
    print(f.read().replace("\n",","))
    
print("\nContents of the sorted output file:")
with open(output_file, 'r') as f:
    print(f.read().replace("\n",","))
    
dfk.cleanup()

### Example 5.b: Running the sort app locally using pilot jobs

We now use IPyParallel to run the `sort` command using a pilot job model. 


In [None]:
from parsl import App, DataFlowKernel, ThreadPoolExecutor, IPyParallelExecutor
from parsl.dataflow.futures import Future
import json
from config.local import config

dfk = DataFlowKernel(config=config)

@App('bash', dfk)
def sort(unsorted: str, 
         outputs: list = [],
         stderr: str='output/p5_b_sort.err',
         stdout: str='output/p5_b_sort.out')-> Future:
    """Call sort executable on file `unsorted`"""
    return "sort -g {} > {}".format(unsorted, outputs[0])

s = sort("input/unsorted.txt", outputs=["output/b_sorted.txt"])

output_file = s.outputs[0].result()

print("Contents of the unsorted.txt file:")
with open('input/unsorted.txt', 'r') as f:
    print(f.read().replace("\n",","))
    
print("\nContents of the sorted output file:")
with open(output_file, 'r') as f:
    print(f.read().replace("\n",","))
dfk.cleanup()

### Example 5.c: Running the sort app on a cluster using pilot jobs

We now take the previous example and run it on a cluster. This example requires that you execute it on the login node with the entire tutorial repository. It also requires that the Parsl tutorial directory is available on the worker node via a shared file system.

You will need the Python module and Parsl library. Instructions for setting up the environment are available online:  

For Midway you can use our shared library:

$ module load python/3.5.2+gcc-4.8; 

$ source /scratch/midway/yadunand/parsl_env_3.5.2_gcc/bin/activate


In [None]:
from parsl import App, DataFlowKernel, ThreadPoolExecutor, IPyParallelExecutor
from parsl.dataflow.futures import Future
import json
import os

from config.midway import config

dfk = DataFlowKernel(config=config)

@App('bash', dfk)
def sort(unsorted, outputs=[]):
    """Call sort executable on file `unsorted`"""
    return "sort -g {} > {}".format(unsorted, outputs[0])

s = sort(os.path.abspath("input/unsorted.txt"), 
         outputs=[os.path.abspath("output/sorted_c.txt")])

output_file = s.outputs[0].result()

print("Contents of the unsorted.txt file:")
with open('input/unsorted.txt', 'r') as f:
    print(f.read().replace("\n",","))
    
print("\nContents of the sorted output file:")
with open(output_file, 'r') as f:
    print(f.read().replace("\n",","))


### Example 5.d: Running the sort app on remote resources using pilot jobs

In the previous example we needed to be on a login node to run the application. Parsl also can be run over a remote SSH tunnel. However, it requires that the machine on which this application is running to also run the IPythonParallel controller and be accessible from the remote machine.  You will also need to configure your SSH agent to enable creation of the SSH connection to the login node.

Note: Parsl does not yet support file staging. Instead, for this eaxmple we use a Python app to first create the unsorted file and then we use the Parsl SSH connection to stage the file back again.

To run this script you must update the shared_dir variable. 


In [None]:
from parsl import App, DataFlowKernel, ThreadPoolExecutor, IPyParallelExecutor
from parsl.dataflow.futures import Future
from parsl import set_stream_logger
import json
import os

#set_stream_logger()
from config.midway import config

shared_dir = "/scratch/midway/yadunand/tutorial" # directory shared with worker nodes

# Set remote connection details
config["sites"][0]["auth"]= {"channel": "ssh",
                             "hostname":"swift.rcc.uchicago.edu",
                             "username": "yadunand",
                             "scriptDir": shared_dir}
#print (config)
dfk = DataFlowKernel(config=config)

@App('python', dfk)
def create_unsorted_file(outputs=[]):
    from random import randint
    file = open(outputs[0], 'w') 
    for i in range(0,50):
        file.write("{0}\n".format(randint(1,100)))
    file.close() 

@App('bash', dfk)
def sort(unsorted, outputs=[]):
    """Call sort executable on file `unsorted`"""
    return "sort -g {0} > {1}".format(unsorted, outputs[0])

unsorted = create_unsorted_file(outputs=[os.path.join(shared_dir, "unsorted-generated.txt")])
print (unsorted.outputs[0].result())

s = sort(unsorted.outputs[0], 
         outputs=[os.path.join(shared_dir, "sorted_d.txt")])

output_file = s.outputs[0].result()

dfk.executor.execution_provider.channel.pull_file(output_file, '.')
with open(os.path.basename(output_file), 'r') as f:
     print(f.read().replace("\n",","))
        
dfk.cleanup()      

## Example 7: MPI Hello

The final example is a basic "Hello World!" example that shows you how to run MPI applications. Here we have a simple MPI code mpi_hello.c that has each MPI rank sleep for a user-specified duration and then print the processor name on which the rank is executing followed by "Hello World!". 

The following script is designed to be run from the login node. If running locally you must first update the shared directory. Example configurations for other resources are included at the top of the script.

In [None]:
#!/usr/bin/env python3

from parsl import App, IPyParallelExecutor, DataFlowKernel
import os, json
from parsl.dataflow.futures import Future

# TODO : Remove after debugging
from parsl import set_stream_logger
set_stream_logger()

"""From now on, the tutorial applications are written to run on Midway,
a cluster located at the University of Chicago Research Computing Center
They have also been tested locally on both Mac and Ubuntu Linux.
In order to run them locally,
either start an IPyParallel cluster controller on your machine
or change the workers to something like this:
workers = ThreadPoolExecutor(max_workers=NUMBER OF CORES)
"""

"""
from config.midway import config
# Set remote connection details
shared_dir = "/scratch/midway/yadunand/tutorial" # directory shared with worker nodes
config["sites"][0]["auth"]= {"channel": "ssh",
                             "hostname":"swift.rcc.uchicago.edu",
                             "username": "yadunand",
                             "scriptDir": shared_dir}
config["sites"][0]["execution"]["block"]["walltime"] = '00:10:00'
config["sites"][0]["execution"]["block"]["nodes"] = 2
"""

"""
from config.cori import config
shared_dir = "/global/homes/y/yadunand/tutorial" # directory shared with worker nodes
config["sites"][0]["auth"]= {"channel": "ssh",
                             "hostname":"cori.nersc.gov",
                             "username": "yadunand",
                             "scriptDir": shared_dir}
config["sites"][0]["execution"]["block"]["walltime"] = '00:10:00'
config["sites"][0]["execution"]["block"]["nodes"] = 2
"""

from config.midway import config
shared_dir = "/scratch/midway/chard/parsl-tutorial" # Update with your local path

dfk = DataFlowKernel(config=config)

@App('bash', dfk)
def compile_app(dirpath, stdout=None, stderr=None, compiler="mpicc"):
    """Compile mpi app with mpicc
    On midway use compiler = mpicc
    On Cori use default compiler= cc
    """
    return '''cd {0}; make clean; make CC={1} '''.format(dirpath, compiler)

@App('bash', dfk)
def mpi_hello(dirpath, launcher="mpirun", app="mpi_hello", nproc=20, outputs=[]):
    """Call compiled mpi executable with mpilib.
    Works natively for openmpi mpiexec, mpirun, orterun, oshrun, shmerun
    mpiexec is default"""
    if launcher == "mpirun" :        
        return "cd {}; {} -np {} {} &> {};".format(dirpath, launcher, nproc, app, outputs[0] )
    elif launcher == "srun" :
        return "cd {}; {} -n {} ./{} &> {};".format(dirpath, launcher, nproc, app, outputs[0] )

# use .result() to make the execution wait until the app has compiled
compile_app(dirpath=os.path.join(shared_dir, "mpi_apps"),
            stdout=os.path.join(shared_dir, "mpi_apps.compile.out"),
            stderr=os.path.join(shared_dir, "mpi_apps.compile.err",),
            compiler='mpicc'
           ).result()


hello = mpi_hello(os.path.join(shared_dir, "mpi_apps"),
                  launcher="mpirun",
                  outputs=[os.path.join(shared_dir, "mpi_apps", "hello.txt")])

output_file = hello.outputs[0].result()

dfk.executor.execution_provider.channel.pull_file(output_file, '.')
with open(os.path.basename(output_file), 'r') as f:
     print(f.read())
        
os.remove(os.path.basename(output_file))
dfk.cleanup()