# CMIP6 Precipitation Frequency Analysis Example

This notebook shows an advanced analysis case. The calculation was inspired by [Angie Pendergrass](https://staff.ucar.edu/users/apgrass)’s work on precipitation statistics, as described in the following websites / papers:
- https://journals.ametsoc.org/doi/full/10.1175/JCLI-D-16-0684.1
- https://climatedataguide.ucar.edu/climate-data/gpcp-daily-global-precipitation-climatology-project

We use [xhistogram](https://xhistogram.readthedocs.io/) to calculate the distribution of precipitation intensity and its changes in a warming climate.

In [1]:
import os
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
import xarray as xr
import gcsfs
from tqdm.autonotebook import tqdm

from xhistogram.xarray import histogram

%matplotlib inline
plt.rcParams['figure.figsize'] = 12, 6
%config InlineBackend.figure_format = 'retina' 

  import sys


We assume this notebook is running in a Pangeo environment with the ability to create [Dask Kubernetes](https://kubernetes.dask.org/en/latest/) distributed clusters for processing. If that's not the case, simply skip the cell below. The analysis will go a lot slower but will hopefully still work.

In [2]:
from dask.distributed import Client
from dask_kubernetes import KubeCluster

cluster = KubeCluster()
cluster.adapt(minimum=1, maximum=10, interval='2s')
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://10.48.2.36:35307  Dashboard: /user/pangeo-data-pan--cmip6-examples-sps0lyi7/proxy/8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


Here we search for all 3-hourly precipitation fields.

In [3]:
df = pd.read_csv('https://storage.googleapis.com/cmip6/cmip6-zarr-consolidated-stores.csv')
df.head()

Unnamed: 0,activity_id,institution_id,source_id,experiment_id,member_id,table_id,variable_id,grid_label,zstore,dcpp_init_year,version
0,AerChemMIP,AS-RCEC,TaiESM1,histSST,r1i1p1f1,AERmon,od550aer,gn,gs://cmip6/AerChemMIP/AS-RCEC/TaiESM1/histSST/...,,20200310
1,AerChemMIP,BCC,BCC-ESM1,histSST,r1i1p1f1,AERmon,mmrbc,gn,gs://cmip6/AerChemMIP/BCC/BCC-ESM1/histSST/r1i...,,20190718
2,AerChemMIP,BCC,BCC-ESM1,histSST,r1i1p1f1,AERmon,mmrdust,gn,gs://cmip6/AerChemMIP/BCC/BCC-ESM1/histSST/r1i...,,20191127
3,AerChemMIP,BCC,BCC-ESM1,histSST,r1i1p1f1,AERmon,mmroa,gn,gs://cmip6/AerChemMIP/BCC/BCC-ESM1/histSST/r1i...,,20190809
4,AerChemMIP,BCC,BCC-ESM1,histSST,r1i1p1f1,AERmon,mmrso4,gn,gs://cmip6/AerChemMIP/BCC/BCC-ESM1/histSST/r1i...,,20191127


Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py:596> exception=AssertionError()>
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py", line 603, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/spec.py", line 42, in _
    assert self.status == "running"
AssertionError
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f9430f4b0b8>>, <Task finished coro=<SpecCluster._correct_state_internal() done, defined at /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/spec.py:284> exception=AssertionError()>)
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py"

In [5]:
df.experiment_id.unique()

array(['histSST', 'piClim-CH4', 'piClim-NTCF', 'piClim-control', 'ssp370',
       'piClim-2xDMS', 'piClim-2xdust', 'piClim-2xfire', 'piClim-2xss',
       'piClim-BC', 'piClim-HC', 'piClim-N2O', 'piClim-OC', 'piClim-SO2',
       'piClim-aer', '1pctCO2-bgc', 'esm-ssp585', 'hist-bgc',
       'amip-4xCO2', 'amip-future4K', 'amip-m4K', 'amip-p4K', 'amip',
       'abrupt-2xCO2', 'abrupt-solp4p', 'abrupt-0p5xCO2', 'amip-lwoff',
       'amip-p4K-lwoff', 'aqua-4xCO2', 'abrupt-solm4p',
       'aqua-control-lwoff', 'aqua-control', 'aqua-p4K-lwoff', 'aqua-p4K',
       '1pctCO2', 'abrupt-4xCO2', 'historical', 'piControl', 'esm-hist',
       'esm-piControl', 'ssp126', 'ssp245', 'ssp585',
       'esm-piControl-spinup', 'piControl-spinup', 'hist-GHG', 'hist-aer',
       'hist-nat', 'hist-CO2', 'hist-sol', 'hist-stratO3', 'hist-volc',
       'ssp245-GHG', 'ssp245-aer', 'ssp245-nat', 'ssp245-stratO3',
       'dcppA-hindcast', 'dcppA-assim', 'dcppC-hindcast-noAgung',
       'dcppC-hindcast-noElChichon', 

Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py:596> exception=AssertionError()>
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py", line 603, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/spec.py", line 42, in _
    assert self.status == "running"
AssertionError
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f9430f4b0b8>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>)
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 7

In [6]:
df.table_id.unique()

array(['AERmon', 'Amon', 'LImon', 'Lmon', 'Omon', 'SImon', 'day',
       'AERmonZ', 'CFmon', 'fx', 'Oyr', 'Ofx', 'CFday', '3hr', '6hrLev',
       'Oday', 'Aclim', 'Emon', '6hrPlev', 'CF3hr', 'Oclim', 'SIclim',
       'Eclim', 'Odec', 'AERhr', 'IfxGre', 'ImonGre', '6hrPlevPt', 'E3hr'],
      dtype=object)

Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py:596> exception=AssertionError()>
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py", line 603, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/spec.py", line 42, in _
    assert self.status == "running"
AssertionError
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f9430f4b0b8>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>)
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 7

In [13]:
df[(df.experiment_id=='ssp585') & (df.table_id=='day') & (df.variable_id.isin(['ua', 'va']))]['zstore'].values[:2]

array(['gs://cmip6/ScenarioMIP/BCC/BCC-CSM2-MR/ssp585/r1i1p1f1/day/ua/gn/',
       'gs://cmip6/ScenarioMIP/BCC/BCC-CSM2-MR/ssp585/r1i1p1f1/day/va/gn/'],
      dtype=object)

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f9430f4b0b8>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>)
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 183, in adapt
    recommendations = await self.recommendations(target)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 121, in recommendations
    await self.cluster
  File "/srv/conda/

In [14]:
uri=df[(df.experiment_id=='ssp585') & (df.table_id=='day') & (df.variable_id.isin(['ua', 'va']))]['zstore'].values[0]

Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py:596> exception=AssertionError()>
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py", line 603, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/spec.py", line 42, in _
    assert self.status == "running"
AssertionError


In [15]:
uri

'gs://cmip6/ScenarioMIP/BCC/BCC-CSM2-MR/ssp585/r1i1p1f1/day/ua/gn/'

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f9430f4b0b8>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>)
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 183, in adapt
    recommendations = await self.recommendations(target)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 121, in recommendations
    await self.cluster
  File "/srv/conda/

In [16]:
gcs = gcsfs.GCSFileSystem(token='anon')
ds = xr.open_zarr(gcs.get_mapper(uri), consolidated=True)
ds

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f9430f4b0b8>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>)
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 183, in adapt
    recommendations = await self.recommendations(target)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 121, in recommendations
    await self.cluster
  File "/srv/conda/

In [None]:
df_3hr_pr = df[(df.table_id == '3hr') & (df.variable_id == 'pr')]
len(df_3hr_pr)

In [None]:
df_3hr_pr.head()

In [None]:
df_3hr_pr.groupby(['experiment_id', 'source_id'])['zstore'].count()

In [None]:
run_counts = df_3hr_pr.groupby(['source_id', 'experiment_id'])['zstore'].count()
run_counts

In [None]:
source_ids = []
experiment_ids = ['historical', 'ssp585']
for name, group in df_3hr_pr.groupby('source_id'):
    if all([expt in group.experiment_id.values
            for expt in experiment_ids]):
        source_ids.append(name)
source_ids

In [None]:
def load_pr_data(source_id, expt_id):
    """
    Load 3hr precip data for given source and expt ids
    """
    uri = df_3hr_pr[(df_3hr_pr.source_id == source_id) &
                         (df_3hr_pr.experiment_id == expt_id)].zstore.values[0]
    
    gcs = gcsfs.GCSFileSystem(token='anon')
    ds = xr.open_zarr(gcs.get_mapper(uri), consolidated=True)
    return ds

In [None]:
def precip_hist(ds, nbins=100, pr_log_min=-3, pr_log_max=2):
    """
    Calculate precipitation histogram for a single model. 
    Lazy.
    """
    assert ds.pr.units == 'kg m-2 s-1'
    
    # mm/day
    bins_mm_day = np.hstack([[0], np.logspace(pr_log_min, pr_log_max, nbins)]) 
    bins_kg_m2s = bins_mm_day / (24*60*60)

    pr_hist = histogram(ds.pr, bins=[bins_kg_m2s], dim=['lon']).mean(dim='time')
    
    log_bin_spacing = np.diff(np.log(bins_kg_m2s[1:3])).item()
    pr_hist_norm = 100 * pr_hist / ds.dims['lon'] / log_bin_spacing
    pr_hist_norm.attrs.update({'long_name': 'zonal mean rain frequency',
                               'units': '%/Δln(r)'})
    return pr_hist_norm

def precip_hist_for_expts(dsets, experiment_ids):
    """
    Calculate histogram for a suite of experiments.
    Eager.
    """
    # actual data loading and computations happen in this next line
    pr_hists = [precip_hist(ds).load()
            for ds in [ds_hist, ds_ssp]]
    pr_hist = xr.concat(pr_hists, dim=xr.Variable('experiment_id', experiment_ids))
    return pr_hist

In [None]:
source_ids

In [None]:
results = {}
for source_id in tqdm(source_ids):
    # get a 20 year period
    ds_hist = load_pr_data(source_id, 'historical').sel(time=slice('1980', '2000'))
    ds_ssp = load_pr_data(source_id, 'ssp585').sel(time=slice('2080', '2100'))
    pr_hist = precip_hist_for_expts([ds_hist, ds_ssp], experiment_ids)
    results[source_id] = pr_hist

In [None]:
def plot_precip_changes(pr_hist, vmax=5):
    """
    Visualize the output
    """
    pr_hist_diff = (pr_hist.sel(experiment_id='ssp585') - 
                    pr_hist.sel(experiment_id='historical'))
    pr_hist.sel(experiment_id='historical')[:, 1:].plot.contour(xscale='log', colors='0.5', levels=21)
    pr_hist_diff[:, 1:].plot.contourf(xscale='log', vmax=vmax, levels=21)

In [None]:
title = 'Change in Zonal Mean Rain Frequency'
for source_id, pr_hist in results.items():
    plt.figure()
    plot_precip_changes(pr_hist)
    plt.title(f'{title}: {source_id}')