In [42]:
import numpy as np 
import math
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.tree import DecisionTreeRegressor
from sklearn.svm import LinearSVR
from sklearn.model_selection import train_test_split, KFold, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn import datasets
from sklearn.metrics import mean_squared_error, mean_absolute_error
import scipy.stats as st

In [43]:
def tune_bb(algo, X, y, 
            regularization="Dropout", M=10, 
            c=None, K=5, criterion="MSE"):

    """function to automatically tune blackbox regression model
    
    Parameters:
    -----------

    algo : callable
        A learning algorithm that takes as input a matrix X in R nxp
        and a vector of responses Y in Rn and returns a function that
        maps inputs to outputs. Must have methods like .fit() and .predict()
    X : array-like of shape (n,p)
        training data X in R nxp
    y : array-like of shape (n,)
        training labels, Y, in Rn
    regularization : str, default="Dropout"
        regularization method, can be any of "Dropout",
        "NoiseAddition", or "Robust"
    M : int, default=
        A positive integer indicating the number of Monte Carlo
        replicates to be used if the method specified is Dropout or 
        NoiseAddition
    c : default=None
        A vector of column bounds to be used if method specified is "Robust"
    K : int, default=5
        A positive integer indicating the number of CV-folds to be used to 
        tune the amount of regularization, e.g., K = 5 indicates five-fold CV
    criterion : str, default="MSE"
        A criterion to be used to evaluate the method that belongs to the set 
        {MSE, MAD} where MSE encodes mean square error and MAD encodes mean
        absolute deviation.

    Returns:
    -----------
    tuned_mode : callable 
        A tuned predictive model that optimizes the specific criterion using 
        the specified method

    Example:
    -----------
    >>> tune_bb()
    >>> 
    """
    
    # statements here to ensure model has the methods we need to tune it
    assert hasattr(algo, "fit"), "model object must have .fit() method"
    assert hasattr(algo, "predict"), "model object must have .predict() method"

    if criterion == "MSE":
        criterion = "neg_mean_squared_error"
    elif criterion == "MAE":
        criterion = "neg_mean_absolute_error"
    else:
        raise ValueError("Please input either MAE or MSE for criterion.")
    
    # Standardize X (useful for all methods with regularization)
    scaler = StandardScaler()  
    X = scaler.fit_transform(X)
    
    if regularization == "Dropout":
        # dropout notes: topic 1 p. 30 
        # draw Z matrix
        Z = np.random.binomial(1,0.5,size=X.shape)
        dropout_matrix = Z * X 
        
    elif regularization == "NoiseAddition":
        # possible levels of noise
        lambda_levels = np.linspace(0, 5, 100)
        min_metric = None
        best_lambda = None
        for lam in lambda_levels: 
            #CV
            metric = []
            for m in range(M):
                # generate noise matrix with lambda (variance, not std)
                Z = np.random.normal(0, lam**2, size=X.shape)
                kf = KFold(n_splits=K)
                for train_index, test_index in kf.split(X):
                    X_train, y_train = X[train_index], y[train_index]
                    X_test, y_test = X[test_index], y[test_index]
                    X_train_disturbed = X_train + Z[train_index]
                    model = algo
                    model.fit(X_train_disturbed, y_train)
                    if criterion == "neg_mean_squared_error":
                        metric.append(mean_squared_error(y_test, model.predict(X_test)))
                    else:
                        metric.append(mean_absolute_error(y_test, model.predict(X_test)))
            new_metric = np.mean(metric)
            if (min_metric == None or new_metric < min_metric):
                best_lambda = lam
                min_metric = new_metric
                                  
        print('best lambda')
        print(best_lambda)
        
    elif regularization == "Robust":
        #Assertion: we don't actually have any reasonable bounds to argue for robust regression here
        #We could take a sample of many matrix M and then choose 1 based on a criteria to fit our regression
        #If we do this iteratively with changing our c bounds, we can theoretically narrow down our bounds wrt the MSE/MAE
        tol = False #Are we in our tolerance range
        toler = 5e-4 #Sklearn default for tolerance is 1e-4
        wts = [x/2 for x in c] #Initial weights are going to be c/2
        oerror = np.inf #Initial error to not get in the way
        merror = np.inf #Initialize new error so object exists
        itera = 0 #We want to be able to count the number of iterations (since the weights scale by e^-itera
        while not tol:
            print(itera, merror, wts) #Print lines I was using to track what's going on
            #Create a bunch of matrices and choose the best by some score
            maxmatrix = None
            maxnorm = -np.inf
            for i in range(1000):
                matrix = np.random.rand(X.shape[0], X.shape[1])
                for m in range(matrix.shape[1]):
                    matrix[:,m] = (wts[m] / np.linalg.norm(matrix[:,m], 2)) * matrix[:,m]
                fnorm = np.linalg.norm(matrix, 2) #The criteria I'm using here is the two-norm
                if fnorm > maxnorm:
                    maxnorm = fnorm
                    maxmatrix = matrix
            new_X = X + maxmatrix #We add the permuted matrix to our design matrix
            
            #kfold cross validation
            errors = np.abs(cross_val_score(algo, new_X, y, cv=K, scoring=criterion))
            merror = np.mean(errors)
            #I originally thought about this being only if the error was decreasing
            #The problem with that is that we're dependent a random subset of generated matrix or we could get stuck in a lucky minimum
            #Hence I think it's better to allow converence to a step to step change, even if its not at our minimum MSE
            if abs(oerror - merror) > toler:
                print(abs(oerror - merror)) #See if we're converging correctly
                oerror = merror #Save the current error for the next iteration
                #Set our new weights for the next iteration
                wts = np.minimum(wts + (np.random.normal(size = len(wts)) * math.exp(-itera/2)), c)
                wts = np.maximum(0.1, wts) #weights can't be negative
                itera += 1
            else:
                tol = True
        
        #Once we have our best c bounds, let's use them exactly to construct the best model
        maxmatrix = None
        maxnorm = -np.inf
        for i in range(10000):
            matrix = np.random.rand(X.shape[0], X.shape[1])
            for m in range(matrix.shape[1]):
                matrix[:,m] = (wts[m] / np.linalg.norm(matrix[:,m], 2)) * matrix[:,m]
            fnorm = np.linalg.norm(matrix, 2) #The criteria I'm using here is the two-norm
            if fnorm > maxnorm:
                maxnorm = fnorm
                maxmatrix = matrix
        new_X = X + maxmatrix #We add the permuted matrix to our design matrix
        
        model = algo
        tuned_mode = model.fit(new_X, y)
        #print(tuned_mode.coef_)
        return tuned_mode
    
    else: 
        raise ValueError('Please input one of of "Dropout", "NoiseAddition", or "Robust"')



In [27]:
# matrices to play around with and feed into function 
# NO train test split here, just full data
X, y = datasets.load_iris(return_X_y=True)

# test here, maybe start testing with Ridge()
tune_bb(Ridge(alpha = 0.5),
        X,
        y,
        regularization="Robust",
        c = [4,4,6,3],
        criterion="MSE")

0 inf [2.0, 2.0, 3.0, 1.5]
inf
1 0.06977271381719005 [1.19049989 2.61969236 4.00003914 2.11689383]
0.0027639652499021894
2 0.07253667906709224 [1.41860031 2.31808535 3.59495338 2.31971237]
0.0024509479552507624
3 0.074987627022343 [1.92556876 2.00596372 3.46177965 2.35165647]
0.0022522943625744862
4 0.07723992138491749 [1.95431057 1.68963438 3.59960962 2.50723644]
0.002593100792406197
5 0.07983302217732369 [1.75013398 1.71429934 3.52137313 2.68581626]
0.0047517849757485325
6 0.08458480715307222 [1.80451775 1.74482324 3.59974129 2.72391039]
0.0041960007999607835
7 0.08038880635311144 [1.85449716 1.75128116 3.52074712 2.77260743]
0.007004344798177825
8 0.07338446155493361 [1.85191784 1.72949086 3.54049099 2.80313851]
0.006558509919818545
9 0.07994297147475216 [1.84516214 1.71185383 3.51914363 2.81724051]
0.008489208898058001
10 0.07145376257669415 [1.85234825 1.70312722 3.53046095 2.815261  ]
0.0052504776419727145
11 0.07670424021866687 [1.8532827  1.70934529 3.52952753 2.8159873 ]
0.000

### Scratchwork below

In [44]:
# In theory, using linear regression with noise addition is equivalent to ridge

tune_bb(LinearRegression(),
        X,
        y,
        regularization="NoiseAddition",
        c = [2,2,1,1.5],
        criterion="MSE")

# Load package
from sklearn.linear_model import RidgeCV

# Fit ridge regression using LOOCV
alphas = np.linspace(0.0001, 5, 100)
ridge_cv = RidgeCV(alphas = alphas).fit(X,y)
ridge_cv.alpha_

best lambda
0.15151515151515152


0.7071565656565657

In [21]:
# maybe something like this if they all follow the .fit() syntax? 
def model_test(model,X,y):
    return model.fit(X,y)

In [22]:
# this passes the model in ?
model_test(LinearRegression(), X, y).coef_

array([-0.11190585, -0.04007949,  0.22864503,  0.60925205])

In [23]:
model_test(Ridge(), X, y).coef_ # niceeeeeeeeee

array([-0.11346491, -0.03184254,  0.25936799,  0.53764103])

In [20]:
model_test(Lasso(), X, y).coef_

array([ 0.57894737, -0.        , -0.        ])

In [None]:
# ridge tuning parameter, for example
# expand this later
parameter_set = np.linspace(0,10,11) # 0 to 10

cv_score_set = []
# begin cross validation
for alpha in parameter_set:

    algo.alpha = alpha
    cv = cross_val_score(algo, 
                         X, 
                         y, 
                         scoring=criterion,
                         cv=K)

    # cv returns negative values, need abs()
    cv_score = np.mean(np.absolute(cv))
    cv_score_set.append(cv_score)

minimum_score = cv_score_set.index(np.min(cv_score_set))
alpha_value = parameter_set[minimum_score]

In [16]:
X_iris, y_iris = datasets.load_iris(return_X_y=True)

(150, 4)

### Resources and Notes: 

1. https://www.statology.org/k-fold-cross-validation-in-python/

In [67]:
# use this to view doc string
?tune_bb