# Tutorial

Xarray-Beam is a library for writing [Apache Beam](http://beam.apache.org/) pipelines consisting of [xarray](http://xarray.pydata.org) Dataset objects. This tutorial (and Xarray-Beam itself) assumes basic familiarity with both Beam and Xarray.

This tutorial will walk you through the basics of writing a pipeline with Xarray-Beam. We also recommend reading through a few [end to end examples](https://github.com/google/xarray-beam/tree/main/examples) to understand what code using Xarray-Beam typically looks like.

```{note}
Before getting started, it's important to understand that although Xarray-Beam tries to make it _straightforward_ to write distributed pipelines with Xarray objects, but it doesn't try to hide the distributed magic inside high-level objects like [Xarray with Dask](http://xarray.pydata.org/en/stable/user-guide/dask.html) or Dask/Spark DataFrames.

Xarray-Beam is a lower-level tool. You will be manipulating large datasets piece-by-piece yourself, and you as the developer will be responsible for maintaining Xarray-Beam's internal invariants. This means that to successfully use Xarray-Beam, **you will need to understand how how it represents distributed datasets**. This may sound like a lot of responsibility, but we promise that isn't too bad!
```

We'll start off with some standard imports:

In [16]:
import apache_beam as beam
import numpy as np
import xarray_beam as xbeam
import xarray

## Keys in Xarray-Beam

Xarray-Beam is designed around the model that every stage in your Beam pipeline _could_ be stored in a single `xarray.Dataset` object, but is instead represented by a distributed beam `PCollection` of smaller `xarray.Dataset` objects, distributed in two possible ways:

- Distinct _variables_ in a Dataset may be separated across multiple records.
- Individual arrays can also be split into multiple _chunks_, similar to those used by [dask.array](https://docs.dask.org/en/latest/array.html).

To keep track of how individual records could be combined into a larger (virtual) dataset, Xarray-Beam defines a `Key` object. Key objects consist of:

1. `offsets`: integer offests for chunks from the origin in an `immutabledict`
2. `vars`: The subset of variables included in each chunk, either as a `frozenset`, or as `None` to indicate "all variables".

Making a `Key` from scratch is simple:

In [2]:
key = xbeam.Key({'x': 0, 'y': 10}, vars=None)
key

Key(offsets={'x': 0, 'y': 10}, vars=None)

Or given an existing `Key`, you can easily modify it with `replace()` or `with_offsets()`:

In [3]:
key.replace(vars={'foo', 'bar'})

Key(offsets={'x': 0, 'y': 10}, vars={'bar', 'foo'})

In [4]:
key.with_offsets(x=None, z=1)

Key(offsets={'y': 10, 'z': 1}, vars=None)

`Key` objects don't do very much. They are just simple structs with two attributes, along with various special methods required to use them as `dict` keys or as keys in Beam pipelines. You can find a more examples of manipulating keys in the docstring.

## Creating PCollections

The standard inputs & outputs for Xarray-Beam are PCollections of tuples of `(xbeam.Key, xarray.Dataset)` pairs. Xarray-Beam provides a bunch of PCollections for typical tasks, but many pipelines will still involve some manual manipulation of `Key` and `Dataset` objects, e.g., with builtin Beam transforms like `beam.Map`.

To start off, let's write a helper functions for creating our first collection from scratch:

In [5]:
def create_records():
    for offset in [0, 3]:
        key = xbeam.Key({'x': offset})
        chunk = xarray.Dataset({
            'foo': ('x', offset + np.arange(3)),
            'bar': ('x', 10 + offset + np.arange(3)),
        })
        yield key, chunk

Let's take a look the entries, which are lazily constructed with the generator:

In [6]:
inputs = list(create_records())

In [7]:
inputs

[(Key(offsets={'x': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 3)
  Dimensions without coordinates: x
  Data variables:
      foo      (x) int64 0 1 2
      bar      (x) int64 10 11 12),
 (Key(offsets={'x': 3}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 3)
  Dimensions without coordinates: x
  Data variables:
      foo      (x) int64 3 4 5
      bar      (x) int64 13 14 15)]

```{note}
If desired, we could have set `vars={'foo', 'bar'}` on each of these `Key` objects instead of `vars=None`. This would be an equally valid representation of the same records, since all of our datasets have the same variables.
```

We now have the inputs we need to use Xarray-Beam's helper functions and PTransforms. For example, we can fully consolidate chunks & variables to see what single `xarray.Dataset` these values would correspond to:

In [57]:
xbeam.consolidate_fully(inputs)

(Key(offsets={'x': 0}, vars={'bar', 'foo'}),
 <xarray.Dataset>
 Dimensions:  (x: 6)
 Dimensions without coordinates: x
 Data variables:
     foo      (x) int64 0 1 2 3 4 5
     bar      (x) int64 10 11 12 13 14 15)

To execute with Beam, of course, we need to turn Python lists/generators into Beam PCollections, typically with `beam.Create()`, e.g.,

In [62]:
with beam.Pipeline() as p:
    p | beam.Create(create_records()) | beam.Map(print)



(Key(offsets={'x': 0}, vars=None), <xarray.Dataset>
Dimensions:  (x: 3)
Dimensions without coordinates: x
Data variables:
    foo      (x) int64 0 1 2
    bar      (x) int64 10 11 12)
(Key(offsets={'x': 3}, vars=None), <xarray.Dataset>
Dimensions:  (x: 3)
Dimensions without coordinates: x
Data variables:
    foo      (x) int64 3 4 5
    bar      (x) int64 13 14 15)


## Rechunking

We can now make use of transforms for "rechunking" how a dataset to be distributed in a different ways.

### Adjusting variables

The simplest transformation is splitting (or consoldating) different _variables_ in a Dataset with `SplitVariables()` and `ConsolidateVariables()`, e.g.,

In [33]:
inputs | xbeam.SplitVariables()



[(Key(offsets={'x': 0}, vars={'foo'}),
  <xarray.Dataset>
  Dimensions:  (x: 3)
  Dimensions without coordinates: x
  Data variables:
      foo      (x) int64 0 1 2),
 (Key(offsets={'x': 0}, vars={'bar'}),
  <xarray.Dataset>
  Dimensions:  (x: 3)
  Dimensions without coordinates: x
  Data variables:
      bar      (x) int64 10 11 12),
 (Key(offsets={'x': 3}, vars={'foo'}),
  <xarray.Dataset>
  Dimensions:  (x: 3)
  Dimensions without coordinates: x
  Data variables:
      foo      (x) int64 3 4 5),
 (Key(offsets={'x': 3}, vars={'bar'}),
  <xarray.Dataset>
  Dimensions:  (x: 3)
  Dimensions without coordinates: x
  Data variables:
      bar      (x) int64 13 14 15)]

### Adjusting chunks

You can also adjust _chunks_ in a dataset to distribute arrays of different sizes. Here you have two choices of API:

1. The lower level {py:class}`~xarray_beam.SplitChunks` and {py:class}`~xarray_beam.ConsolidateChunks`. These transformations apply a single splitting (with indexing) or consolidation (with {py:function}`xarray.concat`) function to array elements.
2. The high level {py:class}`~xarray_beam.Rechunk`, which uses a pipeline of multiple split/consolidate steps to efficient rechunk a dataset.

For minor adjustments, the more explicit `SplitChunks()` and `ConsolidateChunks()` are good options. They take a dict of _desired_ chunk sizes as a parameter, which can also be `-1` to indicate "no chunking" along a dimension:

In [37]:
inputs | xbeam.ConsolidateChunks({'x': -1})



[(Key(offsets={'x': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 6)
  Dimensions without coordinates: x
  Data variables:
      foo      (x) int64 0 1 2 3 4 5
      bar      (x) int64 10 11 12 13 14 15)]

Note that because these transformations only split _or_ consolidate, they cannot necessary fully rechunk a dataset in a single step if the new chunk sizes are not multiples of old chunks (with consolidate) or do not even divide the old chunks (with split), e.g.,

In [43]:
inputs | xbeam.SplitChunks({'x': 5})  # notice that the first two chunks are still separate!



[(Key(offsets={'x': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 3)
  Dimensions without coordinates: x
  Data variables:
      foo      (x) int64 0 1 2
      bar      (x) int64 10 11 12),
 (Key(offsets={'x': 3}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 2)
  Dimensions without coordinates: x
  Data variables:
      foo      (x) int64 3 4
      bar      (x) int64 13 14),
 (Key(offsets={'x': 5}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 1)
  Dimensions without coordinates: x
  Data variables:
      foo      (x) int64 5
      bar      (x) int64 15)]

For such uneven cases, you'll need to use split followed by consolidate:

In [44]:
inputs | xbeam.SplitChunks({'x': 5}) | xbeam.ConsolidateChunks({'x': 5})



[(Key(offsets={'x': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 5)
  Dimensions without coordinates: x
  Data variables:
      foo      (x) int64 0 1 2 3 4
      bar      (x) int64 10 11 12 13 14),
 (Key(offsets={'x': 5}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 1)
  Dimensions without coordinates: x
  Data variables:
      foo      (x) int64 5
      bar      (x) int64 15)]

Alternatively, `Rechunk()` applies multiple split and consolidate steps based on the [Rechunker](https://github.com/pangeo-data/rechunker) algorithm:

In [49]:
inputs | xbeam.Rechunk(dim_sizes={'x': 6}, source_chunks={'x': 3}, target_chunks={'x': 5}, itemsize=8)



[(Key(offsets={'x': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 5)
  Dimensions without coordinates: x
  Data variables:
      foo      (x) int64 0 1 2 3 4
      bar      (x) int64 10 11 12 13 14),
 (Key(offsets={'x': 5}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 1)
  Dimensions without coordinates: x
  Data variables:
      foo      (x) int64 5
      bar      (x) int64 15)]

`Rechunk` requires specifying a few more parameters, but based on that information it can be _much_ more efficient for more complex rechunking tasks, particular in cases where data needs to be distributed into a very different shape (e.g., distributing a matrix across rows vs. columns). A naive "splitting" approach in such cases could divide datasets into extremely small tasks corresponding to individual array elements, which adds a huge amount of overhead.

TODO: finish this!

- Discuss the nuances of feeding in Dask datasets into DatasetToChunks
- Discuss options for lazy datasets: xarray's lazy indexing vs dask
- ChunksToZarr (including `template`)
- Fancy algorithms for rechunking