In [None]:
import sys
sys.path.insert(0, '../')

# First Steps

This section outlines the steps required to get started with the main features
of the library. Before starting, make sure the library is configured to run on
your machine.

## Initialization of the environment

Before starting, we will create a dataset to handle our collection.

In [None]:
import zcollection.tests.data


def create_dataset():
    generator = zcollection.tests.data.create_test_dataset_with_fillvalue()
    return next(generator)


ds = create_dataset()
ds.to_xarray()

Then we will create a file system in memory.

In [None]:
import fsspec


fs = fsspec.filesystem('memory')

Finally we create a local dask cluster using only threads in order to work
with the file system stored in memory.

In [None]:
import dask.distributed

cluster = dask.distributed.LocalCluster(processes=False)
client = dask.distributed.Client(cluster)
client

## Collection

This introduction will describe the main functionalities allowing to handle a
collection : create, open, load, modify a collection.

Before creating our collection, we define the partitioning of our dataset. In
this example, we will partition the data by **month** using the variable
`time`.

In [None]:
import zcollection

partition_handler = zcollection.partitioning.Date(("time", ), resolution="M")

Finally, we create our collection.

In [None]:
collection = zcollection.create_collection(axis="time",
                                           ds=ds,
                                           partition_handler=partition_handler,
                                           partition_base_dir="/my_collection",
                                           filesystem=fs)

---
**Note**

The collection created can be accessed using the following command: 

    >>> collection = zcollection.open_collection("/my_collection",
    >>>                                          filesystem=fs)
---

When the collection has been created, a configuration file is created. This file
contains all the metadata to ensure that all future inserted data will have the
same features as the existing data (data consistency).

In [None]:
collection.metadata.get_config()

Now that the collection has been created, we can insert new records.

In [None]:
collection.insert(ds)

---
**Note**

When inserting it’s possible to specify the merge strategy of a partition.
By default, the last inserted data overwrite the exising
ones. Others strategy can be defined, for example, to update existing data
(overwrite the updated data, while keeping the existing ones). This last
strategy allows updating incrementally an existing partition.

    >>> import zcollection.merging
    >>> collection.insert(ds, merge_callable=merging.merge_time_series)
---

Let's look at the different partitions thus created.

In [None]:
fs.listdir("/my_collection/year=2000")

This collection is composed of several partitions, but it is always handled as a
single data set.

### Loading data

To load the dataset call the method `load` on the instance.  By default, the
method loads all partitions stored in the collection.

In [None]:
collection.load()

You can also select the partitions to be considered by filtering the partitions
using keywords used for partitioning.

In [None]:
collection.load("year == 2000 and month == 2")

Note that the `load` function may return None if no partition has been selected.

In [None]:
collection.load("year == 2002 and month == 2") is None

### Editing variables

*The functions for modifying collections are not usable if the collection is
open in read-only mode.*

It's possible to delete a variable from a collection.

In [None]:
collection.drop_variable("var2")

In [None]:
collection.load()

**Warning**: The variable used for partitioning cannot be deleted.

In [None]:
collection.drop_variable("time")

The `add_variable` method allows you to add a new variable to the collection.

In [None]:
collection.add_variable(ds.metadata().variables["var2"])

The newly created variable is initialized with its default value.

In [None]:
collection.load().variables["var2"].values

Finally it's possible to update the existing variables.

In this example, we will alter the variable `var2` by setting it to 1 anywhere
the variable `var1` is defined.

In [None]:
def ones(ds):
    return ds.variables["var1"].values * 0 + 1


collection.update(ones, "var2")

In [None]:
collection.load().variables["var2"].values

## Views

A view allows you to extend a collection (a view reference) that you are not allowed to modify.

In [None]:
view = zcollection.create_view("/my_view",
                               zcollection.view.ViewReference(
                                   "/my_collection", fs),
                               filesystem=fs)

When the view is created, it has no data of its own, it uses all the data
defined in the reference view.

In [None]:
fs.listdir("/my_view")

In [None]:
view.load()

Such a state of the view is not very interesting. But it is possible to add and
modify variables in order to enhance the view.

In [None]:
var3 = ds.metadata().variables["var2"]
var3.name = "var3"

In [None]:
view.add_variable(var3)

This step creates all necessary partitions for the new variable.

In [None]:
fs.listdir("/my_view/year=2000")

The new variable is not initialized.

In [None]:
view.load().variables["var3"].values

 The same principle used by the collection allows to update the variables.

In [None]:
view.update(ones, "var3")

In [None]:
var3 = view.load().variables["var3"].values
var2 = view.load().variables["var2"].values
var2 - var3

**Warning**: the variables of the reference collection cannot be edited.

In [None]:
view.update(ones, "var2")

In [None]:
view.load()

Finally, a method allows you to delete variables from the view.

In [None]:
view.drop_variable("var3")

**Warning**: the variables of the reference collection cannot be deleted.

In [None]:
view.drop_variable("var2")

## Indexing

A collection can be indexed. This allows quick access to the data without having
to browse the entire dataset.

We will index another data set. This one contains measurements of a fictitious
satellite on several half-orbits.

___
*Note*

This module is optional. But to use it you need the ``PyArrow`` library.
___

In [None]:
ds = zcollection.Dataset.from_xarray(
    zcollection.tests.data.create_test_sequence(5, 20, 10))
ds

In [None]:
collection = zcollection.create_collection(
        "time",
        ds,
        zcollection.partitioning.Date(("time", ), "M"),
        partition_base_dir=str("/one_other_collection"),
        filesystem=fs)
collection.insert(ds, zcollection.merging.merge_time_series)

Here we have created a collection partitioned by month.

In [None]:
fs.listdir("/one_other_collection/year=2000")

The idea of the implementation is to calculate for each visited partition, the
slice of data that has a constant quantity. In our example, we will rely on the
cycle and pass number information. The first method we will implement is the
detection of these constant parts of two vectors containing the cycle and pass
number.

In [None]:
from typing import Iterator, Tuple

import numpy


def split_half_orbit(
    cycle_number: numpy.ndarray,
    pass_number: numpy.ndarray,
) -> Iterator[Tuple[int, int]]:
    """
    Calculate the indexes of the start and stop of each half-orbit.
    Args:
        pass_number: Pass numbers.
    Returns:
        Iterator of start and stop indexes.
    """
    assert pass_number.shape == cycle_number.shape
    pass_idx = numpy.where(numpy.roll(pass_number, 1) != pass_number)[0]
    cycle_idx = numpy.where(numpy.roll(cycle_number, 1) != cycle_number)[0]

    half_orbit = numpy.unique(
        numpy.concatenate(
            (pass_idx, cycle_idx, numpy.array([pass_number.size],
                                              dtype="int64"))))
    del pass_idx, cycle_idx

    for idx0, idx1 in tuple(zip(half_orbit[:-1], half_orbit[1:])):
        yield idx0, idx1

Now we will compute these constant parts from a dataset contained in a
partition.

In [None]:
def _half_orbit(
    ds: zcollection.Dataset,
    *args,
    **kwargs,
) -> numpy.ndarray:
    """
    Return the indexes of the start and stop of each half-orbit.
    Args:
        ds: Datasets stored in a partition to be indexed.
    Returns:
        Dictionary of start and stop indexes for each half-orbit.
    """
    pass_number_varname = kwargs.pop('pass_number', 'pass_number')
    cycle_number_varname = kwargs.pop('cycle_number', 'cycle_number')
    pass_number = ds.variables[pass_number_varname].values
    cycle_number = ds.variables[cycle_number_varname].values

    generator = ((
        i0,
        i1,
        cycle_number[i0],
        pass_number[i0],
    ) for i0, i1 in split_half_orbit(cycle_number, pass_number))

    return numpy.fromiter(
        generator, numpy.dtype(HalfOrbitIndexer.dtype()))

Finally, we implement our indexing class. The base class implements the index
update and the associated queries.

In [None]:
import pathlib
from typing import List, Optional, Union

import zcollection.indexing


class HalfOrbitIndexer(zcollection.indexing.Indexer):
    """Index collection by half-orbit.
    """
    #: Column name of the cycle number.
    CYCLE_NUMBER = "cycle_number"

    #: Column name of the pass number.
    PASS_NUMBER = "pass_number"

    @classmethod
    def dtype(cls, /, **kwargs) -> List[Tuple[str, str]]:
        """Return the columns of the index.
        Returns:
            A tuple of (name, type) pairs.
        """
        return super().dtype() + [
            (cls.CYCLE_NUMBER, "uint16"),
            (cls.PASS_NUMBER, "uint16"),
        ]

    @classmethod
    def create(
        cls,
        path: Union[pathlib.Path, str],
        ds: zcollection.Collection,
        filesystem: Optional[fsspec.AbstractFileSystem] = None,
        **kwargs,
    ) -> "HalfOrbitIndexer":
        """Create a new index.
        Args:
            path: The path to the index.
            ds: The collection to be indexed.
            filesystem: The filesystem to use.
        Returns:
            The created index.
        """
        return super()._create(path,
                               ds,
                               meta=dict(attribute=b"value"),
                               filesystem=filesystem)  # type: ignore

    def update(
        self,
        ds: zcollection.Collection,
        bag_partition_size: Optional[int] = None,
        bag_npartitions: Optional[int] = None,
        **kwargs,
    ) -> None:
        """
        Update the index.
        Args:
            ds: New data stored in the collection to be indexed.
            bag_partition_size: The length of each bag partition.
            bag_npartitions: The number of desired bag partitions.
            cycle_number: The name of the cycle number variable stored in the
                collection. Defaults to "cycle_number".
            pass_number: The name of the pass number variable stored in the
                collection. Defaults to "pass_number".
        """
        super()._update(ds, _half_orbit, bag_partition_size, bag_npartitions,
                        **kwargs)

Now we can create our index and fill it.

In [None]:
indexer = HalfOrbitIndexer.create("/index.parquet", collection, filesystem=fs)
indexer.update(collection)

The following command allows us to view the information stored in our index: the
first and last indexes of the partition associated with the registered
half-orbit number and the identifier of the indexed partition.

In [None]:
indexer.table.to_pandas()

This index can now be used to load a part of a collection.

In [None]:
selection = collection.load(indexers=indexer.query(dict(pass_number=[1, 2])))
selection.to_xarray().compute()