# Computing climate indicators

This notebook will get you started on the use of `xclim` to subset netCDF arrays and compute climate indicators, taking advantage of parallel processing capabilities offered by `xarray` and `dask`. 

`xarray` is a python package making it easy to work with n-dimensional arrays. It labels axes with their names (time, lat, lon, level) instead of indices (0,1,2,3), reducing the likelihood of bugs and making the code easier to understand. One of the key strengths of `xarray` is that it knows how to deal with non-standard calendars (I'm looking at you 360_days) and can easily resample daily time series to weekly, monthly, seasonal or annual periods.  Finally, `xarray` is tightly inegrated with `dask`, a package that can automatically parallelize operations.


In [None]:
# XCLIM and xarray
from __future__ import annotations

import glob

# file handling libraries
import os
import tempfile
import time
from pathlib import Path

import dask
import numpy as np
import xarray as xr

import xclim.atmos as atmos
import xclim.indices as indices

# Output folder
outfolder = Path(tempfile.mkdtemp())

## 1. Setting up the Dask client - Parallel processing / workers

First we create a pool of workers that will wait for jobs. The `xarray` library will automatically connect to these workers and and dispatch them jobs that can be run in parallel. 

The dashboard link lets you see in real time how busy those workers are. 

In [None]:
from distributed import Client

client = Client(
    n_workers=2, threads_per_worker=10, dashboard_address=8788, memory_limit="6GB"
)
# client=Client(n_workers=1)
client

## 2. Finding data files 

In [None]:
infolder = "<path_to_data>/cb-oura-1.0/"

# Get list of files for tasmax
rcps = ["rcp45", "rcp85"]
v = "tasmax"
r = rcps[0]
search_str = os.path.join(infolder, f"{v}*CanESM*{r}*.nc")
sim_files = sorted(glob.glob(search_str))
print(len(sim_files))

## 3. Creating xarray datasets

To open a netCDF file with `xarray`, we use `xr.open_dataset(<path to file>)`. But by default, the entire file is stored in one chunk, so there is no parallelism. To trigger parallel computations, we need to explicitly specify the *chunk* size. 

`Dask`' parallelism is based on memory chunks. We need to tell `xarray` to split our netCDF array into chunks of a given size, and operations on each chunk of the array will automatically be dispatched to the workers. 

In [None]:
# This file is opened as one big chunk: no parallel processing.
ds = xr.open_dataset(sim_files[0])
print(ds.tasmax)

In [None]:
# Chunked in memory along the time dimension.
# Note that the data type is a 'dask.array'. xarray will automatically use client workers
ds = xr.open_dataset(sim_files[0], chunks={"time": 31})
print(ds.tasmax)
ds.tasmax.chunks

### 3.1. Multifile dataset
netCDF files are often split into periods to keep file size manageable. A single dataset can be split in dozens of individual files. `xarray` has a function `open_mfdataset` that can open and aggregate a list of files and construct a unique *logical* dataset. `open_mfdataset` can aggregate files over coordinates (time, lat, lon) and variables. 

Note that opening a multi-file dataset automatically chunks the array (one chunk per file).

Note also that because `xarray` reads every file metadata to place it in a logical order, it can take a while to load. 

In [None]:
# Create multi-file data & chunks
ds = xr.open_mfdataset(sim_files, chunks={"time": 365, "lat": 50 * 2, "lon": 56 * 2})
ds = ds.drop("time_vectors")
ds = ds.drop("ts")
print(ds)

## 4. Subsetting utilities

### subset_bbox : using a latitude-longitude bounding box

In [None]:
from clisops.core import subset

lat_bnds = [45, 60]
lon_bnds = [-55, -82]

ds1 = subset.subset_bbox(ds, lat_bnds=lat_bnds, lon_bnds=lon_bnds)
print(ds1)

### Add start and/or end years

Note that in the next release, we'll use datetime objects instead of a year integer to specify start and end points.

In [None]:
ds2 = subset.subset_bbox(
    ds, lat_bnds=lat_bnds, lon_bnds=lon_bnds, start_yr=1981, end_yr=2010
)
print(ds2)
print(" ")

# subset years only
ds2 = subset.subset_bbox(ds, start_yr=1981, end_yr=2010)
print(ds2)

### Select a single grid point 

In [None]:
lon_pt = -70.0
lat_pt = 50.0

ds3 = subset.subset_gridpoint(ds, lon=lon_pt, lat=lat_pt, start_yr=1981)
print(ds3)

### Nothing has been computed so far !

If you look at the output of those operations, they're identified as `dask.array` objects. What happens is that `dask` creates a chain of operations that when executed, will yield the values we want. But as long as we don't explicitly ask for a value, no computation will occur. 

You can trigger computations by using the `load` or `compute` method, or writing the output to disk. 

## 5. Climate index calculation & resampling frequencies

`xclim` has two layers for the calculation of indicators. The bottom layer is composed of a list of functions that take a `xarray.DataArray` as an input and return an `xarray.DataArray` as output. You'll find these functions in `xclim.indices`. The indicator's logic is contained in this function, as well as potential unit conversions, but it doesn't check if the time frequency is daily, and doesn't not adjust the meta data of the output array. 

The second layer are class instances that you'll find organized by *realm*. So far, there is only one realm (atmospheric) available in `xclim.atmos`, but we'll be working on `ice` and `land` indicators in 2020. Before running computations, these classes check the input data is a daily average of the expected variable: 
1. If an indicator expects a daily mean and you pass it a daily max, a `warning` will be raised. 
2. After the computation, it also checks the number of values per period to make sure there are not missing values or `NaN` in the input data. If there are, the output is going to be set to `NaN`. 
3. The output units are set correctly as well as other properties of the output array, complying as much as possible with CF conventions. 

For new users, we suggest you use the classes found in `xclim.atmos`. If you know what you're doing and you want to circumvent the built-in checks, then you can use the `xclim.indices` directly. 

All `xclim` indicators convert daily data to lower time frequencies, such as monthly or annual values. This is done using `xarray.DataArray.resample` method. Resampling creates a grouped object over which you apply a reduction operation (e.g. mean, min, max). The list of available frequency is given in the link below, but the most often used are: 

- YS: annual starting in January
- YS-JUL: annual starting in July
- MS: monthly
- QS-DEC: seasonal starting in December
- 7D: 7 day (weekly)


http://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#timeseries-offset-aliases  
Note - not all offsets in the link are supported by cftime objects in `xarray`


In the example below, we're computing the **annual maximum temperature of the daily maximum temperature (tx_max)**

In [None]:
fr = "YS"
ds1.tasmax.attrs["cell_methods"] = "time: maximum within days"
out = atmos.tx_max(ds1.tasmax, freq=fr)
print("Number of time-steps using freq == ", fr, " : ", len(out.time), "\n")
print(out.time)

### Example output using `atmos` vs `indices` modules
The `atmos` module adds CF metadata attributes to the output variable

In [None]:
out30d = atmos.tx_days_above(ds1.tasmax, thresh="25 C", freq=fr)
print(
    "output atmos : \n",
    out30d,
    "\n\n\n",
)

out30d = indices.tx_days_above(ds1.tasmax, thresh="25 C", freq=fr)
print("output indices : \n", out30d)

In [None]:
# We have created an xarray data-array - We can insert this into an output dataset object
# Create an xarray dataset object - copy original dataset global attrs
dsOut = xr.Dataset(data_vars=None, coords=out.coords, attrs=ds1.attrs)
# Add our climate index as a data variable to the dataset
dsOut[out.name] = out
print(dsOut)

## 7. xclim computations are *lazy*

Up until now we have ony created a schedule of tasks with a small preview, not done any actual computations. As mentionned above, writing the output to disk will trigger the cascade of computations on all the chunks. 

In [None]:
outfile = outfolder / "test_tx_max.nc"
start = time.time()
dsOut.to_netcdf(outfile, format="NETCDF4")
end = time.time()
print("calculation took ", end - start, "s")

### Optimizing the chunk size

You can improve performance by being smart about chunk sizes. If chunks are too small, there is a lot of time lost in overhead. If chunks are too large, you may end up exceeding the individual worker memory limit. 

In [None]:
print(ds1)

In [None]:
ds1 = ds1.chunk(chunks={"time": 365, "lon": -1, "lat": -1})
print(ds1)

In [None]:
out = atmos.tx_max(ds1.tasmax, freq=fr)
dsOut = xr.Dataset(data_vars=None, coords=out.coords, attrs=ds1.attrs)
dsOut[out.name] = out

start = time.time()

dsOut.to_netcdf(outfile, format="NETCDF4")

end = time.time()
print("calculation took ", end - start, "s")

### XCLIM unit handling 

A lot of effort has been placed into automatic handling of input data units.  `xclim` will automatically detect the input variable(s) units (e.g. °C versus K or mm/s versus mm/day etc.) and adjust on-the-fly in order to calculate indices in the consistent manner.  This comes with the obvious caveat that input data requires metadata attribute for units

In the example below, we compute weekly total precipitation in mm using inputs of mm/s and mm/d.

In [None]:
dsPr = xr.open_dataset(sim_files[0].replace("tasmax", "pr"), chunks={"time": 31}).drop(
    ["ts", "time_vectors"]
)
dsPr = subset.subset_gridpoint(dsPr, lon=lon_pt, lat=lat_pt)

# Create a copy of the data converted to mm d-1
dsPr_mmd = dsPr.copy()
dsPr_mmd["pr"].values = dsPr.pr.values * 3600 * 24
dsPr_mmd.pr.attrs["units"] = "mm d-1"

print(dsPr.pr.values[0:31], "\n")
print(dsPr_mmd.pr.values[0:31])

In [None]:
out1 = atmos.precip_accumulation(dsPr.pr, freq="MS")
print(
    "1. results using inputs in mm/s : \n\n",
    "units :",
    out1.units,
    "\n",
    out1.values,
    "\n",
)

out2 = atmos.precip_accumulation(dsPr_mmd.pr, freq="MS")
print(
    "2. results using inputs in mm/d : \n\n",
    "units :",
    out2.units,
    "\n",
    out2.values,
    "\n",
)

#### Threshold indices

`xclim` unit handling also applies to threshold indicators. Users can provide threshold in units of choice and `xclim` will adjust automatically.  For example determining the number of days with tasmax > 20°C users can define a threshold input of '20 C' or '20 degC' even if input data is in Kelvin.  Alernatively users could send provide a threshold in Kelvin '293.15 K' (if they really wanted to)

In [None]:
# Original data in Kelvin
dsTasmax = xr.open_dataset(sim_files[0], chunks={"time": 31}).drop(
    ["ts", "time_vectors"]
)
dsTasmax.tasmax.attrs["cell_methods"] = "time: maximum within days"
dsTasmax = subset.subset_gridpoint(dsTasmax, lon=lon_pt, lat=lat_pt)

# Create a copy of the data converted to C
dsTasmax_C = dsTasmax.copy()
dsTasmax_C["tasmax"].values = dsTasmax.tasmax.values - 273.15
dsTasmax_C.tasmax.attrs["units"] = "C"

print(dsTasmax.tasmax.values[0:31], "\n")
print(dsTasmax_C.tasmax.values[0:31])

In [None]:
# Using Kelvin data
out1 = atmos.tx_days_above(dsTasmax.tasmax, thresh="20 C", freq="MS")
print("1. results using inputs in °K  : threshold in °C: \n\n", out1.values, "\n")

# Using Celsius data
out2 = atmos.tx_days_above(dsTasmax_C.tasmax, thresh="20 C", freq="MS")
print("2. results using inputs in °C : threshold in °C\n\n", out2.values)

# Using Celsius but with threshold in Kelvin
out3 = atmos.tx_days_above(dsTasmax_C.tasmax, thresh="293.15 K", freq="MS")
print("\n3. results using inputs in °C : threshold in K: \n\n", out3.values)