In [5]:
# libraries

import seaborn as sns
import xarray as xr
import numpy as np
import pandas as pd
import intake
import dask
import warnings
from pathlib import Path
from tqdm import tqdm
from xmip.preprocessing import combined_preprocessing
import matplotlib.pyplot as plt

%matplotlib inline

In [6]:
# data

with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    url = "https://storage.googleapis.com/cmip6/pangeo-cmip6.json"
    col = intake.open_esm_datastore(url)

In [10]:
# load some monthly data from the GFDL-CM4 4-K warming experiment

query = dict(experiment_id =['amip', 'amip-p4K'],
             variable_id=['ua', 'va'],
             source_id=['GFDL-CM4'],
             table_id='Amon'
            )

cat = col.search(**query)
print(cat.df['source_id'].unique())

# load data into dictionary
# keys are constructed as 'activity_id.institution_id.source_id.experiment_id.table_id.grid_label'

z_kwargs = {'consolidated': True, 'decode_times':True}

with dask.config.set(**{'scheduler': 'threads', 'array.slicing.split_large_chunks': True}):
    dset_dict = cat.to_dataset_dict(zarr_kwargs=z_kwargs, preprocess=combined_preprocessing)

['GFDL-CM4']

--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.institution_id.source_id.experiment_id.table_id.grid_label'


In [13]:
print(dset_dict.keys())

dict_keys(['CMIP.NOAA-GFDL.GFDL-CM4.amip.Amon.gr1', 'CFMIP.NOAA-GFDL.GFDL-CM4.amip-p4K.Amon.gr1'])


In [11]:
from joblib import Parallel, delayed
import time

def task(x):
    time.sleep(1)
    return x

start = time.time()
results = Parallel(n_jobs=50)(delayed(task)(i) for i in range(50))
print("Done in", time.time() - start, "seconds")

Done in 1.4684550762176514 seconds


In [14]:
import time
ds = cat['CMIP.NOAA-GFDL.GFDL-CM4.amip.Amon.gr1'].to_dask()
start = time.time()
_ = ds['ua'].isel(time=0).load()  # force one chunk to load
print("Read time:", time.time() - start)

Read time: 1.9068071842193604


In [15]:
cat.df.head()['zstore']

0    gs://cmip6/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/amip/...
1    gs://cmip6/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/amip/...
2    gs://cmip6/CMIP6/CFMIP/NOAA-GFDL/GFDL-CM4/amip...
3    gs://cmip6/CMIP6/CFMIP/NOAA-GFDL/GFDL-CM4/amip...
Name: zstore, dtype: object

In [None]:
# load 6-hourly data with meridional and zonal wind, air temperature, surface pressure, and specific humidity

query = dict(source_id=['CESM2'],
             table_id='6hrLev',
             experiment_id=['historical', 'ssp585'],
             variable_id=['va', 'ua', 'ta', 'ps']
)

cat = col.search(**query)

# load data into dictionary
# keys are constructed as 'activity_id.institution_id.source_id.experiment_id.table_id.grid_label'

warnings.filterwarnings("ignore")

z_kwargs = {'consolidated': True, 'decode_times':True}

with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    dset_dict = cat.to_dataset_dict(zarr_kwargs=z_kwargs)

dset_dict.keys()

In [None]:
# load data into dictionary
# keys are constructed as 'activity_id.institution_id.source_id.experiment_id.table_id.grid_label'

warnings.filterwarnings("ignore")

z_kwargs = {'consolidated': True, 'decode_times':True}

with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    dset_dict = cat.to_dataset_dict(zarr_kwargs=z_kwargs, preprocess=combined_preprocessing)

dset_dict.keys()

In [None]:
cat.df