## Import Library

In [2]:
import pandas as pd
import numpy as np

## Load data

In [10]:
data = pd.read_csv('x28.txt', sep = '\s+', header=None)
data = data.drop(columns = 0)
data.columns = ['A1', 'A2', 'A3', 'A4', 'A5', 'A6', 'A7', 'A8', 'A9', 'A10', 'A11', 'A12', 'A13', 'A14', 'A15', 'B']
data = data.astype("float")
X = data.iloc[:,0:15].to_numpy()
Y = data.iloc[:,15].to_numpy()

## Normalize and add ones

In [13]:
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)
ones = np.array([[1] for _ in range(X_normalized.shape[0])])
Xbar = np.column_stack((ones, X_normalized))

## Triển khai RidgeRegression

In [226]:
class RidgeRegression:
    def __init__(self):
        return
    
    def fit(self, X_train, Y_train, LAMBDA):
        assert len(X_train.shape) == 2  and X_train.shape[0] == Y_train.shape[0]
        A = np.dot(X_train.T, X_train) + LAMBDA*np.identity(X_train.shape[1])
        W = np.linalg.inv(A).dot(X_train.T).dot(Y_train)
        return W
    
    def fit_gradient_descent(self, X_train, Y_train, LAMBDA, learning_rate, max_num_epoch=100, batch_size=20):
        W = np.random.rand(X_train.shape[1])
        last_loss = 1e9
        for ep in range(max_num_epoch):
            arr = np.array(range(X_train.shape[0]))
            np.random.shuffle(arr)
            X_train = X_train[arr]
            Y_train = Y_train[arr]
            total_minibatch = int(np.ceil(X_train.shape[0]/batch_size))
            for i in range(total_minibatch):
                index = i * batch_size
                X_train_sub = X_train[index:index+batch_size]
                Y_train_sub = Y_train[index:index+batch_size] 
                grad = np.dot(X_train_sub.T, X_train_sub.dot(W) - Y_train_sub) + LAMBDA*W
                W = W - learning_rate*grad
            new_loss = self.compute_RSS(self.predict(W, X_train), Y_train)
            if np.abs(new_loss - last_loss) <= 1e-5:
                break
            last_loss = new_loss
        return W
    
    def predict(self, W, X_new):
        X_new = np.array(X_new)
        Y_new = np.dot(X_new, W)
        return Y_new
    
    def compute_RSS(self, Y_new, Y_predicted):
        loss = (1/Y_new.shape[0])*np.sum((Y_new - Y_predicted)**2)
        return loss
    
    def get_the_best_LAMBDA(self, X_train, Y_train):
        def cross_validation(num_folds, LAMBDA):
            row_ids = np.array(range(X_train.shape[0]))
            ending_ids = len(row_ids) - len(row_ids)%num_folds
            valid_ids = np.split(row_ids[:ending_ids], num_folds)
            valid_ids[-1] = np.append(valid_ids[-1], row_ids[ending_ids:])
            train_ids = [[k for k in row_ids if k not in valid_ids[i]] for i in range(num_folds)]
            total_RSS = 0
            for i in range(num_folds):
                valid_part = {'X': X_train[valid_ids[i]], 'Y': Y_train[valid_ids[i]]}
                train_part = {'X': X_train[train_ids[i]], 'Y': Y_train[train_ids[i]]}
                W = self.fit_gradient_descent(train_part['X'], train_part['Y'], LAMBDA, learning_rate=1e-3)
                Y_predicted = self.predict(W, valid_part['X'])
                total_RSS += self.compute_RSS(Y_test, Y_predicted)
            return total_RSS/num_folds
            
        
        def range_scan(best_LAMBDA, minimum_RSS, LAMBDA_values):
            for current_LAMBDA in LAMBDA_values:
                aver_RSS = cross_validation(5, current_LAMBDA)
                if aver_RSS < minimum_RSS:
                    minimum_RSS = aver_RSS
                    best_LAMBDA = current_LAMBDA
            return best_LAMBDA, minimum_RSS
        #Initalize
        best_LAMBDA, minimum_RSS = range_scan(best_LAMBDA=0, minimum_RSS=10000**2, LAMBDA_values=range(50))
        LAMBDA_values = [k*1./1000 for k in range(max(0, (best_LAMBDA - 1)*1000), (best_LAMBDA+1)*1000, 1)] #step size = 0.001
        
        #Scan
        best_LAMBDA, minimum_RSS = range_scan(best_LAMBDA=best_LAMBDA, minimum_RSS=minimum_RSS, LAMBDA_values=LAMBDA_values)
        
        return best_LAMBDA

## Main

In [227]:
X_train, Y_train = Xbar[:50], Y[:50]
X_test, Y_test = Xbar[50:], Y[50:]

ridgeRegression = RidgeRegression()

best_LAMBDA = ridgeRegression.get_the_best_LAMBDA(X_train, Y_train)
print('Best LAMBDA:', best_LAMBDA)

W_learned = ridgeRegression.fit(X_train, Y_train, best_LAMBDA)
Y_predicted = ridgeRegression.predict(W_learned, X_test)
RSS = ridgeRegression.compute_RSS(Y_test, Y_predicted)
print('RSS:', RSS)

Best LAMBDA: 1.294
RSS: 2297.672646377029
