### Intro

This notebook contains an example of using Parsl with a MapReduce like pattern using various types of data staging.

It contains a bash_app that is run in parallel (16 times), with each instance staging in an input file over https and staging in a script over https.  It then runs the script on the input file, which creates an output file, specifically, it sharpens a jpeg image.  That output file is then staged by Globus to a public Globus endpoint (used for tutorials).

Each task returns an AppFuture, and each AppFutures contains a DataFuture to one of the output files.

The notebook then runs a python_app task that is dependent on all the DataFutures. This task build a mosaic assembled of parts of each of its input images, and that mosaic image is then staged back to the same Globus endpoint.

Notes:
1. The tasks are dependent on the Python library Pillow, which must be installed where the tasks are run.
2. If the tasks are run in the same place, or with access to a shared file system, the final task shouldn't have to stage in the files from Globus, as they already exist locally where they were created by the first 16 tasks.

### Set up Parsl
Set up Parsl, and set up a config that uses threads, runs tasks in my laptop's Desktop directory, and has the globus endpoint associated with my laptop

In [None]:
import parsl
import os

print(parsl.__version__)

In [None]:
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor
from parsl.data_provider.globus import GlobusStaging
from parsl.data_provider.data_manager import default_staging

config = Config(
    executors=[
        ThreadPoolExecutor(
            working_dir="/Users/dsk/Desktop/parsl_tmp",
            storage_access=default_staging + [GlobusStaging(
                endpoint_uuid="8ae0b9fe-9a00-11ea-8eca-02c81b96a709",
            )]
        )
    ]
)

parsl.load(config)

### Define a task as a bash app that runs a script
Define a Parsl bash app with three File parameters for the script, the input file, and the output file.

`script` and `infile` are both File objects with the filepath attribute set to
the execute side filepath, and with the content staged in before execution. The file will get staged in because it is a File object, not some other python object like `7` or a string.

`outputs` is a list of File objects (with 1 entry when you call it because you only pass in one entry). The filepath attribute will be set to where you need to put the file on the execute-side filesystem in order for the staging out system to find it, to stage out afterwards.


In [None]:
from parsl.app.app import python_app, bash_app
from parsl.data_provider.files import File

@bash_app
def process_file(script, infile, outputs=[]):
    return 'python3 {s} {i} {o}'.format(s=script.filepath, i=infile.filepath, o=outputs[0].filepath)

### Run a single task

This is how to run a single task, but you can skip this and just go onto the following cell to run the set of 16 tasks in parallel - this cell is mostly for illustration.

Define Parsl Files for the script to be run and the input file, both of which will be staged into the run location via https

Also define the Parsl File for the output as a Globus file that will be staged via Globus.

Then actually run the task, and block on the result.  (Note that this will return once the file is computed, but the staging may take more time to complete.

In [None]:
script = File("https://github.com/danielskatz/parsl-example/raw/master/sharpen_image.py")

inputFile = File("https://github.com/danielskatz/parsl-example/raw/master/data/0001.jpg")

# This is an endpoint on my laptop, which fails because I don't a globus pro account
# which would be needed to transfer between to globus connect endpoints.
# If I was running on a cluster with a globus endpoint, I could use this.
outputFile = File("globus://8ae0b9fe-9a00-11ea-8eca-02c81b96a709/Users/dsk/Globus_files/0001_sharp.jpg")

# This is a public endpoint used for globus tutorials, which is wiped every few weeks,
# but can be used for temporary purposes.
# You can view the contents via 
# https://app.globus.org/file-manager?origin_id=ddb59aef-6d04-11e5-ba46-22000b92c6ec&origin_path=%2F~%2F
    
outputFile = File("globus://ddb59aef-6d04-11e5-ba46-22000b92c6ec/~/0001_sharp.jpg")

output = process_file(script, inputFile, outputs=[outputFile])
output.result()

### Run a set of 16 tasks
Define Parsl Files for the script to be run and the input files, both of which will be staged into the run location via https.

Also define the Parsl Files for the outputs as Globus files that will be staged via Globus.

Then actually run the tasks. (Note that this will return immediately, as the elements of the elements of `sharpImageFutures[]` are AppFutures

In [None]:
script = File("https://github.com/danielskatz/parsl-example/raw/master/sharpen_image.py")

sharpImageFutures = []

for i in range(16):
    inputFile = File("https://github.com/danielskatz/parsl-example/raw/master/data/{:04d}.jpg".format(i+1))
    outputFile = File("globus://ddb59aef-6d04-11e5-ba46-22000b92c6ec/~/{:04d}_sharp.jpg".format(i+1))
    sharpImageFutures.append(process_file(script, inputFile, outputs=[outputFile]))


### Define a python task to build a mosaic of parts of the 16 images

In [None]:
@python_app
def mosaic_files(inputs=[], outputs=[]):

    try:
        from PIL import Image, ImageFilter
    except ImportError:
        print("error:", sys.argv[0], "requires Pillow - install it via 'pip install Pillow'")
        sys.exit(2)

    outputImage = Image.new('RGB', [400, 400])
    index = 0
    for i in range(4):
        for j in range(4):
            inputImage = Image.open(inputs[index].filepath)
            box = ((i)*100, (j)*100, (i+1)*100, (j+1)*100)
            region = inputImage.crop(box)
            outputImage.paste(region, box)
            index=index+1
            
    outputImage.save(outputs[0].filepath, 'JPEG')


### Run the mosaic task
Note that this task will run remotely, and is dependent on `sharpImageFutures`  The cell will block until the mosaic_files task has completed, but again, this doesn't mean that its output will have completed staging to the globus endpoint.

Also note that this is not tremendously efficient with this resource configuration, the sharpened images are staged back from globus, even though they are already on my laptop.

In [None]:
mosaicFile = File("globus://ddb59aef-6d04-11e5-ba46-22000b92c6ec/~/mosaic.jpg")
# Create a list of DataFutures from the AppFutures
images = [out.outputs[0] for out in sharpImageFutures]

mosaicFuture = mosaic_files(inputs=images,outputs=[mosaicFile])
mosaicFuture.result()

Now block on the stage out completing.

In [None]:
mosaicFuture.outputs[0].result()