In [1]:
import numpy as np
import os.path

In [6]:
path = '..\datasets\death_rates_data.txt'

def get_data(path = path):
    if os.path.exists(path):
        X = []
        y = []
        with open(path, 'r') as f:
            for line in f:
                sample = [float(elem) for elem in line.split()]
                X.append(sample[:-1])
                y.append(sample[-1])
        return X, y
    else:
        print('this path doesn''t exist!')
        return

In [7]:
def normalize_and_add_ones(X):
    X = np.array(X)
    X_max = np.max(X, axis = 0)
    X_min = np.min(X, axis = 0)

    normalized_X = (X - X_min) / (X_max - X_min)
    ones = np.ones((X.shape[0], 1))
    return np.hstack((ones, normalized_X))

In [8]:
class Ridge_regression:
    def __init__(self):
        return
    
    def fit(self, X_train, y_train, LAMBDA):
        assert len(X_train.shape) == 2 and X_train.shape[0] == y_train.shape[0]
        return (np.linalg.inv(X_train.transpose().dot(X_train)) + LAMBDA * np.identity(X_train.shape[1])).dot(X_train.transpose()).dot(y_train)
    
    def fit_grad_descent(self, X_train, y_train, LAMBDA, learning_rate, max_num_epochs = 100, batch_size = 128):
        pass

    def predict(self, W, x_new):
        x_new = np.array(x_new)
        y_new = x_new.dot(W)
        return y_new
        
    def compute_RSS(self, y_new, y_predicted):
        loss =  (1. / y_new.shape[0]) * np.sum ((y_new - y_predicted) ** 2)
        return loss

    def get_the_best_LAMBDA(self, X_train, y_train):
        def cross_validation(num_folds, LAMBDA):
            row_ids = np.array(range(X_train.shape[0]))

            valid_ids = np.split(row_ids[:len(row_ids) - len(row_ids) % num_folds], num_folds)
            valid_ids[-1] = np.append(valid_ids[-1], row_ids[len(row_ids) - len(row_ids) % num_folds:])

            train_ids = [[k for k in row_ids if k not in valid_ids[i]] for i in range(num_folds)]

            aver_RSS = 0
            for i in range(num_folds):
                valid_part = {'X': np.array([X_train[j] for j in valid_ids[i]]), 'y': np.array([y_train[j] for j in valid_ids[i]])}
                train_part = {'X': np.array([X_train[j] for j in train_ids[i]]), 'y': np.array([y_train[j] for j in train_ids[i]])}
                W = self.fit(train_part['X'], train_part['y'], LAMBDA)
                y_predicted = self.predict(W, valid_part['X'])
                aver_RSS += self.compute_RSS(valid_part['y'], y_predicted)

            return aver_RSS / num_folds

        def range_scan(best_LAMBDA, minimum_RSS, LAMBDA_values):
            for current_LAMBDA in LAMBDA_values:
                aver_RSS = cross_validation(5, current_LAMBDA)
                if aver_RSS < minimum_RSS:
                    best_LAMBDA = current_LAMBDA
                    minimum_RSS = aver_RSS
            return best_LAMBDA, minimum_RSS

        best_LAMBDA, minimum_RSS = range_scan(0, minimum_RSS = 10000 ** 2, LAMBDA_values = range(50))

        LAMBDA_values = [k * 1. / 1000 for k in range(max(0, (best_LAMBDA - 1) * 1000), best_LAMBDA * 1000 + 1, 1)]
        
        best_LAMBDA, minimum_RSS = range_scan(best_LAMBDA, minimum_RSS = minimum_RSS, LAMBDA_values = LAMBDA_values)
        
        return best_LAMBDA


In [9]:
X, y = get_data()
X = normalize_and_add_ones(X)
y = np.array(y)
X_train, y_train = X[:50], y[:50]
X_test, y_test = X[50:], y[50:]

ridge_regressor = Ridge_regression()

best_LAMBDA = ridge_regressor.get_the_best_LAMBDA(X_train, y_train)
print('best lambda: ', best_LAMBDA)

W_learned = ridge_regressor.fit(X_train, y_train, best_LAMBDA)
y_predicted = ridge_regressor.predict(W_learned, X_test)

rss = ridge_regressor.compute_RSS(y_test, y_predicted)
print('loss: ', rss)


best lambda:  0
loss:  2191.979180191641
