In [1]:
class SVR_general_cvxopt:
    
    def __init__(self, C = 0.1, epsilon = 0.01, mu = 0.5, lmbda = 0.5, kernel = "linear", **kernel_param):
        import numpy as np
        from cvxopt import matrix, solvers, sparse
        from sklearn.metrics.pairwise import pairwise_kernels
        from sklearn.utils import check_X_y, check_array
        self.sparse = sparse
        self.matrix = matrix
        self.solvers = solvers
        
        self.C = C
        self.epsilon = epsilon
        self.mu = mu
        self.lmbda = lmbda
        
        self.kernel = kernel
        self.pairwise_kernels = pairwise_kernels
        self.kernel_param = kernel_param
        self.check_X_y = check_X_y
        self.check_array = check_array
        
        
    def fit(self, X, y):
        X, y = self.check_X_y(X, y)
        # hyperparameters
        C = self.C 
        epsilon =  self.epsilon
        mu = self.mu
        lmbda = self.lmbda
        
        kernel = self.kernel
        pairwise_kernels = self.pairwise_kernels
        
        sparse = self.sparse 
        matrix = self.matrix 
        solvers = self.solvers 
        
        # Useful parameters
        ydim = y.shape[0]
        onev = np.ones((ydim,1))
        x0 = np.random.rand(ydim)
        
        # Prematrices for the optimizer
        K = mu*pairwise_kernels(X, X, metric = "linear") + \
        lmbda*pairwise_kernels(X, X, metric = kernel, **self.kernel_param)
        
        A = onev.T
        b = 0.0
        G = np.concatenate((np.identity(ydim), -np.identity(ydim)))
        h_ = np.concatenate((100*C*np.ones(ydim)/y, 100*C*np.ones(ydim)/y)); 
        h = h_.reshape(-1, 1)

        # Matrices for the optimizer
        A = matrix(A)
        b = matrix(b)
        G = sparse(matrix(G))
        h = matrix(h)
        Ev = (epsilon*y.T)/100
        
        # functions for the optimizer
        def obj_func(x):
            return 0.5* x.T @ K @ x + Ev @ (np.abs(x)) - y.T @ x

        def obj_grad(x):
            return x.T @ K + Ev @ (x/np.abs(x)) - y
        
        def F(x = None, z = None):
            if x is None: return 0, matrix(x0)
            # objective dunction
            val = matrix(obj_func(x))
            # obj. func. gradient
            Df = matrix(obj_grad(x))
            if z is None: return val, Df
            # hessian
            H = matrix(z[0] * K)
            return val, Df, H
        
        # Solver
        solvers.options['show_progress'] = False
        sol = solvers.cp(F=F, G=G, h=h, A=A, b=b)

        # Support vectors
        beta_1 = np.array(sol['x']).reshape(-1)
        beta_n = np.abs(beta_1)/beta_1.max()
        indx = beta_n > 1e-4
        beta_sv = beta_1[indx]
        x_sv = X[indx,:]
        y_sv = y[indx]
        
        # get w_phi and b
        k_sv = mu*pairwise_kernels(x_sv, x_sv, metric = "linear") + \
        lmbda*pairwise_kernels(x_sv, x_sv, metric = kernel, **self.kernel_param)
        
        cons = np.where(beta_sv >= 0, 1 - epsilon/100, 1 + epsilon/100)
        
        w_phi = beta_sv @ k_sv
        b = np.mean((y_sv*cons - w_phi)); self.b = b
        self.beta_sv = beta_sv; self.x_sv = x_sv
        return self
        
    def predict(self, X_):
        X_ = self.check_array(X_)
        k_test = self.mu*self.pairwise_kernels(self.x_sv, X_, metric = "linear") + \
        self.lmbda*self.pairwise_kernels(self.x_sv, X_, metric = self.kernel, **self.kernel_param)

        w_phi_test = self.beta_sv @ k_test
        predict = w_phi_test + self.b
        return predict
    
    def coef_(self):
        return self.beta_sv, self.x_sv, self.b

In [2]:
class IterativeRun():
    """
    IterativeRun:
        Iterative and ordered SVR prediction and data storage for time series
    
    args: 
        - folder: folder inside Pickle folder to save the resulting predictions
        - n: number of values to predict by iteration
        - itr: number of iterations to run. From last to first. 
                (e.g: n = 2, itr = 10. y_test = -20_-18, ... -2_0)
    
    --Method--
    
        bas_optit: iterate, predict and store data
    
    """
    
    def __init__(self, folder, n = 1, itr = 1):
        from pathlib import Path
        import pickle
        self.folder = folder
        self.n = n
        self.itr = itr
        self.Path = Path
        self.pickle = pickle
        
    def bas_optit(self, C, epsilon, gamma, mu, lmbda): 
        """
        Iterate, predict and store prediction + error
        
        args: SVR Hyperparameters, and kernel weight (mu*linear_kernel + lmbda*rbf_kernel)
            - C
            - epsilon
            - gamma
            - mu
            - lmbda
        """
        
        Path = self.Path
    #   n: test size, itr: # of iterations
        n = self.n; itr = self.itr

        cas = {}; mape = []; it = itr+1
        yl = len(y)

        # parameters
        hyperparameters = {
            'kernel' : "rbf",
            'C' : C, 
            'epsilon' : epsilon, 
            'mu' : mu,
            'lmbda' : lmbda,
            'gamma' : gamma, 
        }

        cas[0] = hyperparameters

        itera = np.flip(np.arange(1, it))
        for i in itera:
            j = i*n

            # y, X split
            X_train = X[:yl - j, :]; X_test = X[yl - j : yl - (j-n), :]
            y_train = y[:yl - j];    y_test = y[yl - j : yl - (j-n)]

            # test index
            y_idx = y1.index[yl - j : yl - (j-n)]

            # rescale X and y
            scaler = MaxAbsScaler(); scaler.fit(X_train); 
            X_train = scaler.transform(X_train); X_test = scaler.transform(X_test)

            scaler1 = MaxAbsScaler(); scaler1.fit(y_train)
            y_train = scaler1.transform(y_train).reshape(-1)
            y_test = y_test.reshape(-1)


            # fit and predict
            model = SVR_general_cvxopt(**hyperparameters).fit(X_train, y_train)
            pred = model.predict(X_test)

            # rescale y_test
            y_pred = scaler1.inverse_transform(pred.reshape(-1, 1))

            # keep data
            yi = pd.DataFrame(y_test, index = y_idx, columns = ["real"])
            yi["predict"] = y_pred.reshape(-1)
            yi["resta"] = yi.real - yi.predict
            yi["error"] = (np.abs((yi.real - yi.predict)/yi.real))
            cas[i] = yi
            mape.append(yi.error.mean()*100)

        # Store prediction
        name_ = datetime.now().strftime("%d_%m_%Y_%H_%M_%S")  
        path_ = Path().resolve().parent / "Pickles" / f"{self.folder}" / f"{name_}"
        with open(path_, 'wb') as to_write:
            self.pickle.dump(cas, to_write)

        mape_mean = sum(mape)/itr

        return -mape_mean