In [21]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [22]:
import xarray as xr
import numpy as np
import matplotlib.pyplot as plt 
import pandas as pd
import intake 
import pprint
import cftime 
import psutil
import xesmf as xe

In [23]:
import dask
from dask.diagnostics import ProgressBar

In [24]:
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from dask.diagnostics import visualize

In [25]:
from bokeh.resources import INLINE
import bokeh.io
from bokeh import *
bokeh.io.output_notebook(INLINE)

## Regrid

In [26]:
url = 'https://storage.googleapis.com/cmip6/pangeo-cmip6.json'
raw_cat = intake.open_esm_datastore(url)

In [27]:
cat = raw_cat.search(
    experiment_id=['ssp585'],
    variable_id= 'tas',
    table_id = 'day',
    source_id = 'EC-Earth3'
)

In [28]:
cat

Unnamed: 0,unique
activity_id,1
institution_id,1
source_id,1
experiment_id,1
member_id,58
table_id,1
variable_id,1
grid_label,1
zstore,58
dcpp_init_year,0


In [41]:
dset = cat.to_dataset_dict(zarr_kwargs={'consolidated':True}, storage_options={"anon": True}, aggregate=False)


--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.institution_id.source_id.experiment_id.member_id.table_id.variable_id.grid_label.zstore.dcpp_init_year.version'


In [34]:
sorted(dset.keys())

['ScenarioMIP.EC-Earth-Consortium.EC-Earth3.ssp585.day.gr']

In [36]:
dset['ScenarioMIP.EC-Earth-Consortium.EC-Earth3.ssp585.day.gr']

Unnamed: 0,Array,Chunk
Bytes,4.03 kiB,4.03 kiB
Shape,"(258, 2)","(258, 2)"
Count,10 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 4.03 kiB 4.03 kiB Shape (258, 2) (258, 2) Count 10 Tasks 1 Chunks Type float64 numpy.ndarray",2  258,

Unnamed: 0,Array,Chunk
Bytes,4.03 kiB,4.03 kiB
Shape,"(258, 2)","(258, 2)"
Count,10 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,8.00 kiB,8.00 kiB
Shape,"(512, 2)","(512, 2)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 8.00 kiB 8.00 kiB Shape (512, 2) (512, 2) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",2  512,

Unnamed: 0,Array,Chunk
Bytes,8.00 kiB,8.00 kiB
Shape,"(512, 2)","(512, 2)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,490.80 kiB,245.41 kiB
Shape,"(31411, 2)","(15706, 2)"
Count,3 Tasks,2 Chunks
Type,datetime64[ns],numpy.ndarray
"Array Chunk Bytes 490.80 kiB 245.41 kiB Shape (31411, 2) (15706, 2) Count 3 Tasks 2 Chunks Type datetime64[ns] numpy.ndarray",2  31411,

Unnamed: 0,Array,Chunk
Bytes,490.80 kiB,245.41 kiB
Shape,"(31411, 2)","(15706, 2)"
Count,3 Tasks,2 Chunks
Type,datetime64[ns],numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,896.52 GiB,64.50 MiB
Shape,"(58, 31411, 258, 512)","(1, 128, 258, 512)"
Count,186892 Tasks,42050 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 896.52 GiB 64.50 MiB Shape (58, 31411, 258, 512) (1, 128, 258, 512) Count 186892 Tasks 42050 Chunks Type float32 numpy.ndarray",58  1  512  258  31411,

Unnamed: 0,Array,Chunk
Bytes,896.52 GiB,64.50 MiB
Shape,"(58, 31411, 258, 512)","(1, 128, 258, 512)"
Count,186892 Tasks,42050 Chunks
Type,float32,numpy.ndarray


In [77]:
ds1 = list(dset.values())[2].isel(time=slice(0, 10)).load()
ds2 = list(dset.values())[0].isel(time=slice(0, 10)).load()

In [78]:
ds1 = ds1.assign_coords({'member_id': ds1.variant_label}).expand_dims('member_id')
ds2 = ds2.assign_coords({'member_id': ds2.variant_label}).expand_dims('member_id')

In [79]:
ds1 = ds1.chunk({'time': 100_000, 'lon': 5, 'lat': 5, 'member_id': 1000})
ds2 = ds2.chunk({'time': 100_000, 'lon': 5, 'lat': 5, 'member_id': 1000})

In [80]:
del ds1.attrs['intake_esm_varname']
del ds2.attrs['intake_esm_varname']

In [81]:
def drop_bounds_height(ds):
        
        """Drop coordinates like 'time_bounds' from datasets,
        which can lead to issues when merging."""
        drop_vars = [vname for vname in ds.coords
                if (('_bounds') in vname ) or ('_bnds') in vname or ('height') in vname]
        return ds.drop(drop_vars)

In [82]:
ds1 = drop_bounds_height(ds1)
ds2 = drop_bounds_height(ds2)

In [83]:
ds1

Unnamed: 0,Array,Chunk
Bytes,5.00 MiB,0.98 kiB
Shape,"(1, 10, 256, 512)","(1, 10, 5, 5)"
Count,5356 Tasks,5356 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 5.00 MiB 0.98 kiB Shape (1, 10, 256, 512) (1, 10, 5, 5) Count 5356 Tasks 5356 Chunks Type float32 numpy.ndarray",1  1  512  256  10,

Unnamed: 0,Array,Chunk
Bytes,5.00 MiB,0.98 kiB
Shape,"(1, 10, 256, 512)","(1, 10, 5, 5)"
Count,5356 Tasks,5356 Chunks
Type,float32,numpy.ndarray


In [88]:
ds1.to_zarr('gcs://climateai_data_repository/tmp/tests/global_cmip_append_test2.zarr', consolidated=True, mode='a', append_dim='member_id')

ValueError: append_dim='member_id' does not match any existing dataset dimensions {}

In [86]:
ds2.to_zarr('gcs://climateai_data_repository/tmp/tests/global_cmip_append_test1.zarr', consolidated=True, mode='a'ds1 = ds1.assign_coords({'member_id': ds1.variant_label}).expand_dims('member_id'))

<xarray.backends.zarr.ZarrStore at 0x7fd91b6660b0>

In [85]:
xr.open_zarr('gcs://climateai_data_repository/tmp/tests/global_cmip_append_test1.zarr', consolidated=True)

Unnamed: 0,Array,Chunk
Bytes,5.00 MiB,0.98 kiB
Shape,"(1, 10, 256, 512)","(1, 10, 5, 5)"
Count,5357 Tasks,5356 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 5.00 MiB 0.98 kiB Shape (1, 10, 256, 512) (1, 10, 5, 5) Count 5357 Tasks 5356 Chunks Type float32 numpy.ndarray",1  1  512  256  10,

Unnamed: 0,Array,Chunk
Bytes,5.00 MiB,0.98 kiB
Shape,"(1, 10, 256, 512)","(1, 10, 5, 5)"
Count,5357 Tasks,5356 Chunks
Type,float32,numpy.ndarray


In [87]:
xr.open_zarr('gcs://climateai_data_repository/tmp/tests/global_cmip_append_test1.zarr', consolidated=True)

Unnamed: 0,Array,Chunk
Bytes,10.00 MiB,0.98 kiB
Shape,"(2, 10, 256, 512)","(1, 10, 5, 5)"
Count,10713 Tasks,10712 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 10.00 MiB 0.98 kiB Shape (2, 10, 256, 512) (1, 10, 5, 5) Count 10713 Tasks 10712 Chunks Type float32 numpy.ndarray",2  1  512  256  10,

Unnamed: 0,Array,Chunk
Bytes,10.00 MiB,0.98 kiB
Shape,"(2, 10, 256, 512)","(1, 10, 5, 5)"
Count,10713 Tasks,10712 Chunks
Type,float32,numpy.ndarray


In [12]:
dx = 2.5
ds_ref = xr.Dataset(
    {
        "lat": (["lat"], np.arange(-90 + dx, 90, dx)),
        "lon": (["lon"], np.arange(0, 360, dx)),
    }
)
ds_ref

In [13]:
regridder = xe.Regridder(ds, ds_ref, "bilinear", periodic=True, )

In [34]:
ds_out = regridder(ds)

  ds_out = xr.apply_ufunc(


In [35]:
ds_out

Unnamed: 0,Array,Chunk
Bytes,490.80 kiB,245.41 kiB
Shape,"(31411, 2)","(15706, 2)"
Count,3 Tasks,2 Chunks
Type,datetime64[ns],numpy.ndarray
"Array Chunk Bytes 490.80 kiB 245.41 kiB Shape (31411, 2) (15706, 2) Count 3 Tasks 2 Chunks Type datetime64[ns] numpy.ndarray",2  31411,

Unnamed: 0,Array,Chunk
Bytes,490.80 kiB,245.41 kiB
Shape,"(31411, 2)","(15706, 2)"
Count,3 Tasks,2 Chunks
Type,datetime64[ns],numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.39 GiB,10.14 MiB
Shape,"(31411, 71, 144)","(130, 71, 144)"
Count,969 Tasks,242 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.39 GiB 10.14 MiB Shape (31411, 71, 144) (130, 71, 144) Count 969 Tasks 242 Chunks Type float64 numpy.ndarray",144  71  31411,

Unnamed: 0,Array,Chunk
Bytes,2.39 GiB,10.14 MiB
Shape,"(31411, 71, 144)","(130, 71, 144)"
Count,969 Tasks,242 Chunks
Type,float64,numpy.ndarray


In [39]:
ds_out = ds_out.chunk({'time': 30_000, 'lat': 5, 'lon': 5})

In [40]:
%%time
ds_out.to_zarr('gcs://climateai_data_repository/tmp/global_cmip_test.zarr' ,consolidated=True)

CPU times: user 1min 10s, sys: 26.7 s, total: 1min 36s
Wall time: 1min 7s


<xarray.backends.zarr.ZarrStore at 0x7f94edbb5040>

In [93]:
from src.global_regridding import *

In [94]:
regrid_global(2.5, 'climateai_data_repository', 'tmp/global_cmip_2.5deg', 'MIROC6', 'historical', 'tas')


--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.institution_id.source_id.experiment_id.member_id.table_id.variable_id.grid_label.zstore.dcpp_init_year.version'


Saving: gcs://climateai_data_repository/tmp/global_cmip_2.5deg/MIROC6/historical/day/tas.zarr


  ds_out = xr.apply_ufunc(
  ds_out = xr.apply_ufunc(
  ds_out = xr.apply_ufunc(
distributed.utils - ERROR - ('fetch', 'memory')
Traceback (most recent call last):
  File "/opt/conda/envs/analogs/lib/python3.9/site-packages/distributed/utils.py", line 638, in log_errors
    yield
  File "/opt/conda/envs/analogs/lib/python3.9/site-packages/distributed/worker.py", line 2435, in gather_dep
    self.transition(ts, "memory", value=data[d])
  File "/opt/conda/envs/analogs/lib/python3.9/site-packages/distributed/worker.py", line 1716, in transition
    func = self._transitions[start, finish]
KeyError: ('fetch', 'memory')
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7fb288a6b8b0>>, <Task finished name='Task-328652' coro=<Worker.gather_dep() done, defined at /opt/conda/envs/analogs/lib/python3.9/site-packages/distributed/worker.py:2291> exception=KeyError(('fetch', 'memory')

KeyboardInterrupt: 

## load global dataset

In [2]:
import xarray as xr

In [6]:
xr.open_zarr('gcs://climateai_data_repository/tmp/global_cmip_2.5deg/cesm_lens/ssp585/day/tas.zarr', consolidated=True)

Unnamed: 0,Array,Chunk
Bytes,7.17 GiB,5.99 MiB
Shape,"(3, 31390, 71, 144)","(1, 31390, 5, 5)"
Count,1306 Tasks,1305 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 7.17 GiB 5.99 MiB Shape (3, 31390, 71, 144) (1, 31390, 5, 5) Count 1306 Tasks 1305 Chunks Type float64 numpy.ndarray",3  1  144  71  31390,

Unnamed: 0,Array,Chunk
Bytes,7.17 GiB,5.99 MiB
Shape,"(3, 31390, 71, 144)","(1, 31390, 5, 5)"
Count,1306 Tasks,1305 Chunks
Type,float64,numpy.ndarray


In [5]:
xr.open_zarr('gcs://climateai_data_repository/tmp/global_cmip_2.5deg/MIROC6/historical/day/tas.zarr', consolidated=True)

Unnamed: 0,Array,Chunk
Bytes,2.64 GiB,6.62 MiB
Shape,"(1, 34699, 71, 144)","(1, 34699, 5, 5)"
Count,436 Tasks,435 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.64 GiB 6.62 MiB Shape (1, 34699, 71, 144) (1, 34699, 5, 5) Count 436 Tasks 435 Chunks Type float64 numpy.ndarray",1  1  144  71  34699,

Unnamed: 0,Array,Chunk
Bytes,2.64 GiB,6.62 MiB
Shape,"(1, 34699, 71, 144)","(1, 34699, 5, 5)"
Count,436 Tasks,435 Chunks
Type,float64,numpy.ndarray


## Compute extremes

In [6]:
from google.api_core import page_iterator
from google.cloud import storage

def _item_to_value(iterator, item):
    return item

def list_directories(bucket_name, prefix):
    if prefix and not prefix.endswith('/'):
        prefix += '/'

    extra_params = {
        "projection": "noAcl",
        "prefix": prefix,
        "delimiter": '/'
    }

    gcs = storage.Client()

    path = "/b/" + bucket_name + "/o"

    iterator = page_iterator.HTTPIterator(
        client=gcs,
        api_request=gcs._connection.api_request,
        path=path,
        items_key='prefixes',
        item_to_value=_item_to_value,
        extra_params=extra_params,
    )

    return [f'gcs://{bucket_name}/{x}' for x in iterator]

In [7]:
zarrs = list_directories('climateai_data_repository', 'tmp/global_cmip_2.5deg/MIROC6/historical/day/tas')

In [8]:
dss = [xr.open_zarr(z, consolidated=True) for z in zarrs]

In [9]:
ds = xr.concat(dss, 'member')

In [10]:
ds

Unnamed: 0,Array,Chunk
Bytes,229.53 GiB,5.72 MiB
Shape,"(50, 60265, 71, 144)","(1, 30000, 5, 5)"
Count,195800 Tasks,65250 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 229.53 GiB 5.72 MiB Shape (50, 60265, 71, 144) (1, 30000, 5, 5) Count 195800 Tasks 65250 Chunks Type float64 numpy.ndarray",50  1  144  71  60265,

Unnamed: 0,Array,Chunk
Bytes,229.53 GiB,5.72 MiB
Shape,"(50, 60265, 71, 144)","(1, 30000, 5, 5)"
Count,195800 Tasks,65250 Chunks
Type,float64,numpy.ndarray


In [11]:
ds = ds.chunk({'time': -1, 'member': -1})

In [12]:
import dask

dask.config.set({"distributed.workers.memory.spill": 0.90})
dask.config.set({"distributed.workers.memory.target": 0.80})
dask.config.set({"distributed.workers.memory.terminate": 0.98})

<dask.config.set at 0x7fda598bf130>

In [13]:
cluster = dask.distributed.LocalCluster(
            n_workers=8,
            threads_per_worker=1,
#             silence_logs=logging.ERROR
)
client = dask.distributed.Client(cluster)

In [14]:
from tqdm import tqdm
def loop_over_chunks(da, f, n_chunks=4, restart_every=10):
    n_lat, n_lon = da.data.shape[2:]
    chunksize_lat, chunksize_lon = da.data.chunksize[2:]
    n_chunks_lat = (n_lat + chunksize_lat) // chunksize_lat
    n_chunks_lon = (n_lon + chunksize_lon) // chunksize_lon
    out_lat = []
    counter = 0
    pbar = tqdm(total=n_chunks_lat*((n_chunks_lon + n_chunks) // n_chunks))
    for i in range(n_chunks_lat):
        lat_start = i * chunksize_lat
        lat_end = (i + 1) * chunksize_lat
        out_lon = []
        for j in range((n_chunks_lon + n_chunks) // n_chunks):
            lon_start = j * (chunksize_lon * n_chunks)
            lon_end = (j + 1) * (chunksize_lon * n_chunks)
    #         print((lat_start, lat_end), (lon_start, lon_end))
            chunk = ds.isel(lat=slice(lat_start, lat_end), lon=slice(lon_start, lon_end))
            result = f(chunk).compute()
            out_lon.append(result)
            counter += 1
            pbar.update(1)
            if counter % restart_every == 0:
                client.restart()
        out_lat.append(xr.concat(out_lon, 'lon'))
    out = xr.concat(out_lat, 'lat')

In [15]:
def quantile_func(da):
    return da.quantile(0.999, ('time', 'member'))

In [17]:
with ResourceProfiler(dt=0.25) as rprof:
    qu = loop_over_chunks(ds.tas, quantile_func)
    

100%|██████████| 120/120 [23:19<00:00, 11.66s/it]


In [20]:
rprof.visualize()

In [14]:
# %%time
# ds.isel(lat=0, lon=0).quantile(0.999, ('time', 'member')).compute()

In [15]:
# dask.config.set(scheduler='synchronous')
# dask.config.set(scheduler='threads')
# from dask.distributed import Client, LocalCluster
# cluster = LocalCluster()
# client = Client(cluster)

In [16]:
# import ctypes

# def trim_memory() -> int:
#     libc = ctypes.CDLL("libc.so.6")
#     return libc.malloc_trim(0)

# client.run(trim_memory)

In [18]:
qu

In [48]:
occ = ds > qu

In [49]:
occ = occ.resample(time='AS').mean().rolling(time=10, center=True).mean()

In [50]:
occ

Unnamed: 0,Array,Chunk
Bytes,1.57 MiB,97.66 kiB
Shape,"(165, 50, 5, 5)","(10, 50, 5, 5)"
Count,198251 Tasks,17 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.57 MiB 97.66 kiB Shape (165, 50, 5, 5) (10, 50, 5, 5) Count 198251 Tasks 17 Chunks Type float64 numpy.ndarray",165  1  5  5  50,

Unnamed: 0,Array,Chunk
Bytes,1.57 MiB,97.66 kiB
Shape,"(165, 50, 5, 5)","(10, 50, 5, 5)"
Count,198251 Tasks,17 Chunks
Type,float64,numpy.ndarray


In [51]:
%%time
occ.load()

CPU times: user 1.95 s, sys: 176 ms, total: 2.13 s
Wall time: 3.84 s


In [52]:
occ

In [24]:
400 * 5 / 60

33.333333333333336

In [34]:
q = ds.isel(lat=slice(0, 100), lon=slice(0, 20)).quantile(0.999, ('time', 'member'))

In [35]:
q

Unnamed: 0,Array,Chunk
Bytes,11.09 kiB,200 B
Shape,"(71, 20)","(5, 5)"
Count,196655 Tasks,60 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 11.09 kiB 200 B Shape (71, 20) (5, 5) Count 196655 Tasks 60 Chunks Type float64 numpy.ndarray",20  71,

Unnamed: 0,Array,Chunk
Bytes,11.09 kiB,200 B
Shape,"(71, 20)","(5, 5)"
Count,196655 Tasks,60 Chunks
Type,float64,numpy.ndarray


In [36]:
x = q.tas.data

In [37]:
x

Unnamed: 0,Array,Chunk
Bytes,11.09 kiB,200 B
Shape,"(71, 20)","(5, 5)"
Count,196655 Tasks,60 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 11.09 kiB 200 B Shape (71, 20) (5, 5) Count 196655 Tasks 60 Chunks Type float64 numpy.ndarray",20  71,

Unnamed: 0,Array,Chunk
Bytes,11.09 kiB,200 B
Shape,"(71, 20)","(5, 5)"
Count,196655 Tasks,60 Chunks
Type,float64,numpy.ndarray


In [38]:
with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof, CacheProfiler() as cprof, ProgressBar():
    x.compute()

In [39]:
visualize([prof, rprof, cprof])