# Designing complex, reusable and scalable scientific workflows with Pydra

> Ghislain VAILLANT, Inria

In [None]:
import nest_asyncio

# Allow nested event loops.
nest_asyncio.apply()

# Reduce error backtrace size.
%xmode Minimal

## Motivation

> Scientific workflows often require sophisticated analyses that encompass a large **collection of algorithms**. These algorithms are not necessarily designed to **work together** and are written by **different authors**.
>
> Some may be written in Python, while others might require calling **external programs**. It is common practice to create semi-manual workflows that require the scientists to **handle the files** and **interact with partial results** from algorithms and external tools.
>
> This approach is conceptually simple and easy to implement, but the resulting workflow is often **time consuming**, **error-prone** and **difficult to share** with others.
> 
> -- <cite>[Pydra's Documentation](https://nipype.github.io/pydra/)</cite>

## Challenges

- Heterogeneity

    _How to sequence computation from multiple tools using different data formats._

- Reproducibility

    _How difficult is it to share or re-use research code._

- Scalability

    _How much does code need to change to scale to larger computing power and storage._

## Roadmap

- Prerequisites
- Core components
- Advanced features
- Support channels

## Prerequisites

- Python 3.8+
- Type annotations
- Data classes

### Installation

To install the core package:

```shell
$ pip install pydra==0.22
```

To install Pydra task packages, for instance ANTs:

```shell
$ pip install pydra-ants
```

### Type annotations

- Proposed in [PEP 484](https://peps.python.org/pep-0484/)
- Available since Python 3.5
- Implemented in syntax and [typing](https://docs.python.org/3/library/typing.html) module
- Enhanced by subsequent Python releases

Standard function definition.

In [None]:
def scale(factor, vector):
    return [factor * x for x in vector]

Definition with type annotations.

In [None]:
from typing import List

# Type alias for convenience.
Vector = List[float]

def scale(factor: float, vector: Vector) -> Vector:
    return [factor * x for x in vector]

### Data classes

- Proposed in [PEP 557](https://peps.python.org/pep-0557/)
- Available since Python 3.7
- Implemented in [dataclasses](https://docs.python.org/3/library/dataclasses.html) module
- Enhanced by third-party libraries such as [attrs](https://www.attrs.org/)

Simple record definition.

In [None]:
import attrs

@attrs.define
class GeoPoint:
    lat: float
    lon: float

In [None]:
swansea = GeoPoint(51.62, -3.94)

print(swansea)

Record with custom fields.

In [None]:
from attrs import define, field, validators

def validate_lat(instance, attribute, value):
    if abs(value) > 90:
        raise ValueError(
            f"Latitude must be in range (-90, 90), got {value}.")

def validate_lon(instance, attribute, value):
    if abs(value) > 180:
        raise ValueError(
            f"Longitude must be in range (-180, 180), got {value}.")

@define(kw_only=True)   # Forbid init with posargs.
class CustomGeoPoint:
    lat: float = field(
        validator=[validators.instance_of(float), validate_lat])

    lon: float = field(
        validator=[validators.instance_of(float), validate_lon])

    alt: float = field(
        default=0.0, metadata={"recorded_by": "$DEVICE"})

In [None]:
swansea = CustomGeoPoint(lat=51.62, lon=-3.94)  # Okay!

print(swansea)

In [None]:
swansea = CustomGeoPoint(151.62, -3.94)             # Oops!

In [None]:
swansea = CustomGeoPoint(lat=151.62, lon=-3.94)     # Oops!

## Core components

Tasks, workflows and shell specifications.

### Python tasks

Defining a function task.

In [None]:
from pathlib import Path
from pydra.mark import task

# Define a Python task.
@task
def cwd() -> Path:
    return Path.cwd()

Running a task.

In [None]:
# Instantiate a task.
task = cwd()

# Run and get the results.
result = task()

print(result.output.out)

### Shell tasks

Defining a shell command task.

In [None]:
from pydra.engine.task import ShellCommandTask

# Define a shell task.
class Pwd(ShellCommandTask):
    executable = "pwd"

# Instantiate a task.
task = Pwd()

# Run and get the results.
result = task()

print(result.output.stdout)

Defining input specifications.

In [None]:
from attrs import define, field
from pydra.engine.specs import ShellSpec, SpecInfo

# Define an input specifications.
@define(kw_only=True)
class InputSpec(ShellSpec):
    level: int = field(
        metadata={"help_string": "max level", "argstr": "-L"}
    )
    path: str = field(
        metadata={
            "help_string": "input path",
            "mandatory": True,
            "argstr": "",
        }
    )

# Define the shell task.
class Tree(ShellCommandTask):
    executable = "tree"

    # Associate the specifications with the task definition.
    input_spec = SpecInfo(name="Inputs", bases=(InputSpec,))


Testing input specifications.

In [None]:
from pathlib import Path

# Instantiate a task.
task = Tree(path=Path.cwd(), level=1)

# Check the shell command.
print(task.cmdline)

Output specifications.

In [None]:
@define(kw_only=True)
class InputSpec(ShellSpec):
    input_file: Path = field(
        metadata={
            "help_string": "input file",
            "mandatory": True,
            "argstr": "",
        }
    )   
    backup_file: str = field(
        metadata={
            "help_string": "backup file",
            "argstr": "",
            # Define the default file template.
            "output_file_template": "{input_file}.bak",
            # Whether to keep or strip file extensions.
            "keep_extension": True,
        }
    )

class Backup(ShellCommandTask):
    executable = "cp"
    input_spec = SpecInfo(name="Inputs", bases=(InputSpec,))

Testing output specifications.

In [None]:
# Instantiate a task.
task = Backup(input_file=Path("./README.md"))

# Check the shell command.
print(task.cmdline)

### Workflows

Composing tasks in a workflow.

In [None]:
from pydra.engine import Workflow

workflow = Workflow(
    name="my_workflow", input_spec=["some_input", "other_input"])

# Add task and connect its inputs to the workflow inputs.
workflow.add(
    FooTask(name="foo_task", foo=workflow.lzin.some_input))

# Add task and connect its inputs to the workflow inputs.
workflow.add(
    BarTask(name="bar_task", bar=workflow.lzin.other_input))

# Add task and connect its inputs to upstream task outputs.
workflow.add(
    BazTask(name="baz_task", baz=workflow.foo_task.lzout.out,
            fred=workflow.bar_task.lzout.out))

# Expose the workflow outputs.
workflow.set_output({"out": workflow.baz_task.lzout.out})


Submitting a workflow for execution.

In [None]:
from pydra.engine import Submitter

# Setup an execution environment explicitly.
with Submitter() as submitter:
    # Submit the workflow for execution.
    submitter(workflow)

# Collect the results.
result = workflow.result()


### Summary

- Define Python tasks with the `@task` and `@annotate`
- Define shell tasks by subclassing `ShellCommandTask`
- Parameterize shell tasks with `ShellSpec` and `ShellOutSpec`
- Compose tasks into a workflow with `Workflow`

## Advanced features

### Complex shell tasks

Example with the `shasum` command.

```shell
$ shasum -h
Usage: shasum [OPTION]... [FILE]...
Print or check SHA checksums.
With no FILE, or when FILE is -, read standard input.

  -a, --algorithm   1 (default), 224, 256, 384, 512, 512224, 512256
  -b, --binary      read in binary mode
  -c, --check       read SHA sums from the FILEs and check them
      --tag         create a BSD-style checksum
  -t, --text        read in text mode (default)
  -U, --UNIVERSAL   read in Universal Newlines mode
                        produces same digest on Windows/Unix/Mac
  -0, --01          read in BITS mode
                        ASCII '0' interpreted as 0-bit,
                        ASCII '1' interpreted as 1-bit,
                        all other characters ignored
```

Mutually exclusive parameters with `xor`.

In [None]:
@define(kw_only=True)
class SHASumSpec(ShellSpec):
    input_file: Path = field(
        metadata={
            "help_string": "input file",
            "mandatory": True,
            "argstr": "",
            "position": -1
        }
    )
    ascii: bool = field(
        metadata={
            "help_string": "read as ASCII bits",
            "argstr": "-0",
            "xor": {"binary"},
        }
    ) 
    binary: bool = field(
        metadata={
            "help_string": "read as binary",
            "argstr": "-b",
            "xor": {"ascii"},
        }
    )


Choice parameters with `allowed_values`.

In [None]:
@define(kw_only=True)
class SHASumSpec(ShellSpec):
    input_file: Path = field(
        metadata={
            "help_string": "input file",
            "mandatory": True,
            "argstr": "",
            "position": -1
        }
    )
    algorithm: int = field(
        default=1,
        metadata={
            "help_string": "algorithm (default is SHA-1)",
            "argstr": "-a",
            # Restrict the domain of valid values.
            "allowed_values": {1, 224, 256, 384, 512, 51224, 512256},
        },
    )

Custom formatting with `formatter`.

In [None]:
# Custom formatting logic for mode parameter.
def format_mode(mode: str) -> str:
    return {"ascii": "-0", "binary": "-b", "text": "-t"}.get(mode, "")

@define(kw_only=True)
class SHASumSpec(ShellSpec):
    input_file: Path = field(
        metadata={
            "help_string": "input file",
            "mandatory": True,
            "argstr": "",
            "position": -1
        }
    )
    mode: str = field(
        default="text",
        metadata={
            "help_string": "read mode",
            "allowed_values": ["ascii", "binary", "text"],
            # argstr replaced by custom formatter.
            "formatter": format_mode,
        },
    )

Dependent parameters with `requires`.

```sh
$ shasum -h
Usage: shasum [OPTION]... [FILE]...
Print or check SHA checksums.
With no FILE, or when FILE is -, read standard input.
  ...
  -c, --check       read SHA sums from the FILEs and check them
  ...
The following five options are useful only when verifying checksums:
  ...
  -w, --warn        warn about improperly formatted checksum lines
  ...
```

In [None]:
@define(kw_only=True)
class SHASumSpec(ShellSpec):
    input_file: Path = field(
        metadata={
            "help_string": "input file",
            "mandatory": True,
            "argstr": "",
            "position": -1
        }
    )
    check: bool = field(
        metadata={
            "help_string": "check sums from input file",
            "argstr": "--check",
        }
    )
    warn: bool = field(
        metadata={
            "help_string": "warn about faulty checksums",
            "argstr": "--warn",
            "requires": {"check"},
        }
    )

Custom output values

In [None]:
from attrs import define, field
from pydra.engine.specs import ShellOutSpec

def parse_checksum(stdout: str) -> str:
    return stdout.split(' ')[0]

@define(kw_only=True)
class SHASumOutSpec(ShellOutSpec):
    checksum: str = field(
        metadata={
            "help_string": "checksum of input file",
            "callable": parse_checksum,
        }
    )

In [None]:
from attrs import field, define
from pathlib import Path
from pydra.engine.specs import ShellSpec, SpecInfo
from pydra.engine.task import ShellCommandTask

def format_mode(mode: str) -> str:
    return {"ascii": "-0", "binary": "-b", "text": "-t"}.get(mode, "")

@define(kw_only=True)
class SHASumSpec(ShellSpec):
    input_file: Path = field(
        metadata={
            "help_string": "input file",
            "mandatory": True,
            "argstr": "",
            "position": -1
        }
    )
    algorithm: int = field(
        default=1,
        metadata={
            "help_string": "algorithm (default is SHA-1)",
            "argstr": "-a",
            # Restrict the domain of valid values.
            "allowed_values": {1, 224, 256, 384, 512, 51224, 512256},
        },
    )
    mode: str = field(
        default="text",
        metadata={
            "help_string": "read mode",
            "allowed_values": ["ascii", "binary", "text"],
            # argstr replaced by custom formatter.
            "formatter": format_mode,
        },
    )

class SHASum(ShellCommandTask):
    executable = "shasum"
    input_spec = SpecInfo(name="Inputs", bases=(SHASumSpec,))
    output_spec = SpecInfo(name="Ouputs", bases=(SHASumOutSpec,))

In [None]:
# Compute the SHA-256 sum of the readme file.
task = SHASum(
    input_file=Path.cwd() / "README.md",
    algorithm=256,
    mode="text",
)

# Run the task and collect results.
result = task()

print(f"stdout: \t{result.output.stdout}")
print(f"checksum: \t{result.output.checksum}")

### Container tasks

- Using the `container_info` parameter
- Restricted to shell tasks
- Docker or Singularity 

Running a shell task in a Docker container:

```python
task = MyTask(..., container_info=("docker", "busybox"))
```

Running a shell task in a Singularity container:

```python
task = MyTask(..., container_info=("singularity", "alpine"))
```

<div class="alert alert-info">
<b>Experimental</b>: Proposal for refactoring to a generic `Environment` class.

https://github.com/nipype/pydra/pull/516
</div>

### Accelerate execution

Multicore task parallelism

In [None]:
from pydra.engine import Submitter

# Scale computation to 2 cores.
with Submitter(plugin="cf", n_procs=2) as submitter:
    submitter(workflow)

Other plugins (Dask, SLURM, ...)

In [None]:
# Scale computation using Dask workers.
with Submitter(plugin="dask") as submitter:
    submitter(workflow)

# Scale computation using SLURM workers.
with Submitter(plugin="slurm") as submitter:
    submitter(workflow)

Other plugins are in the works, such as for [PSI/J](https://psi-j-python.readthedocs.io/).

Caching expensive computation

```python
task = MyTask(..., cache_dir="/path/to/cache/dir")
```

### Arbitrary workflows

- Decompose workflows into subgraphs with `add`
- Map over a list of parameter values with `split`
- Merge results from mapped tasks with `combine`

<div class="alert alert-info">
<b>Discussion</b>: Syntax simplification of `split`, `combine` and `add`.

https://github.com/nipype/pydra/discussions/692
</div>

### Summary

- Specify complex shell tasks using `xor`, `requires`, `formatter` and `callable`
- Execute shell tasks in a container using `container_info`
- Scale workflow execution with `Submitter` plugins
- Re-use intermediate computation with `cache_dir`
- Design arbitrary workflow by nesting, splitting and combining

## Support channels

- Documentation: `https://nipype.github.io/pydra`
- Issues: `https://github.com/nipype/pydra/issues`
- Discussions: `https://github.com/nipype/pydra/discussions`
- Live chat: `https://mattermost.brainhack.org/brainhack/channels/nipype`
- Cohacking: `https://meet.jit.si/pydra`

### Tutorial

![The Pydra tutorial homepage](./assets/pydra-tutorial.png)

> https://nipype.github.io/pydra-tutorial

### Q&A

![The NeuroStars homepage](./assets/neurostars-homepage.png)

> https://neurostars.org

### Pydra task packages

![Pydra task packages on PyPI](./assets/pypi-packages.png)

> https://pypi.org/search/?q=pydra

## The next steps

- Get comfortable with the core concepts behind Pydra
- Write and run a first workflow you are very familiar with
- Start with a linear dataflow, then increase in complexity
- Do not hesitate to reach out for help to the community
  
> https://github.com/aramis-lab/RSECon23-Pydra