In [18]:
import numpy as np
import xarray as xr
import bottleneck
from sys import getsizeof

def covariance_gufunc(x, y):
    return ((x - x.mean(axis=-1, keepdims=True))
            * (y - y.mean(axis=-1, keepdims=True))).mean(axis=-1)

def pearson_correlation_gufunc(x, y):
    return covariance_gufunc(x, y) / (x.std(axis=-1) * y.std(axis=-1))

def spearman_correlation_gufunc(x, y):
    x_ranks = bottleneck.rankdata(x, axis=-1)
    y_ranks = bottleneck.rankdata(y, axis=-1)
    return pearson_correlation_gufunc(x_ranks, y_ranks)

def spearman_correlation(x, y, dim):
    return xr.apply_ufunc(
        spearman_correlation_gufunc, x, y,
        input_core_dims=[[dim], [dim]],
        dask='parallelized',
        output_dtypes=[float])

In [19]:
rs = np.random.RandomState(0)

In [20]:
array1 = xr.DataArray(rs.randn(10000, 100000), dims=['place', 'time'])

In [21]:
array2 = array1 + 0.5 * rs.randn(10000, 100000)

In [22]:
array1.nbytes*9.31*10**-10

7.448

### using one core, on NumPy arrays

In [23]:
%time _ = spearman_correlation(array1, array2, 'time')

CPU times: user 3min 41s, sys: 37.3 s, total: 4min 19s
Wall time: 4min


### using 10 cores, with Dask

In [30]:
from dask.distributed import Client, progress

client = Client(n_workers=8, threads_per_worker=2)
client

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


0,1
Client  Scheduler: tcp://127.0.0.1:43572  Dashboard: http://127.0.0.1:40312/status,Cluster  Workers: 8  Cores: 16  Memory: 1.08 TB


In [31]:
chunked1 = array1.chunk({'place': 10})
chunked2 = array2.chunk({'place': 10})

In [32]:
r = spearman_correlation(chunked1, chunked2, 'time')

  import sys
  
  keepdims=keepdims)


In [33]:
%time _ = r.compute()

CPU times: user 27 s, sys: 39.2 s, total: 1min 6s
Wall time: 3min 8s


In [29]:
client.close()