# Distributional Regression Netwrok verification

For predictions out of MLPP

In [2]:
import datetime
import numpy as np
from scipy.stats import norm
import pandas as pd
import xarray as xr
import properscoring as ps
from dt_output import get_dt_output

from verif.obs.utils import get_obs_serie
from verif.models.verif import VerifModelStation

In [4]:

class VerifDRN(VerifModelStation):
    """ Class for DRN verification:
    Args:
        station (str): WMO station id.
        dt_start (datetime.datetime): The start model basetime
        dt_end (datetime.datetime): The end model basetime
        fcast_window (int): number of days to include in the obs windows after dt_end
        obs_source (str): obs data source ('DDB_obs', 'API_obs')
        model (str): model to verify (need to be referenced in the Yaml)
        model_vars (list): list of model variables we want to verify
        freq (str): frequency requested (None: all points, 'hourly', '10min' )
        api_key (str): 1 min obs API key
    """
    def __init__(self,
                 station_id,
                 dt_start,
                 dt_end,
                 obs_source,
                 model,
                 model_vars,
                 fcast_window=16,
                 freq='hourly',
                 api_key=None
                 ):
        super().__init__(station_id,
                         dt_start,
                         dt_end,
                         obs_source,
                         model,
                         model_vars,
                         fcast_window,
                         freq,
                         api_key)
        # obs query
        self.obs_ds = super().query_obs()

        self.verif_ds_list = []

    def verify_vars(self):
        """ DeepThought Prob verification 
            Outputs a list of xr dataset for each requested model_var
        """

        for model_var in self.model_vars:
            # get deepthought forecast from the archive
            try:
                dt_ds = get_dt_output(self.station_id,
                                    self.dt_start,
                                    self.dt_end,
                                    model_var,
                                    source='S3',
                                    dt_kind=self.model,
                                    output=None)
            except:
                print(f"No {self.model} data for station {self.station_id}/{model_var}")
                return
    
            # observation mapping
            mapping = dict(zip(pd.to_datetime(self.obs_ds.time.values),
                               self.obs_ds[f'{model_var}_obs'].values))
            map_func = np.vectorize(lambda x: mapping.get(x, np.nan))

            # apply the mapping function to the 'validtime' variable 
            obs_predictand = map_func(pd.to_datetime(dt_ds['validtime'].values)).astype(np.float32)
            # assign with the same dimensions
            dt_ds[f'{model_var}_obs'] = (('basetime', 'prognosis_period'), obs_predictand)
            # add metadata
            dt_ds[f'{model_var}_obs'].attrs = self.obs_ds[f'{model_var}_obs'].attrs

            # if dt_kind=='ePD':
            # full prob distribution
            if dt_ds.attrs['pdf_type']==3:
                # observation probabiltiy
                p_obs = xr.apply_ufunc(np.interp,
                                dt_ds[f'{model_var}_obs'],
                                dt_ds['pdf'].isel(pdf_parameter=0),
                                dt_ds['pdf'].isel(pdf_parameter=1),
                                exclude_dims=set(('pdf_index',)),
                                input_core_dims=[[], ["pdf_index"], ["pdf_index"]],
                                vectorize=True)
                dt_ds['p_obs'] = p_obs.astype(np.float32)

                # negative log likelihood
                dt_ds['negloglik'] = -np.log(dt_ds['p_obs'])

                # cumulative proba (for PIT histogram)
                cp_obs = xr.apply_ufunc(np.interp,
                                dt_ds[f'{model_var}_obs'],
                                dt_ds['cdf'].isel(pdf_parameter=0),
                                dt_ds['cdf'].isel(pdf_parameter=1),
                                exclude_dims=set(('pdf_index',)),
                                input_core_dims=[[], ["pdf_index"], ["pdf_index"]],
                                vectorize=True)
                dt_ds['cp_obs'] = cp_obs.astype(np.float32)

                # crps
                crps = xr.apply_ufunc(ps.crps_ensemble,
                                dt_ds[f'{model_var}_obs'],
                                dt_ds['pdf'].isel(pdf_parameter=0),
                                dt_ds['pdf'].isel(pdf_parameter=1),
                                exclude_dims=set(('pdf_index',)),
                                input_core_dims=[[], ["pdf_index"], ["pdf_index"]],
                                vectorize=True)
                dt_ds['crps'] = crps.astype(np.float32)

            # elif dt_kind=='DLITE':
            # normal distribution (mean + sd)
            elif dt_ds.attrs['pdf_type'] in [0,8]:
                # observation probabiltiy (for the normal dist)
                p_obs = xr.apply_ufunc(norm.pdf,
                                dt_ds[f'{model_var}_obs'],
                                dt_ds[f'{model_var}_PDF_parameter'].isel(pdf_parameter=0),
                                dt_ds[f'{model_var}_PDF_parameter'].isel(pdf_parameter=1),
                                exclude_dims=set(('pdf_index',)),
                                input_core_dims=[[], ["pdf_index"], ["pdf_index"]],
                                vectorize=True)
                dt_ds['p_obs'] = p_obs.astype(np.float32)

                # negative log likelihood
                dt_ds['negloglik'] = -np.log(dt_ds['p_obs'])

                # cumulative proba (for PIT histogram)
                cp_obs = xr.apply_ufunc(norm.cdf,
                                dt_ds[f'{model_var}_obs'],
                                dt_ds[f'{model_var}_PDF_parameter'].isel(pdf_parameter=0),
                                dt_ds[f'{model_var}_PDF_parameter'].isel(pdf_parameter=1),
                                exclude_dims=set(('pdf_index',)),
                                input_core_dims=[[], ["pdf_index"], ["pdf_index"]],
                                vectorize=True)
                dt_ds['cp_obs'] = cp_obs.astype(np.float32)

                # crps
                crps = xr.apply_ufunc(ps.crps_gaussian,
                                dt_ds[f'{model_var}_obs'],
                                dt_ds[f'{model_var}_PDF_parameter'].isel(pdf_parameter=0),
                                dt_ds[f'{model_var}_PDF_parameter'].isel(pdf_parameter=1),
                                exclude_dims=set(('pdf_index',)),
                                input_core_dims=[[], ["pdf_index"], ["pdf_index"]],
                                vectorize=True)
                dt_ds['crps'] = crps.astype(np.float32)

            self.verif_ds_list.append(dt_ds[['validtime', f'{model_var}_obs', 'mean', 'var', 'std', 'p_obs', 'cp_obs', 'negloglik', 'crps']])
        
        return self.verif_ds_list

In [6]:
verif = VerifDRN(station_id='93439',
                 dt_start=datetime.datetime(2022,11, 1),
                 dt_end=datetime.datetime(2022,11, 30),
                 obs_source='DDB_obs',
                 model='MLPP',
                 model_vars=['TTTTT'],
                 fcast_window=16,
                 freq='hourly')
verif.query_obs()