# ArrayAPI Example of Graph Construction and Execution

## Graph Construction

Source nodes for the graph are created with the `source` function taking arguments: a function, its args and kwargs. 

In [None]:
import numpy as np 
import xarray as xr
from cascade.fluent import Fluent

args = xr.DataArray([np.fromiter([(2, 3) for _ in range(4)], dtype=object) for _ in range(5)], coords={"x": list(range(5)), "y": 3*np.array(range(4))})
start = Fluent().source(np.random.rand, args)
start

Plotting the graph now, we will have 20 nodes

In [None]:
from cascade.graph.pyvis import to_pyvis

net_graph = to_pyvis(start.graph(), notebook=True)
net_graph.show("start.html")

With the functions such as `mean`, `std`, `min`, `max` etc we can reduce the array of nodes along a specified dimension
all the way down to a single node

In [None]:
single = start.mean("x").min("y")

From our initial payload in creating the source nodes, we chose a random array of shape (2, 3) inside each node. We can use `expand` to expose one of these internal dimensions into the array of nodes. To do this we need specify a new name for the dimension, its size and the axis of the internal array to take values from. After the operation, internally in each node we have arrays of shape (2,).

In [None]:
expanded = single.expand("z", 3, 1, 0)
expanded.nodes

We can broadcast to match the shape of another existing set of nodes, which in this case creates duplicates of the single existing 
node along the z dimension. Note, this is an operation purely on the nodes of the graph and no operations are performed to the underlying arrays in each node.

In [None]:
single.broadcast(expanded).nodes

### Low Level Operations

There are various low level functions `map`, `reduce`, and `transform` which allow the application of user-defined functions onto the array of nodes. The `map` operation applies a single payload to all nodes, or if a array of payloads is provided of the same shape as the array of nodes, then each node will get a unique payload applied to it.

In [None]:
from cascade.fluent import Payload 

# Single payload that scales the array in each node by 2
expanded.map(Payload(lambda x: x * 2)).nodes

In [None]:
# Or we can scale the array in each node by a different value 
mapped = expanded.map([Payload(lambda x, a=a: x * a) for a in range(1, 4)])

Arbitrary reduction operations can be applied with the `reduce` operation, which takes arguments `Payload` and the name of the dimension the reduction should be performed along. If no dimension name is supplied, the reduction is performed along the first axis. The higher level functions `mean`, `std`, `min`, `max` are all `reduce` operations with a pre-defined payload.

Finally, we have `transform` which allows the shape of the array of nodes in the subsequent action to be changed. The operation takes 
- a function of the form `func(action: Action, arg: Any) -> Action`
- a list of values for `arg` 
- a name for new dimension along which `arg` varies

The resulting nodes will be output of `func` with the different values of `arg`.

In [None]:
from cascade.fluent import Action

def _example_transform(action: Action, threshold: float) -> Action:
    ret = action.map(Payload(lambda x: x if x > threshold else 0))
    ret._add_dimension("threshold", threshold)
    return ret

mapped.transform(_example_transform, [0, 1, 10], "threshold").nodes

## Graph Execution

Example graph composed by combining the various operations detailed in the Graph Construction section

In [None]:
import numpy as np 
import xarray as xr
from cascade.fluent import Payload, Fluent

args = xr.DataArray([np.fromiter([(2, 3) for _ in range(4)], dtype=object) for _ in range(5)], dims=["x", "y"])
graph = (
    Fluent().source(np.random.rand, args)
    .mean("x")
    .min("y")
    .expand("z", 3, 1, 0)
    .map([Payload(lambda x, a=a: x * a) for a in range(1, 4)])
    .graph()
)

At the moment, the graphs can be executed using with either Dask's dynamic scheduling or a static schedule produced by one of the schedulers.

In [None]:
import os
from cascade.executors.dask import DaskLocalExecutor

os.environ["DASK_LOGGING__DISTRIBUTED"]="debug"
DaskLocalExecutor.execute(graph, memory_limit="5GB", n_workers=1, threads_per_worker=1)

In [None]:
from cascade.executors.dask import DaskLocalExecutor
from cascade.schedulers.depthfirst import DepthFirstScheduler
from cascade.transformers import to_task_graph
from cascade.contextgraph import ContextGraph

context = ContextGraph()
context.add_node(name="cpu1", type="CPU", speed=10, memory=10)
context.add_node(name="cpu2", type="CPU", speed=10, memory=10)
context.add_edge("cpu1", "cpu2", 10, 1)

schedule = DepthFirstScheduler().schedule(to_task_graph(graph, {}), context)
DaskLocalExecutor.execute(schedule, memory_limit="5GB", n_workers=1, threads_per_worker=1)


In [None]:
schedule.task_allocation

In [None]:
from cascade.executors.dask_utils.report import Report

report = Report("performance_report.html")
report.task_stream.exclude_transfer()

# Cupy Example

We can construct graphs where cupy arrays instead of numpy arrays are the base objects by changing the input arrays that feature in the payloads of the source method. In this case, we need to add an additional set of tasks to the graph which retrieves the outputs from the GPU so that we can return the results at the end of the execution

In [None]:
import numpy as np
import cupy as cp 
import xarray as xr
from cascade.fluent import Payload, Fluent

args = xr.DataArray([np.fromiter([(2, 3) for _ in range(4)], dtype=object) for _ in range(5)], dims=["x", "y"])
graph = (
    Fluent().source(cp.random.rand, args)
    .mean("x")
    .min("y")
    .expand("z", 3, 1, 0)
    .map([Payload(lambda x, a=a: x * a) for a in range(1, 4)])
    .map(Payload(lambda x: x.get())) # Move to CPU
    .graph()
)

In [None]:
import os
from cascade.executors.dask import DaskLocalExecutor
os.environ["DASK_LOGGING__DISTRIBUTED"]="debug"
DaskLocalExecutor.execute(graph, memory_limit="5GB", n_workers=1, threads_per_worker=1)