This notebook contains code for calculating the tas anomaly of a given dataset. We use the Pangeo CMIP6 Public Dataset. A lot of this code was modified starting from code from the Pangeo Gallery,found [here](https://gallery.pangeo.io/repos/pangeo-gallery/cmip6/).

Additionally, the [xarray documentation](https://xarray.pydata.org/en/stable/) and [xarray-extras documentation](https://xarray-extras.readthedocs.io/en/latest/), along with the online documentation for any of the other used libraries are a useful resource.

In [2]:
from matplotlib import pyplot as plt
import xarray as xr
import numpy as np
import dask
from dask.diagnostics import progress
from tqdm.autonotebook import tqdm
import intake
import fsspec
import seaborn as sns
import pandas as pd
from tqdm.autonotebook import tqdm  # Fancy progress bars for our loops!
from dask_gateway import Gateway
from dask.distributed import Client
import cftime
import datetime


  from tqdm.autonotebook import tqdm


In [3]:
col = intake.open_esm_datastore("https://storage.googleapis.com/cmip6/pangeo-cmip6.json")

After importing all relevant packages and using intake to open the pangeo cmip6 database, we construct our query. Note that this particular query requires all sources have all 5 experiments. This is due to the passing the .search() function the perameter `require_all_on`. Info at the [intake-esm docs](https://intake-esm.readthedocs.io/en/latest/api.html?highlight=search#intake_esm.core.esm_datastore.search).

In [5]:
expts = ['historical', 'ssp126', 'ssp245', 'ssp370', 'ssp585']

query = dict(
    experiment_id = expts,
    variable_id = ['tas'],
    table_id = ['Amon'],
    source_id = ['ACCESS-ESM1-5','CESM2-WACCM', 'MPI-ESM1-2-HR'],
    member_id = 'r1i1p1f1'
)

# In the other two documents, I used col_subset instead of cat.
cat = col.search(require_all_on=['source_id'], **query)


print(cat.df.groupby("source_id")[
    ["experiment_id", "variable_id", "table_id"]
].nunique())

               experiment_id  variable_id  table_id
source_id                                          
ACCESS-ESM1-5              5            1         1
CESM2-WACCM                5            1         1
MPI-ESM1-2-HR              5            1         1


In [7]:
# Create an xarray dataset using our catalog.
dset_dict = cat.to_dataset_dict(zarr_kwargs={'consolidated': True, 'decode_times': False},cdf_kwargs={'chunks': {}, 'decode_times': False})


--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.institution_id.source_id.experiment_id.table_id.grid_label'


The next block of code is mainly for formatting all of the datasets into a more standard format. This is especially important if you want to make a graph using multiple

In [8]:
def drop_all_bounds(ds):
    """Drop coordinates like 'time_bounds' from datasets,
    which can lead to issues when merging."""
    drop_vars = [vname for vname in ds.coords if (('_bounds') in vname ) or ('_bnds') in vname]
    return ds.drop(drop_vars)

def open_dsets(df):
    """Open datasets from cloud storage and return xarray dataset."""
    dsets = [xr.open_zarr(fsspec.get_mapper(ds_url), consolidated=True).pipe(drop_all_bounds) for ds_url in df.zstore]
    try:
        ds = xr.merge(dsets, join='exact')
        return ds
    except ValueError:
        return None

def open_delayed(df):
    """A dask.delayed wrapper around `open_dsets`.
    Allows us to open many datasets in parallel."""
    return dask.delayed(open_dsets)(df)

from collections import defaultdict

dsets = defaultdict(dict)
for group, df in cat.df.groupby(by=['source_id', 'experiment_id']):
    dsets[group[0]][group[1]] = open_delayed(df)

open_dsets(df)
dsets_ = dask.compute(dict(dsets))[0]

expt_da = xr.DataArray(expts, dims='experiment_id', name='experiment_id', coords={'experiment_id': expts})

dsets_aligned = {}

The main dictionary we will be working with is `dsets_`. Generally, `dsets_[source][scenario]` will be an xarray dataset with its data stored as a dask array.


In [9]:
dsets_

{'ACCESS-ESM1-5': {'historical': <xarray.Dataset>
  Dimensions:  (lat: 145, lon: 192, time: 1980)
  Coordinates:
      height   float64 ...
    * lat      (lat) float64 -90.0 -88.75 -87.5 -86.25 ... 86.25 87.5 88.75 90.0
    * lon      (lon) float64 0.0 1.875 3.75 5.625 7.5 ... 352.5 354.4 356.2 358.1
    * time     (time) datetime64[ns] 1850-01-16T12:00:00 ... 2014-12-16T12:00:00
  Data variables:
      tas      (time, lat, lon) float32 dask.array<chunksize=(589, 145, 192), meta=np.ndarray>
  Attributes: (12/50)
      Conventions:            CF-1.7 CMIP-6.2
      activity_id:            CMIP
      branch_method:          standard
      branch_time_in_child:   0.0
      branch_time_in_parent:  21915.0
      cmor_version:           3.4.0
      ...                     ...
      tracking_id:            hdl:21.14100/d2debfa6-c0e2-4339-bac8-08d97867ae3a
      variable_id:            tas
      variant_label:          r1i1p1f1
      version:                v20191115
      netcdf_tracking_ids:

A lot of code I used ended up needing the line `climatology = v['historical'].sel(time=slice('1981-01-01', '2010-12-31')).groupby('time.month').mean('time')` or something similar. The `sel`, `groupby`, and `mean` methods are all in xarray's documetation (linked at the start of this notebook). For `slice`, see [here](https://docs.python.org/3/library/functions.html#slice).

In [10]:
for k, v in tqdm(dsets_.items()):

    # Check for any missing experiments that we expect
    expt_dsets = v.values()
    if any([d is None for d in expt_dsets]):
        print(f"Missing experiment for {k}")
        continue

    climatology = v['historical'].sel(time=slice('1981-01-01', '2010-12-31')).groupby('time.month').mean('time')

    for i in v:
        if i == 'historical':
            # When working with daily data, it's very easy for objects to run your computer out of memory, so we shorten historical runs because to the date range we need.
            anomaly = v[i].sel(time=slice('1981-01-01', '2010-12-31')).groupby('time.month') - climatology
        else:
            anomaly = v[i].groupby('time.month') - climatology
        # Because these files are too large to store in memory, we use the option compute=False to create a dask delayed object and then compute it later.
        # Remember to change the file location to the relevant folder.
        delayed_obj = anomaly.to_netcdf(path=f"~/tas-anomaly_{k}_{i}.nc", mode='w', compute=False, engine='netcdf4', format='NETCDF4')
        print(f"writing data to ~/tas-anomaly_{k}_{i}.nc")

        with progress.ProgressBar():
            results = delayed_obj.compute()

  0%|          | 0/3 [00:00<?, ?it/s]

writing data to ~/tas-anomaly_ACCESS-ESM1-5_historical.nc
[                                        ] | 0% Completed |  0.1s

  return self.array[key]


[########################################] | 100% Completed |  5.0s
writing data to ~/tas-anomaly_ACCESS-ESM1-5_ssp126.nc
[                                        ] | 0% Completed |  0.1s

  return self.array[key]


[########################################] | 100% Completed | 11.3s
writing data to ~/tas-anomaly_ACCESS-ESM1-5_ssp245.nc
[                                        ] | 0% Completed |  0.1s

  return self.array[key]


[########################################] | 100% Completed | 10.5s
writing data to ~/tas-anomaly_ACCESS-ESM1-5_ssp370.nc
[                                        ] | 0% Completed |  0.1s

  return self.array[key]


[########################################] | 100% Completed | 10.3s
writing data to ~/tas-anomaly_ACCESS-ESM1-5_ssp585.nc
[                                        ] | 0% Completed |  0.1s

  return self.array[key]


[########################################] | 100% Completed |  9.7s
writing data to ~/tas-anomaly_CESM2-WACCM_historical.nc
[                                        ] | 0% Completed |  0.1s

  return self.array[key]


[########################################] | 100% Completed |  8.4s


  return self.array[key]


writing data to ~/tas-anomaly_CESM2-WACCM_ssp126.nc
[########################################] | 100% Completed | 40.4s
writing data to ~/tas-anomaly_CESM2-WACCM_ssp245.nc
[                                        ] | 0% Completed |  0.1s

  return self.array[key]


[########################################] | 100% Completed | 18.4s
writing data to ~/tas-anomaly_CESM2-WACCM_ssp370.nc
[                                        ] | 0% Completed |  0.1s

  return self.array[key]


[########################################] | 100% Completed | 18.7s
writing data to ~/tas-anomaly_CESM2-WACCM_ssp585.nc
[                                        ] | 0% Completed |  0.0s

  return self.array[key]


[########################################] | 100% Completed | 18.9s


  pos_indexers, new_indexes = remap_label_indexers(


KeyError: "cannot represent labeled-based slice indexer for dimension 'time' with a slice over integer positions; the index is unsorted or non-unique"