In [1]:
import numpy as np
import xarray as xr

from scipy.sparse import coo_matrix

import dask
import dask.array as da
from dask.distributed import Client, progress
from dask_kubernetes import KubeCluster

import gcsfs

# Prepare weights and function

Extend [scatter_large_weights.ipynb](./scatter_large_weights.ipynb) to a full pipeline. Again assume that weights already generated by [make_large_weights.ipynb](./make_large_weights.ipynb). 

In [2]:
!du -h bilinear_1001x2000_1200x2400.nc

176M	bilinear_1001x2000_1200x2400.nc


In [3]:
# read regridding weights from disk
ds = xr.open_dataset("bilinear_1001x2000_1200x2400.nc")
n_s = ds.dims['n_s']
col = ds['col'].values - 1
row = ds['row'].values - 1
S = ds['S'].values
A = coo_matrix((S, (row, col)), shape=[2880000, 2002000]) 
A.shape

(2880000, 2002000)

In [4]:
def apply_A(data, A):
    # make sure this is a pure function without external data
    return A.dot(data.T).T

# Login

In [5]:
# need to provide token for write access
fs = gcsfs.GCSFileSystem(project='pangeo-181919', token='/home/jovyan/application_default_credentials.json')

In [6]:
# large_array.zarr is pre-generated input data (just a large array of random numbers)
fs.ls('pangeo-data/xESMF_test')

['pangeo-data/xESMF_test/large_array.zarr/',
 'pangeo-data/xESMF_test/output_array.zarr/',
 'pangeo-data/xESMF_test/test_data.zarr/']

In [7]:
# Set credential for dask distributed 
# https://github.com/pangeo-data/pangeo/issues/334#issuecomment-404329498
fs2 = gcsfs.GCSFileSystem(token=fs.session.credentials, project=fs.project)

In [8]:
gcsmap = gcsfs.mapping.GCSMap('pangeo-data/xESMF_test/large_array.zarr', gcs=fs2, check=True)

In [9]:
ds_gcs = xr.open_zarr(gcsmap)
ds_gcs

<xarray.Dataset>
Dimensions:  (dim_0: 1600, dim_1: 2002000)
Dimensions without coordinates: dim_0, dim_1
Data variables:
    x        (dim_0, dim_1) float64 dask.array<shape=(1600, 2002000), chunksize=(5, 2002000)>

In [10]:
ds_gcs.nbytes / 1e9 # input in GB

25.6256

# Cluster

In [11]:
cluster = KubeCluster(n_workers=20)
cluster

VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .…

In [14]:
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://10.21.75.17:38777  Dashboard: /user/jiaweizhuang/proxy/8787/status,Cluster  Workers: 20  Cores: 40  Memory: 120.00 GB


# Build pipeline

In [15]:
x_dask_array = ds_gcs['x'].data
x_dask_array

dask.array<zarr, shape=(1600, 2002000), dtype=float64, chunksize=(5, 2002000)>

In [16]:
# manually scatter out weights before calling regridding
# make sure this is called after all workers come to live
A_future = client.scatter(A, broadcast=True)
progress(A_future)

VBox()

In [17]:
out_dask_array = da.map_blocks(apply_A, x_dask_array, A_future,
                              dtype=np.float64, chunks=[5, A.shape[0]])

In [18]:
out_dask_array.nbytes / 1e9 # output in GB

36.864

In [19]:
ds_out = xr.DataArray(out_dask_array, name='x').to_dataset()
ds_out

<xarray.Dataset>
Dimensions:  (dim_0: 1600, dim_1: 2880000)
Dimensions without coordinates: dim_0, dim_1
Data variables:
    x        (dim_0, dim_1) float64 dask.array<shape=(1600, 2880000), chunksize=(5, 2880000)>

# Trigger computation

In [20]:
map_out = gcsfs.mapping.GCSMap('pangeo-data/xESMF_test/output_array.zarr', gcs=fs2, check=True)

In [21]:
%%time
ds_out.to_zarr(store=map_out, mode='w')

CPU times: user 9.04 s, sys: 642 ms, total: 9.68 s
Wall time: 1min 1s


<xarray.backends.zarr.ZarrStore at 0x7f5d74265c88>

# Check output result

In [22]:
map_out = gcsfs.mapping.GCSMap('pangeo-data/xESMF_test/output_array.zarr', gcs=fs2, check=True)
xr.open_zarr(map_out)

<xarray.Dataset>
Dimensions:  (dim_0: 1600, dim_1: 2880000)
Dimensions without coordinates: dim_0, dim_1
Data variables:
    x        (dim_0, dim_1) float64 dask.array<shape=(1600, 2880000), chunksize=(5, 2880000)>