# Xarray apply_ufunc
Author: Loic Bachelot loic.bachelot@ifremer.fr\
\
In this notebook, you will find a simple example on how to use the xarray apply_ufunc method (http://xarray.pydata.org/en/stable/generated/xarray.apply_ufunc.html)\
The example is: From a dataset with one dimention x and a time dimention, we want o have the most frequent label over time for each coordinate x.\
This example can easily be modified to do any operation over a specific dimention (mean, sum, min, max, etc...)

In [9]:
import numpy as np
import xarray as xr
import time
import dask

## Definition of the core funtion
in this function, we define the transformation to apply over the specific dimension. 
The input x is the array we do the operation over, in our case, the time dimension of one x coordinate\
We use time.sleep(1) to add some precessing time and showcase the parallel execution

In [42]:
def core_funct(x):
    time.sleep(1)
    res = np.nan
    if np.count_nonzero(~np.isnan(x)) == len(x):
        res = np.argmax(np.bincount(x.astype('int')))
    return res

## Creating an example Xarray datarray
We purposely add np.nan values in it as it can happen in real life problems and need to be handled. Further explanation next cell.\
Note that we also chunk our array on the x dimension, the one we will iterate over. Feel free to change this and experience for yourself the impact!

In [43]:
array = xr.DataArray([[np.nan, np.nan, 10], [5, 5, 10], [3, 3, 10]], coords=[("x", [0.1, 0.2, 0.3]), ('time', [10, 11, 12])]).chunk({'x':1})
array

Unnamed: 0,Array,Chunk
Bytes,72 B,24 B
Shape,"(3, 3)","(1, 3)"
Count,3 Tasks,3 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 72 B 24 B Shape (3, 3) (1, 3) Count 3 Tasks 3 Chunks Type float64 numpy.ndarray",3  3,

Unnamed: 0,Array,Chunk
Bytes,72 B,24 B
Shape,"(3, 3)","(1, 3)"
Count,3 Tasks,3 Chunks
Type,float64,numpy.ndarray


# Apply_ufunc
Let's look at the attributes one by one:
- core_funct: the funtion we defined earlier, that will do the operation over each x coordinate array
- array: our input xarray data array
- dask="parallelized": explicitely say dask will handle the parallelization
- input_core_dims=[['time']]: core dimention to pass for processing, thet should no be broadcast
- vectorize=True: precise that core_funct only takes arrays defined over core dimensions as input and vectorize it automatically with numpy.vectorize(). In our case, arrays defined over 'time'
- output_dtypes=np.float: type of output, needed to handle np.nan in case of chunked array
\
\
Full documentation here: http://xarray.pydata.org/en/stable/generated/xarray.apply_ufunc.html

In [44]:
%%time
a = xr.apply_ufunc(core_funct, array, dask="parallelized", input_core_dims=[['time']], vectorize=True, output_dtypes=np.float)
a.compute()

CPU times: user 8.64 ms, sys: 0 ns, total: 8.64 ms
Wall time: 1.01 s


# Without apply_ufunc
Here is an exaple without using apply_ufunc:\
- We define a function to loop over the x dimension and call our core_funct on each x array defined over 'time'
- append results in array and cast to Xarray data array

In [45]:
def get_most_freq_loop(ds):
    mpblab = []
    for i in ds['x']:
        res = core_funct(ds.sel(x=i))
        mpblab.append(res)
    return xr.DataArray(np.array(mpblab))

In [46]:
%%time
b = get_most_freq_loop(array)
b

CPU times: user 45.3 ms, sys: 2.72 ms, total: 48 ms
Wall time: 3.05 s


# Results
The method using apply_ufunc is taking about 1 second total which is the sleep(1) of the core_funct. We are not going over the sleep(1) only one time but 3 times in parallel so only takes 1 second overall.\
The execution using the loop is taking 3 seconds, and this is because we have to wait 1 second before starting the next computation.\
\
Using this approach helped us archive a speedup of about 15x for a plot on the same 8 core machine. Of course it depends on the number of core, and the chunks defined.
I invite you to read dask best practice guide for more information on the chunks: https://docs.dask.org/en/latest/array-best-practices.html 