In [1]:
"""
This is my self implemented ml project.
No ML library used for core ML calculation or logic.
"""

'\nThis is my self implemented ml project.\nNo ML library used for core ML calculation or logic.\n'

In [2]:
import os
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path

In [3]:
def read_dataset(filename: str) -> pd.DataFrame:
    """
    Reads dataset from the project's data directory.
    Works in notebooks and scripts by searching upward.
    """
    current = Path.cwd()

    # search upward for "data" directory
    for parent in [current] + list(current.parents):
        data_path = parent / "data" / filename
        if data_path.exists():
            return pd.read_csv(data_path, sep=";")

    raise FileNotFoundError(
        f"{filename} not found in any parent 'data/' directory."
    )

In [4]:
white = read_dataset("winequality-white.csv")

assert white.shape[1] == 12

In [5]:
red = read_dataset("winequality-red.csv")

assert red.shape[1] == 12

In [6]:
X_white = white.drop(columns=['quality'])
y_white = white['quality']

In [7]:
X_red = red.drop(columns=['quality'])
y_red = red['quality']

In [8]:
X_white = X_white.to_numpy(dtype=float)
y_white = y_white.to_numpy(dtype=float)
X_red = X_red.to_numpy(dtype=float)
y_red = y_red.to_numpy(dtype=float)

In [9]:
assert X_white.ndim == 2
assert y_white.ndim == 1
assert X_white.shape[0] == y_white.shape[0]

assert X_red.ndim == 2
assert y_red.ndim == 1
assert X_red.shape[0] == y_red.shape[0]

In [10]:
X_white_mean = X_white.mean(axis=0)
X_white_std = X_white.std(axis=0)

X_white = (X_white - X_white_mean) / X_white_std

X_red_mean = X_red.mean(axis=0)
X_red_std = X_red.std(axis=0)

X_red = (X_red - X_red_mean) / X_red_std

In [11]:
def test_train_split(dataset, target):
    size = dataset.shape[0]
    train_size = int(size * 0.8)
    return dataset[:train_size, :], dataset[train_size:, :], target[:train_size], target[train_size:]

In [12]:
def init_params(X):
    w = np.zeros(X.shape[1])
    b = 0.0
    params = dict()
    params['w'] = w
    params['b'] = b
    return params

In [13]:
def compute_predictions_loop(x, params):
    w = params['w']
    b = params['b']
    y_hat = np.zeros(x.shape[0])
    for i in range(x.shape[0]):
        mul = 0
        for j in range(x.shape[1]):
            mul += w[j] * x[i, j]
        y_hat[i] = mul + b

    return y_hat

In [14]:
def compute_predictions_vec(x, params):
    w = params['w']
    b = params['b']
    y_hat = x @ w + b
    return y_hat

In [15]:
def loss_loop(target, prediction):
    n = target.shape[0]
    j = 0
    for i in range(n):
        j += (target[i] - prediction[i]) ** 2

    j /= n
    return j

In [16]:
def loss_vec(target, prediction):
    j = np.sum(np.square(target - prediction))
    j/= target.shape[0]

In [17]:
def compute_gradients_loop(target, prediction, dataset):
    m = target.shape[0]
    features = dataset.shape[1]
    dw = np.zeros(features)
    db = 0.0
    for i in range(m):
        for j in range(features):
            dw[j] += dataset[i, j] * (prediction[i] - target[i])
        db += (prediction[i] - target[i])

    dw *= 2/m
    db *= 2/m
    grads = dict()
    grads['dw'] = dw
    grads['db'] = db
    return grads

In [18]:
def compute_gradients_vec(target, prediction, dataset):
    m = target.shape[0]
    dw = 2 / m * dataset.T @ (prediction - target)
    db = np.sum(prediction - target) * 2 / m
    grads = dict()
    grads['dw'] = dw
    grads['db'] = db
    return grads

In [19]:
def updated_params(params, grads, alpha):
    w, b = params['w'], params['b']
    w -= alpha * grads['dw']
    b -= alpha * grads['db']

    updated = dict()
    updated['w'] = w
    updated['b'] = b
    return updated

In [20]:
def test_regression(X_test, y_test, params):
    w, b = params["w"], params["b"]
    
    # predictions
    y_pred = X_test @ w + b

    # Mean Squared Error
    mse = np.mean((y_test - y_pred) ** 2)

    # Root Mean Squared Error (interpretable)
    rmse = np.sqrt(mse)

    # R^2 score
    ss_res = np.sum((y_test - y_pred) ** 2)
    ss_tot = np.sum((y_test - np.mean(y_test)) ** 2)
    r2 = 1 - ss_res / ss_tot

    return {
        "MSE": mse,
        "RMSE": rmse,
        "R2": r2
    }

In [21]:
def test_accuracy_bands(X_test, y_test, params):
    w, b = params["w"], params["b"]
    
    # predictions
    y_pred = X_test @ w + b
    
    # absolute error
    abs_err = np.abs(y_pred - y_test)

    # exact match (after rounding)
    exact_match = np.mean(np.round(y_pred) == y_test)

    # tolerance-based accuracies
    within_05 = np.mean(abs_err <= 0.5)
    within_1  = np.mean(abs_err <= 1.0)
    within_2  = np.mean(abs_err <= 2.0)

    return {
        "Exact match (%)": exact_match * 100,
        "Within ±0.5 (%)": within_05 * 100,
        "Within ±1 (%)":  within_1 * 100,
        "Within ±2 (%)":  within_2 * 100
    }

In [22]:
X_white_train, X_white_test, y_white_train, y_white_test = test_train_split(X_white, y_white)
X_red_train, X_red_test, y_red_train, y_red_test = test_train_split(X_red, y_red)

In [23]:
def regression_loop(dataset, target, alpha, iterations):
    params = init_params(dataset)
    for i in range(iterations):
        prediction = compute_predictions_loop(dataset, params)
        j = loss_loop(target, prediction)
        grads = compute_gradients_loop(target, prediction, dataset)
        params = updated_params(params, grads, alpha)
        assert np.all(np.isfinite(params['w']))
        assert np.isfinite(params['b'])
    return params

In [24]:
def regression_vec(dataset, target, alpha, iterations):
    params = init_params(dataset)
    for i in range(iterations):
        prediction = compute_predictions_vec(dataset, params)
        j = loss_vec(target, prediction)
        grads = compute_gradients_vec(target, prediction, dataset)
        params = updated_params(params, grads, alpha)
        assert np.all(np.isfinite(params['w']))
        assert np.isfinite(params['b'])
    return params

In [25]:
tic = time.perf_counter()
params_white_loops = regression_loop(X_white_train, y_white_train, 0.001, 6000)
tac = time.perf_counter()
print(tac-tic)
print(params_white_loops)

263.7840006819988
{'w': array([-0.02352935, -0.18199308, -0.00685084,  0.22191839, -0.01258503,
        0.0954569 , -0.02440375, -0.16758964,  0.0619726 ,  0.06967165,
        0.40126571]), 'b': np.float64(5.907208610509608)}


In [26]:
tic = time.perf_counter()
params_vac = regression_vec(X_white_train, y_white_train, 0.001, 6000)
tac = time.perf_counter()
print(tac-tic)
print(params_vac)

0.6046303330003866
{'w': array([-0.02352935, -0.18199308, -0.00685084,  0.22191839, -0.01258503,
        0.0954569 , -0.02440375, -0.16758964,  0.0619726 ,  0.06967165,
        0.40126571]), 'b': np.float64(5.907208610509608)}


In [28]:
train_white_loops_result = test_regression(X_white_train, y_white_train, params_white_loops)
print(train_white_loops_result)

{'MSE': np.float64(0.5845326038016652), 'RMSE': np.float64(0.7645473195307568), 'R2': np.float64(0.29567436854323226)}


In [29]:
test_white_loops_result = test_regression(X_white_test, y_white_test, params_white_loops)
print(train_white_loops_result)

{'MSE': np.float64(0.5845326038016652), 'RMSE': np.float64(0.7645473195307568), 'R2': np.float64(0.29567436854323226)}


In [30]:
train_white_vac_result = test_regression(X_white_train, y_white_train, params_vac)
print(train_white_vac_result)

{'MSE': np.float64(0.5845326038016652), 'RMSE': np.float64(0.7645473195307568), 'R2': np.float64(0.29567436854323226)}


In [31]:
test_white_vac_result = test_regression(X_white_train, y_white_train, params_vac)
print(train_white_vac_result)

{'MSE': np.float64(0.5845326038016652), 'RMSE': np.float64(0.7645473195307568), 'R2': np.float64(0.29567436854323226)}


In [32]:
"""
As Vac and loops give same result only vac version is tested to avoid too much code from this point.
params_red_loops will still be calculated for noting down performance.
"""

'\nAs Vac and loops give same result only vac version is tested to avoid too much code from this point.\nparams_red_loops will still be calculated for noting down performance.\n'

In [33]:
metrics_white_train = test_accuracy_bands(X_white_train, y_white_train, params_vac)
for k, v in metrics_white_train.items():
    print(f"{k}: {v:.2f}%")

Exact match (%): 50.94%
Within ±0.5 (%): 50.94%
Within ±1 (%): 83.87%
Within ±2 (%): 98.37%


In [34]:
metrics_white_test = test_accuracy_bands(X_white_test, y_white_test, params_vac)
for k, v in metrics_white_test.items():
    print(f"{k}: {v:.2f}%")

Exact match (%): 53.98%
Within ±0.5 (%): 53.98%
Within ±1 (%): 87.35%
Within ±2 (%): 98.47%


In [35]:
tic = time.perf_counter()
params_red_loops = regression_loop(X_red_train, y_red_train, 0.001, 6000)
tac = time.perf_counter()
print(tac-tic)
print(params_red_loops)

84.4413016439994
{'w': array([ 0.09120673, -0.19295701, -0.04949786,  0.02506003, -0.07379732,
        0.02228939, -0.10519019, -0.07245554, -0.02628482,  0.13827746,
        0.28608893]), 'b': np.float64(5.657515725254137)}


In [36]:
tic = time.perf_counter()
params_red_vac = regression_vec(X_red_train, y_red_train, 0.001, 6000)
tac = time.perf_counter()
print(tac-tic)
print(params_red_vac)

0.2912727860002633
{'w': array([ 0.09120673, -0.19295701, -0.04949786,  0.02506003, -0.07379732,
        0.02228939, -0.10519019, -0.07245554, -0.02628482,  0.13827746,
        0.28608893]), 'b': np.float64(5.657515725254137)}


In [101]:
train_red_result = test_regression(X_red_train, y_red_train, params_red_vac)
print(train_red_result)

{'MSE': np.float64(0.41607907619837975), 'RMSE': np.float64(0.645041918171509), 'R2': np.float64(0.3690494088655838)}


In [102]:
test_red_result = test_regression(X_red_test, y_red_test, params_red_vac)
print(train_red_result)

{'MSE': np.float64(0.41607907619837975), 'RMSE': np.float64(0.645041918171509), 'R2': np.float64(0.3690494088655838)}


In [103]:
metrics_red_train = test_accuracy_bands(X_red_train, y_red_train, params_red_vac)
for k, v in metrics_red_train.items():
    print(f"{k}: {v:.2f}%")

Exact match (%): 58.41%
Within ±0.5 (%): 58.41%
Within ±1 (%): 89.68%
Within ±2 (%): 99.45%


In [104]:
metrics_red_test = test_accuracy_bands(X_red_test, y_red_test, params_red_vac)
for k, v in metrics_red_test.items():
    print(f"{k}: {v:.2f}%")

Exact match (%): 64.38%
Within ±0.5 (%): 64.38%
Within ±1 (%): 88.12%
Within ±2 (%): 99.38%
