In [1]:
import xarray as xr
import xclim as xc
import numpy as np
from clisops.core import subset
from dask.distributed import Client

import requests
from pathlib import Path
from zipfile import ZipFile

from memory_profiler import memory_usage
import time
from datetime import datetime as dt

from xclim.core.calendar import adjust_doy_calendar
import pandas as pd
from math import floor

In [2]:
c = Client(n_workers=1, threads_per_worker=3, memory_limit='3G', dashboard_address=8784)

# Benchmark xr.quantiles and friends

Compare options for computing percentiles when input data has NaNs. The test case will be `percentile_doy`, in various flavors.

In [4]:
# test data 1
basedir = Path('..')
url = 'https://code.dented-planet.net/Indices.zip'
test_files = [basedir / 'temp'/ 'Indices' / 'tasmin.nc', basedir / 'temp' / 'Indices' / 'tasmax.nc']
zipfile = basedir / 'temp' / 'indices.zip'
outdir = basedir / 'temp' / 'out'
outdir.mkdir(exist_ok=True)

if not all([tf.is_file() for tf in test_files]):
    if not zipfile.is_file():
        print('Downloading zip file')
        r = requests.get(url)
        with open(zipfile,'wb') as f:
            f.write(r.content)

    print('Extracting zip')
    with ZipFile(zipfile, 'r') as zipObj:
        zipObj.extractall('../temp')

ds = xr.open_mfdataset((basedir / 'temp' / 'Indices').glob('tasm*.nc'))
ds = ds.assign(tas=xc.atmos.tg(ds=ds)).chunk({'lat': 10})
ds

Unnamed: 0,Array,Chunk
Bytes,5.84 kB,5.84 kB
Shape,"(365, 2)","(365, 2)"
Count,5 Tasks,1 Chunks
Type,datetime64[ns],numpy.ndarray
"Array Chunk Bytes 5.84 kB 5.84 kB Shape (365, 2) (365, 2) Count 5 Tasks 1 Chunks Type datetime64[ns] numpy.ndarray",2  365,

Unnamed: 0,Array,Chunk
Bytes,5.84 kB,5.84 kB
Shape,"(365, 2)","(365, 2)"
Count,5 Tasks,1 Chunks
Type,datetime64[ns],numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.56 kB,160 B
Shape,"(160, 2)","(10, 2)"
Count,37 Tasks,16 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.56 kB 160 B Shape (160, 2) (10, 2) Count 37 Tasks 16 Chunks Type float64 numpy.ndarray",2  160,

Unnamed: 0,Array,Chunk
Bytes,2.56 kB,160 B
Shape,"(160, 2)","(10, 2)"
Count,37 Tasks,16 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,5.12 kB,5.12 kB
Shape,"(320, 2)","(320, 2)"
Count,5 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 5.12 kB 5.12 kB Shape (320, 2) (320, 2) Count 5 Tasks 1 Chunks Type float64 numpy.ndarray",2  320,

Unnamed: 0,Array,Chunk
Bytes,5.12 kB,5.12 kB
Shape,"(320, 2)","(320, 2)"
Count,5 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,74.75 MB,4.67 MB
Shape,"(365, 160, 320)","(365, 10, 320)"
Count,34 Tasks,16 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 74.75 MB 4.67 MB Shape (365, 160, 320) (365, 10, 320) Count 34 Tasks 16 Chunks Type float32 numpy.ndarray",320  160  365,

Unnamed: 0,Array,Chunk
Bytes,74.75 MB,4.67 MB
Shape,"(365, 160, 320)","(365, 10, 320)"
Count,34 Tasks,16 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,74.75 MB,4.67 MB
Shape,"(365, 160, 320)","(365, 10, 320)"
Count,34 Tasks,16 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 74.75 MB 4.67 MB Shape (365, 160, 320) (365, 10, 320) Count 34 Tasks 16 Chunks Type float32 numpy.ndarray",320  160  365,

Unnamed: 0,Array,Chunk
Bytes,74.75 MB,4.67 MB
Shape,"(365, 160, 320)","(365, 10, 320)"
Count,34 Tasks,16 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,74.75 MB,4.67 MB
Shape,"(365, 160, 320)","(365, 10, 320)"
Count,38 Tasks,16 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 74.75 MB 4.67 MB Shape (365, 160, 320) (365, 10, 320) Count 38 Tasks 16 Chunks Type float32 numpy.ndarray",320  160  365,

Unnamed: 0,Array,Chunk
Bytes,74.75 MB,4.67 MB
Shape,"(365, 160, 320)","(365, 10, 320)"
Count,38 Tasks,16 Chunks
Type,float32,numpy.ndarray


In [5]:
test2_files = [basedir / 'temp' / 'Indices' / 'taspavics.nc', basedir / 'temp' / 'Indices' / 'taspavics_na.nc']

if not all([tf.is_file() for tf in test2_files]):
    url = 'http://pavics.ouranos.ca:8083/twitcher/ows/proxy/thredds/dodsC/datasets/simulations/bias_adjusted/cmip5/ouranos/cb-oura-1.0/day_CanESM2_historical+rcp85_r1i1p1_na10kgrid_qm-moving-50bins-detrend_1950-2100.ncml'
    tas = subset.subset_bbox(xr.open_dataset(url, chunks=dict(time=256, lon=16)), lon_bnds=[-90, -60], lat_bnds=[45, 48], start_date='1981',end_date='2010').tasmin
    taswna = tas.where((tas.time.dt.dayofyear != 21))
    
    tas.to_netcdf(test2_files[0])
    taswna.to_netcdf(test2_files[1])

tas = xr.open_dataset(test2_files[0], chunks={'lon': 10}).tasmin
taswna = xr.open_dataset(test2_files[0], chunks={'lon': 10}).tasmin

In [6]:
def atomic_test(ds, func, outpath=outdir):
    outname = outpath / f"{func.__name__}_{int(dt.now().timestamp())}.nc"
    out = func(ds)
    out.to_netcdf(outname)
    return out
    
def perform_test(ds, func, outpath=outdir, N=1):
    res = []
    for i in range(N):
        print(f'Performing test with {func.__name__} ({i + 1} / {N})')
        t0wc = time.perf_counter()
        t0sys = time.process_time()
        mem_usage = memory_usage((atomic_test, (ds, func, outpath)), interval=0.1)
        res.append(
            {'wall_time': time.perf_counter() - t0wc,
             'sys_time': time.process_time() - t0sys,
             'max_mem_use': max(mem_usage)}
        )
    return res

In [7]:
# Tests
def xclim_023(
    arr: xr.DataArray, window: int = 5, per: float = 0.1
) -> xr.DataArray:
    """Percentile value for each day of the year.
    Return the climatological percentile over a moving window around each day of the year.
    Parameters
    ----------
    arr : xr.DataArray
      Input data.
    window : int
      Number of days around each day of the year to include in the calculation.
    per : float
      Percentile between [0,1]
    Returns
    -------
    xr.DataArray
      The percentiles indexed by the day of the year.
    """
    # TODO: Support percentile array, store percentile in coordinates.
    #  This is supported by DataArray.quantile, but not by groupby.reduce.
    rr = arr.rolling(min_periods=1, center=True, time=window).construct("window")

    # Create empty percentile array
    g = rr.groupby("time.dayofyear")

    p = g.reduce(np.nanpercentile, dim=("time", "window"), q=per * 100)

    # The percentile for the 366th day has a sample size of 1/4 of the other days.
    # To have the same sample size, we interpolate the percentile from 1-365 doy range to 1-366
    if p.dayofyear.max() == 366:
        p = adjust_doy_calendar(p.sel(dayofyear=(p.dayofyear < 366)), arr)

    p.attrs.update(arr.attrs.copy())
    return p

In [8]:
def xclim_024(
    arr: xr.DataArray,
    window: int = 5,
    per: float = 0.1,
    fill_ends=True,
) -> xr.DataArray:
    """Percentile value for each day of the year.
    Return the climatological percentile over a moving window around each day of the year.
    Parameters
    ----------
    arr : xr.DataArray
      Input data.
    window : int
      Number of days around each day of the year to include in the calculation.
    per : float
      Percentile between [0,1]
    fill_ends : boolean
      Pad first and last windows of the time-series with mean of the window to avoid nan values
    Returns
    -------
    xr.DataArray
      The percentiles indexed by the day of the year.
    """
    # TODO: Support percentile array, store percentile in coordinates.
    #  This is supported by DataArray.quantile, but not by groupby.reduce.
    rr = arr.rolling(min_periods=1, center=True, time=window).construct("window")

    # rolling automatically adds some nans to the windows on the time-series edges
    # Optionally fill nans in first and last n=floor(window/2) time steps with average window values for these
    if fill_ends:
        nstep = floor(window / 2)
        tmp1 = rr.isel(time=slice(0, nstep))
        tmp1 = tmp1.fillna(tmp1.mean(dim="window"))
        tmp2 = rr.isel(time=slice(len(rr.time) - nstep, None))
        tmp2 = tmp2.fillna(tmp2.mean(dim="window"))
        rr = xr.concat(
            [tmp1, rr.isel(time=slice(nstep, len(rr.time) - nstep)), tmp2], dim="time"
        )

    ind = pd.MultiIndex.from_arrays(
        (rr.time.dt.year.values, rr.time.dt.dayofyear.values),
        names=("year", "dayofyear"),
    )
    rrr = (
        rr.assign_coords(time=ind)
        .unstack("time")
        .stack(stack_dim=("year", "window"))
        .chunk(dict(stack_dim=-1))
    )

    # identify data where a xr.quantile(skipna=False) is ok
    mask = np.isnan(rrr).sum(dim="stack_dim")
    mask = (mask == 0) | (mask == len(rrr.stack_dim))
    p = rrr.quantile(q=per, dim="stack_dim", skipna=False)

    # if necessary use xr.quantile(skipna=True) only for grid cells / dayofyear with nans
    if np.any(~mask):
        p_nans = rrr.where(~mask, drop=True).quantile(
            q=per, dim="stack_dim", skipna=True
        )
        p = p.where(mask, p_nans.broadcast_like(p))


    # The percentile for the 366th day has a sample size of 1/4 of the other days.
    # To have the same sample size, we interpolate the percentile from 1-365 doy range to 1-366
    if p.dayofyear.max() == 366:
        p = adjust_doy_calendar(p.sel(dayofyear=(p.dayofyear < 366)), arr)

    p.attrs.update(arr.attrs.copy())
    return p

In [28]:
# This version replaces all NaNs by the mean along the window
def _calc_perc(arr, p=[50]):
    """Ufunc-like computing a percentile over the last axis of the array.

    Processes cases with invalid values separately, which makes it more efficent than np.nanpercentile for array with only a few invalid points.

    Parameters
    ----------
    arr : np.array
        Percentile is computed over the last axis.
    p : sequence of floats
        Percentile to compute, between 0 and 100. (the default is 50)

    Returns
    -------
    np.array
    """
    nan_count = np.isnan(arr).sum(axis=-1)
    out = np.moveaxis(np.percentile(arr, p, axis=-1), 0, -1)
    nans = (nan_count > 0) & (nan_count < arr.shape[-1])
    if np.any(nans):
        out_mask = np.stack([nans] * len(p), axis=-1)
        # arr1 = arr.reshape(int(arr.size / arr.shape[-1]), arr.shape[-1])
        # only use nanpercentile where we need it (slow performance compared to standard) :
        out[out_mask] = np.moveaxis(
            np.nanpercentile(arr[nans], p, axis=-1), 0, -1
        ).ravel()
    return out

def xclim_dev(
    arr: xr.DataArray,
    window: int = 5,
    per: float = 10,
) -> xr.DataArray:
    """Percentile value for each day of the year.

    Return the climatological percentile over a moving window around each day of the year.
    All NaNs are skipped.

    Parameters
    ----------
    arr : xr.DataArray
      Input data, a daily frequency is required.
    window : int
      Number of days around each day of the year to include in the calculation.
    per : float or sequence of floats
      Percentile between [0, 100]

    Returns
    -------
    xr.DataArray
      The percentiles indexed by the day of the year.
      For calendars with 366 days, percentiles of doys 1-365 are interpolated to the 1-366 range.
    """
    rr = arr.rolling(min_periods=1, center=True, time=window).construct("window")

    ind = pd.MultiIndex.from_arrays(
        (rr.time.dt.year.values, rr.time.dt.dayofyear.values),
        names=("year", "dayofyear"),
    )
    rrr = (
        rr.assign_coords(time=ind)
        .unstack("time")
        .stack(stack_dim=("year", "window"))
        .chunk(dict(stack_dim=-1))
    )

    if np.isscalar(per):
        per = [per]

    p = xr.apply_ufunc(
        _calc_perc,
        rrr,
        input_core_dims=[['stack_dim']],
        output_core_dims=[['percentiles']],
        keep_attrs=True,
        kwargs=dict(p=per),
        dask='parallelized',
        output_dtypes=[rrr.dtype],
        output_sizes={"percentiles": len(per)}
    )
    p = p.assign_coords(
        percentiles=xr.DataArray(per, dims=('percentiles',))
    )

    # The percentile for the 366th day has a sample size of 1/4 of the other days.
    # To have the same sample size, we interpolate the percentile from 1-365 doy range to 1-366
    if p.dayofyear.max() == 366:
        p = adjust_doy_calendar(p.sel(dayofyear=(p.dayofyear < 366)), arr)

    p.attrs.update(arr.attrs.copy())
    return p

# Test laziness

In [31]:
%%time
out = xclim_024(ds.tas)
# nothing should have been executed

CPU times: user 780 ms, sys: 42.2 ms, total: 822 ms
Wall time: 2.67 s


In [30]:
%%time
out = xclim_dev(ds.tas)

CPU times: user 35.5 ms, sys: 9.41 ms, total: 44.9 ms
Wall time: 42 ms


# Tests with Test dataset 1

In [32]:
res = perform_test(ds.tas, xclim_024, N=5)
print('xclim 0.24 - Version of Travis with fill_ends')
print(f"Mean wall time : {np.mean([r['wall_time'] for r in res])}")

Performing test with xclim_024 (1 / 5)
Performing test with xclim_024 (2 / 5)
Performing test with xclim_024 (3 / 5)
Performing test with xclim_024 (4 / 5)
Performing test with xclim_024 (5 / 5)
xclim 0.24 - Version of Travis with fill_ends
Mean wall time : 22.82041727179894


In [33]:
res = perform_test(ds.tas, xclim_dev, N=5)
print('xclim dev - Version of Pascal with apply_ufunc')
print(f"Mean wall time : {np.mean([r['wall_time'] for r in res])}")

Performing test with xclim_dev (1 / 5)
Performing test with xclim_dev (2 / 5)
Performing test with xclim_dev (3 / 5)
Performing test with xclim_dev (4 / 5)
Performing test with xclim_dev (5 / 5)
xclim dev - Version of Pascal replacing ALL NaNs
Mean wall time : 35.72202802119428


In [10]:
res = perform_test(ds.tas, xclim_023, N=1)
print('xclim 0.23 - Previous laggy version.')
print(f"Mean wall time : {np.mean([r['wall_time'] for r in res])}")

Performing test with xclim_023 (1 / 1)
































xclim 0.23 - Previous laggy version.
Mean wall time : 2198.434556741995


## Verification of xclim_dev

In [25]:
outdev = atomic_test(ds.tas.isel(lat=0, lon=0), xclim_dev)
out24 = atomic_test(ds.tas.isel(lat=0, lon=0), xclim_024)
diff = (out24 - outdev)
(diff == 0).all().values

array(True)

In [26]:
out23 = atomic_test(ds.tas.isel(lat=0, lon=0), xclim_023)
diff = (out24 - out23)
(diff == 0).all().values































array(False)

# Tests with Test dataset 2

In [10]:
res = perform_test(tas, xclim_024, N=5)
print('xclim 0.24 - Version of Travis with fill_ends')
print(f"Mean wall time : {np.mean([r['wall_time'] for r in res])}")

res = perform_test(taswna, xclim_024, N=5)
print('xclim 0.24 - Version of Travis with fill_ends - with NaN')
print(f"Mean wall time : {np.mean([r['wall_time'] for r in res])}")

Performing test with xclim_024 (1 / 5)
Performing test with xclim_024 (2 / 5)
Performing test with xclim_024 (3 / 5)
Performing test with xclim_024 (4 / 5)
Performing test with xclim_024 (5 / 5)
xclim 0.24 - Version of Travis with fill_ends
Mean wall time : 86.16731039119186
Performing test with xclim_024 (1 / 5)
Performing test with xclim_024 (2 / 5)
Performing test with xclim_024 (3 / 5)
Performing test with xclim_024 (4 / 5)
Performing test with xclim_024 (5 / 5)
xclim 0.24 - Version of Travis with fill_ends - with NaN
Mean wall time : 174.39460554339456


In [12]:
res = perform_test(tas, xclim_dev, N=3)
print('xclim dev - Version of Pascal replacing ALL NaNs')
print(f"Mean wall time : {np.mean([r['wall_time'] for r in res])}")

res = perform_test(taswna, xclim_dev, N=3)
print('xclim dev - Version of Pascal replacing ALL NaNs - with NaN')
print(f"Mean wall time : {np.mean([r['wall_time'] for r in res])}")

Performing test with xclim_dev (1 / 3)
Performing test with xclim_dev (2 / 3)
Performing test with xclim_dev (3 / 3)
xclim dev - Version of Pascal replacing ALL NaNs
Mean wall time : 62.10716049099574
Performing test with xclim_dev (1 / 3)
Performing test with xclim_dev (2 / 3)
Performing test with xclim_dev (3 / 3)
xclim dev - Version of Pascal replacing ALL NaNs - with NaN
Mean wall time : 64.38503600898548


In [None]:
res = perform_test(tas, xclim_023, N=1)
print('xclim 0.23 - Previous laggy version.')
print(f"Mean wall time : {np.mean([r['wall_time'] for r in res])}")

res = perform_test(taswna, xclim_023, N=1)
print('xclim 0.23 - Previous laggy version. - with NaN')
print(f"Mean wall time : {np.mean([r['wall_time'] for r in res])}")

## Verification of xclim_dev

In [None]:
outdev = atomic_test(tas, xclim_dev)
out24 = atomic_test(tas, xclim_024)
diff = (out24 - outdev)
(diff == 0).all().values

In [13]:
outdev = atomic_test(taswna, xclim_dev)
out24 = atomic_test(taswna, xclim_024)
diff = (out24 - outdev)
diff.fillna(0) == 0).all().values

array(False)