Linear quantile regression

In [1]:
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.patches as patches
import matplotlib.lines as lines
from sklearn.linear_model import QuantileRegressor
from sklearn.linear_model import LinearRegression

import torch
import torch.nn as nn # All neural network modules, nn.Linear, nn.Conv2d, BatchNorm, Loss functions
import torch.optim as optim # For all Optimization algorithms, SGD, Adam, etc.
import torch.nn.functional as F # All functions that don't have any parameters
from torch.autograd import Variable

from sklearn.metrics.pairwise import rbf_kernel
from sklearn.preprocessing import PolynomialFeatures
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from quantile_forest import RandomForestQuantileRegressor

import numpy as np
import cvxpy as cp
from numpy import linalg
import pandas as pd

from scipy.linalg import sqrtm
import math

random_seed = 42

In [2]:
# 1-layer NN
class NN1(nn.Module):
    def __init__(self, input_size, output_size):
        super(NN1, self).__init__()
        self.fc1 = nn.Linear(input_size, 10)
        self.fc2 = nn.Linear(10, output_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x
    
# 2-layer NN
class NN2(nn.Module):
    def __init__(self, input_size, output_size):
        super(NN2, self).__init__()
        self.fc1 = nn.Linear(input_size, 50)
        self.fc2 = nn.Linear(50, 50)
        self.fc3 = nn.Linear(50, output_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x 

In [3]:
# train mean estimator that estimate E[Y|X]
def mean_est(est_type,X_lin,Y_lin,X_quantile,X_test):
    if est_type == "NN1":
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        torch.manual_seed(42)
        torch.cuda.manual_seed_all(42) 
        model = NN1(input_size=3, output_size=1).to(device)
        criterion=nn.MSELoss()
        learning_rate = 0.001
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        for epoch in range(1000):
            #convert numpy array to torch Variable
            inputs=Variable(torch.from_numpy(X_lin))
            labels=Variable(torch.from_numpy(Y_lin))
            
            #clear gradients wrt parameters
            optimizer.zero_grad()
    
            #Forward to get outputs
            outputs=model(inputs.float())
    
            #calculate loss
            loss=criterion(outputs.float(), labels.float())
    
            #getting gradients wrt parameters
            loss.backward()
    
            #updating parameters
            optimizer.step()
        M_quantile = model(torch.from_numpy(X_quantile).float())
        M_quantile = M_quantile.detach().cpu().numpy().reshape(-1,1)
        M_test = model(torch.from_numpy(X_test).float())
        M_test = M_test.detach().cpu().numpy().reshape(-1,1)
        return M_quantile, M_test
    if est_type == "NN2":
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        torch.manual_seed(42)
        torch.cuda.manual_seed_all(42) 
        model = NN2(input_size=3, output_size=1).to(device)
        criterion=nn.MSELoss()
        learning_rate = 0.001
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        for epoch in range(1000):
            #convert numpy array to torch Variable
            inputs=Variable(torch.from_numpy(X_lin))
            labels=Variable(torch.from_numpy(Y_lin))
    
            #clear gradients wrt parameters
            optimizer.zero_grad()
    
            #Forward to get outputs
            outputs=model(inputs.float())
    
            #calculate loss
            loss=criterion(outputs.float(), labels.float())
    
            #getting gradients wrt parameters
            loss.backward()
    
            #updating parameters
            optimizer.step()
        M_quantile = model(torch.from_numpy(X_quantile).float())
        M_quantile = M_quantile.detach().cpu().numpy().reshape(-1,1)
        M_test = model(torch.from_numpy(X_test).float())
        M_test = M_test.detach().cpu().numpy().reshape(-1,1)
        return M_quantile, M_test

In [4]:
# Generate points uniformly distributed on the surface of a sphere

def generate_points_on_sphere(n):
    phi = np.random.uniform(0, 2*np.pi, size=n)
    cos_theta = np.random.uniform(-1, 1, size=n)
    theta = np.arccos(cos_theta)

    x = np.sin(theta) * np.cos(phi)
    y = np.sin(theta) * np.sin(phi)
    z = np.cos(theta)

    return np.stack((x, y, z), axis=-1)

In [5]:
# Test 1

# Generate i.i.d data
np.random.seed(5)
n_pre = 1000
n_opt = 500
n_adj = 100
n_t = 1000
n = n_pre+n_opt+n_adj+n_t
beta = np.array([1/math.sqrt(3),1/math.sqrt(3),-1/math.sqrt(3)])

X = generate_points_on_sphere(n)
Y = np.sqrt(1+25*np.power(X @ beta, 4))  * np.random.uniform(-1, 1, n)
Y = Y.reshape(-1,1)


X_lin = X[0:600,:]
Y_lin = Y[0:600,:]
x_lin = X_lin[:,0]
y_lin = Y_lin[:,0]
n_lin = X_lin.shape[0]

X_quantile = X[600:n_pre+n_opt+n_adj,:]
Y_quantile = Y[600:n_pre+n_opt+n_adj,:]
n_quantile = X_quantile.shape[0]

X_test = X[n_pre+n_opt+n_adj:,:]
Y_test = Y[n_pre+n_opt+n_adj:,:]
n_test = X_test.shape[0]


# Estimate the quantile
M_quantile = np.zeros(n_quantile).reshape(-1,1)

alpha = 0.05

model_quantile = QuantileRegressor(quantile=1-(alpha/2), alpha=0)
model_quantile.fit(X_quantile, Y_quantile-M_quantile)
Q_test = model_quantile.predict(X_test)


M_test = np.zeros(n_test).reshape(-1,1)
V_test = Q_test**2
V_test = V_test.reshape(-1,1)

coverage = (np.power(Y_test[:,0]-M_test[:,0], 2) <= V_test[:,0]).mean()
bandwidth = np.mean(V_test[:,0])
print("The overall coverage is", coverage)
print("The mean bandwidth for testing data is", bandwidth)

  y = column_or_1d(y, warn=True)


The overall coverage is 0.969
The mean bandwidth for testing data is 10.728016689819828


In [6]:
# Test 2

# Generate i.i.d data
np.random.seed(0)
n_pre = 1000
n_opt = 1000
n_adj = 100
n_t = 1000
n = n_pre+n_opt+n_adj+n_t
beta = np.array([1/math.sqrt(3),1/math.sqrt(3),-1/math.sqrt(3)])

X = generate_points_on_sphere(n)
Y = 1+5*np.power(X @ beta, 3)+np.sqrt(1+25*np.power(X @ beta, 4))  * np.random.uniform(-1, 1, n)
Y = Y.reshape(-1,1)

X_lin = X[0:600,:]
Y_lin = Y[0:600,:]
x_lin = X_lin[:,0]
y_lin = Y_lin[:,0]
n_lin = X_lin.shape[0]

X_quantile = X[600:n_pre+n_opt+n_adj,:]
Y_quantile = Y[600:n_pre+n_opt+n_adj,:]
n_quantile = X_quantile.shape[0]

X_test = X[n_pre+n_opt+n_adj:,:]
Y_test = Y[n_pre+n_opt+n_adj:,:]
n_test = X_test.shape[0]


# Estimate the mean using NN2
est_type = "NN2"
M_quantile, M_test = mean_est(est_type,X_lin,Y_lin,X_quantile,X_test)

# Estimate the quantile
alpha = 0.05

model_quantile = QuantileRegressor(quantile=1-(alpha/2), alpha=0)
model_quantile.fit(X_quantile, Y_quantile-M_quantile)
Q_test = model_quantile.predict(X_test)


V_test = Q_test**2
V_test = V_test.reshape(-1,1)

coverage = (np.power(Y_test[:,0]-M_test[:,0], 2) <= V_test[:,0]).mean()
bandwidth = np.mean(V_test[:,0])
print("The overall coverage is", coverage)
print("The mean bandwidth for testing data is", bandwidth)

  y = column_or_1d(y, warn=True)


The overall coverage is 0.953
The mean bandwidth for testing data is 10.056907548832976


In [7]:
# Test 3

# Generate i.i.d data (Y follows a constrained Laplace)
np.random.seed(1)
n_pre = 1000
n_opt = 500
n_adj = 100
n_t = 1000
n = n_pre+n_opt+n_adj+n_t
beta = np.array([1/math.sqrt(3),1/math.sqrt(3),-1/math.sqrt(3)])

X = generate_points_on_sphere(n)

# Specify the mean and standard deviation for Y
mean_Y = np.power(X @ beta, 2)+5*np.power(X @ beta, 4)
std_dev_Y = np.sqrt(1 + 25 * np.power(X @ beta, 4))
mean_Y = mean_Y.reshape(-1,1)
std_dev_Y  = std_dev_Y .reshape(-1,1)

# Specify the bounds for Y
lower_bound = mean_Y - 2 * std_dev_Y
upper_bound = mean_Y + 2 * std_dev_Y

# Generate all Y values initially
Y = np.random.laplace(mean_Y, std_dev_Y)

# Correct values that fall out of bounds
while True:
    out_of_bounds = (Y < lower_bound) | (Y > upper_bound)
    if not np.any(out_of_bounds):
        break
    Y[out_of_bounds] = np.random.laplace(mean_Y[out_of_bounds], std_dev_Y[out_of_bounds])
    

X_lin = X[0:600,:]
Y_lin = Y[0:600,:]
x_lin = X_lin[:,0]
y_lin = Y_lin[:,0]
n_lin = X_lin.shape[0]

X_quantile = X[600:n_pre+n_opt+n_adj,:]
Y_quantile = Y[600:n_pre+n_opt+n_adj,:]
n_quantile = X_quantile.shape[0]

X_test = X[n_pre+n_opt+n_adj:,:]
Y_test = Y[n_pre+n_opt+n_adj:,:]
n_test = X_test.shape[0]

# Estimate the mean using NN2
est_type = "NN2"
M_quantile, M_test = mean_est(est_type,X_lin,Y_lin,X_quantile,X_test)


# Estimate the quantile
alpha = 0.05

model_quantile = QuantileRegressor(quantile=1-(alpha/2), alpha=0)
model_quantile.fit(X_quantile, Y_quantile-M_quantile)
Q_test = model_quantile.predict(X_test)


V_test = Q_test**2
V_test = V_test.reshape(-1,1)

coverage = (np.power(Y_test[:,0]-M_test[:,0], 2) <= V_test[:,0]).mean()
bandwidth = np.mean(V_test[:,0])
print("The overall coverage is", coverage)
print("The mean bandwidth for testing data is", bandwidth)

  y = column_or_1d(y, warn=True)


The overall coverage is 0.943
The mean bandwidth for testing data is 22.928315704490675


In [8]:
# Test 4

# Generate i.i.d data (Y follows a constrained Laplace)
np.random.seed(1)
n_pre = 1000
n_opt = 500
n_adj = 100
n_t = 1000
n = n_pre+n_opt+n_adj+n_t
beta = np.array([1/math.sqrt(3),1/math.sqrt(3),-1/math.sqrt(3)])

X = generate_points_on_sphere(n)

# Specify the mean and standard deviation for Y
mean_Y = np.power(X @ beta, 2)+5*np.power(X @ beta, 4)
std_dev_Y = np.sqrt(1 + 25 * np.power(X @ beta, 4))
mean_Y = mean_Y.reshape(-1,1)
std_dev_Y  = std_dev_Y .reshape(-1,1)


# Generate all Y values initially
Y = np.random.laplace(mean_Y, std_dev_Y)

    

X_lin = X[0:600,:]
Y_lin = Y[0:600,:]
x_lin = X_lin[:,0]
y_lin = Y_lin[:,0]
n_lin = X_lin.shape[0]

X_quantile = X[600:n_pre+n_opt+n_adj,:]
Y_quantile = Y[600:n_pre+n_opt+n_adj,:]
n_quantile = X_quantile.shape[0]

X_test = X[n_pre+n_opt+n_adj:,:]
Y_test = Y[n_pre+n_opt+n_adj:,:]
n_test = X_test.shape[0]

# Estimate the mean using NN2
est_type = "NN2"
M_quantile, M_test = mean_est(est_type,X_lin,Y_lin,X_quantile,X_test)


# Estimate the quantile
alpha = 0.05

model_quantile = QuantileRegressor(quantile=1-(alpha/2), alpha=0)
model_quantile.fit(X_quantile, Y_quantile-M_quantile)
Q_test = model_quantile.predict(X_test)


V_test = Q_test**2
V_test = V_test.reshape(-1,1)

coverage = (np.power(Y_test[:,0]-M_test[:,0], 2) <= V_test[:,0]).mean()
bandwidth = np.mean(V_test[:,0])
print("The overall coverage is", coverage)
print("The mean bandwidth for testing data is", bandwidth)

  y = column_or_1d(y, warn=True)


The overall coverage is 0.943
The mean bandwidth for testing data is 56.161555004050896
