In [1]:
import os

import numpy as np
import polars as pl

In [2]:
SEED = 462
np.random.seed(SEED)

In [3]:
data_path = os.path.join("data", "tabular")

In [4]:
class Dataset:
    def __init__(self, train_path, val_path, test_path):
        self.train_path = train_path
        self.val_path = val_path
        self.test_path = test_path
        self.mean = None
        self.std = None
        self.label_map = None

    def load_csv(self, path):
        df = pl.read_csv(path)
        data = df.to_numpy()
        X = data[:, :-1].astype(float)
        Y_str = data[:, -1]
        return X, Y_str

    def encode_labels(self, Y_str, fit=False):
        if fit:
            unique_labels = np.unique(Y_str)
            self.label_map = {label: i for i, label in enumerate(unique_labels)}

        Y = np.array([self.label_map[label] for label in Y_str])
        return Y

    def normalize(self, X, fit=False):
        if fit:
            self.mean = np.mean(X, axis=0)
            self.std = np.std(X, axis=0)
            self.std[self.std == 0] = 1.0

        return (X - self.mean) / self.std

    def get_data(self):
        X_train, Y_train_str = self.load_csv(self.train_path)
        X_val, Y_val_str = self.load_csv(self.val_path)
        X_test, Y_test_str = self.load_csv(self.test_path)

        Y_train = self.encode_labels(Y_train_str, fit=True)
        Y_val = self.encode_labels(Y_val_str, fit=False)
        Y_test = self.encode_labels(Y_test_str, fit=False)

        X_train = self.normalize(X_train, fit=True)
        X_val = self.normalize(X_val, fit=False)
        X_test = self.normalize(X_test, fit=False)

        return (X_train, Y_train), (X_val, Y_val), (X_test, Y_test)

In [5]:
class LogisticRegression:
    def __init__(self, learning_rate, num_iters):
        self.learning_rate = learning_rate
        self.num_iters = num_iters
        self.weights = None
        self.bias = None

    def sigmoid(self, z):
        z = np.clip(z, -500, 500)
        return 1 / (1 + np.exp(-z))

    def predict_proba(self, X):
        return self.sigmoid(np.dot(X, self.weights) + self.bias)

    def logistic_loss(self, y_true, y_pred):
        eps = 1e-15
        y_pred = np.clip(y_pred, eps, 1 - eps)
        return -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))

    def train(self, X, Y):
        n_examples, n_features = X.shape
        self.weights = np.zeros(n_features)
        self.bias = 0

        for i in range(self.num_iters):
            y_pred = self.predict_proba(X)
            y_diff = y_pred - Y

            self.weights -= self.learning_rate * np.dot(X.T, y_diff) / n_examples
            self.bias -= self.learning_rate * np.mean(y_diff)

In [6]:
class LogisticRegressionOVA:
    def __init__(self, learning_rate=0.01, num_iters=1000):
        self.learning_rate = learning_rate
        self.num_iters = num_iters
        self.models = []
        self.classes = None

    def train(self, X_train, Y_train, X_val, Y_val):
        self.classes = np.unique(Y_train)
        self.models = []

        for cls in self.classes:
            print(f"Training for class {cls}...")
            Y_train_bin = (Y_train == cls).astype(float)
            Y_val_bin = (Y_val == cls).astype(float)

            model = LogisticRegression(self.learning_rate, self.num_iters)

            # Custom training loop to print loss
            n_examples, n_features = X_train.shape
            model.weights = np.zeros(n_features)
            model.bias = 0

            for i in range(model.num_iters):
                y_pred = model.predict_proba(X_train)
                y_diff = y_pred - Y_train_bin

                model.weights -= model.learning_rate * np.dot(X_train.T, y_diff) / n_examples
                model.bias -= model.learning_rate * np.mean(y_diff)

                if i % 1000 == 0:
                    train_loss = model.logistic_loss(Y_train_bin, y_pred)
                    val_pred = model.predict_proba(X_val)
                    val_loss = model.logistic_loss(Y_val_bin, val_pred)
                    print(f"Iter {i}: Train Loss {train_loss:.4f}, Val Loss {val_loss:.4f}")

            self.models.append(model)

    def predict(self, X):
        probs = np.column_stack([model.predict_proba(X) for model in self.models])
        return np.argmax(probs, axis=1)


def accuracy(y_true, y_pred):
    return np.mean(y_true == y_pred) * 100

In [7]:
def accuracy(y_true, y_pred):
    return np.mean(y_true == y_pred) * 100

In [8]:
if __name__ == "__main__":
    dataset = Dataset(
        train_path=os.path.join(data_path, "train_processed.csv"),
        val_path=os.path.join(data_path, "validation_processed.csv"),
        test_path=os.path.join(data_path, "test_processed.csv"),
    )

    (X_train, Y_train), (X_val, Y_val), (X_test, Y_test) = dataset.get_data()

    model = LogisticRegressionOVA(learning_rate=0.001, num_iters=50000)
    model.train(X_train, Y_train, X_val, Y_val)

    test_pred = model.predict(X_test)
    print(f"Test Accuracy: {accuracy(Y_test, test_pred):.2f}%")

Training for class 0...
Iter 0: Train Loss 0.6931, Val Loss 0.6929
Iter 1000: Train Loss 0.5217, Val Loss 0.5201
Iter 2000: Train Loss 0.4256, Val Loss 0.4233
Iter 3000: Train Loss 0.3664, Val Loss 0.3636
Iter 4000: Train Loss 0.3268, Val Loss 0.3237
Iter 5000: Train Loss 0.2987, Val Loss 0.2953
Iter 6000: Train Loss 0.2777, Val Loss 0.2740
Iter 7000: Train Loss 0.2615, Val Loss 0.2576
Iter 8000: Train Loss 0.2485, Val Loss 0.2445
Iter 9000: Train Loss 0.2380, Val Loss 0.2339
Iter 10000: Train Loss 0.2292, Val Loss 0.2250
Iter 11000: Train Loss 0.2218, Val Loss 0.2175
Iter 12000: Train Loss 0.2154, Val Loss 0.2111
Iter 13000: Train Loss 0.2099, Val Loss 0.2055
Iter 14000: Train Loss 0.2050, Val Loss 0.2006
Iter 15000: Train Loss 0.2007, Val Loss 0.1963
Iter 16000: Train Loss 0.1969, Val Loss 0.1924
Iter 17000: Train Loss 0.1934, Val Loss 0.1889
Iter 18000: Train Loss 0.1903, Val Loss 0.1858
Iter 19000: Train Loss 0.1875, Val Loss 0.1830
Iter 20000: Train Loss 0.1848, Val Loss 0.1804
It