# DASK to ServiceX

In this demo we'll take advantage of DASK and ServiceX.

## Assumptions:

* We don't start anything until we know the number of files that SX will produce
* We are ok with some files failing coming out of SX
* We are going to do one partition per file
* When we start we don't necessarily know all the files produced.

## Design Outline

* A single `dask` task/layer that has a single output per partition. The output is just a string.
* The `AwkwardInputLayer` that has looks at the task input and loads that data from `minio`

This version of things will work only for local files - once this works we can move it to a SX prototype.

This was written before any code below.

## Research

A few things to help get started below.

### How does the current uproot/dask work?

The `from_map` is the base of how `dask_awkward` builds the production. One very nice thing - this is a public interface, so we
can rely on it in libraries. 

```python
def from_map(
    func: Callable,
    *iterables: Iterable,
    args: tuple[Any, ...] | None = None,
    label: str | None = None,
    token: str | None = None,
    divisions: tuple[int, ...] | tuple[None, ...] | None = None,
    meta: ak.Array | None = None,
    **kwargs: Any,
) -> Array | tuple[Array, Array]:
    """Create an Array collection from a custom mapping.

    Parameters
    ----------
    func : Callable
        Function used to create each partition.
    *iterables : Iterable
        Iterable objects to map to each output partition. All
        iterables must be the same length. This length determines the
        number of partitions in the output collection (only one
        element of each iterable will be passed to `func` for each
        partition).
    args : tuple
        Tuple of positional arguments to append after mapped arguments.
    label : str, optional
        String to use as the function-name label in the output
        collection-key names.
    token : str, optional
        String to use as the "token" in the output collection-key names.
    divisions : tuple[int, ...] | tuple[None, ...], optional
        Partition boundaries (if known).
    meta : Array, optional
        Collection metadata array, if known (the awkward-array type
        tracer)
    **kwargs : Any
        Keyword arguments passed to `func`.

    Returns
    -------
    Array
        Array collection.

    """

    if not callable(func):
        raise ValueError("`func` argument must be `callable`")
    lengths = set()
    iters: list[Iterable] = list(iterables)
    for i, iterable in enumerate(iters):
        if not isinstance(iterable, Iterable):
            raise ValueError(
                f"All elements of `iterables` must be Iterable, got {type(iterable)}"
            )
        try:
            lengths.add(len(iterable))  # type: ignore
        except (AttributeError, TypeError):
            iters[i] = list(iterable)
            lengths.add(len(iters[i]))  # type: ignore
    if len(lengths) == 0:
        raise ValueError("`from_map` requires at least one Iterable input")
    elif len(lengths) > 1:
        raise ValueError("All `iterables` must have the same length")
    if lengths == {0}:
        raise ValueError("All `iterables` must have a non-zero length")

    # Check for `produces_tasks` and `creation_info`
    produces_tasks = kwargs.pop("produces_tasks", False)
    # creation_info = kwargs.pop("creation_info", None)

    if produces_tasks or len(iters) == 1:
        if len(iters) > 1:
            # Tasks are not detected correctly when they are "packed"
            # within an outer list/tuple
            raise ValueError(
                "Multiple iterables not supported when produces_tasks=True"
            )
        inputs = list(iters[0])
        packed = False
    else:
        # Structure inputs such that the tuple of arguments pair each 0th,
        # 1st, 2nd, ... elements together; for example:
        # from_map(f, [1, 2, 3], [4, 5, 6]) --> [f(1, 4), f(2, 5), f(3, 6)]
        inputs = list(zip(*iters))
        packed = True

    # Define collection name
    label = label or funcname(func)
    token = token or tokenize(func, iters, meta, **kwargs)
    name = f"{label}-{token}"

    # Define io_func

    # FIXME: projection etc.
    if packed or args or kwargs:
        func = PackedArgCallable(
            func,
            args=args,
            kwargs=kwargs,
            packed=packed,
        )

    # Special `io_func` implementations can implement mocking and optionally
    # support buffer projection.
    if io_func_implements_mocking(func):
        io_func = func
        array_meta = cast(ImplementsMocking, func).mock()
    # If we know the meta, we can spoof mocking
    elif meta is not None:
        io_func = IOFunctionWithMocking(meta, func)
        array_meta = meta
    # Without `meta`, the meta will be computed by executing the graph
    else:
        io_func = func
        array_meta = None

    dsk = AwkwardInputLayer(name=name, inputs=inputs, io_func=io_func)

    hlg = HighLevelGraph.from_collections(name, dsk)
    if divisions is not None:
        result = new_array_object(hlg, name, meta=array_meta, divisions=divisions)
    else:
        result = new_array_object(hlg, name, meta=array_meta, npartitions=len(inputs))

    if io_func_implements_report(io_func):
        if cast(ImplementsReport, io_func).return_report:
            res = result.map_partitions(first, meta=array_meta, output_divisions=1)
            rep = result.map_partitions(second, meta=empty_typetracer())
            return res, rep

    return result
```

## Imports

In [1]:
import dask_awkward as dak
import awkward as ak
import dask
import uproot

from dask.highlevelgraph import Layer, HighLevelGraph
from dask.distributed import Client
from typing import AbstractSet

client = Client()

## The `uproot.dask` hack way

### The `awkward.Form` file form

We need the form from the schema to prevent us from having to open files that do not yet exist in hour hack. Eventually we'll have to build this from the schema we know exists from the `func_adl` query.

In [2]:
dummy_filename = "0fc6e51a5ea6dea107c195591d20a1b2-15.26710677._000019.pool.root.1"
with uproot.open(dummy_filename) as file:
    file_form = file['treeme'].arrays().layout.form

file_form

RecordForm([ListOffsetForm('i64', NumpyForm('float64'))], ['JetPt'])

Next, lets test it.

In [3]:
test_ar = uproot.dask({dummy_filename: "treeme"}, open_files=False, known_base_form=file_form)
test_ar.JetPt.compute()

### ServiceX Dask Layer

Next a `dask` layer that will eventually poll SX for files that are done.

In [4]:
class SXLayer(Layer):
    def __init__(self, name, output_names):
        super().__init__()
        self.name = name
        self.dependencies = dict()
        self.tasks = {
            out_name: (lambda: self.get_file(out_name),)
            for out_name in output_names
        }

    def __getitem__(self, key):
        return self.tasks[key]

    def __iter__(self):
        return iter(self.tasks)

    def __len__(self):
        return len(self.tasks)

    def is_materialized(self):
        return False
    
    def get_output_keys(self) -> AbstractSet[str | bytes | int | float]:
        return set(self.tasks.keys())
    
    def get_file(self, name):
        '''Return the info that is needed by uproot to actually open the file'''
        print(f"Returning info for file {name}: {dummy_filename}")
        return (dummy_filename, 'treeme', 0, 1, False)


In [5]:
def capture(*args, **kwargs):
    print(f"Captured: {args} {kwargs}")


def sx_open(n_files: int) -> ak.Array:
    assert n_files == 1, "We only know how to do one file for now"

    # Build the high level array.
    files = {
        f"sx_partition_{i}.root": "treeme"
        for i in range(0, n_files)
    }  
    ar = uproot.dask(files, open_files=False, known_base_form=file_form)
    assert len(ar.__dask_layers__()) == 1
    uproot_layer_name = list(ar.__dask_layers__())[0]

    # And the task that will furnish the file names for the uproot layer,
    # and make uproot layer dependent on the sx layer.
    sx_layer = SXLayer("sx_fetcher", files.keys())
    ar.__dask_graph__().layers["sx_fetcher"] = sx_layer
    ar.__dask_graph__().dependencies[uproot_layer_name].add("sx_fetcher")
    ar.__dask_graph__().dependencies["sx_fetcher"] = set()

    # Next, hook up the argument to the uproot functions to the output from the SX layer.
    uproot_layer = ar.__dask_graph__().layers[uproot_layer_name]
    uproot_layer_outputs = list(uproot_layer.keys())
    assert len(uproot_layer_outputs) == n_files
    for layer_out, layer_arg in zip(uproot_layer_outputs, files.keys()):
        print(f"Hooking up {layer_out} to {layer_arg}")
        uproot_layer._dict[layer_out] = (capture, f"{layer_arg}-bogus")
        # uproot_layer._dict[layer_out] = (uproot_layer[layer_out][0], f"{layer_arg}-bogus")
        print (f"  {uproot_layer[layer_out]}")
    print(uproot_layer)

    return ar

In [None]:
a = sx_open(1)
# uproot_key_name = 'from-uproot-8d1ae5c61dd92ce9355fa2e2a5fc2bc5'
# key0 = list(a.__dask_graph__().layers[uproot_key_name].keys())[0]
# print(key0)
# a.__dask_graph__().layers[uproot_key_name][key0][1]
# a.__dask_graph__().layers[uproot_key_name][key0][0]
a.__dask_graph__()



Hooking up ('from-uproot-8d1ae5c61dd92ce9355fa2e2a5fc2bc5', 0) to sx_partition_0.root
  (<function capture at 0x0000025441B67240>, 'sx_partition_0.root-bogus')
AwkwardInputLayer<from-uproot-8d1ae5c61dd92ce9355fa2e2a5fc2bc5>


0,1
layer_type  SXLayer  is_materialized  False  number of outputs  1,

0,1
layer_type,SXLayer
is_materialized,False
number of outputs,1

0,1
layer_type  AwkwardInputLayer  is_materialized  True  number of outputs  1  depends on sx_fetcher,

0,1
layer_type,AwkwardInputLayer
is_materialized,True
number of outputs,1
depends on,sx_fetcher


In [None]:
a.JetPt.compute()

This seems to be running into trouble because we can't really use task outputs as inputs. So we'll need to talk to someone further about that.

## Blockwise Approach

Could we start a blockwise approach on its own?

In [None]:
class SXLayerBW(Layer):
    '''Outputs are just the names of the files that we want to open downstream with uproot'''
    def __init__(self, name, n_files):
        super().__init__()
        self.name = name
        self.dependencies = dict()
        self.tasks = {
            "output_0": (lambda: self.get_file(f"output_0"),)
        }

    def __getitem__(self, key):
        return self.tasks[key]

    def __iter__(self):
        return iter(self.tasks)

    def __len__(self):
        return len(self.tasks)

    def is_materialized(self):
        return False
    
    def get_output_keys(self) -> AbstractSet[str | bytes | int | float]:
        return set(self.tasks.keys())
    
    def get_file(self, name):
        '''Return the info that is needed by uproot to actually open the file'''
        print(f"Returning info for file {name}: {dummy_filename}")
        return (dummy_filename, 'treeme', 0, 1, False)


And the layer that will load files from the above.

In [None]:
class URLoaderLayer(Layer):
    def __init__(self, name, sx_layer_name, output_name, n_files):
        super().__init__()
        self.name = name
        self.dependencies = {name: sx_layer_name}
        self.tasks = {
            (name, i): (lambda f_name: self.get_data(f_name), f'output_{i}')
            for i in range(n_files)
        }

    def __getitem__(self, key):
        return self.tasks[key]

    def __iter__(self):
        return iter(self.tasks)

    def __len__(self):
        return len(self.tasks)

    def is_materialized(self):
        return False
    
    def get_output_keys(self) -> AbstractSet[str | bytes | int | float]:
        return set(self.tasks.keys())
    
    def get_data(self, name):
        '''Return the info that is needed by uproot to actually open the file'''
        print(f"Returning info for file {name}: {dummy_filename}")
        with uproot.open(dummy_file) as file:
            return file['treeme'].arrays()

Ok - lets build up the array.

In [None]:
sx_layer = SXLayerBW("sx_fetcher", 1)


# def sx_open(n_files: int) -> ak.Array:
#     assert n_files == 1, "We only know how to do one file for now"

#     # Build the high level array.
#     files = {
#         f"sx_partition_{i}.root": "treeme"
#         for i in range(0, n_files)
#     }  
#     ar = uproot.dask(files, open_files=False, known_base_form=file_form)
#     assert len(ar.__dask_layers__()) == 1
#     uproot_layer_name = list(ar.__dask_layers__())[0]

#     # And the task that will furnish the file names for the uproot layer,
#     # and make uproot layer dependent on the sx layer.
#     sx_layer = SXLayer("sx_fetcher", files.keys())
#     ar.__dask_graph__().layers["sx_fetcher"] = sx_layer
#     ar.__dask_graph__().dependencies[uproot_layer_name].add("sx_fetcher")
#     ar.__dask_graph__().dependencies["sx_fetcher"] = set()

#     # Next, hook up the argument to the uproot functions to the output from the SX layer.
#     uproot_layer = ar.__dask_graph__().layers[uproot_layer_name]
#     uproot_layer_outputs = list(uproot_layer.keys())
#     assert len(uproot_layer_outputs) == n_files
#     for layer_out, layer_arg in zip(uproot_layer_outputs, files.keys()):
#         print(f"Hooking up {layer_out} to {layer_arg}")
#         uproot_layer._dict[layer_out] = (capture, f"{layer_arg}-bogus")
#         # uproot_layer._dict[layer_out] = (uproot_layer[layer_out][0], f"{layer_arg}-bogus")
#         print (f"  {uproot_layer[layer_out]}")
#     print(uproot_layer)

#     return ar

## SX Support

The code below belongs in the `sx-awk` library.

This is the ServiceX layer. It is responsible for all communication with ServiceX, and finding the files (and URL's) from `minio`.

In [None]:
from collections.abc import Set
from typing import AbstractSet, Any, Dict, Iterator, KeysView, List
import logging

class SXLayer(Layer):
    def __init__(self, sx_query_guid):
        super().__init__()
        self._query_guid = sx_query_guid

        # Create a task that will be executed when the layer is computed,
        # and will fetch the list of files from SX.
        k = f"SX-query-{self._query_guid}"
        self._tasks: Dict[str, Any] = {k: dask.delayed(self._fetch_files, name=k)}

    def _fetch_files(self) -> str:
        # This is where the actual fetching of the files from SX would happen.
        # For now, just return a list of files.
        logging.warn("Returning a file")
        return "0fc6e51a5ea6dea107c195591d20a1b2-15.26710677._000019.pool.root.1"

    def __getitem__(self, __key) -> Any:
        return self._tasks[str(__key)]
    
    def keys(self):
        return self._tasks.keys()
    
    def __len__(self) -> int:
        return len(self._tasks)
    
    def get_output_keys(self) -> AbstractSet[str]:
        return {f"SX-query-{self._query_guid}"}

    def __iter__(self):
        return iter(self._tasks)
    
    def is_materialized(self) -> bool:
        return False

In [None]:
l = SXLayer("182382781")
hlg = HighLevelGraph(
    layers={"l1": l},
    dependencies={},
)

In [None]:
r = client.compute(hlg)

In [None]:
type(r)


## The `uproot.dask` way

This is a very simple call - here for reference.

In [None]:
filename = "0fc6e51a5ea6dea107c195591d20a1b2-15.26710677._000019.pool.root.1"
ar = uproot.dask({filename: "treeme"}, open_files=False)

In [None]:
pt = ar.JetPt * 5

And lets look at the layers/etc. for this for reference.

In [None]:
pt.compute()

In [None]:
graph = pt.__dask_graph__()
graph

Lets look at the input layer here

In [None]:
from_uproot_key = [k for k in graph.layers.keys() if k.startswith("from")][0]
print(f"From uproot key: {from_uproot_key}")
from_uproot_first_output = list(graph.layers[from_uproot_key].keys())[0]
print(f"From uproot first output: {from_uproot_first_output}")
print(f"The function that gets executed and the arguments: {graph.layers[from_uproot_key][from_uproot_first_output]}")

We want the `('0fc6e51a5ea6dea107c195591d20a1b2-15.26710677._000019.pool.root.1', 'treeme', 0, 1, False)` to be the argument/output from a previous run.

## Toy Demo

Used AI to come up with this (minus a few syntax errors and missing the `Client.get`). This shows how to build everything from scratch.

In [None]:
from collections.abc import Set
import typing


class CustomLayer(Layer):
    def __init__(self, name, dependencies, tasks):
        super().__init__()
        self.name = name
        self.dependencies = dependencies
        self.tasks = tasks

    def __getitem__(self, key):
        return self.tasks[key]

    def __iter__(self):
        return iter(self.tasks)

    def __len__(self):
        return len(self.tasks)

    def is_materialized(self):
        return False
    
    def get_output_keys(self) -> AbstractSet[str | bytes | int | float]:
        return set(self.tasks.keys())

# Define the tasks for each layer
tasks1 = {"output1": (lambda: "result1", ), "output2": (lambda: "result2", )}
tasks2 = {"output3": (lambda x: x + " processed", "output1"), "output4": (lambda x: x + " processed", "output2")}

# Create the layers
layer1 = CustomLayer("layer1", [], tasks1)
layer2 = CustomLayer("layer2", ["layer1"], tasks2)

# Create the high level graph
hlg = HighLevelGraph(
    layers={layer1.name: layer1, layer2.name: layer2},
    dependencies={layer2.name: {layer1.name}, layer1.name: set()},
)

In [None]:
client.get(hlg, ["output4", "output3"])