In [18]:
import numpy as np
import cvxpy as cp
from scipy.optimize import linprog
from sklearn.linear_model import QuantileRegressor

In [19]:
X = np.c_[np.ones(100), np.random.normal(0, 5, size=(100, 5))]
beta = np.array([10, 1, 2, 3, 4, 5])
y = X @ beta + np.random.normal(0, 1, 100)

In [20]:
# print (cp.installed_solvers())

In [36]:
class QuantileRegression:
    """
    Original form:
        
        min. Σ max[(q-1) * (y - yhat)_i, q * (y - yhat)_i]
    
    where y_hat can be estimated by 4 CAViaR functions:
    1. Adaptive:     f(B1)_t = f(B1)_t-1 + B1 * {[1 + exp(G * [y_t-1 - f(B1)_t-1])]^-1 - q}
    2. Symmetric:    f(B)_t = B1 + B2 f(B)_t-1 + B3 |y_t-1|
    3. Assymmetric:  f(B)_t = B1 + B2 f(B)_t-1 + B3 max(y_t-1, 0) + B4 min(y_t-1, 0)
    4. IGRACH(1, 1): f(B)_t = (B1 + B2 f(B)_t-1**2 + B3 y_t-1**2)**0.5
    """
    def __init__(self, theta, tol=1e-10):
        self.theta = theta # theta-quantile
        self.tol = tol # tolerance for loss to break the loop
    
    def fit(self, X, y, fit_intercept=False):
        """
        if X == None => autoregressive; else normal regerssion
        """
        # data preparation
        self.n, self.p = X.shape
        
        # if data itself doesnt contain intercept term
        if fit_intercept:
            self.p += 1
            X = np.c_[np.ones(self.n), X]
            
        # initialize the params to recursively fit the data
        loss = np.inf
        beta_hat = cp.Variable(self.p)
        
        loss_fct = cp.sum(cp.maximum(self.theta * (y - X @ beta_hat),
                                     (self.theta - 1) * (y - X @ beta_hat)))
        prob = cp.Problem(cp.Minimize(loss_fct))
        prob.solve(solver='ECOS')
        
        self.params = beta_hat.value

    def autoregressive_fit(self, returns):
        self.n = returns.shape[0]
        first = True
        count = 0
        
        params = np.random.uniform(0, 1, 4)
        sigmas = abs(np.random.normal(0, 1, self.n+1))
        
        constant_term = np.ones_like(returns)
        while True:
            if not first:
                sigmas = self.asymmetric_slope(returns, self.params)
            else:
                first = False
            X = np.c_[constant_term, sigmas[:-1], np.maximum(returns, 0), np.minimum(returns, 0)]
            y = sigmas[1:]
            self.fit(X, y)
            print(self.params)
            
            if count == 20:
                break
            count += 1

    def fit_LP(self, X, y, fit_intercept=True):
        """
        Epigraph form (LP):

            min. 1Tt
            s.t. max[(q-1) * (y - yhat), q * (y - yhat)]_i <= ti
        
        let mxi == max[(q-1) * (y - yhat), q * (y - yhat)]_i
        since mxi is always a positive value, it implies mxi <= ti only
        
        if y - y_hat positive => q; else q - 1
        """       
        # data preparation
        self.n, self.p = X.shape
        
        # if data itself doesnt contain intercept term
        if fit_intercept:
            self.p += 1
            X = np.c_[np.ones(self.n), X]
            
        # initialize the params to recursively fit the data
        self.params = np.random.uniform(0, 1, self.p)
        
        # fit the data
        loss = np.inf
        
        for i in range(10000):
            # calculate the deviations
            y_hat = X @ self.params
            dev = y - y_hat
            
            # the lower bounds of t
            lowers = np.maximum((self.theta-1) * (dev), self.theta * (dev))
            bounds = [(lb, None) for lb in lowers]
            
            # solve the above LP
            res = linprog(np.ones_like(y), bounds=bounds)
            if loss - res.fun <= self.tol:
                break
            loss = res.fun
            
            # update the betas
            t = res.x
            q_indicator = np.where(dev>0, self.theta, self.theta-1)
            self.params = np.linalg.pinv(X) @ (y - t / q_indicator)
        
    def loss(self, y, y_hat):
        # sum of absolute error
        dev = y - y_hat
        return np.sum(np.maximum(self.theta * dev, (self.theta - 1) * dev))

    def asymmetric_slope(self, returns, params):
        b1, b2, b3, b4 = params
        sigmas = np.zeros(returns.shape[0]+1)
        sigmas[0] = np.std(returns) # im not sure
        for t in range(1, len(sigmas)):
            sigmas[t] = b1 + b2 * sigmas[t-1] + max(b3 * returns[t-1], 0) + b4 * min(b3 * returns[t-1], 0)
        return sigmas

In [37]:
X = np.c_[np.ones(100), np.random.normal(0, 5, size=(100, 5))]
beta = np.array([100, -1, 2, -3, 4, -5])
y = X @ beta + np.random.normal(0, 1, 100)

In [38]:
qr = QuantileRegression(0.01)
qr_sklearn = QuantileRegressor(quantile=0.01, alpha=0, fit_intercept=False)

In [39]:
qr.autoregressive_fit(y)

AttributeError: 'QuantileRegression' object has no attribute 'n'

In [34]:
qr.fit(X, y, fit_intercept=False)

In [None]:
qr_sklearn.fit(X, y)

In [29]:
qr.params

array([98.31215737, -0.94949759,  1.9675716 , -2.94511591,  4.06207583,
       -4.94583376])

In [30]:
qr_sklearn.coef_

array([98.31215737, -0.94949759,  1.9675716 , -2.94511591,  4.06207583,
       -4.94583376])