In [4]:
! pip install netcdf4 xarray[io] cartopy nc-time-axis siphon

Defaulting to user installation because normal site-packages is not writeable


In [5]:
import pandas as pd
import xarray as xr
import numpy as np
import os
import os.path
import copy
from siphon import catalog
from dask.distributed import Client, LocalCluster
overwrite = False


In [6]:
model = 'NorESM2-LM'

experiments = [
               '1pctCO2', 'abrupt-4xCO2', 'historical', 'piControl', # CMIP
               'hist-GHG', 'hist-aer', # DAMIP
               'ssp126', 'ssp245', 'ssp370', 'ssp370-lowNTCF', 'ssp585' #	ScenarioMIP
]

variables = [
             'tas', 'tasmin', 'tasmax', 'pr'
]

In [7]:
def get_MIP(experiment):
  if experiment == 'ssp245-covid':
    return 'DAMIP'
  elif experiment == 'ssp370-lowNTCF':
    return 'AerChemMIP'
  elif experiment.startswith('ssp'):
    return 'ScenarioMIP'
  elif experiment.startswith('hist-'):
    return 'DAMIP'
  else:
    return 'CMIP'

def get_esgf_data(variable, experiment, ensemble_member):
  """
  Inspired by https://github.com/rabernat/pangeo_esgf_demo/blob/master/narr_noaa_thredds.ipynb
  """

  # Get the relevant catalog references
  cat_refs = list({k:v for k,v in full_catalog.catalog_refs.items() if k.startswith(f"CMIP6.{get_MIP(experiment)}.NCC.NorESM2-LM.{experiment}.{ensemble_member}.day.{variable}.")}.values()) 
  # Get the latest version (in case there are multiple)
  print(cat_refs)
  cat_ref = sorted(cat_refs, key=lambda x: str(x))[-1]
  print(cat_ref)
  sub_cat = cat_ref.follow().datasets
  datasets = []
  # Filter and fix the datasets
  for cds in sub_cat[:]:
    # Only pull out the (un-aggregated) NetCDF files
    if (str(cds).endswith('.nc') and ('aggregated' not in str(cds))):
      # For some reason these OpenDAP Urls are not referred to as Siphon expects...
      #cds.access_urls['OPENDAP'] = cds.access_urls['OpenDAPServer']
      datasets.append(cds)
  dsets = [(cds.remote_access(use_xarray=True)
             .reset_coords(drop=True)
             .chunk({'time': 365}))
         for cds in datasets]
  ds = xr.combine_by_coords(dsets, combine_attrs='drop')
  return ds[variable]

In [8]:
print("starting")

from collections import defaultdict
import xarray as xr

full_catalog = catalog.TDSCatalog('http://noresg.nird.sigma2.no/thredds/catalog/esgcet/catalog.xml')
print("Read full catalogue")

tas_by_exp_lists = defaultdict(list)   
pr_by_exp_lists  = defaultdict(list)

for experiment in experiments:
    for i in range(3):
        physics = 2 if experiment == 'ssp245-covid' else 1
        member  = f"r{i+1}i1p1f{physics}"
        print(f"Processing {member} of {experiment}...")

        try:
            ta = get_esgf_data('tas', experiment, member)  
            pr = get_esgf_data('pr',  experiment, member) 
        except (IndexError, KeyError) as e:
            print(f"Skipping {experiment} {member}: {e}")
            continue

        ta = ta.assign_coords(experiment=experiment, member=member).expand_dims('member')
        pr = pr.assign_coords(experiment=experiment, member=member).expand_dims('member')

        tas_by_exp_lists[experiment].append(ta)
        pr_by_exp_lists[experiment].append(pr)

tas_exp = {}
pr_exp  = {}

for exp, arrs in tas_by_exp_lists.items():
    if not arrs:
        continue
    aligned = xr.align(*arrs, join='outer', exclude=('experiment', 'member'))
    tas_exp[exp] = xr.concat(aligned, dim='member') 

for exp, arrs in pr_by_exp_lists.items():
    if not arrs:
        continue
    aligned = xr.align(*arrs, join='outer', exclude=('experiment', 'member'))
    pr_exp[exp] = xr.concat(aligned, dim='member')   

print("Built tas_exp for experiments:", list(tas_exp.keys()))
print("Built pr_exp  for experiments:", list(pr_exp.keys()))


starting
Read full catalogue
Processing r1i1p1f1 of 1pctCO2...
[CMIP6.CMIP.NCC.NorESM2-LM.1pctCO2.r1i1p1f1.day.tas.gn.v20190815]
CMIP6.CMIP.NCC.NorESM2-LM.1pctCO2.r1i1p1f1.day.tas.gn.v20190815
[CMIP6.CMIP.NCC.NorESM2-LM.1pctCO2.r1i1p1f1.day.pr.gn.v20190815]
CMIP6.CMIP.NCC.NorESM2-LM.1pctCO2.r1i1p1f1.day.pr.gn.v20190815
Processing r2i1p1f1 of 1pctCO2...
[]
Skipping 1pctCO2 r2i1p1f1: list index out of range
Processing r3i1p1f1 of 1pctCO2...
[]
Skipping 1pctCO2 r3i1p1f1: list index out of range
Processing r1i1p1f1 of abrupt-4xCO2...
[CMIP6.CMIP.NCC.NorESM2-LM.abrupt-4xCO2.r1i1p1f1.day.tas.gn.v20210118]
CMIP6.CMIP.NCC.NorESM2-LM.abrupt-4xCO2.r1i1p1f1.day.tas.gn.v20210118
[CMIP6.CMIP.NCC.NorESM2-LM.abrupt-4xCO2.r1i1p1f1.day.pr.gn.v20210118]
CMIP6.CMIP.NCC.NorESM2-LM.abrupt-4xCO2.r1i1p1f1.day.pr.gn.v20210118
Processing r2i1p1f1 of abrupt-4xCO2...
[]
Skipping abrupt-4xCO2 r2i1p1f1: list index out of range
Processing r3i1p1f1 of abrupt-4xCO2...
[]
Skipping abrupt-4xCO2 r3i1p1f1: list index out

  return provider(self.access_urls[service])
  return provider(self.access_urls[service])
  return provider(self.access_urls[service])
  return provider(self.access_urls[service])
  return provider(self.access_urls[service])
  return provider(self.access_urls[service])
  return provider(self.access_urls[service])
  return provider(self.access_urls[service])


[CMIP6.CMIP.NCC.NorESM2-LM.piControl.r1i1p1f1.day.pr.gn.v20210118]
CMIP6.CMIP.NCC.NorESM2-LM.piControl.r1i1p1f1.day.pr.gn.v20210118


  return provider(self.access_urls[service])
  return provider(self.access_urls[service])
  return provider(self.access_urls[service])
  return provider(self.access_urls[service])
  return provider(self.access_urls[service])
  return provider(self.access_urls[service])
  return provider(self.access_urls[service])
  return provider(self.access_urls[service])


Processing r2i1p1f1 of piControl...
[]
Skipping piControl r2i1p1f1: list index out of range
Processing r3i1p1f1 of piControl...
[]
Skipping piControl r3i1p1f1: list index out of range
Processing r1i1p1f1 of hist-GHG...
[CMIP6.DAMIP.NCC.NorESM2-LM.hist-GHG.r1i1p1f1.day.tas.gn.v20191108, CMIP6.DAMIP.NCC.NorESM2-LM.hist-GHG.r1i1p1f1.day.tas.gn.v20190815]
CMIP6.DAMIP.NCC.NorESM2-LM.hist-GHG.r1i1p1f1.day.tas.gn.v20191108
[CMIP6.DAMIP.NCC.NorESM2-LM.hist-GHG.r1i1p1f1.day.pr.gn.v20191108, CMIP6.DAMIP.NCC.NorESM2-LM.hist-GHG.r1i1p1f1.day.pr.gn.v20190815]
CMIP6.DAMIP.NCC.NorESM2-LM.hist-GHG.r1i1p1f1.day.pr.gn.v20191108
Processing r2i1p1f1 of hist-GHG...
[CMIP6.DAMIP.NCC.NorESM2-LM.hist-GHG.r2i1p1f1.day.tas.gn.v20191108]
CMIP6.DAMIP.NCC.NorESM2-LM.hist-GHG.r2i1p1f1.day.tas.gn.v20191108
[CMIP6.DAMIP.NCC.NorESM2-LM.hist-GHG.r2i1p1f1.day.pr.gn.v20191108]
CMIP6.DAMIP.NCC.NorESM2-LM.hist-GHG.r2i1p1f1.day.pr.gn.v20191108
Processing r3i1p1f1 of hist-GHG...
[CMIP6.DAMIP.NCC.NorESM2-LM.hist-GHG.r3i1p1f1.

    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]


Built tas_exp for experiments: ['1pctCO2', 'abrupt-4xCO2', 'historical', 'piControl', 'hist-GHG', 'hist-aer', 'ssp245', 'ssp370', 'ssp370-lowNTCF', 'ssp585']
Built pr_exp  for experiments: ['1pctCO2', 'abrupt-4xCO2', 'historical', 'piControl', 'hist-GHG', 'hist-aer', 'ssp245', 'ssp370', 'ssp370-lowNTCF', 'ssp585']


    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]


In [9]:
tas_exp_means = copy.deepcopy(tas_exp)

for exp, da in tas_exp.items():
    weights = np.cos(np.deg2rad(da['lat']))

    tas_exp_means[exp] = (
        da.weighted(weights)
          .mean(['lat', 'lon'], skipna=True)
          .groupby('time.year')
          .mean('time', skipna=True)
    )

In [12]:
pr_exp_means = copy.deepcopy(pr_exp)
pr_exp_percentiles = copy.deepcopy(pr_exp)

for exp, da in pr_exp.items():
    weights = np.cos(np.deg2rad(da['lat']))

    pr_exp_means[exp] = (
        da.weighted(weights)
          .mean(['lat', 'lon'], skipna=True)
          .groupby('time.year')
          .mean('time', skipna=True)
    )
    
for exp, da in pr_exp.items():
    dt_sec = (da['time'].diff('time') / np.timedelta64(1, 's'))
    dt_sec = dt_sec.reindex(time=da['time'], method='bfill')
    pr_yearly = (da * dt_sec).groupby('time.year').sum('time', skipna=True)
    p90 = pr_yearly.quantile(0.9, dim=['lat', 'lon'], skipna=True)

    name = 'pr_p90_grid_annual_total_mm'
    order = [d for d in ('member', 'year') if d in p90.dims]
    pr_exp_percentiles[exp] = p90.rename(name).transpose(*order)


In [None]:
import os, time, numpy as np

OUT = "data_store"
os.makedirs(f"{OUT}/tas_means", exist_ok=True)
os.makedirs(f"{OUT}/pr_means",  exist_ok=True)
os.makedirs(f"{OUT}/pr_perc",   exist_ok=True)

os.environ.setdefault("HDF5_USE_FILE_LOCKING", "FALSE")

def _prep_da(da, fallback_name):
    # Ensure a name, dtype, sane coords, single chunks, and materialize
    name = da.name or fallback_name
    da2  = da.rename(name).astype("float32")

    # Make coords writer-friendly
    if "member" in da2.coords:
        da2 = da2.assign_coords(member=da2["member"].astype(str))
    if "year" in da2.coords:
        # int32 is safer than object/int64 on some stacks
        da2 = da2.assign_coords(year=da2["year"].astype(np.int32))

    # Avoid tons of (1,) chunks
    if hasattr(da2.data, "chunks"):
        da2 = da2.chunk({d: -1 for d in da2.dims})

    # Materialize any dask graph
    if hasattr(da2.data, "compute"):
        da2 = da2.compute()

    return name, da2

def _safe_write(da, fname, var_name):
    enc = {var_name: {"zlib": True, "complevel": 4}}

    # Try h5netcdf with phony dims first (skips dim-scales), then fall back to netcdf4
    try:
        da.to_netcdf(
            fname,
            engine="h5netcdf",
            encoding=enc,
            invalid_netcdf=True,   # <- key: use phony dims instead of dim-scales
        )
    except Exception as e1:
        # Fallback: classic netCDF4 writer is very robust
        tmp = fname + ".netcdf4.tmp"
        da.to_netcdf(
            tmp,
            engine="netcdf4",
            encoding=enc,
            format="NETCDF4",
        )
        os.replace(tmp, fname)

# 1) tas_exp_means
for exp, da in tas_exp_means.items():
    var_name, da_to_write = _prep_da(da, "tas_global_annual_mean")
    fname = f"{OUT}/tas_means/{model}_{exp}_{var_name}.nc"
    t0 = time.perf_counter()
    _safe_write(da_to_write, fname, var_name)
    print(f"Wrote {fname} in {time.perf_counter()-t0:.2f}s")

# 2) pr_exp_means (annual global mean *rate*)
for exp, da in pr_exp_means.items():
    var_name, da_to_write = _prep_da(da, "pr_global_annual_mean_flux")
    fname = f"{OUT}/pr_means/{model}_{exp}_{var_name}.nc"
    t0 = time.perf_counter()
    _safe_write(da_to_write, fname, var_name)
    print(f"Wrote {fname} in {time.perf_counter()-t0:.2f}s")

# 3) pr_exp_percentiles (90th percentile of annual TOTAL precip per grid; mm/yr)
for exp, da in pr_exp_percentiles.items():
    var_name, da_to_write = _prep_da(da, "pr_p90_grid_annual_total_mm")
    fname = f"{OUT}/pr_perc/{model}_{exp}_{var_name}.nc"
    t0 = time.perf_counter()
    _safe_write(da_to_write, fname, var_name)
    print(f"Wrote {fname} in {time.perf_counter()-t0:.2f}s")


  self.flush()


Wrote data_store/tas_means/NorESM2-LM_1pctCO2_tas.nc in 0.04s


  self.flush()


Wrote data_store/tas_means/NorESM2-LM_abrupt-4xCO2_tas.nc in 0.01s


  self.flush()


Wrote data_store/tas_means/NorESM2-LM_historical_tas.nc in 0.01s


  self.flush()


Wrote data_store/tas_means/NorESM2-LM_piControl_tas.nc in 0.01s


  self.flush()


Wrote data_store/tas_means/NorESM2-LM_hist-GHG_tas.nc in 0.01s


  self.flush()


Wrote data_store/tas_means/NorESM2-LM_hist-aer_tas.nc in 0.01s


  self.flush()


Wrote data_store/tas_means/NorESM2-LM_ssp245_tas.nc in 0.01s


  self.flush()


Wrote data_store/tas_means/NorESM2-LM_ssp370_tas.nc in 0.01s


  self.flush()


Wrote data_store/tas_means/NorESM2-LM_ssp370-lowNTCF_tas.nc in 0.01s


  self.flush()


Wrote data_store/tas_means/NorESM2-LM_ssp585_tas.nc in 0.01s


  self.flush()


Wrote data_store/pr_means/NorESM2-LM_1pctCO2_pr.nc in 0.01s


  self.flush()


Wrote data_store/pr_means/NorESM2-LM_abrupt-4xCO2_pr.nc in 0.04s


  self.flush()


Wrote data_store/pr_means/NorESM2-LM_historical_pr.nc in 0.01s


  self.flush()


Wrote data_store/pr_means/NorESM2-LM_piControl_pr.nc in 0.03s


  self.flush()


Wrote data_store/pr_means/NorESM2-LM_hist-GHG_pr.nc in 0.01s


  self.flush()


Wrote data_store/pr_means/NorESM2-LM_hist-aer_pr.nc in 0.07s


  self.flush()


Wrote data_store/pr_means/NorESM2-LM_ssp245_pr.nc in 0.01s


  self.flush()


Wrote data_store/pr_means/NorESM2-LM_ssp370_pr.nc in 0.01s


  self.flush()


Wrote data_store/pr_means/NorESM2-LM_ssp370-lowNTCF_pr.nc in 0.01s


  self.flush()


Wrote data_store/pr_means/NorESM2-LM_ssp585_pr.nc in 0.01s


  self.flush()


Wrote data_store/pr_perc/NorESM2-LM_1pctCO2_pr_p90_grid_annual_total_mm.nc in 0.01s


  self.flush()


Wrote data_store/pr_perc/NorESM2-LM_abrupt-4xCO2_pr_p90_grid_annual_total_mm.nc in 0.01s


  self.flush()


Wrote data_store/pr_perc/NorESM2-LM_historical_pr_p90_grid_annual_total_mm.nc in 0.09s


  self.flush()


Wrote data_store/pr_perc/NorESM2-LM_piControl_pr_p90_grid_annual_total_mm.nc in 0.01s
