# Applying functions to data files in batch

This package contains functionality for defining a Python function and applying it to remote files in batches using the `CloudBatch_apply` class. This notebook gives an example of how to use this class.

In the getting_started notebook, we looked at creating batch object. These objects describe a list of files (local or remote on google buckets), separating them into batches and then downloading or uploading the data. Here, we extend that to show how we can easily perform a workflow with the following steps:

    1. Download data from a google bucket in batches.
    2. Apply a function to this batch of data one file at a time.
    3. Save the analysed data to new files.
    4. Push the resulting batch of new files back to the google bucket.
    
The `CloudBatch_apply` object works by performing the following loop on any batch objects that are passed:

1. Download data using `gsutil -m cp` from any batch objects where `source='remote'`. Data is downloaded to directory argument `get_dir` (required). 
2. Pass files from the current batch of all objects to a user provided function in the same order that batches were passed to CloudBatch_apply.
3. Function can be applied to batch files one at a time (`pass_args='one'`) or all at once (`pass_args = 'all'`).
4. Concatenate any output from all function applications in the batch.
5. Upload data using `gsutil -m cp` to remote directory `put_dir` (required if there are objects with `source='local'`). 
6. Delete temporary downloaded files from this batch.
7. If `delete_put_files == True` then delete any batch files described by any objects where `source = 'local'`.
8. Cycle all objects up to the next batch.
9. Go back to step one until all batches are cycled.

So let's start by importing things:

In [1]:
%load_ext autoreload
%autoreload 2
from cloudbatch import CloudBatch, CloudBatch_apply
import xarray as xr
import os

Next, we're going to create two CloudBatch objects. The first describes the data in the google bucket. The second describes local data that does not exist yet (but will once we have analysed bucket data).

In [3]:
# Create a batch object describing things on the cloud
gsb_get = CloudBatch(file_dir = "gs://<bucket>/<dir>",
                  file_list = '*.nc',
                  source='remote',
                  batch_size=4,
                  get_dir = "<local directory>")

# Create local batch object with same number of files and batch size
# as the remote batch object. These will be names test0.nc, ..., testN.nc
output_filenames = [f"test{ii}.nc" for ii in range(gsb_get.n_files)]
gsb_put = CloudBatch(file_dir = "<local directory>",
                  file_list = "<anticipated filenames>",
                  source='local',
                  batch_size=4,
                  put_dir = "gs://<bucket>/<dir>")

Next we need to define a function to apply. Here we are going to take two file names `fp_in` and `fp_out`. We will open `fp_in` and take the mean of the data along a single axis (`'x'`) and save it to `fp_out`. We're using `xarray` to do this here, so that it can be done over multiple cores in parallel.

We also return an integer (1) when the routine has been successful just to show how CloudBatch_apply will concatenate outputs.

In [4]:
def batch_func(fp_in, fp_out):
    ds = xr.open_dataset(fp_in, chunks={'lat':1e3, 'lon':1e3})
    data_out = ds.mean(dim='lat')
    data_out.to_netcdf(fp_out)
    return 1

Now we call `CloudBatch_apply`:

In [None]:
%%time
app = CloudBatch_apply(func = batch_func, 
                    batch = [gsb_get, gsb_put],
                    verbosity = 2,
                    pass_args = 'one',
                    delete_put_files = True)