<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Create-and-Connect-to-Dask-Distributed-Cluster" data-toc-modified-id="Create-and-Connect-to-Dask-Distributed-Cluster-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Create and Connect to Dask Distributed Cluster</a></span></li><li><span><a href="#Load-data-into-xarray-from-an-intake-catalog" data-toc-modified-id="Load-data-into-xarray-from-an-intake-catalog-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Load data into xarray from an intake catalog</a></span></li><li><span><a href="#Compute-Weighted-Means" data-toc-modified-id="Compute-Weighted-Means-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Compute Weighted Means</a></span></li><li><span><a href="#Get-Observations-(HadCRUT4;-Morice-et-al.-2012)" data-toc-modified-id="Get-Observations-(HadCRUT4;-Morice-et-al.-2012)-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Get Observations (HadCRUT4; Morice et al. 2012)</a></span></li><li><span><a href="#Figure-2:-Global-surface-temperature-anomaly-(1961-90-base-period)-for-individual-ensemble-members,-and-observations" data-toc-modified-id="Figure-2:-Global-surface-temperature-anomaly-(1961-90-base-period)-for-individual-ensemble-members,-and-observations-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Figure 2: Global surface temperature anomaly (1961-90 base period) for individual ensemble members, and observations</a></span></li><li><span><a href="#Compute-Linear-Trend-for-Winter-Seasons" data-toc-modified-id="Compute-Linear-Trend-for-Winter-Seasons-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Compute Linear Trend for Winter Seasons</a></span></li><li><span><a href="#Figure-4:-Global-maps-of-historical-(1979---2012)-boreal-winter-(DJF)-surface-air-trends" data-toc-modified-id="Figure-4:-Global-maps-of-historical-(1979---2012)-boreal-winter-(DJF)-surface-air-trends-7"><span class="toc-item-num">7&nbsp;&nbsp;</span>Figure 4: Global maps of historical (1979 - 2012) boreal winter (DJF) surface air trends</a></span></li></ul></div>

In [1]:
# TODO: document this notebook
# References: https://journals.ametsoc.org/doi/full/10.1175/BAMS-D-13-00255.1

In [3]:
%matplotlib inline
import warnings

warnings.filterwarnings("ignore")
# Silence dask.distributed logs
import logging

logger = logging.getLogger("distributed.utils_perf")
logger.setLevel(logging.ERROR)

import intake
import pprint
import numpy as np
import pandas as pd
import xarray as xr

## Create and Connect to Dask Distributed Cluster

In [2]:
from dask_kubernetes import KubeCluster
from dask.distributed import Client

cluster = KubeCluster()
cluster.adapt(minimum=2, maximum=100, wait_count=60)
client = Client(cluster)
cluster

VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .…


☝️ Don't forget to click the link above to view the scheduler dashboard!

## Load data into xarray from an intake catalog

In [4]:
col = intake.open_esm_datastore("../intake-catalogs/aws-cesm1-le.json")
col

aws-cesm1-le-ESM Collection with 278 entries:
	> 5 component(s)

	> 5 frequency(s)

	> 6 experiment(s)

	> 46 variable(s)

	> 278 path(s)

In [5]:
col.df.head()

Unnamed: 0,component,frequency,experiment,variable,path
0,atm,daily,20C,FLNS,s3://ncar-cesm-lens/atm/daily/cesmLE-20C-FLNS....
1,atm,daily,20C,FLNSC,s3://ncar-cesm-lens/atm/daily/cesmLE-20C-FLNSC...
2,atm,daily,20C,FLUT,s3://ncar-cesm-lens/atm/daily/cesmLE-20C-FLUT....
3,atm,daily,20C,FSNS,s3://ncar-cesm-lens/atm/daily/cesmLE-20C-FSNS....
4,atm,daily,20C,FSNSC,s3://ncar-cesm-lens/atm/daily/cesmLE-20C-FSNSC...


In [6]:
uniques = col.unique(columns=["component", "frequency", "experiment", "variable"])
pprint.pprint(uniques, compact=True, indent=4)

{   'component': {   'count': 5,
                     'values': ['atm', 'ice_nh', 'ice_sh', 'lnd', 'ocn']},
    'experiment': {   'count': 6,
                      'values': [   '20C', 'RCP85', 'CTRL', 'CTRL_AMIP',
                                    'CTRL_SLAB_OCN', 'HIST']},
    'frequency': {   'count': 5,
                     'values': [   'daily', 'hourly6-1990-2005',
                                   'hourly6-2026-2035', 'hourly6-2071-2080',
                                   'monthly']},
    'variable': {   'count': 46,
                    'values': [   'FLNS', 'FLNSC', 'FLUT', 'FSNS', 'FSNSC',
                                  'FSNTOA', 'ICEFRAC', 'LHFLX', 'PRECL',
                                  'PRECSC', 'PRECSL', 'PRECT', 'PRECTMX', 'PSL',
                                  'Q850', 'SHFLX', 'TMQ', 'TREFHT', 'TREFHTMN',
                                  'TREFHTMX', 'TS', 'UBOT', 'WSPDSRFAV', 'Z500',
                                  'PS', 'Q', 'QREFHT', 'T', 'U', 'V', 'Z3',

In [18]:
import s3fs
import xarray as xr
s3 = s3fs.S3FileSystem(anon=True)
store = s3fs.S3Map(root='ncar-cesm-lens/atm/daily/cesmLE-20C-TREFHT.zarr', s3=s3)
ds_20C = xr.open_zarr(store, consolidated=True)['TREFHT']

In [19]:
ds_20C

<xarray.DataArray 'TREFHT' (member_id: 40, time: 31390, lat: 192, lon: 288)>
dask.array<zarr, shape=(40, 31390, 192, 288), dtype=float32, chunksize=(2, 365, 192, 288), chunktype=numpy.ndarray>
Coordinates:
  * lat        (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0
  * lon        (lon) float64 0.0 1.25 2.5 3.75 5.0 ... 355.0 356.2 357.5 358.8
  * member_id  (member_id) int64 1 2 3 4 5 6 7 8 ... 34 35 101 102 103 104 105
  * time       (time) object 1920-01-01 00:00:00 ... 2005-12-31 00:00:00
Attributes:
    cell_methods:  time: mean
    long_name:     Reference height temperature
    units:         K

In [22]:
col_subset = col.search(frequency="daily", component="atm", variable="TREFHT", experiment=["20C"])
col_subset.df

Unnamed: 0,component,frequency,experiment,variable,path
17,atm,daily,20C,TREFHT,s3://ncar-cesm-lens/atm/daily/cesmLE-20C-TREFH...


In [23]:
dsets = col_subset.to_dataset_dict(zarr_kwargs={"consolidated": True}, aggregate=False)

--> The keys in the returned dictionary of datasets are constructed as follows:
	'path'

--> There will be 1 group(s)


KeyError: 'ncar-cesm-lens/atm/daily/cesmLE-20C-TREFHT.zarr/.zmetadata'

In [None]:
# TODO: Remove this section once intake-esm is working
cat = intake.Catalog(
    "https://raw.githubusercontent.com/NCAR/cesm-lens-aws/master/intake-catalogs/atmosphere/daily.yaml"
)

ds_20C = cat["reference_height_temperature_20C"].to_dask()
ds_rcp = cat["reference_height_temperature_RCP85"].to_dask()
t_20c = ds_20C["TREFHT"]
t_rcp = ds_rcp["TREFHT"]

In [None]:
t_ref = t_20c.sel(time=slice("1961", "1990"))
t_ref

In [None]:
areacella = ds_20C.area
total_area = areacella.sum()
areacella

## Compute Weighted Means

In [None]:
t_ref_ts = (
    (t_ref.resample(time="AS").mean("time") * areacella).sum(dim=("lat", "lon"))
    / total_area
).mean(dim=("time", "member_id"))

t_20c_ts = (
    (t_20c.resample(time="AS").mean("time") * areacella).sum(dim=("lat", "lon"))
) / total_area

t_rcp_ts = (
    (t_rcp.resample(time="AS").mean("time") * areacella).sum(dim=("lat", "lon"))
) / total_area

In [None]:
%%time
t_ref_mean = t_ref_ts.load()
t_ref_mean

In [None]:
t_20c_ts_df = t_20c_ts.to_series().unstack().T
t_20c_ts_df.head()

In [None]:
t_rcp_ts_df = t_rcp_ts.to_series().unstack().T
t_rcp_ts_df.head()

## Get Observations (HadCRUT4; Morice et al. 2012)

In [None]:
ds = xr.open_dataset(
    "https://www.esrl.noaa.gov/psd/thredds/dodsC/Datasets/cru/hadcrut4/air.mon.anom.median.nc"
).load()
ds

- **Obs mean: weight by days in each month**

In [None]:
def weighted_temporal_mean(ds):
    time_bound_diff = ds.time_bnds.diff(dim="nbnds")[:, 0]
    wgts = time_bound_diff.groupby("time.year") / time_bound_diff.groupby(
        "time.year"
    ).sum(xr.ALL_DIMS)
    np.testing.assert_allclose(wgts.groupby("time.year").sum(xr.ALL_DIMS), 1.0)
    obs = ds["air"]
    cond = obs.isnull()
    ones = xr.where(cond, 0.0, 1.0)
    obs_sum = (obs * wgts).resample(time="AS").sum(dim="time")
    ones_out = (ones * wgts).resample(time="AS").sum(dim="time")
    obs_s = (obs_sum / ones_out).mean(("lat", "lon")).to_series()
    return obs_s

In [None]:
obs_s = weighted_temporal_mean(ds)
obs_s.head()

In [None]:
all_ts_anom = pd.concat([t_20c_ts_df, t_rcp_ts_df]) - t_ref_mean.data
years = [val.year for val in all_ts_anom.index]

- **Confirm that after using area weighted average, max temp increase is 5k**

In [None]:
np.testing.assert_allclose(all_ts_anom.values.max(), 5.0, rtol=0.02)

## Figure 2: Global surface temperature anomaly (1961-90 base period) for individual ensemble members, and observations

In [None]:
import matplotlib.pyplot as plt

In [None]:
ax = plt.axes()

ax.tick_params(right=True, top=True, direction="out", length=6, width=2, grid_alpha=0.5)
ax.plot(years, all_ts_anom, color="grey")
ax.plot(years, all_ts_anom[1], color="black")
ax.plot(obs_s.index.year.tolist(), obs_s, color="red")

ax.text(
    0.3,
    0.4,
    "observations",
    verticalalignment="bottom",
    horizontalalignment="left",
    transform=ax.transAxes,
    color="red",
    fontsize=10,
)
ax.text(
    0.3,
    0.3,
    "members 1-40",
    verticalalignment="bottom",
    horizontalalignment="left",
    transform=ax.transAxes,
    color="grey",
    fontsize=10,
)

ax.set_xticks([1850, 1920, 1950, 2000, 2050, 2100])
plt.ylim(-1, 5)
plt.xlim(1850, 2100)
plt.ylabel("Global Surface\nTemperature Anomaly (K)")
plt.show()

## Compute Linear Trend for Winter Seasons

In [None]:
def linear_trend(da, dim="time"):
    da_chunk = da.chunk({dim: -1})
    trend = xr.apply_ufunc(
        calc_slope,
        da_chunk,
        vectorize=True,
        input_core_dims=[[dim]],
        output_core_dims=[[]],
        output_dtypes=[np.float],
        dask="parallelized",
    )
    return trend


def calc_slope(y):
    """ufunc to be used by linear_trend"""
    x = np.arange(len(y))
    return np.polyfit(x, y, 1)[0]

In [None]:
t = xr.concat([t_20c, t_rcp], dim="time")
seasons = t.sel(time=slice("1979", "2012")).resample(time="QS-DEC").mean("time")
# Include only full seasons from 1979 and 2012
seasons = seasons.sel(time=slice("1979", "2012")).load()
seasons

In [None]:
winter_seasons = seasons.sel(
    time=seasons.time.where(seasons.time.dt.month == 12, drop=True)
)
winter_trends = linear_trend(
    winter_seasons.chunk({"lat": 20, "lon": 20, "time": -1})
).load() * len(winter_seasons.time)

In [None]:
# Make sure that we have 34 seasons
assert len(winter_seasons.time) == 34

## Figure 4: Global maps of historical (1979 - 2012) boreal winter (DJF) surface air trends

In [None]:
!pip install git+https://github.com/hhuangwx/cmaps.git -q

In [None]:
import cmaps  # for NCL colormaps
import cartopy.crs as ccrs

contour_levels = [-6, -5, -4, -3, -2, -1.5, -1, -0.5, 0, 0.5, 1, 1.5, 2, 3, 4, 5, 6]

color_map = cmaps.ncl_default

In [None]:
lats = winter_trends.lat
lons = winter_trends.lon

numPlotRows = 10
numPlotCols = 4
figWidth = 20 
figHeight = 35  
fig, axs = plt.subplots(numPlotRows, numPlotCols, figsize=(figWidth,figHeight))

for row in range(numPlotRows):
    for col in range(numPlotCols):
        ensemble_index = row * numPlotCols + col
        subplot_index = ensemble_index + 1
        ax = plt.subplot(numPlotRows, numPlotCols, subplot_index, projection = ccrs.Robinson(central_longitude = 180))
        plot_data = winter_trends.isel(member_id = ensemble_index)
        cplot = plt.contourf(lons, lats, plot_data, 
                             levels = contour_levels, 
                             cmap = color_map, 
                             extend = 'both', 
                             transform = ccrs.PlateCarree())
        ax.coastlines(color = 'grey')
        plot_label = str(subplot_index)
        ax.text(0.01, 0.01, plot_label, fontSize = 16, transform = ax.transAxes)
        # Save axes objects for figure colorbar
        axs[row, col] = ax
    
cbar = fig.colorbar(cplot, ax=axs, orientation='horizontal', shrink = 0.7, pad = 0.02)
cbar.ax.set_title('1979-2012 DJF surface air temperature trends (K/34 years)', fontSize = 16)
cbar.set_ticks(contour_levels)
cbar.set_ticklabels(contour_levels)

In [None]:
# TODO: Add obs panel and ensemble mean at the end

In [None]:
# Gracefully destroy/close our cluster
client.close()
cluster.close()