In [24]:
import numpy as np
import pandas as pd
import xarray as xr
from datetime import timedelta
import warnings


def forecast_error(forecast, valid, method='mse', axis=None, weighted=False, climatology=None):
    """
    Calculate the error of a time series model forecast.
    :param forecast: ndarray or DataArray: forecast from a DLWP model (forecast hour is first axis)
    :param valid: ndarray or DataArray: validation target data for the predictors the forecast was made on
    :param method: str: method for computing the error. Options are:
        'mse': mean squared error
        'mae': mean absolute error
        'rmse': root-mean-squared error
        'acc': anomaly correlation coefficient
        'cos': cosine similarity score
    :param axis: int, tuple, or None: take the mean of the error along this axis. Regardless of this setting, the
        forecast hour will be the first dimension. Note that for cosine similarity it is recommended to explicitly
        specify the spatial axes.
    :param weighted: bool: if True, expects inputs to be DataArrays with 'lat' as one of the dimensions, and weights
        according to the latitude
    :param climatology: ndarray or DataArray: mean climatology state for computing the ACC score. Dimensions other than
        axis 0 (forecast hour) and axis 1 (time) must match that of the forecast/valid arrays. If either of the first
        two axes are included, they must be size 1 or (for time) match the time dimension.
    :return: ndarray: forecast error with forecast hour as the first dimension
    """
    assert method in ['mse', 'mae', 'rmse', 'acc', 'cos'], "'method' must be one of 'mse', 'mae', 'rmse', 'acc', 'cos'"
    if method in ['acc', 'cos'] and climatology is None:
        warnings.warn("'acc' and 'cos' error methods expect to get a climatology; using 0 instead, which may yield "
                      "unexpected results.")
        climatology = 0.
    n_f = forecast.shape[0]
    if weighted:
        weights = np.cos(np.deg2rad(valid.lat))
        weights /= weights.mean()
    else:
        weights = 1.
    print(len(forecast.shape))
    print(len(valid.shape))
    if len(forecast.shape) == len(valid.shape):
        # valid provided with a forecast hour dimension 0
        if axis is None:
            axis = tuple(range(1, len(valid.shape)))
        if method == 'mse':
            return np.nanmean((valid - forecast) ** 2. * weights, axis=axis)
        elif method == 'mae':
            return np.nanmean(np.abs((valid - forecast) * weights), axis=axis)
        elif method == 'rmse':
            return np.sqrt(np.nanmean((valid - forecast) ** 2. * weights, axis=axis))
        elif method == 'acc':
            return (np.nanmean((valid - climatology) * (forecast - climatology) * weights, axis=axis)
                    / np.sqrt(np.nanmean((valid - climatology) ** 2. * weights, axis=axis) *
                              np.nanmean((forecast - climatology) ** 2. * weights, axis=axis)))
        elif method == 'cos':
            # TODO: be consistent and return a np.ndarray. Need to figure out how to do the dot operation in numpy
            if not isinstance(forecast, xr.DataArray):
                raise TypeError("'cos' method requires xarray DataArrays for now")
            dims = [valid.dims[axis]] if isinstance(axis, int) else [valid.dims[a] for a in axis]
            return ((forecast - climatology).dot((valid - climatology) * weights, dims=dims) /
                    (np.linalg.norm((forecast - climatology) * weights, axis=axis) *
                     np.linalg.norm((valid - climatology) * weights, axis=axis)))
    else:
        # valid provided as a continuous time series without a forecast hour dimension
        if len(climatology.shape) >= len(valid.shape) and climatology.shape[0] > 1:
            raise ValueError("'climatology' cannot have non-spatial dimensions != 1 if the verification data is not "
                             "provided with a forecast hour dimension")
        n_val = valid.shape[0]
        me = []
        for f in range(n_f):
            if method == 'mse':
                me.append(np.nanmean((valid[f:] - forecast[f, :(n_val - f)]) ** 2. * weights, axis=axis))
            elif method == 'mae':
                me.append(np.nanmean(np.abs((valid[f:] - forecast[f, :(n_val - f)]) * weights), axis=axis))
            elif method == 'rmse':
                me.append(np.sqrt(np.nanmean((valid[f:] - forecast[f, :(n_val - f)]) ** 2. * weights, axis=axis)))
            elif method == 'acc':
                return (np.nanmean((valid[f:] - climatology) * (forecast[f, :(n_val - f)] - climatology), axis=axis)
                        / np.sqrt(np.nanmean((valid[f:] - climatology) ** 2., axis=axis) *
                                  np.nanmean((forecast[f, :(n_val - f)] - climatology) ** 2., axis=axis)))
            elif method == 'cos':
                return (forecast[f, :(n_val - f)] - climatology).dot(valid[f:] - climatology, dims=axis) / \
                       (np.linalg.norm(forecast[f, :(n_val - f)] - climatology, axis=axis) *
                        np.linalg.norm(valid[f:] - climatology, axis=axis))
        return np.array(me)

def verification_from_samples(ds, all_ds=None, init_times=None, forecast_steps=1, dt=6, f_hour_timedelta_type=True,
                              include_zero=False):
    """
    Generate a DataArray of forecast verification from a validation DataSet built using Preprocessor.data_to_samples().
    :param ds: xarray.Dataset: dataset of verification data. Time is the first dimension.
    :param all_ds: xarray.Dataset: optional Dataset containing the same variables/levels/lat/lon as val_ds but
        including more time steps for more robust handling of data at times outside of the validation selection
    :param init_times: iterable of Timestamps: optional list of verification initialization times
    :param forecast_steps: int: number of forward forecast iterations
    :param dt: int: forecast time step in hours
    :param f_hour_timedelta_type: bool: if True, converts f_hour dimension into a timedelta type. May not always be
        compatible with netCDF applications.
    :param include_zero: bool: if True, include the 0 forecast hour (initialization)
    :return: xarray.DataArray: verification with forecast hour as the first dimension
    """
    forecast_steps = int(forecast_steps)
    if forecast_steps < 1:
        raise ValueError("'forecast_steps' must be an integer >= 1")
    dt = int(dt)
    if dt < 1:
        raise ValueError("'dt' must be an integer >= 1")
    if init_times is None:
        init_times = ds.sample.values
    dims = [d for d in ds.predictors.dims if d.lower() not in ['time_step', 'sample', 'time']]
    f_hour = np.arange(0 if include_zero else dt, dt * forecast_steps + 1, dt)
    if f_hour_timedelta_type:
        f_hour = np.array(f_hour).astype('timedelta64[h]')
    verification = xr.DataArray(
        np.full([forecast_steps + int(include_zero), len(init_times)] + [ds.dims[d] for d in dims],
                np.nan, dtype=np.float32),
        coords=[f_hour, init_times] + [ds[d] for d in dims],
        dims=['f_hour', 'time'] + dims,
        name='verification'
    )
    if all_ds is not None:
        valid_da = all_ds.predictors.isel(time_step=-1)
    else:
        valid_da = ds.predictors.isel(time_step=-1)
    for d, date in enumerate(init_times):
        verification[:, d] = valid_da.reindex(
            sample=pd.date_range(date if include_zero else date + np.timedelta64(timedelta(hours=dt)),
                                 date + np.timedelta64(timedelta(hours=dt * forecast_steps)),
                                 freq='%sH' % int(dt)),
            method=None
        ).values
    return verification

In [2]:
import os
os.chdir(os.pardir)

root_directory = '/Users/calledoux/cubed_sphere_implementation/'
predictor_file = os.path.join(root_directory, 'ERA5', 'tutorial_z500_t2m_CS.nc')
scale_file = os.path.join(root_directory, 'ERA5', 'tutorial_z500_t2m.nc')

model = os.path.join(root_directory, 'dlwp-cs_tutorial')
map_files = ('map_LL91x180_CS48.nc', 'map_CS48_LL91x180.nc')

In [3]:
io_selection = {'varlev': ['z/500', 't2m/0']}
add_solar = True
io_time_steps = 2

In [4]:
import numpy as np
import pandas as pd
import xarray as xr

validation_set = pd.date_range('2016-12-31', '2018-12-31', freq='6H')
validation_set = np.array(validation_set, dtype='datetime64[ns]')

In [5]:
all_ds = xr.open_dataset(predictor_file)
predictor_ds = all_ds.sel(sample=validation_set)

In [44]:
from DLWP.model import SeriesDataGenerator

sequence = dlwp._n_steps if hasattr(dlwp, '_n_steps') and dlwp._n_steps > 1 else None
val_generator = SeriesDataGenerator(dlwp, predictor_ds, rank=3, add_insolation=add_solar,
                                    input_sel=io_selection, output_sel=io_selection,
                                    input_time_steps=io_time_steps, output_time_steps=io_time_steps,
                                    shuffle=False, sequence=sequence, batch_size=32,
                                    load=False, channels_last=is_channels_last(dlwp))

ModuleNotFoundError: No module named 'DLWP'

In [6]:
ds_forecast = xr.open_dataset("/Users/calledoux/forecast_dlwp-cs_tutorial.nc.cs")

In [31]:
xr.Dataset.to_array(predictor_ds)

In [42]:
xr.Dataset.to_array(ds_forecast)

In [None]:
np.nanmean((valid - forecast) ** 2.)

In [32]:
xr.Dataset.to_array(ds_forecast)[0,:,0,0,:,:,:]

In [43]:
xr.Dataset.to_array(predictor_ds)[2,:,:,:,4:24,0]

In [None]:
xr.Dataset.to_array(predictor_ds)