# Notebook created to train the logistic regression model without having to reload the dataset every time


# Because for some reason, ucimlrepository takes an weirdly long time to load


In [23]:
import pandas as pd
from pandas import DataFrame
from ucimlrepo import fetch_ucirepo
import numpy as np
from numpy import floating as fl, float32 as f32, float64 as f64, int32 as i32
from numpy.typing import NDArray
from pprint import pprint

# Iris dataset
DATASET_ID = 53

iris = fetch_ucirepo(id=DATASET_ID)  # fetch dataset
assert iris.data is not None

DATA: DataFrame = iris.data.original
LAB_NAME: str = iris.data["headers"][-1]

from sklearn.model_selection import train_test_split

# FEAT, FEAT_test, y_train, y_test = train_test_split(iris.data.features, DATA[LAB_NAME], test_size=0.3, random_state=np.random.randint(0, 100))
FEAT, FEAT_test, y_train, y_test = train_test_split(iris.data.features, DATA[LAB_NAME], test_size=0.3, random_state=42)

DATA_train = FEAT.copy(deep=True)
DATA_train["class"] = y_train

DATA_test = FEAT_test.copy(deep=True)
DATA_test["class"] = y_test

# FEAT: DataFrame = X_train
# FEAT_test: DataFrame = X_test

LABELS_STR: DataFrame = DATA[LAB_NAME]  # class value as string
# LABELS_STR_test: DataFrame = DATA_test[LAB_NAME]  # type: ignore

lab_values = LABELS_STR.unique()
# lab_values_test = LLAB_IDX_VALABELS_STR_test.unique()

LAB_IDX_VAL: dict[int, str] = dict(zip(range(len(lab_values)), lab_values))  # class index, class value
LAB_VAL_IDX: dict[str, int] = {v: k for k, v in LAB_IDX_VAL.items()}  # class value, class index

LABELS: NDArray = np.array([LAB_VAL_IDX[class_value] for class_value in y_train])
LABELS_test: NDArray = np.array([LAB_VAL_IDX[class_value] for class_value in y_test])
COL_NAMES = list(FEAT.columns)

In [4]:
FEAT

Unnamed: 0,sepal length,sepal width,petal length,petal width
81,5.5,2.4,3.7,1.0
133,6.3,2.8,5.1,1.5
137,6.4,3.1,5.5,1.8
75,6.6,3.0,4.4,1.4
109,7.2,3.6,6.1,2.5
...,...,...,...,...
71,6.1,2.8,4.0,1.3
106,4.9,2.5,4.5,1.7
14,5.8,4.0,1.2,0.2
92,5.8,2.6,4.0,1.2


## Gradient Descent

`gradient_descent.py`


In [13]:
def grad_desc_ml(
        features: NDArray, labels: NDArray, df, w: NDArray, b: float, alpha: float, num_iters: int
) -> tuple[NDArray, float]:
    """Same gradient descent `gradient_desent` method, but that takes `features` (X) and `labels` (y)
    as additional parameters, since they're obviously going to be need for any kind of learning whatsoever.
    Parameters
    ----------
    `features` : NDArray
        Samples / features.
    `labels` : NDArray
        labels / class associated to each sample.
    `df`: function
        derivative function (i.e. gradient)
    `w` : NDArray
        weights vector.
    `b` : fl (float or NDArray[float])
        bias
    `alpha`: float
        define how the function will converge. Values too big will give bad results and values too small won't converge or will converge too slowly
    `num_iters`: Number of iterations
    Return value
    ------------
    Optimal vector for the initial configuration and parameters"""

    for _ in range(num_iters):
        grad_w, grad_b = df(features, labels, w, b)
        w -= alpha * grad_w
        b -= alpha * grad_b
    return w, b


## Logistic Regression

`log_reg.py`


## Logistic Regression but with CuPy (Nvidia / Cuda)


In [14]:
def compute_metrics(data, predicted_values):
    from sklearn.metrics import precision_score, recall_score, accuracy_score, f1_score
    """ This function calculates the performance metrics for each class in a binary classification problem.
        The metrics calculated are Precision, Recall, and F1 Score.
        :param data: (DataFrame): The DataFrame containing the actual labels.
        :param predicted_values: (list): The list containing the predicted labels.
        :return: dict: A dictionary containing the performance metrics for each class."""
    y_true, y_pred = data, predicted_values

    precision = precision_score(y_true, y_pred, average="micro")
    recall = recall_score(y_true, y_pred, average="micro")
    accuracy = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average="micro")

    return {
        'precision': float(precision),
        'recall': float(recall),
        'accuracy': float(accuracy),
        'f1_score': float(f1)
    }

In [31]:
import cupy as cp
from cupy import ndarray as CPArray
from numpy.random import rand, randint


def z(w: CPArray, X: CPArray, b: float) -> CPArray:
    """:return: ``cp.dot(X, w) + b``.
    i.e. float or CPArray[float] (i.e. cupy.ndarray)
    NOTE: `w` and `X` can be interchanged e.g. (`z(X, w, b)), it won't give
    the same result (in general) but as long as matrix multiplication dimensions
    are respected, it will work."""
    return cp.dot(X, w) + b


def sigmoid(z): return 1 / (1 + cp.exp(-z))


def norm(X: CPArray): return (X - cp.mean(X)) / cp.std(X)


def grad_gpu(X: CPArray, y: CPArray, w: CPArray, b: float):
    """:return: (dw, db). i.e. Computes aforementioned derivatives w.r.t "w" and "b". 
    (on gpu. X, y, w, b are `cupy.ndarray` shortened to `CPArray`)"""

    predictions = sigmoid(z(w, X, b))  # Sigmoid function applied to z
    errors = y - predictions  # Difference between actual and predicted values
    db = -cp.mean(errors)  # Vectorized computation of db component

    X_sum_over_rows = cp.sum(X, axis=1)  # Sum over rows of X

    N = X.shape[0]  # samples number
    dw = -cp.mean(errors @ X)  # Vectorized computation of dw component
    return dw, db


def train_log_reg(X: NDArray, y: NDArray, w: NDArray, b: float, n_it: int, lr: float) -> tuple[NDArray, float]:
    """
    :param X: Feature matrix (covariables)
    :param y: Label vector
    :param w: initial weight vector
    :param b:  initial bias
    :param n_it: iterations number
    :param lr: learning rate
    :return: Trained weight vector and bias to minimize by gradient descent.
    """
    X, y, w = map(cp.array, (X, y, w))
    return grad_desc_ml(X, y, grad_gpu, w, b, lr, n_it)  # type: ignore


def predict_log_reg(X: NDArray, w: NDArray, b):
    """ Predict the class labels for a set of examples X using logistic regression parameters w and b.
    :param X: The input features. 2D Matrix NDArray
    :param w: The weights of the logistic regression model. Vector NDArray
    :param b: The bias of the logistic regression model. float
    :return: Vector of predicted class labels (0 or 1) for each example in X. Vector NDArray
    """
    X, w = map(cp.array, (X, w))
    # return i32(sigmoid(z(w, X, b)).get() >= 0.5)
    # print(sigmoid(z(w, X, b)))
    # predicted= sigmoid((z(w, X, b))).get()
    predicted = sigmoid(norm(z(w, X, b))).get()
    class_nb = len(LAB_VAL_IDX)
    return i32(predicted * class_nb)  # returns k-1 if val < (k/class_nb) i.e. 0 if < 1/3, 1 if < 2/3...


def pred_compute(X_test, Y_test, w, b):
    # if DataFrame or Series is passed, convert to numpy array
    if type(X_test) != np.ndarray: X_test = np.asarray(X_test)
    # for i in range(X_test.shape[1]): X_test[:, i] = norm(X_test[:, i])
    # X_test[:, :] = np.apply_along_axis(norm, 0, X_test[:, :])
    predicted_val_logreg = predict_log_reg(X_test, w, b)
    metrics = compute_metrics(Y_test, predicted_val_logreg)
    return metrics


def test_train_gpu(m, n):
    X, y, w, b = rand(m, n), rand(m), rand(n), rand()
    n_it, lr = 100, 0.03
    w, b = train_log_reg(X, y, w, b, n_it, lr)

In [8]:
#class_nb = len(LAB_VAL_IDX)
#thresholds = np.linspace(1 / class_nb, 1, class_nb - 1, endpoint=False)
#val = 0.34
#idx = 0
## while idx < (class_nb - 1) and val > thresholds[idx]: idx += 1
#idx = int(val * class_nb)
#print(thresholds)
#print(idx)

In [33]:
#_w, _b = np.array([0.57880481, 0.93467781, 0.80771699, 0.89825272]), 0.03075745284540543
from sklearn.metrics import f1_score


w, b = [0.53452349, 0.36463584, 1.16132476, 1.08204578], 0.45146791
pred_compute(FEAT_test, LABELS_test, w, b)

predicted_val_logreg = predict_log_reg(FEAT_test.to_numpy(), w, b)
f1_score(LABELS_test, predicted_val_logreg, average="micro")
# predict_log_reg(FEAT_test, w, b)

1.0

In [36]:
import sklearn.metrics as metrics

m, n = FEAT.shape
init_w = np.random.rand(n)
init_b = np.random.rand()
n_it, lr = 1000, 1e-5

init_w, init_b = [0.53452349, 0.36463584, 1.16132476, 1.08204578], 0.45146791
# w, b = [0.53452349, 0.36463584, 1.16132476, 1.08204578], 0.45146791
w, b = train_log_reg(FEAT.to_numpy(), LABELS, init_w, init_b, n_it, lr)
predicted_val_logreg = predict_log_reg(FEAT_test.to_numpy(), w, b)

score = metrics.f1_score(LABELS_test, predicted_val_logreg, average="weighted")
# pprint(w)
# pprint(b)
print("score:", score)


score: 0.9332870012870013


In [11]:
m, n = FEAT.shape
init_w = np.random.rand(n)
init_b = np.random.rand()
n_it, lr = 100, 1e-10

from concurrent.futures import ThreadPoolExecutor

MAX_WORKERS = 2**8
# lr = 3.162277660168379e-06
# init_w, init_b = [0.53452349, 0.36463584, 1.16132476, 1.08204578], 0.45146791
# init_w, init_b = np.array([0.57880481, 0.93467781, 0.80771699, 0.89825272]), 0.03075745284540543
# init_w, init_b = [2.72002667, 3.07589967, 2.94893885, 3.03947458], 0.08176500616680844
# init_w, init_b = np.array([0.1911183 , 0.93876337, 0.98824842, 0.7948038]), 0.83589593
# w, b = np.array([0.57880481, 0.93467781, 0.80771699, 0.89825272]), 0.03075745284540543
# w, b = train_log_reg(FEAT.to_numpy(), LABELS, init_w, init_b, n_it, lr)
# w, b = init_w, init_b
# print("\tWeights, \t\t\t\t\t\t\t\t\t\tbias\n", w, b)

def max_lr_1iteration(init_w, init_b, expo, n_it):
    """Needed to be passed to threadpool executor. "submit()" function"""
    lr = 10 ** (-expo)
    X = np.asarray(FEAT.copy(deep=True))
    # X = np.apply_along_axis(norm, 0, np.asarray(FEAT.copy(deep=True)))
    w, b = train_log_reg(X, LABELS, init_w, init_b, n_it, lr)
    score = pred_compute(FEAT_test.to_numpy(), LABELS_test, w, b)
    return (score["f1_score"], lr, w, b) 

def max_lr():
    max_f1_scores = []
    init_w = np.random.rand(n) #* np.random.randint(0, 50)
    init_b = np.random.rand() #* np.random.randint(0, 50)

    start, stop, step = 8, 20, 1/18
    worker_nb = np.ceil((stop-start) / step)
    executor = ThreadPoolExecutor(max_workers=worker_nb)
    
    for expo in np.arange(start, stop, step=step):
        max_f1_scores.append(executor.submit(max_lr_1iteration, init_w, init_b, expo, n_it).result())
    return max(max_f1_scores, key=lambda x: x[0])


def max_it_1iteration(init_w, init_b, iter, lr):
    """Needed to be passed to threadpool executor. "submit()" function"""
    w, b = train_log_reg(FEAT.to_numpy(), LABELS, init_w, init_b, iter, lr)
    score = pred_compute(FEAT_test.to_numpy(), LABELS_test, w, b)
    return (score["f1_score"], iter)


def max_it():
    init_w = np.random.rand(n) #* np.random.randint(0, 50)
    init_b = np.random.rand() #* np.random.randint(0, 50)
    max_f1_scores = []
    print(init_w, init_b)

    start, stop, step = 100, 3000, 50
    worker_nb = np.ceil((stop-start) / step)
    executor = ThreadPoolExecutor(max_workers=worker_nb)

    for iter in range(start, stop, step):
        max_f1_scores.append(executor.submit(max_it_1iteration, init_w, init_b, iter, lr).result())
        # max_f1_scores.append(max_it_1iteration(init_w, init_b, iter, lr))
    return max(max_f1_scores, key=lambda x: x[0])


max_lr()
# pred_compute(FEAT_test.to_numpy(), LABELS_test, w, b)
# max_it()

(0.8000000000000002,
 1e-08,
 array([0.62552569, 0.93126723, 0.53632815, 0.44111999]),
 array(0.15500626))