In [1]:
import pandas as pd
import seaborn as sns
import numpy as np
import math
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from tqdm import tqdm

In [2]:
def process_data(feature_engineer=False, only_red=True):
    if only_red:
        df = pd.read_csv('Downloads/wine+quality/winequality-red.csv', sep=';')
    else:
        # read the csv and one-hot encode the red/white feature
        df_red = pd.read_csv('Downloads/wine+quality/winequality-red.csv', sep=';')
        df_white = pd.read_csv('Downloads/wine+quality/winequality-white.csv', sep=';')
        df_red['color'] = 'red'
        df_white['color'] = 'white'
        df = pd.concat([df_red, df_white])
        df = df.join(pd.get_dummies(df['color']))
        df = df.drop('color', axis=1)

    # optional feature engineering to get some nonlinearity
    if feature_engineer:
        df['fixed/volatile'] = df['fixed acidity'] / df['volatile acidity']
        df['citric/sugar'] = df['citric acid'] / df['residual sugar']
        df['free/total sulfur'] = df['free sulfur dioxide'] / df['total sulfur dioxide']
        df['chlorides_quad'] = (df['chlorides'] - 5) ** 2
        df['sulphates/pH'] = df['sulphates'] / df['pH']
        df['density_aux'] = (1 - df['density'])
    return df

def prep_for_training(df): 
    # split into data and targets and convert to numpy arrays
    y = df['quality']
    X = df.drop('quality', axis=1)
    X = X.to_numpy().astype(np.float32)
    y = y.to_numpy().astype(np.float32)

    # split into 60% train, 20% val, and 10% test
    X_trainval, X_test, y_trainval, y_test = train_test_split(
        X,
        y,
        test_size=0.20,
        random_state=42
    )
    X_train, X_val, y_train, y_val = train_test_split(
        X_trainval,
        y_trainval,
        test_size=0.25,
        random_state=42
    )

    # normalize the features so that L^2 regularization affects things equally
    scaler = preprocessing.StandardScaler().fit(X_train)
    X_train = scaler.transform(X_train)
    X_val = scaler.transform(X_val)
    X_test = scaler.transform(X_test)
    return X_train, X_val, X_test, y_train, y_val, y_test

df = process_data(feature_engineer=True)

df.head()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality,fixed/volatile,citric/sugar,free/total sulfur,chlorides_quad,sulphates/pH,density_aux
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5,10.571429,0.0,0.323529,24.245776,0.159544,0.0022
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5,8.863636,0.0,0.373134,24.029604,0.2125,0.0032
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5,10.263158,0.017391,0.277778,24.088464,0.199387,0.003
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6,40.0,0.294737,0.283333,24.255625,0.183544,0.002
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5,10.571429,0.0,0.323529,24.245776,0.159544,0.0022


In [3]:
# Taken from week 1 notebook

def MSE(y_true, y_pred):
    residues = y_true - y_pred
    return residues @ residues.T / len(y_true)

def R2(y_true, y_pred):
    return 1 - MSE(y_true, y_pred) / y_true.var()

y_true = np.array([1, 5])
y_pred = np.array([2, 4])
assert MSE(y_true, y_pred) == 1
assert R2(y_true, y_pred) == 1 - 1 / 4

In [4]:
def concat_ones(X):
    # Add a 1 in front of every training sample for the bias term.
    return np.concatenate([np.ones(shape=(len(X), 1)), X], axis=1)

class SGD:
    def __init__(self, weights, step_size=0.01):
        self.weights = weights
        self.step_size = step_size

    def step(self, grad):
        self.weights -= self.step_size * grad


class LR:
    def __init__(self, learn_bias=False, L2_coeff=0.0, step_size=1e-5, n_iters=100):
        self.beta = None
        self.learn_bias = learn_bias
        self.L2_coeff = L2_coeff
        self.step_size = step_size
        self.n_iters = n_iters

    def fit(self, X, y, closed_form=True):
        if self.learn_bias:
            X = concat_ones(X)
        if not closed_form: # use gradient descent
            self.beta = np.zeros(X.shape[1])
            optimizer = SGD(self.beta, step_size=1e-5)
            for _ in tqdm(range(self.n_iters)):
                grad = -2 * X.T @ y + 2 * X.T @ X @ self.beta + 2 * self.L2_coeff * self.beta
                optimizer.step(grad)
        else: # use the closed_form solution
            self.beta = np.linalg.inv(X.T @ X + self.L2_coeff) @ X.T @ y

    def predict(self, X_test):
        if self.learn_bias:
            X_test = concat_ones(X_test)
        if self.beta is None:
            raise ValueError('Fit the LR model before predicting.')
        return X_test @ self.beta
    
model = LR()

In [5]:
X_train, X_val, X_test, y_train, y_val, y_test = prep_for_training(df)

# train a LR model on the training data
model = LR(learn_bias=True, L2_coeff=1, step_size=1e-3, n_iters=10000)
model.fit(X_train, y_train, closed_form=False)

# evaluate how well the model does on the training versus validation data
y_pred = model.predict(X_train)
print(f'R^2 train = {R2(y_train, y_pred)}')
print(f'RMSE train = {math.sqrt(MSE(y_train, y_pred))}')

y_pred = model.predict(X_val)
print(f'R^2 val = {R2(y_val, y_pred)}')
print(f'RMSE val = {math.sqrt(MSE(y_val, y_pred))}')

100%|██████████| 10000/10000 [00:02<00:00, 4738.20it/s]

R^2 train = 0.36962178168370474
RMSE train = 0.6419195945931451
R^2 val = 0.284664353922895
RMSE val = 0.6738008838275571





In [6]:
# finally, evaluate on the test data
y_pred = model.predict(X_test)
print(f'R^2 test = {R2(y_test, y_pred)}')
print(f'RMSE test = {math.sqrt(MSE(y_test, y_pred))}')

R^2 test = 0.4128044500153778
RMSE test = 0.6194640416636605
