# Tutorial for Pipefunc Package

The `pipefunc` package is a Python library that allows you to define functions as pipelines, with each function providing a single step in the pipeline. In this tutorial, we will explain how to use the package, based on an example notebook.

For the latest documentation, check out [the official documentation](https://pipefunc.readthedocs.io/en/latest/#what-is-this).


## Building a Simple Pipeline

Let's start by importing `pipefunc` and `Pipeline` from the `pipefunc` module.

In [None]:
from pipefunc import pipefunc, Pipeline

We then define some functions using the `@pipefunc` decorator. The `@pipefunc` decorator turns these functions into pipeline steps. For each function, we specify an `output_name` which will be used to refer to the output of that function in the pipeline.

In [None]:
@pipefunc(output_name="c")
def f_c(a, b):
    return a + b


@pipefunc(output_name="d")
def f_d(b, c, x=1):  # "c" is the output of f_c
    return b * c


@pipefunc(output_name="e")
def f_e(c, d, x=1):  # "d" is the output of f_d
    return c * d * x

We now have three functions `f_c`, `f_d`, and `f_e`, which we can use to build a pipeline. Let's create a `Pipeline` object, passing our functions in the order we want them to execute. We can also enable debugging, profiling, and caching for the entire pipeline:

In [None]:
pipeline = Pipeline([f_c, f_d, f_e], debug=True, profile=True, cache_type="lru")

Now, we have a pipeline that adds two numbers (function `f_c`), multiplies two numbers (function `f_d`), and again multiplies two numbers (function `f_e`).


## Visualizing the Pipeline

You can visualize your pipeline using the `visualize()` method, and print the nodes in the graph using the `graph.nodes` attribute.

In [None]:
pipeline.visualize()
print("Graph nodes:", pipeline.graph.nodes)

In [None]:
pipeline.visualize_holoviews()
print("Graph nodes:", pipeline.graph.nodes)

## Using the Pipeline

To use the pipeline, we first get a handle for each function using the `func` method on the pipeline, passing the output name of the function we want.

In [None]:
pf_d = pipeline.func("d")
pf_e = pipeline.func("e")

We can now use these handles as if they were the original functions. The pipeline will automatically ensure that the functions are called in the correct order, passing the output of one function as the input to the next.

In [None]:
c = f_c(a=2, b=3)
assert c == 5
assert (
    f_d(b=3, c=5)
    == pf_d(a=2, b=3)  # We can call pf_d with different arguments
    == pf_d(b=3, c=5)
    == 15
)
assert pf_e(c=c, d=15, x=1) == pf_e(a=2, b=3, x=1) == pf_e(a=2, b=3, d=15, x=1) == 75

Alternatively, one can also use the `__call__` method on the pipeline, passing the output name of the function we want to call, and the arguments to that function. For example:

In [None]:
pipeline("d", b=3, c=5)

## Function Argument Combinations

To see all the possible combinations of arguments that can be passed to each function, you can use the `all_arg_combinations` property. This will return a dictionary, with function output names as keys and sets of argument tuples as values.

In [None]:
all_args = pipeline.all_arg_combinations
assert all_args == {
    "c": {("a", "b")},
    "d": {("a", "b", "x"), ("b", "c", "x")},
    "e": {("a", "b", "d", "x"), ("a", "b", "x"), ("b", "c", "x"), ("c", "d", "x")},
}
# We can get arguments for a specific function
assert pipeline.root_args("e") == ("a", "b", "x")

## Using the call_full_output Method

The `call_full_output()` method can be used to call the function and get all the outputs from the pipeline as a dictionary.

In [None]:
pf_e = pipeline.func("e")
pf_e.call_full_output(a=2, b=3, x=1)

## Direct Calling with Root Arguments (as positional arguments)

You can directly call the functions in the pipeline with the root arguments using the `call_with_root_args()` method. It automatically executes all the dependencies of the function in the pipeline with the given root arguments.

In [None]:
pf_e = pipeline.func("e")
pf_e.call_with_root_args(1, 2, 1)  # note these are now positional args

This executes the function `f_e` with the root arguments `a=1, b=2, x=1`.

For more information about this method, you can use the Python built-in `help` function or the `?` command.

In [None]:
help(pf_e.call_with_root_args)

This shows the signature and the doc-string of the `call_with_root_args` method.


## Handling Multiple Outputs

Functions can return multiple results at once. The `output_name` argument allows you to specify multiple outputs by providing a tuple of strings. By default, this assumes the output is a tuple. However, if the output is a single element selected from a tuple, you can use the `output_picker` argument to specify that.

In [None]:
from pipefunc import pipefunc, Pipeline


# Define a function add_ab with multiple outputs, 'c' and 'const'.
@pipefunc(output_name=("c", "const"))
def add_ab(a, b):
    return (a + b, 1)


# Define a function mul_bc with multiple outputs, 'd' and 'e',
# where output_picker is used to select the output.
@pipefunc(
    output_name=("d", "e"),
    output_picker=dict.__getitem__,
)
def mul_bc(b, c, x=1):
    return {"d": b * c, "e": x}


# Define a function calc_cde with multiple outputs, 'g' and 'h',
# where output_picker is used to select the output.
@pipefunc(
    output_name=("g", "h"),
    output_picker=getattr,
)
def calc_cde(c, d, e, x):
    from types import SimpleNamespace

    return SimpleNamespace(g=c * d * x, h=c + e)


# Define a function add_gh with a single output 'i'.
@pipefunc(output_name="i")
def add_gh(h, g):
    return h + g


# Create a pipeline with the defined functions and visualize it.
pipeline_multiple = Pipeline([add_ab, mul_bc, calc_cde, add_gh])
pipeline_multiple.visualize()
final_func = pipeline_multiple.func("i")
final_func(a=1, b=2, x=3)

**(Sneak peak of the next section: simplifying the pipeline)**

The pipeline can be simplified by combining `calc_cde` and `add_gh` into a single pipeline.

In [None]:
simplified_pipeline_multiple = pipeline_multiple.simplified_pipeline("i")
simplified_pipeline_multiple.visualize()

Note that, in the simplified pipeline, the full output of `calc_cde` (i.e., `g, h`) is not available. 

In [None]:
# If the full output of calc_cde (g, h) is needed, we can't use the simplified pipeline.
out_full = pipeline_multiple.func("i").call_full_output(a=1, b=2, x=3)
out_full_red = simplified_pipeline_multiple.func("i").call_full_output(a=1, b=2, x=3)
print(f"Full output of f_e:\n{out_full}")
print(f"Full output of f_e after simplification:\n{out_full_red}")

## Simplifying Pipelines
Consider the following pipeline (look at the `visualize()` output to see the structure of the pipeline):

In [None]:
from pipefunc import Pipeline


def f1(a, b, c, d):
    return a + b + c + d


def f2(a, b, e):
    return a + b + e


def f3(a, b, f1):
    return a + b + f1


def f4(f1, f3):
    return f1 + f3


def f5(f1, f4):
    return f1 + f4


def f6(b, f5):
    return b + f5


def f7(a, f2, f6):
    return a + f2 + f6


# If the functions are not decorated with @pipefunc,
# they will be wrapped and the output_name will be the function name
pipeline_complex = Pipeline([f1, f2, f3, f4, f5, f6, f7])
pipeline_complex("f7", a=1, b=2, c=3, d=4, e=5)
pipeline_complex.visualize(color_combinable=True)  # combinable functions have the same color

In the example code above, the complex pipeline composed of multiple functions (`f1`, `f2`, `f3`, `f4`, `f5`, `f6`, `f7`) can be simplified by merging the nodes `f1`, `f3`, `f4`, `f5`, `f6` into a single node.
This merging process simplifies the pipeline and allows to reduce the number of functions that need to be cached/saved.

The method `reduced_pipeline` from the `Pipeline` class is used to generate this simplified version of the pipeline.

In [None]:
simplified_pipeline_complex = pipeline_complex.simplified_pipeline("f7")
simplified_pipeline_complex.visualize()

However, simplifying a pipeline comes with a trade-off. The simplification process removes intermediate nodes that may be necessary for debugging or inspection.

For instance, if a developer wants to monitor the output of `f3` while processing the pipeline, they would not be able to do so in the simplified pipeline as `f3` has been merged into a single node. Hence, while a simplified pipeline can speed up the computation, it may limit the ability to examine intermediate computations.

### Another graph simplification example

In [None]:
from pipefunc import pipefunc, Pipeline


@pipefunc(output_name=("d", "e"))
def calc_de(b, g, x=1):
    pass


@pipefunc(output_name=("g", "h"))
def calc_gh(a, x=1):
    pass


@pipefunc(output_name="gg")
def calc_gg(g):
    pass


@pipefunc(output_name="i")
def calc_i(gg, b, e):
    pass


# Create a pipeline with the defined functions and visualize it
pipe3 = Pipeline([calc_de, calc_gh, calc_i, calc_gg])
pipe3.visualize(color_combinable=True)
pipe3.simplified_pipeline("i").visualize()

## Working with Resources Report

The `resources_report()` method of the `pipeline` provides useful information on the performance of the functions in the pipeline such as CPU usage, memory usage, average time, and the number of times each function was called. This feature is only available if `profile=True` when creating the pipeline.

In [None]:
# This will print the number of times each function was called
# CPU, memory, and time usage is also reported
pipeline.resources_report()

This report can be beneficial in performance tuning and identifying bottlenecks in your pipeline. You can identify which functions are consuming the most resources and adjust your pipeline accordingly.

You can also look all the stats directly with:

In [None]:
pipeline.profiling_stats

## Parallel Execution and Caching

To enable parallel execution, you can use Python's built-in `concurrent.futures.ProcessPoolExecutor`. To enable caching, simply set the `cache` attribute to `True` for each function. This can be useful to avoid recomputing results when calling the same function with the same arguments multiple times.

In [None]:
from concurrent.futures import ProcessPoolExecutor

for f in pipeline.functions:
    # Enable caching for all functions
    # See next section to only cache based on a certain parameter sweep
    f.cache = True

pf_e = pipeline.func("e")
with ProcessPoolExecutor(max_workers=1) as executor:
    results = executor.map(pf_e.call_with_dict, [{"a": 2, "b": 3, "x": 1}] * 10)
    print(list(results))

The cache is populated *__even when using parallel execution__*. To see the cache, you can use the `cache` attribute on the pipeline.

The keys of the cache are always in terms of the root arguments.

In [None]:
print(f"Cache object: {pipeline.cache}")
pipeline.cache.cache

## Parameter Sweeps

Parameter sweeps are a technique used in computational simulations to explore the parameter space of a model or system. 

In the provided example, the `generate_sweep` method is used to generate a set of combinations of input parameters `a`, `b`, `c`, `d`, and `e` for the function `f7`. 
The `generate_sweep` method takes a dictionary of parameters as input and returns a list of dictionaries, where each dictionary represents a combination of parameters.

In [None]:
from pipefunc import Sweep

combos = {
    "a": [0, 1, 2],
    "b": [0, 1, 2],
    "c": [0, 1, 2],
    "d": [0, 1, 2],
    "e": [0, 1, 2],
}
# This means a Cartesian product of all the values in the lists
# while zipping ("a", "b").
sweep = Sweep(combos, dims=[("a", "b"), "c", "d", "e"])
sweep.list()[:10]  # show the first 10 combinations

The function `set_cache_for_sweep` then enables caching for nodes in the pipeline that are expected to be executed two or more times during the parameter sweep. Caching improves the efficiency of the sweep by storing and reusing results of repeated computations, rather than performing the same computation multiple times.

In [None]:
from pipefunc import set_cache_for_sweep

set_cache_for_sweep(
    "f7", simplified_pipeline_complex, sweep, min_executions=2, verbose=True
)

Large parameter sweeps can be computationally expensive. Reducing the pipeline and utilizing caching, as demonstrated in the example, can help alleviate this cost.

## Calculating Optimal Execution Order

In complex pipelines, especially those involving parameter sweeps, some function nodes may be executed multiple times. 
Precalculating and caching the results of such functions can significantly speed up the pipeline execution.
The `pipefunc` package provides the `get_precalculation_order()` function to determine the optimal execution order of functions in a pipeline, prioritizing those functions which are executed more often.

Let's consider a test scenario with a pipeline composed of four functions and a sweep of parameters `x`, `y`, and `z`.

In [None]:
from pipefunc import pipefunc, Pipeline, Sweep, count_sweep, get_precalculation_order


@pipefunc(output_name=("a", "b"))
def f_ab(y, z=1):
    print(f"🏃 Running f_ab(y={y}, z={z})")
    return y + z, y * z


@pipefunc(output_name=("c", "d"))
def f_cd(x, a, z=1):
    return x + a + z, x * a * z


@pipefunc(output_name="aa")
def f_aa(a):
    print(f"🏃 Running f_aa(a={a})")
    return a + 1


@pipefunc(output_name="i")
def f_i(aa, x, d):
    return aa + x + d


pipeline_order = Pipeline([f_cd, f_ab, f_i, f_aa])
pipeline_order.visualize()

In [None]:
sweep = Sweep({"y": [1, 2], "x": [3, 4], "z": [5, 6]})
cnt = count_sweep("i", sweep, pipeline_order)
print("Number of executions (keys are based on root_args):")
cnt

To determine the optimal order of execution for functions in the pipeline, we call the `get_precalculation_order()` function, passing in our pipeline and the counts of function executions:

In [None]:
precalculation_order = get_precalculation_order(pipeline_order, cnt)
precalculation_order

In the `get_precalculation_order()` function, the order is determined by the topological dependencies of the functions and the count of their executions in the context of a parameter sweep. Only functions that are executed multiple times are included in the precalculation order. This ensures that the computation is most efficient where it matters the most.

In this test scenario, the order of precalculation is `[f_e, f_gg]`, meaning `f_e` and `f_gg` should be precalculated and cached before executing the rest of the pipeline.

In [None]:
print(f"Length of the sweep is {len(sweep)}")
for f in precalculation_order:
    f.cache = True  # enable caching
    func = pipeline_order.func(f.output_name)
    input_args = pipeline_order.root_args(f.output_name)
    sub_sweep = sweep.filtered_sweep(input_args)
    print(f"- Function `{f}` has {len(sub_sweep)} combinations")
    for combo in sub_sweep.generate():  # sweep as generator
        _ = func.call_with_dict(combo)  # We just populate the cache

Now we populated the cache, however, one might also run these calculations on a cluster and save the results to disk.

Note that when we are executing the pipeline to get `'i'`, we are not executing `f_ab` and `f_aa` again, but rather loading the results from the cache.

In [None]:
# Note no print statements are shown from `f_ab` and `f_aa`
import pandas as pd

F = pipeline_order.func("i")
results = [F.call_full_output(**combo) for combo in sweep.generate()]
df = pd.DataFrame(results)
df

# Running "Fan-out" / Map-Reduce Pipelines using `MapSpec`
Absolutely, the `mapspec` property plays a crucial role in defining how data is processed in a distributed or parallel computing context within PipeFunc. Let's emphasize and explain this concept more clearly in the documentation.

---

## Running "Fan-out" / Map-Reduce Pipelines with PipeFunc

This section illustrates how to effectively use PipeFunc for "Fan-out" and Map-Reduce operations, highlighting the use of the `mapspec` attribute to automate and optimize the distribution of data across operations.

### Example: Doubling and Summing Integers

The script below demonstrates a two-step pipeline: doubling each integer in an input list, followed by summing all the doubled values.

In [None]:
from pipefunc.map import run, load_outputs
import numpy as np
from typing import Any

@pipefunc(output_name="y", mapspec="x[i] -> y[i]")
def double_it(x: int) -> int:
    assert isinstance(x, int)
    return 2 * x

@pipefunc(output_name="sum")
def take_sum(y: np.ndarray[Any, np.dtype[np.int_]]) -> int:
    assert isinstance(y, np.ndarray)
    return sum(y)

pipeline = Pipeline([double_it, take_sum])

inputs = {"x": [0, 1, 2, 3]}
run_folder = "my_run_folder"
results = run(pipeline, inputs, run_folder=run_folder)
assert results[-1].output == 12
assert results[-1].output_name == "sum"
assert load_outputs("sum", run_folder=run_folder) == 12

### Key Components

1. **Using `mapspec` for Data Distribution**:
   - The `mapspec` attribute in the `@pipefunc` decorator defines how data is distributed across computations. In `double_it`, `mapspec="x[i] -> y[i]"` specifies that each element `i` of the input array `x` is independently processed to produce the corresponding element `i` in the output array `y`. Because `take_sum` does not have a `mapspec`, it receives the entire array `y` for aggregation.
   - This specification allows PipeFunc to automatically handle the distribution of input elements across multiple processing units, crucial for scaling operations across large datasets or distributed environments.

2. **Function Definitions**:
   - `double_it`: Doubles each integer, demonstrating a simple stateless operation that can be easily parallelized.
   - `take_sum`: Aggregates all elements of the resulting array into a single sum, serving as the reduce step in this Map-Reduce example.

3. **Pipeline Execution**:
   - The `run` function executes the pipeline with specific inputs and a directory for temporary run files, showcasing how PipeFunc manages data flow and execution state across the pipeline. Alternatively, use `Pipeline.map` to run the pipeline on a single input dictionary.

4. **Results Verification**:
   - Assertions check that the final sum of doubled numbers is correct, ensuring both the integrity and correctness of the pipeline's execution.

5. **Output Retrieval**:
   - The `load_outputs` function demonstrates how to retrieve and verify results post-computation, confirming the output is stored and accessible as expected.

This detailed example not only demonstrates the use of basic PipeFunc functionalities but specifically underscores how the `mapspec` attribute optimizes data processing tasks by defining clear mapping rules for data elements, pivotal for implementing efficient parallel processing workflows.