In [None]:
from pathlib import Path
from IPython.core.display import SVG

# Diamond Workflow

This notebook will generate the **diamond workflow** illustrated below, then plan and execute the workflow on the local condorpool. Rectangles represent input/output files, and ovals represent compute jobs. The arrows represent file dependencies between each compute job. 

In [None]:
SVG(filename='diamond.svg')

## 1. Import Python API

Import required packages. <br>
```
from Pegasus.api.mixins import EventType, Namespace
from Pegasus.api.properties import Properties
from Pegasus.api.replica_catalog import File, ReplicaCatalog
from Pegasus.api.site_catalog import (
    OS,
    Arch,
    Directory,
    FileServer,
    Grid,
    Operation,
    Scheduler,
    Site,
    SiteCatalog,
)
from Pegasus.api.transformation_catalog import (
    Container,
    Transformation,
    TransformationCatalog,
    TransformationSite,
)
from Pegasus.api.workflow import Job, SubWorkflow, Workflow
from Pegasus.client._client import PegasusClientError
```

In [None]:
from Pegasus.api import *

## 2. Configure Logging

Configure logging. While this is **not required**, it is useful for seeing output from tools such as `pegasus-plan`, `pegasus-analyzer`, etc. when using these python wrappers. 

In [None]:
import logging

logging.basicConfig(level=logging.DEBUG)

## 3. Configure Pegasus Properties

The `pegasus.properties` file can now be generated using the `Properties()` object as shown below. To see a list of the most commonly used properties, you can use `Properties.ls(prefix)`. By default, `pegasus-plan` will look in `cwd` for a `pegasus.properties` file if one is given.

In [None]:
Properties.ls("condor.request")

In [None]:
# --- Properties ---------------------------------------------------------------
props = Properties()
props["pegasus.monitord.encoding"] = "json"                                                                    
props["pegasus.catalog.workflow.amqp.url"] = "amqp://friend:donatedata@msgs.pegasus.isi.edu:5672/prod/workflows"
props.write() # written to ./pegasus.properties 

## 4. Create a Replica Catalog (Specify Initial Input Files)

Any initial input files given to the workflow should be specified in the `ReplicaCatalog`. This object tells Pegasus where each input file is physically located. First, we create a file that will be used as input to this workflow. 

In [None]:
with open("f.a", "w") as f:
    f.write("This is the contents of the input file for the diamond workflow!")

The `./f.a` will be used in this workflow, and so we create a corresponding `File` object. Metadata may also be added to the file as shown below.

Next, a `ReplicaCatalog` object is created so that the physical locations of each input file can be cataloged. This is done using the `ReplicaCatalog.add_replica(site, file, path)` function. As the file `f.a` resides here on the submit machine, we use the reserved keyword `local` for the site parameter. Second, the `File` object is passed in for the `file` parameter. Finally, the absolute path to the file is given. `pathlib.Path` may be used as long as an absolute path is given. 

By default, `pegasus-plan` will look in `cwd` for a `replicas.yml` file if one is given.

In [None]:
# --- Replicas -----------------------------------------------------------------
fa = File("f.a").add_metadata(creator="ryan")
rc = ReplicaCatalog()\
    .add_replica("local", fa, Path(".").resolve() / "f.a")\
    .write() # written to ./replicas.yml 

## 5. Create a Transformation Catalog (Specify Executables Used)

Any executable (referred to as ***transformations***) used by the workflow needs to be specified in the `TransformationCatalog`. This is done by creating `Transformation` objects, which represent executables. Once created, these must be added to the `TransformationCatalog` object. 

By default, `pegasus-plan` will look in `cwd` for a `transformations.yml` file.

In [None]:
# --- Transformations ----------------------------------------------------------
preprocess = Transformation(
                "preprocess",
                site="condorpool",
                pfn="/home/scitech/pegasus/dist/pegasus/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

findrange = Transformation(
                "findrange",
                site="condorpool",
                pfn="/home/scitech/pegasus/dist/pegasus/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

analyze = Transformation(
                "analyze",
                site="condorpool",
                pfn="/home/scitech/pegasus/dist/pegasus/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

tc = TransformationCatalog()\
    .add_transformations(preprocess, findrange, analyze)\
    .write() # ./written to ./transformations.yml

## 6. Create the Workflow

The `Workflow` object is used to store jobs and dependencies between each job. Typical job creation is as follows:

```
# Define job Input/Output files
input_file = File("input.txt")
output_file1 = File("output1.txt")
output_file2 = File("output2.txt")

# Define job, passing in the transformation (executable) it will use
j = Job(transformation_obj)

# Specify command line arguments (if any) which will be passed to the transformation when run
j.add_args("arg1", "arg2", input_file, "arg3", output_file)

# Specify input files (if any)
j.add_inputs(input_file)

# Specify output files (if any)
j.add_outputs(output_file1, output_file2)

# Add profiles to the job
j.add_env(FOO="bar")
j.add_profiles(Namespace.PEGASUS, key="checkpoint.time", value=1)

# Add the job to the workflow object
wf.add_jobs(j)
```

By default, depedencies between jobs are inferred based on input and output files. 

In [None]:
# --- Workflow -----------------------------------------------------------------
wf = Workflow("blackdiamond")

fb1 = File("f.b1")
fb2 = File("f.b2")
job_preprocess = Job(preprocess)\
                    .add_args("-a", "preprocess", "-T", "3", "-i", fa, "-o", fb1, fb2)\
                    .add_inputs(fa)\
                    .add_outputs(fb1, fb2)

fc1 = File("f.c1")
job_findrange_1 = Job(findrange)\
                    .add_args("-a", "findrange", "-T", "3", "-i", fb1, "-o", fc1)\
                    .add_inputs(fb1)\
                    .add_outputs(fc1)

fc2 = File("f.c2")
job_findrange_2 = Job(findrange)\
                    .add_args("-a", "findrange", "-T", "3", "-i", fb2, "-o", fc2)\
                    .add_inputs(fb2)\
                    .add_outputs(fc2)

fd = File("f.d")
job_analyze = Job(analyze)\
                .add_args("-a", "analyze", "-T", "3", "-i", fc1, fc2, "-o", fd)\
                .add_inputs(fc1, fc2)\
                .add_outputs(fd)

wf.add_jobs(job_preprocess, job_findrange_1, job_findrange_2, job_analyze)

## 7. Run the Workflow

Using the reference do the `Workflow` object, you can plan, run, and monitor the workflow directly. These are wrappers around Pegasus CLI tools, and as such, the same arguments may be passed to them. 

**Note that the Pegasus binaries must be added to your PATH for this to work.**

In [None]:
try:
    wf.plan(submit=True)\
        .wait()\
        .analyze()\
        .statistics()
except PegasusClientError as e:
    print(e.output)
