Comparing several methods for creating out of sample predictive intervals

Methods:
- Quantile of residuals
- Quantile regression
- Jacknife+
- Bayesian regression


In [None]:
import os
from abc import ABC, abstractmethod
import pandas as pd
import numpy as np
import pystan as ps
import statsmodels.api as sm
import statsmodels.formula.api as smf
import matplotlib.pyplot as plt

In [None]:
from typing import Union, Optional, NoReturn, Dict, Tuple

# Data Simulator

Generate random data

In [None]:
class DataSimulator:
    """Class encapsulating process of simulating data from a linear model.
    """
    
    def __init__():
        pass
    
    def generate_lm_data(n_points: int = 10_000,
                         pct_train: float = 0.5,
                         alpha: float = 1.,
                         beta: float = 2.,
                         sigma: float = np.sqrt(3.)) -> Dict[str, Dict[str, np.ndarray]]:
        """Method to generate data according to following linear model:
            y = alpha + beta * x + N(0, sigma^2)
        Once data is generated, split it into training set and test set.

        Parameters
        ----------
        n_points: int
            Number of points we want to generate
        alpha: float, optional, default = 1.
            Intercept parameter of the linear model
        beta: float, optional, default = 2.
            Explanatory variable parameter of the linear model
        sigma: float, optional, default = sqrt(3.)
            Standard deviation of the normal error

        Returns
        -------
        data: Dict[str, np.ndarray]
            Dictionary containing dictionary traing and test data
        """
        # Sample data/realization
        x = np.random.randn(n_points)
        y = alpha + beta*x + sigma * np.random.randn(n_points)

        # Split data into traing and test set
        n_train = int(pct_train * n_points)

        # Create training data and test data
        y_train = y[:n_train]
        x_train = x[:n_train]

        y_test = y[n_train:]
        x_test = x[n_train:]

        # Stack columns together
        data_train = {'y': y_train, 'x': x_train}
        data_test = {'y': y_test, 'x': x_test}

        data = {'train': data_train,
                'test': data_test}
        return data

In [None]:
data_sim = DataSimulator.generate_lm_data()

Experiment parameters

In [None]:
# Number of data points to sample
n = 10000
pct_train = 0.5

# Model parameters
sigma = 3.
mu = 1.
beta = 2.

# quantile
alpha = 0.05

Implement abstract version of a class for predictive intervals

In [None]:
class PredictiveIntervalModel(ABC):
    """Abstract class encapsulating methods reletaed to child classes that implement
    different ways of getting predictive intervals.
    
    """
    
    def __init__(self, alpha: float):
        """
        Parameters
        ----------
        alpha: float, in interval (0,1)
            Probability level at which we want the predictive interval to be, i.e. alpha = 0.1 corresponds
            to 90% probability interval.
            
        """
        self.check_alpha(alpha=alpha)
        self.alpha = alpha
    
    @abstractmethod
    def get_predictive_intervals(self, data: Dict[str, Dict[str, np.ndarray]]) -> np.ndarray:
        """ Method for that encapsulates the creation and evaluation of predictive interval.
        Should print out statistics
        
        Parameters
        ----------
        data: Dictionary
            Data dictionary with keys 'train' and 'test' corresponding to data we want to use for
            training the model and for testing, aka creating predicitve intervals for
            
        Returns
        -------
        pred_ints: np.ndarray
            2-d numpy array with the dimensions being lower and upper bounds of the created predictive intervals
        """
        pass
    
    def check_alpha(self, alpha: float) -> bool:
        """Method for asserting alpha is a float between 0 and 1
        
        Parameters
        ----------
        alpha: float
            Level 
            
        Returns
        -------
        bool
            True if alpha is between 0 and 1
            
        Raises
        ------
        Exception if alpha is outside of (0, 1) interval or not float
        """
        
        if (isinstance(alpha, float) and (0 < alpha < 1.)):
            return True
        else:
            raise Exception("`alpha` must be float between 0. and 1.!")
            
    def print_stats(self, hit_ratio: float, avg_length: float) -> NoReturn:
        """Method for printing statistics for the created predictive intervals.
        
        Parameters
        ----------
        hit_ratio: float
        avg_length: float
        """
        print("The predictive intervals achieved hit ratio of {:.2f}% and have average length of {:.3f}.".format(100*hit_ratio, avg_length))

# Linear Regression - residuals quantile

In [None]:
class LinearModel(PredictiveIntervalModel):
    """
    Class encapsulating methods for related to linear model
    
        y = alpha + beta_1 * x_1 + ... + beta_p * x_p + eps
        
    where eps comes from Normal distribution with zero mean and fixed variance sigma.
    """
    
    def __init__(self, 
                 params: Optional[np.ndarray] = None, 
                 alpha: Optional[float] = 0.1):
        """
        Parameters
        ----------
        alpha: float, in interval (0,1)
            Probability level at which we want the predictive interval to be, i.e. alpha = 0.1 corresponds
            to 90% probability interval.
        params: np.ndarray
            Parameters of the linear model we want to use,
        """
        # Init super
        super(LinearModel, self).__init__(alpha=alpha)
        
        # Set parameters if some were provided
        self.params = params
    
    def fit(self, data: Dict[str, np.ndarray]) -> NoReturn:
        """
        Method that fits linear model based on the provided data.
        
        Parameters
        ----------
        data: Dict[str, np.ndarray]
            Data dictionary with keys `x` and `y` representing exogenous and engenous data, respectively
        """
        lm = sm.OLS(endog=data['y'], exog=sm.add_constant(data['x']), has_intercept=True)
        lm_fit = lm.fit()
        self.params = lm_fit.params
    
    def predict(self, x_new: np.ndarray) -> np.ndarray:
        """
        Method that predicts new values of the target variable using the provided data and fitted model.
        
        Paramaters
        ----------
        x_new: np.ndarray
            Array with new data we want to create prediction for
        
        Returns
        -------
        y_fit: np.ndarray
            Array with predicted values based on x_new
        """
        assert self.params is not None, "Model has not been fitted yet! Run `.fit` method first."      
        assert x_new.ndim == len(self.params) - 1, "Dimension of new data does not correspond to fitted parameters!"
        
        # Create y_fit
        if x_new.ndim == 1:
            y_fit = self.params[0] + self.params[1]*x_new
        else:
            y_fit = self.params[0] + np.dot(np.expand_dims(self.params[:1], 0), x_new.transpose())
        
        # Assert y_fit has the same length as the input data
        assert len(y_fit) == len(x_new), "Wrong predict calculation, lengths don't match!"
        
        return y_fit
    
    def predict_with_residuals(self, data: Dict[str, np.ndarray]) -> Tuple[np.ndarray, np.ndarray]:
        """Method that predicts new values of the target variable and also calculates respective residuals
        based on the actual response variable values.
        
        Parameters
        ----------
        data: Dict 
            Dictionary with new regressor data we want to use to create predictions, should contain keys `x` and `y`
        
        Returns
        -------
        y_hat, res: Tuple[np.ndarray, np.ndarray]
            Fitted values of y and residuals w.r.t. to actual y
        """
        # Assert that `x` and `y` are in the data dict keys
        assert 'x' and 'y' in data.keys(), "Data dictionary must have `x` and `y` keys!"
        
        # Assert that x and y have correct lengths
        assert len(data['x']) == len(data['y']), "`x` and `y` data must have the same length!"
        
        # Extract the new x and y data
        x_new = data['x']
        y_new = data['y']
        
        # Predict the new y
        y_hat = self.predict(x_new=x_new)
       
        # Calculate residuals
        res = y_hat - y_new
        
        return y_hat, res
    
    def get_predictive_intervals(self, data: Dict[str, Dict[str, np.ndarray]]) -> np.ndarray:
        """Method that encapsulates process of creating predictive intervals based on train and
        test data provided in the data dictionary.
        
        Parameters
        ----------
        data: Dict
            Data dictionary containing train and test data under `train` and `test` keys.
            
        Returns
        -------
        pred_ints: np.ndarray
            2-d numpy array with the dimensions being lower and upper bounds of the created predictive intervals
        """
        # Fit the model
        self.fit(data=data['train'])
        
        # Create new predictions
        y_hat, res = self.predict_with_residuals(data=data['test'])
        
        # Get (1-alpha)-th quantile of absolute values of residuals   
        q_alpha = np.quantile(np.abs(res),1-self.alpha)
        
        # Create lower and upper bound of the predictive interval
        y_lb = y_hat - q_alpha
        y_ub = y_hat + q_alpha
        
        # Calculate hit ratio
        y_real = data['test']['y']
        hit_ratio = np.mean((y_real >= y_lb) & (y_real <= y_ub))
        
        # Calculate average interval length
        avg_length = np.mean(y_ub - y_lb)
        
        # Print stats
        self.print_stats(hit_ratio=hit_ratio, avg_length=avg_length)
        
        # Stack the lower and upper bound to create predictive intervals
        pred_ints = np.vstack([y_lb, y_ub])
        
        return pred_ints

Instantiate linear model

In [None]:
lm = LinearModel()

Fit the model on training data

In [None]:
lm.fit(data=data_sim['train'])

Predict new data and get residuals

In [None]:
#y_hat, res = lm.predict_with_residuals(data_sim['test']['x'])

Get predictive intervals

In [None]:
aux = lm.get_predictive_intervals(data=data_sim)

Creates prediction band using in sample residuals

In [None]:
pct_list = []

for i in range(N_samples):

    # Fit regression model on the first half of the data
    x1_const = sm.add_constant(x1)
    lm = sm.OLS(y1, x1_const, has_intercept=True)
    lm_fit = lm.fit()

    # Extract alpha quantile of the residuals
    q_alpha = np.quantile(np.abs(lm_fit.resid), 1-alpha)

    # Extract coefficients from the estimated regression
    alpha_fit, beta_fit = tuple(lm_fit.params)

    # Estimate y2 using the estimated coefficients from first half of the dataset
    y2_hat = alpha_fit + beta_fit*x2 

    # Create range for y2
    y2_range = (y2_hat - q_alpha, y2_hat + q_alpha)

    # Calculate percentage of data points actually in the range
    pct_in_range = np.mean((y2 >= y2_range[0]) & (y2 <= y2_range[1]))
    
    pct_list.append(pct_in_range)
    #print("There was {:.2f}% of the data points in the range in experiment {}!".format(100*pct_in_range, i+1))

In [None]:
pct_total = 100*np.mean(np.array(pct_list) > 0.95)
print("Percentage of experiments in which at least {:.1f}% of the predictions fell into the coverage interval = {:.1f}%".format(100*(1-alpha),pct_total))

# Quantile regression

In [None]:
plt.scatter(data_sim['train']['x'], data_sim['train']['y'])
plt.plot(plt_data[1,:], np.dot(res_5.params.values, plt_data), color='r')
plt.plot(plt_data[1,:], np.dot(res_95.params.values, plt_data), color='k')
plt.plot(plt_data[1,:], np.dot(res_med.params.values, plt_data), color='g')
plt.show()

In [None]:
class QuantileRegression(PredictiveIntervalModel):
    """Class encapsulating estimation of predictive intervals using quantile regression
    
    """
    
    def __init__(self, 
                 params_lb: Optional[np.ndarray] = None, 
                 params_ub: Optional[np.ndarray] = None,
                 alpha: Optional[float] = 0.1):
        """
        Parameters
        ----------
        params_lb: Optional np.ndarray or None
            Parameters of the lower bound quantile regression, i.e. alpha level
        params_ub: Optional np.ndarray or None
            Parameters of the upper bound quantile regression, i.e. 1-alpha level
        alpha: float in (0,1), optional, default value = 0.1
            Quantile we want to estimate. Regression will esimate alpha/2-th and (1-alpha/2)-th quantile
            to create predictive interval at 1-alpha level
        """
        super(QuantileRegression, self).__init__(alpha=alpha)
        self.params_lb = params_lb
        self.params_ub = params_ub
    
    def fit(self, data: Dict[str, np.ndarray]) -> NoReturn:
        """Method that fits lower (1-alpha/2) and upper (alpha/2) quantile regression based on the
        provided data.
        
        Parameters
        ----------
        data: Dictionary
            Data dictionary with keys `x` and `y` corresponding to endogenous and exogenous variables.
        """
        assert 'x' and 'y' in data.keys(), "Data dictionary must have `x` and `y` keys!"
        
        # Specify quantile regression model
        mod = smf.quantreg('y ~ x', pd.DataFrame(data))
        
        # Fit the lower and upper quantile regression
        res_lb = mod.fit(q=self.alpha/2)
        res_ub = mod.fit(q=(1-self.alpha/2))
        
        # Save the fitted parameters into self
        self.params_lb = res_lb.params.values
        self.params_ub = res_ub.params.values
        
    def predict(self, data: Dict[str, np.ndarray]) -> Tuple[np.ndarray, np.ndarray]:
        """Method that predicts 
        
        Parameters
        ----------
        data: Dictionary
            Data dictionary with keys `x` and `y` corresponding to endogenous and exogenous variables.

        Returns
        -------
        y_lb, y_up: Tuple[np.ndarray, np.ndarray]
            Tuple of numpy array's for lower and upper bound predictions for y
        """
        # Assert that `x` and `y` are in the data dict keys
        assert 'x' and 'y' in data.keys(), "Data dictionary must have `x` and `y` keys!"
        
        # Assert that x and y have correct lengths
        assert len(data['x']) == len(data['y']), "`x` data and `y` data must be of the same length!"
            
        # Extract the new x and y data (have to add intercept to the `x` data)
        x_new = np.vstack([np.ones(len(data['x'])), data['x']])

        # Predict the lower and upper bound for y
        y_lb = np.dot(self.params_lb, x_new)
        y_ub = np.dot(self.params_ub, x_new)
        
        return y_lb, y_ub
    
    def get_predictive_intervals(self,
                                 data: Dict[str, Dict[str, np.ndarray]]) -> np.ndarray:
        """Method that encapsulates process of creating predictive intervals based on train and
        test data provided in the data dictionary.
        
        Parameters
        ----------
        data: Dict
            Data dictionary containing train and test data under `train` and `test` keys.
            
        Returns
        -------
        pred_ints: np.ndarray
            2-d numpy array with the dimensions being lower and upper bounds of the created predictive intervals
        """
        self.fit(data=data['train'])
        y_lb, y_ub = self.predict(data['test'])
        
        # Calculate hit ratio
        y_real = data['test']['y']
        hit_ratio = np.mean((y_real >= y_lb) & (y_real <= y_ub))
        
        # Calculate average interval length
        avg_length = np.mean(y_ub - y_lb)
        
        # Print stats
        self.print_stats(hit_ratio=hit_ratio, avg_length=avg_length)
        
        # Stack the lower and upper bound to create predictive intervals
        pred_ints = np.vstack([y_lb, y_ub])
        
        return pred_ints

In [None]:
qr = QuantileRegression()

In [None]:
qr.fit(data=data_sim['train'])

In [None]:
aux = qr.get_predictive_intervals(data=data_sim)

# Jacknife+

In [None]:
class JacknifePlus(PredictiveIntervalModel):
    """Class encapsulating implementation of the Jacknife+ method
    Jacknife+ paper on arxiv: https://arxiv.org/pdf/1905.02928.pdf
    
    """
    
    def __init__(self, alpha: Optional[float] = 0.1):
        """
        Parameters
        ----------
        alpha: float in (0,1), optional, default value = 0.1
            Quantile we want to estimate. Regression will esimate alpha/2-th and (1-alpha/2)-th quantile
            to create predictive interval at 1-alpha level
        """
        
        super(JacknifePlus, self).__init__(alpha=alpha)
        
        # Attributes for storing residuals and fitted models
        self.residuals = None
        self.fitted_lms = None
    
    def fit_linear_model(self, data: Dict[str, np.ndarray]):
        """
        Method that fits linear model based on the provided data.
        
        Parameters
        ----------
        data: Dict[str, np.ndarray]
            Data dictionary with keys `x` and `y` corresponding to exogenous and endogenous variable, respectively

        Returns
        -------
        lm_fit: 
        """
        lm = sm.OLS(data['y'], sm.add_constant(data['x']), has_intercept=True)
        lm_fit = lm.fit()
        return lm_fit
          
    def fit(self, data: Dict[str, np.ndarray]) -> NoReturn:
        """
        Method that fits linear model on every leave-one-out subset of the data and collects
        residuals for the leave-one-out point based on the trained model.

        Parameters
        ----------
        data: Dict[str, np.ndarray]
        """
        # Extract training data
        y_train = data['y']
        x_train = data['x']
        
        # Get number of data points in the data
        n = len(y_train)

        # Store residuals and fitted models
        res = np.zeros(n)
        fitted_lms = []

        for i in range(n):
            # Single out the point we're not using in this estimation
            y_out = y_train[i]
            if x_train.ndim == 1:
                x_out = x_train[i]
            else:
                x_out = x_train[i, :]

            # Estimate linear regression leaving out the point
            data = {'x': np.delete(x_train, i, axis=0),
                    'y': np.delete(y_train, i, axis=0)}
            lm_fit = self.fit_linear_model(data=data)

            # Estimate the point we left out using the fitted linear model
            y_hat = lm_fit.predict(exog=np.array((1, x_out)))

            # Calculate the residual
            res[i] = y_hat - y_out

            # Append fitted model
            fitted_lms.append(lm_fit)

        # Save to self
        self.residuals = res
        self.fitted_lms = fitted_lms
        
    def get_predictive_intervals(self, 
                                 data: Dict[str, Dict[str, np.ndarray]],
                                 pct_sample: float = 1.) -> np.ndarray:
        """
        
        Paramateres
        -----------
        """
        if self.fitted_lms or self.residuals is None:
            self.fit(data=data['train'])
            
        # Add constant term for intercept
        x_new = sm.add_constant(data['test']['x'])
        
        # Get number of points and number of fitted models
        n_points = int(pct_sample*len(x_new))
        n_models = len(self.fitted_lms)
        
        # Array for predictive intervals
        y_lb = np.zeros(n_points)
        y_ub = np.zeros(n_points)
        
        # Idx sample to subset data
        idx_data = np.random.choice(len(x_new), size=n_points,replace=False)
        
        # Loop over all points in the new data
        for i in range(n_points):
            
            # Dictionary with upper and lower bounds created by k-th model
            y_jacknife = {'lb': np.zeros_like(self.residuals),
                          'ub': np.zeros_like(self.residuals)}
            
            # Loop over all fitted models
            for k in range(n_models):
                
                # Get the jacknife+ point estimate using the k-th fitted model
                y_hat_k = self.fitted_lms[k].predict(exog=x_new[idx_data[i],:])
                
                # Construct the jacknife+ intervals
                y_jacknife['lb'][k] = y_hat_k - self.residuals[k]
                y_jacknife['ub'][k] = y_hat_k + self.residuals[k]
               
            # Calculate the quantile of the jacknife+ quantities 
            y_lb[i] = np.quantile(y_jacknife['lb'], self.alpha/2)
            y_ub[i] = np.quantile(y_jacknife['ub'], 1-self.alpha/2)

        # Calculate hit ratio
        y_real = data['test']['y'][idx_data]
        hit_ratio = np.mean((y_real >= y_lb) & (y_real <= y_ub))
        
        # Calculate average interval length
        avg_length = np.mean(y_ub - y_lb)
        
        # Print stats
        self.print_stats(hit_ratio=hit_ratio, avg_length=avg_length)
        
        # Stack the lower and upper bound to create predictive intervals
        pred_ints = np.vstack([y_lb, y_ub])
        
        return pred_ints

In [None]:
jknf = JacknifePlus()

In [None]:
aux = jknf.get_predictive_intervals(data=data_sim, pct_sample=0.1)

# Bayesian regression

In [None]:
class BayesianRegression(PredictiveIntervalModel):
    """Bayesian regression implemented in PyStan
    
    """
    
    def __init__(self, alpha: Optional[float]=0.1):
        """
        Parameters
        ----------
        alpha: float
        """
        super(BayesianRegression, self).__init__(alpha=alpha)
        
    def get_stan_model(self) -> str:
        """
        Returns
        -------
        model: str
            String specification of Stan model
        """
        
        model = """
        data {
            int<lower=0> N;
            int<lower=0> N_test;
            vector[N] x_train;
            vector[N] y_train;
            vector[N] x_test;
        }
        parameters {
            real alpha;
            real beta;
            real<lower=0> sigma;
            vector[N_test] y_hat;
        }
        model {
            y_train ~ normal(alpha + beta * x_train, sigma);
            y_hat ~ normal(alpha + beta * x_test, sigma);
        }

        """
        return model

    def get_predictive_intervals(self, data: Dict[str, Dict[str, np.ndarray]]) -> np.ndarray:
        """

        Notes
        -----
        Implementation inspired by the following article:
        https://towardsdatascience.com/an-introduction-to-bayesian-inference-in-pystan-c27078e58d53
        
        and the following pystan tutorial:
        https://mc-stan.org/docs/2_21/stan-users-guide/prediction-forecasting-and-backcasting.html
        """
        # Specify stan model
        model = self.get_stan_model()
        
        # Put model data into dictionary
        data_stan = {'N': len(data['train']['x']),
                     'x_train': data['train']['x'],
                     'y_train': data['train']['y'],
                     'N_test': len(data['test']['x']),
                     'x_test': data['test']['x']}

        # Compile pystan model
        sm = ps.StanModel(model_code=model)

        # Fit the model and generate fitted predictions
        fit = sm.sampling(data=data_stan, iter=1000, chains=4, warmup=500, thin=1, seed=101).to_dataframe()
        
        # Extract real y, prepare arrays for lower and upper bound
        y_real = data['test']['y']
        y_lb = np.zeros_like(y_real)
        y_ub = np.zeros_like(y_real)
        
        # Loop over the generated samples and extract their distributional property
        for i in range(data_stan['N_test']):
            y_hat_name = "y_hat[{}]".format(i+1)
            y_lb[i] = np.quantile(fit[y_hat_name], self.alpha/2)
            y_ub[i] = np.quantile(fit[y_hat_name], 1-self.alpha/2)
        
        # Calculate hit ratio
        hit_ratio = np.mean((y_real >= y_lb) & (y_real <= y_ub))
        
        # Calculate average interval length
        avg_length = np.mean(y_ub - y_lb)
        
        # Print stats
        self.print_stats(hit_ratio=hit_ratio, avg_length=avg_length)
        
        # Stack the lower and upper bound to create predictive intervals
        pred_ints = np.vstack([y_lb, y_ub])
        
        return pred_ints

In [None]:
br = BayesianRegression()

In [None]:
ints = br.get_predictive_intervals(data_sim)

# Run Experiments

In [None]:
n_experiments = 10