# Advanced Features

*Before reading this notebook we recommend reading [the basic notebook first!](io-00-basic.ipynb)*

_last updated 2024-01-26_

All of the high level file format readers in `dask-awkward` are based on a lower level API: the `from_map` function. This function provides an interface that allows any user defined function to be used as a source of awkward arrays at the nodes in a Dask graph.

A very simple usage of the `from_map` API would be to re-create `from_parquet`:

```python
dak.from_map(
    ak.from_parquet,
    ["/path/to/some/file1.parquet", "/path/to/some/file2.parquet"],
    label="my-from-parquet",
)
```

This will create a `dask-awkward` collection that calls `ak.from_parquet` on those two files, which as stated above, is a simple recreation of `dak.from_parquet` (obviously less flexible/powerful than `from_parquet`! but one should get the idea)

The power of `from_map` materializes when one would like to take advantage of column optimization or gracefully fail, returning an empty array instead of a program crashing, at some nodes where read issues surface. We can begin to demonstrate these features by defining a function class to be passed in as the first argument to `from_map`.

Our example will be a special Parquet reader that rejects any file that contains a "0" in the filename. For some reason we've found that data to be corrupt, but we want to be able still process the whole directory and not manually skip those files

We'll write out the class implementation and then explain each of the methods:

In [1]:
from __future__ import annotations

from typing import Any

import awkward as ak
import dask
import dask_awkward as dak
from dask_awkward.lib.io.columnar import ColumnProjectionMixin

class Ignore0ParquetReader(ColumnProjectionMixin):
    def __init__(
        self,
        form: Form,
        report: bool = False,
        allowed_exceptions: tuple[type[BaseException], ...] = (OSError,),
        columns: list[str] | None = None,
        behavior: dict | None = None,
        **kwargs: Any
    ):
        self.form = form
        self.report = report
        self.allowed_exceptions = allowed_exceptions
        self.columns = columns
        self.behavior = behavior
        self.kwargs = kwargs

    @property
    def return_report(self) -> bool:
        return self.report

    @property
    def use_optimization(self) -> bool:
        return True

    @staticmethod
    def report_success(source, columns) -> ak.Array:
        return ak.Array([{"source": source, "exception": None, "columns": columns}])

    @staticmethod
    def report_failure(source, exception) -> ak.Array:
        return ak.Array([{"source": source, "exception": repr(exception), "columns": None}])

    def mock(self) -> ak.Array:
        return ak.typetracer.typetracer_from_form(self.form, highlevel=True)

    def mock_empty(self, backend="cpu") -> ak.Array:
        return ak.to_backend(self.form.length_one_array(highlevel=False), backend=backend, highlevel=True)

    def read_from_disk(self, source: Any) -> ak.Array:
        if "0" in source:
            raise OSError("cannot read files that contain '0' in the name")
        return ak.from_parquet(source, columns=self.columns, **self.kwargs)

    def __call__(self, *args, **kwargs):
        source = args[0]
        if self.return_report:
            try:
                array = self.read_from_disk(source)
                return array, self.report_success(source, self.columns)
            except self.allowed_exceptions as err:
                array = self.mock_empty()
                return array, self.report_failure(source, err)
        else:
            return self.read_from_disk(source)        

    def project_columns(self, columns):
        return Ignore0ParquetReader(
            form=self.form.select_columns(columns),
            report=self.return_report,
            allowed_exceptions=self.allowed_exceptions,
            columns=columns,
            **self.kwargs,
        )


def my_read_parquet(path, columns=None, allowed_exceptions=(OSError,)):
    pq_files = [os.path.join(path, f) for f in os.listdir(path) if f.endswith("parquet")]
    meta_from_pq = ak.metadata_from_parquet(pq_files)
    form = meta_from_pq["form"]
    fn = Ignore0ParquetReader(form, report=True, allowed_exceptions=allowed_exceptions)
    return dak.from_map(fn, pq_files)

Here's why we have each of the methods!

- Starting with inheriting the `ColumnProjectionMixin`, inheriting from this mixin makes the class compatible with column optimization.
- `__init__`: of course this is needed. It's going to take the starting form that the array should have, a tuple of exceptions that will be allowed to be raised at compute time that we can gracefully absorb, the columns to read, the awkward-array behavior that should be used, and additional kwargs that should be passed at each node's call of `ak.from_parquet
- `return_report`: a class property that will tell `from_map` whether or not we will also return a report array
- `use_optimization`: a class property that tells the columns optimization that we want this function class to be columns optimizable.
- `report_success`: a static method that will be used to construct an report array when the read is successful at a partition
- `report_failure`: the parter to `report_success`, if one of the allowed exceptions is raised at a partition at array creation time, this method will be called to construct an report array
- `mock`: a method that "mocks" the array that would be created, returns a dataless typetracer array
- `mock_empty`: a method that mocks the array but is not a typetracer array, it's an empty concrete awkward array. This is the method that is used at nodes that fail with an allowed exception.
- `read_from_disk`: this is the method that will be called to... read data from disk! What actually matters more is the next method:
- `__call__`: we finally get to the "function" part of this class: This method will be called at each partition. You'll notice that we call `read_from_disk` here, but we wrap it in a `try`, `except` block if we want to return the read-report that allows for graceful fails
- `project_columns`: this method is necessary for rewriting the class instructing it to read a new set of columns. This method is part of the optimization interface

Finally, we write a function that is going to use this function class and call `from_map

Let's use it to read our parquet dataset and look at both the resulting array and the post-compute report. Notice that the report itself is a lazily evaluated dask-awkward Array collection that should be computed simultaneously with the collection-of-interest.

In [2]:
pq_dir = os.path.join("data", "parquet")
dataset, report = my_read_parquet(pq_dir)

In [3]:
result, computed_report = dask.compute(dataset, report)

In [4]:
result

In [5]:
computed_report.tolist()

[{'source': 'data/parquet/part0.parquet',
  'exception': 'OSError("cannot read files that contain \'0\' in the name")',
  'columns': None},
 {'source': 'data/parquet/part2.parquet',
  'exception': None,
  'columns': ['type', 'scoring.distance', 'scoring.basket', 'scoring.player']},
 {'source': 'data/parquet/part3.parquet',
  'exception': None,
  'columns': ['type', 'scoring.distance', 'scoring.basket', 'scoring.player']},
 {'source': 'data/parquet/part1.parquet',
  'exception': None,
  'columns': ['type', 'scoring.distance', 'scoring.basket', 'scoring.player']}]

We can see in the report that the file with a "0" in the name indeed failed!

You'll see that we added the columns that are read to the report as well, so if we perform a compute that will only need a subset of the columns, we can get confirmation from our report array. We get the column optimization by inheriting from the column optimization mixin!

In [6]:
dak.necessary_columns(dataset.scoring.player)

{'<__main__.Ignore0ParquetReader object at 0x7fbc1c5-98de39e045724a64b44ebd0cc521dc4e': frozenset({'scoring.player'})}

In [7]:
result, computed_report= dask.compute(dataset.scoring.player, report)

In [8]:
computed_report.tolist()

[{'source': 'data/parquet/part0.parquet',
  'exception': 'OSError("cannot read files that contain \'0\' in the name")',
  'columns': None},
 {'source': 'data/parquet/part2.parquet',
  'exception': None,
  'columns': ['scoring.player']},
 {'source': 'data/parquet/part3.parquet',
  'exception': None,
  'columns': ['scoring.player']},
 {'source': 'data/parquet/part1.parquet',
  'exception': None,
  'columns': ['scoring.player']}]