# Graph Construction

In [None]:
from ppcascade.fluent import PProcFluent
from ppcascade.utils.request import Request
from ppcascade.utils.window import Range

graph = (
    PProcFluent().source([Request({
        "class": "od", 
        "expver": "0001", 
        "stream": "enfo", 
        "date": "20240226", 
        "time": "00", 
        "param": 167, 
        "levtype": "sfc", 
        "type": "pf", 
        "number": range(1, 51), 
        "step": range(0, 25, 6)
        })])
    .window_operation(
        "mean", 
        [
            Range("0-12", [0, 6, 12]), 
            Range("12-24", [12, 18, 24])
        ],
        dim="step")      
    #.ensemble_operation("mean", dim="number")
)

In [None]:
graph.nodes.coords

# EFI Graph Execution Example

In [None]:
from cascade.cascade import Cascade

from ppcascade.entry.parser import get_parser

parser = get_parser("extreme")
graph = Cascade.graph("extreme", parser.parse_args(["-c", "efi.yaml", "--forecast", "mars:ens", "--climatology", "mars:clim"]))

In [None]:
import functools

from cascade.graph import pyvis

def node_info_ext(sinks, node):
    info = pyvis.node_info(node)
    info["color"] = "#648FFF"
    if not node.inputs:
        info["shape"] = "diamond"
        info["color"] = "#DC267F"
    elif node in sinks:
        info["shape"] = "triangle"
        info["color"] = "#FFB000"
    if node.payload is not None:
        t = []
        if "title" in info:
            t.append(info["title"])
        func, *args = node.payload
        t.append(f"Function: {func}")
        if args:
            t.append("Arguments:")
            t.extend(f"- {arg!r}" for arg in args)
        info["title"] = "\n".join(t)
    return info

pyvis_graph = pyvis.to_pyvis(
        graph,
        notebook=True,
        cdn_resources="remote",
        height="1500px",
        node_attrs=functools.partial(node_info_ext, graph.sinks),
        hierarchical_layout=False,
    )
pyvis_graph.show(f"efi_graph.html")

In [None]:
from cascade.executors.dask import DaskLocalExecutor 

DaskLocalExecutor.execute(
    graph,
    n_workers=2,
    threads_per_worker=1,
    memory_limit="10GB",
    report=f"efi_dask_report.html",
)


# Construction with Array API Backend

Note: it is possible to use the array api backend to work with pure arrays instead of earthkit objects with metadata, however, this does not necessarily work out of the box with the pre-constructed graphs depending on how the graph is constructed. In particular, if the internal contents of each node contain more than a single field e.g. multiple ensemble members, and computations involving reducing along dimensions in the internal array is required then the keyword arguments for the backend functions will diverge. For example, in the case of computing a mean over ensemble members the array api requires keyword argument "axis" in order to not flatten the entire array. If the graph is constructed such that the internal contents of each node only contains a single field then the backend can be swapped out without any problems.

In [None]:
%env CASCADE_ARRAY_MODULE=numpy

In [None]:
from ppcascade.fluent import PProcFluent
from ppcascade.utils.request import Request
from ppcascade.utils.window import Range
from ppcascade.backends.arrayapi import ArrayAPIBackend

windows = [
            Range("0-24", list(range(0, 25, 6))), 
            Range("12-36", list(range(12, 37, 6)))
        ]

interopolation = {
    "grid": "O640"
}

fluent = PProcFluent()

climatology = fluent.source([Request({
        "class": "od", 
        "expver": "0001", 
        "stream": "efhs", 
        "date": "20240314", 
        "time": "00", 
        "param": 228004, 
        "levtype": "sfc", 
        "type": "cd", 
        "step": ["0-24", "12-36"],
        "quantile": ["{}:100".format(i) for i in range(100 + 1)],
        "source": "mars",
        "interpolate": interopolation
    }, 
    no_expand=("quantile",))
                            ], backend=ArrayAPIBackend)

actions = (
    fluent.source([Request({
        "class": "od", 
        "expver": "0001", 
        "stream": "enfo", 
        "date": "20240314", 
        "time": "00", 
        "param": 167, 
        "levtype": "sfc", 
        "type": "pf", 
        "number": range(1, 5), 
        "step": range(0, 37, 6),
        "source": "mars",
        "interpolate": interopolation
        })], backend=ArrayAPIBackend)
    .window_operation(
        "mean", 
        windows,
        dim="step")      
    .ensemble_extreme("efi", climatology, windows, eps=1e-4)
)

In [None]:
actions.nodes

In [None]:
from cascade.executors.dask import DaskLocalExecutor 

result = DaskLocalExecutor.execute(
    actions.graph(),
    n_workers=2,
    threads_per_worker=1,
    memory_limit="10GB",
    report=f"efi_dask_report.html",
)

In [None]:
result