# pydra

There are two main types of objects in *pydra*: `Task` and `Workflow`. `Task` is used to represent individual tasks that take specified input and return specified output. You can create a `Task` from various things, e.g. from *python* functions or *shell* commands. In this introduction we will cover `FunctionTask`s that are used to convert any *python* function to *pydra* `Task`.

Later, we will talk about `Workflow` that is used to create a pipeline that connects various `Task`s. In *pydra*, every `Workflow` is also a `Task`, so can be connected to any other `Task` and be part of another `Workflow`.

In [3]:
import pydra

## FunctionTask

A `FunctionTask` can be created from every *python* function by using *pydra* decorator: `pydra.mark.task`:

In [None]:
@pydra.mark.task
def add_var(a, b):
    return a + b

Once we decorate the function, we can create a pydra `Task` and specify the input:

In [None]:
task1 = add_var(a=4, b=5)

We can check the type of `task1`:

In [None]:
type(task1)

and we can check if the task has correct values of `a` and `b`, they should be saved in the task `inputs`:

In [None]:
print(f"a = {task1.inputs.a}")
print(f"b = {task1.inputs.b}")

We can also check content of entire `inputs`:

In [None]:
task1.inputs

As you could see, `task.inputs` contains also information about the function, that is an inseparable part of the `FunctionTask`.

Once we have the task with set input, we can run it. Since `Task` is a "callable object", we can use the syntax:

In [None]:
task1()

As you can see, the result was returned right away, but we can also access it later:

In [None]:
task1.result()

`Result` contains more than just an output, so if we want to get the task output, we can type:

In [None]:
result = task1.result()
result.output.out

### Customizing output names
Note, that "out" is a default name for the task output, but we can always customize it. There are two ways of doing it: using *python* function annotation and using another *pydra* decorator:

Let's start from the function annotation:

In [5]:
import typing as ty

@pydra.mark.task
def add_var_an(a, b) -> ty.NamedTuple("Output", [("sum_a_b", int)]):
    return a + b


task1a = add_var_an(a=4, b=5)
task1a()

Result(output=Output(sum_a_b=9), runtime=None, errored=False)

The annotation might be very useful to specify the output names when the function returns multiple values.

In [None]:
@pydra.mark.task
def modf_an(a) -> ty.NamedTuple("Output", [("fractional", ty.Any), ("integer", ty.Any)]):
    import math
    return math.modf(a)

task2 = modf_an(a=3.5)
task2()

The second way of customizing the output requires another decorator - `pydra.mark.annotate`

In [None]:
@pydra.mark.task
@pydra.mark.annotate({"return": {"fractional": ty.Any, "integer": ty.Any}})
def modf(a):
    import math
    return math.modf(a)

task2a = modf(a=3.5)
task2a()

Note, that the order of the decorators is important!

### Setting the input

We don't have to provide the input when we create a task, we can always set it later:

In [None]:
task3 = add_var()
task3.inputs.a = 4
task3.inputs.b = 5
task3()

If we forget to specify the input, `None` will be used as the default value in *pydra*, so the function will return a python error:

In [None]:
task3a = add_var()
task3a.inputs.a = 4
try:
    task3a()
except(TypeError) as err:
    print(f"TypeError: {err}")
else:
    raise

### Output directory and caching the results

After running the task, we can check where the output directory with the results was created:

In [None]:
task3.output_dir

Within the directory you can find the file with the results: `_result.pklz`.

In [None]:
import os
os.listdir(task3.output_dir)

But we can also provide the directory path where we want to store the results, let's create a temporary directory and a specific subdirectory "task4":

In [None]:
from tempfile import mkdtemp
from pathlib import Path
cache_dir_tmp = Path(mkdtemp()) / "task4"
print(cache_dir_tmp)

Now we can pass this path to the argument of `FunctionTask` - `cache_dir`. To observe the execution time, we specify a function that is sleeping for 5s:

In [None]:
@pydra.mark.task
def add_var_wait(a, b):
    import time
    time.sleep(5)
    return a + b

task4 = add_var_wait(a=4, b=6, cache_dir=cache_dir_tmp)


If you're running the cell first time, it should take around 5s.

In [None]:
task4()

If you're running the cell first time, it should take around 5s.

We can check `output_dir` of our task, it should contain the path of `cache_dir_tmp` and the last part contains the name of the task class `FunctionTask` and the task checksum:

In [None]:
task4.output_dir

Let's see what happens when we defined identical task again with the same `cache_dir`: 

In [None]:
task4a = add_var_wait(a=4, b=6, cache_dir=cache_dir_tmp)
task4a()

This time the result should be ready right away! *pydra* uses available results and do not recompute the task.

*pydra* not only checks for the results in `cache_dir`, but you can provide a list of other locations that should be checked. Let's create another directory that will be used as `cache_dir` and previous working directory will be used in `cache_locations`.

In [None]:
cache_dir_tmp_new = Path(mkdtemp()) / "task4b"

task4b = add_var_wait(a=4, b=6, cache_dir=cache_dir_tmp_new, cache_locations=[cache_dir_tmp])
task4b()

This time the results should be also returned quickly! And we can check that `task4b.output_dir` was not created:

In [None]:
task4b.output_dir.exists()

Note, that if we update the input of the task, and run again, the new directory will be created and task will be recomputed:

In [None]:
task4b.inputs.a = 1
print(task4b())
print(task4b.output_dir.exists())

and when we change the `output_dir`, we can see that the last part is different than last time:

In [None]:
task4b.output_dir

This is because, the checksum changes when we change either input or function.

#### Exercise 1
Create a task that take a list of numbers as an input and returns two fields: `mean` with the mean value and `std` with the standard deviation value.

In [6]:
@pydra.mark.task
@pydra.mark.annotate({"return": {"mean": ty.Any, "std": ty.Any}})
def mean_dev(my_list):
    import statistics as st
    return st.mean(my_list), st.stdev(my_list)

my_task = mean_dev(my_list=[2, 2, 2])
my_task()

Result(output=Output(mean=2, std=0.0), runtime=None, errored=False)

In [None]:
# write your solution here (you can use statistics module)

### Using Audit

*pydra* can record various run time information, including the workflow provenance, by setting `audit_flags` and the type of messengers. 

`AuditFlag.RESOURCE` allows you to monitor resource usage for the `Task`, while `AuditFlag.PROV` tracks the provenance of the `Task`. 

In [None]:
from pydra.utils.messenger import AuditFlag, PrintMessenger

task5 = add_var(a=4, b=5, audit_flags=AuditFlag.RESOURCE)
task5()

One can turn on both audit flags using `AuditFlag.ALL`, and print the messages on the terminal using the `PrintMessenger`. 

In [None]:
task5 = add_var(a=4, b=5, audit_flags=AuditFlag.ALL, messengers=PrintMessenger())
task5()