In [3]:
import numpy as np
import pandas as pd
from scipy.stats import bernoulli, uniform, expon, chi2, norm
from scipy.optimize import fsolve
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt
from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import mean_squared_error
%matplotlib inline
import seaborn as sns
import math
from sklearn.neighbors import KernelDensity

In [4]:
class DataGenerator:
    def __init__(self, N=200_000, nA=20_000, nB=500, seed=None):
        """
        Initializes the data generator.

        Parameters:
        - N: Population size
        - nA: Target size for non-probability sample
        - nB: Target size for probability sample
        - seed: Random seed for reproducibility
        """
        self.N = N
        self.nA = nA
        self.nB = nB
        self.seed = seed
        if seed is not None:
            np.random.seed(seed)
        
        """
        For this example covariate has been generated from N(0,1)
        """

        self.X1 = norm.rvs(0, 1, size=N) 
        self.epsilon = norm.rvs(0, 2, size=N)
        self.Y = 2 + 3 * self.X1 + self.epsilon
        self.gamma0 = None

        self.pi_A = None
        self.pi_B = None
        self.SA = None
        self.SB = None

    def _solve_gamma0(self):
        """
        Solves for gamma0 to achieve the target non-probability sample size. 
        Here 
        pi_a = 1 / (1 + np.exp(-(gamma0 + 0.5*((x1-4)**2))))
        which ensures covariate shift.
        
        """
        def objective(gamma0):
            pi = 1 / (1 + np.exp(-(gamma0 + 0.5*((self.X1-4)**2))))
            return np.sum(pi) - self.nA

        self.gamma0 = fsolve(objective, x0=0)[0]

    def generate_samples(self):
        """
        Generates the full population and draws non-probability and probability samples.
        """
        self._solve_gamma0()

        self.pi_A = 1 / (1 + np.exp(-(self.gamma0 + 0.5*((self.X1-4)**2))))
        self.SA = np.random.uniform(size=self.N) < self.pi_A
        self.c = fsolve(lambda c: np.max(c + 0.5*self.X1 + 0.03*self.Y) / np.min(c + 0.5*self.X1 + 0.03*self.Y) - 50, x0=1)[0]
        self.pi_B = self.c + self.X1 + 0.03*self.Y
        self.pi_B = (self.pi_B/sum(self.pi_B))*self.nB
        self.SB = np.random.uniform(size=self.N) < (self.pi_B / np.sum(self.pi_B)) * self.nB

    def get_population(self):
        """
        Returns the full population dataframe.
        """
        return pd.DataFrame({
            'x1': self.X1,
            'y': self.Y,
            'pi_A': self.pi_A,
            'pi_B': self.pi_B,
            'SA': self.SA,
            'SB': self.SB
        })

    def get_nonprob_sample(self):
        """
        Returns the non-probability sample dataframe.
        """
        pop = self.get_population()
        return pop.loc[pop['SA'], ['y', 'x1']]

    def get_prob_sample(self):
        """
        Returns the probability sample dataframe.
        
        """
        pop = self.get_population()
        return pop.loc[pop['SB'], ['y', 'x1', 'pi_B']]
    
    def get_gamma0(self):
        
        return(self.gamma0)


In [7]:
class ConformalRegressor:
    
    """
    ######################################################################
    Conformal Regressor for predictive intervals under covariate shift.
    ######################################################################
    
    Computes individual prediction intervals with confidence level alpha using the methodology 
    proposed by Tibshiranni et al. in "Conformal Prediction Under Covariate Shift". This also 
    involves local scoring by modelling the MAD using the same prediction model as used to model 
    the conditional distribution of the response given the covariates. 

    """
    
    def __init__(self, alpha=0.05, random_state=420):
        """
        Initialize conformal regressor with confidence level alpha and RNG seed
        
        """ 
        self.alpha = alpha
        self.random_state = random_state
        self.model = RandomForestRegressor(random_state=random_state)  # Main prediction model
        self.mad_model = RandomForestRegressor(random_state=random_state)  # Model to estimate residual spread (MAD)
        self.q1 = None  # Quantile for equal weighting (used in unweighted baseline interval)

    def fit(self, train_X, train_Y):
        
        """
        Fit both the primary model and the (MAD) model.

        Parameters
        ----------
        train_X : np.ndarray
            Training covariates.
        train_Y : np.ndarray
            Training responses.
        
        """
        # Fit the main model on training data
        self.model.fit(train_X, train_Y)
        # Fit a model to predict the absolute residuals (MAD model)
        residuals = np.abs(self.model.predict(train_X) - train_Y)
        self.mad_model.fit(train_X, residuals)

    def _weighted_quantile(self, values, weights, alpha):
        
        """
        Compute weighted quantile.

        Input:
        ----------
        values : array-like
            Values to compute quantiles over.
        weights : array-like
            Corresponding weights.
        alpha : float
            Desired quantile level.

        Output:
        -------
        float
            The weighted quantile.
        """
        
        df = pd.DataFrame({'values': values, 'weights': weights})
        df = df.sort_values('values')
        df['cum_weight'] = df['weights'].cumsum() / df['weights'].sum()
        return np.interp(alpha, df['cum_weight'], df['values'])

    def calibrate(self, calib_X, calib_Y, calib_weights):
        """
        Calibrate model using conformity scores and weights.

        Input:
        ----------
        calib_X : np.ndarray
             Calibration covariates.
        calib_Y : np.ndarray
             Calibration responses.
        calib_weights : np.ndarray
             Calibration weights 
        """
        
        # Predict responses on calibration set
        calib_y_pred = self.model.predict(calib_X)
        # Compute conformity scores (absolute errors)
        conformity_scores = np.abs(calib_Y - calib_y_pred)
        # Predict local MADs for normalization
        calib_mad = self.mad_model.predict(calib_X)
        norm_scores = conformity_scores / calib_mad

        # Save conformity scores and calibration weights
        self.values_with_inf = np.append(norm_scores, np.inf)
        self.weights = calib_weights

        # Compute equal-weighted quantile (for no-covariate-shift baseline)
        equal_weights = np.ones(len(calib_Y) + 1) / (len(calib_Y) + 1)
        self.q_equal = self._weighted_quantile(self.values_with_inf, equal_weights, 1 - self.alpha)

    def predict(self, test_X, test_weights):
        
        """
        Generate prediction intervals for the unseen test data.

        Input:
        ----------
        test_X : np.ndarray
            Test covariates.
        test_weights : np.ndarray
            Test weights for conformal quantile adjustment.

        Output:
        -------
        dict
            Dictionary containing prediction means and intervals (with and without covariate shift adjustment).
        """
        
        # Predict mean and MAD for test data
        y_pred = self.model.predict(test_X)
        mad_pred = self.mad_model.predict(test_X)

        # Initialize prediction interval lists
        lower = []
        upper = []
        lower_wo = []  # Without covariate shift adjustment
        upper_wo = []

        for i in range(len(test_X)):
            # Append new test weight to calibration weights
            weights_with_test = np.append(self.weights, test_weights[i])
            # Normalize (optional: already handled in quantile function)
            weights_with_test = weights_with_test / np.sum(weights_with_test)
            # Get weighted quantile
            q_weighted = self._weighted_quantile(self.values_with_inf, weights_with_test, 1 - self.alpha)

            # Compute prediction intervals (covariate shift adjusted)
            lower.append(y_pred[i] - q_weighted * mad_pred[i])
            upper.append(y_pred[i] + q_weighted * mad_pred[i])

            # Compute unweighted intervals (no shift correction)
            lower_wo.append(y_pred[i] - self.q_equal * mad_pred[i])
            upper_wo.append(y_pred[i] + self.q_equal * mad_pred[i])

        return {
            'pred': y_pred,
            'lower': np.array(lower),
            'upper': np.array(upper),
            'lower_wo': np.array(lower_wo),
            'upper_wo': np.array(upper_wo)
        }

    def evaluate(self, test_Y, lower, upper, lower_wo, upper_wo):
        """
        Evaluate interval quality using empirical coverage and interval width.

        Input:
        ----------
        test_Y : np.ndarray
            True test responses.
        lower : np.ndarray
            Lower bounds of conformal intervals with covariate shift adjustment.
        upper : np.ndarray
            Upper bounds of conformal intervals with covariate shift adjustment.
        lower_wo : np.ndarray
            Lower bounds without covariate shift adjustment.
        upper_wo : np.ndarray
            Upper bounds without covariate shift adjustment.

        Output:
        -------
        dict
            Dictionary of coverage and average width metrics.
        """
        coverage = np.mean((test_Y >= lower) & (test_Y <= upper))
        coverage_wo = np.mean((test_Y >= lower_wo) & (test_Y <= upper_wo))
        width = np.mean(upper - lower)
        width_wo = np.mean(upper_wo - lower_wo)
        return {
            'coverage': coverage,
            'coverage_wo': coverage_wo,
            'width': width,
            'width_wo': width_wo
        }


In [9]:
gen = DataGenerator(seed=123)
gen.generate_samples()
pop = gen.get_population()
nonprob = gen.get_nonprob_sample()
prob = gen.get_prob_sample()

In [10]:
train_X = nonprob[['x1']]
train_Y = nonprob['y']
val_X = prob[['x1']]
val_Y = prob['y']
calib_weights_val = 1/prob[['pi_B']]

In [11]:
model = ConformalRegressor(alpha=0.05)
model.fit(train_X, train_Y)
model.calibrate(val_X, val_Y, calib_weights_val)

In [12]:
gamma0 = gen.get_gamma0()

In [13]:
# Sample from population
sampled_df = pop.sample(n=500, random_state=420)
test_X = sampled_df[['x1']]
test_Y = sampled_df['y']
calib_weights_test = np.array(1 + np.exp(-(gamma0 + 0.5 * ((test_X - 4) ** 2))))

In [14]:
pred_dict = model.predict(test_X, calib_weights_test)
eval_dict = model.evaluate(
    test_Y,
    lower=pred_dict['lower'],
    upper=pred_dict['upper'],
    lower_wo=pred_dict['lower_wo'],
    upper_wo=pred_dict['upper_wo']
)
