In [2]:
import numpy as np
import pandas as pd
import random


class MyLogReg():
    def __init__(self, n_iter, learning_rate, weights=None, metric=None, reg=None, l1_coef=0, l2_coef=0, sgd_sample=None, random_state=42):
        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self.weights = weights
        self.metric = metric
        self.reg = reg
        self.l1_coef = l1_coef
        self.l2_coef = l2_coef
        self.random_state = random_state
        self.sgd_sample = sgd_sample

        self.metric = self.metrics(metric)
        self.learning_rate = self.learning_rate_type(learning_rate)

    def __str__(self) -> str:
        return f"MyLogReg class: n_iter={self.n_iter}, learning_rate={self.learning_rate}"

    def __repr__(self) -> str:
        return f"MyLogReg class: n_iter={self.n_iter}, learning_rate={self.learning_rate}"

    def calculate_gradient(self, X, y, y_pred):
        if self.reg == 'l1':
            assert self.l1_coef != 0
            return (1/X.shape[0]) * np.dot((y_pred - y.values.ravel()), X) + self.l1_coef*np.sign(self.weights)
        elif self.reg == 'l2':
            assert self.l2_coef != 0
            return (1/X.shape[0]) * np.dot((y_pred - y.values.ravel()), X) + self.l2_coef*2*(self.weights)
        elif self.reg == 'elasticnet':
            assert self.l1_coef != 0 and self.l2_coef != 0
            return (1/X.shape[0]) * np.dot((y_pred - y.values.ravel()), X) + self.l1_coef*np.sign(self.weights) + self.l2_coef*2*(self.weights)
        else:
            return (1/X.shape[0]) * np.dot((y_pred - y.values.ravel()), X)

    def learning_rate_type(self, LR):
        if not isinstance(LR, float):
            return LR
        else:
            return lambda x: LR
        
    def sgd_sample_size(self, sgd_sample_num, samples):
        if isinstance(sgd_sample_num, float):
            return int(samples.shape[0] * sgd_sample_num)
        elif isinstance(sgd_sample_num, int):
            return sgd_sample_num
        else:
            return samples.shape[0]

    def metrics(self, metric) -> None:
        if metric:
            if metric == 'accuracy':
                metric = ['accuracy', lambda y, y_pred: (
                    np.sum(y.values.ravel() == (y_pred > 0.5).ravel())) / len(y)]

            elif metric == 'precision':
                # TP / (TP + FP)
                def precision_score_func(y, y_pred):
                    y_pred_binary = (y_pred > 0.5).ravel()
                    y_ravel = y.values.ravel()
                    TP = np.sum((y_ravel == 1) & (y_pred_binary == 1))
                    FP = np.sum((y_ravel == 0) & (y_pred_binary == 1))
                    return TP/(TP+FP)

                metric = ['precision', precision_score_func]

            elif metric == 'recall':
                # TP / (TP + FN)
                def recall_score_func(y, y_pred):
                    y_pred_binary = (y_pred > 0.5)
                    y_ravel = y.values.ravel()
                    TP = np.sum((y_ravel == 1) & (y_pred_binary == 1))
                    FN = np.sum((y_ravel == 1) & (y_pred_binary == 0))
                    return TP/(TP+FN)
                
                metric = ['recall', recall_score_func]

            elif metric == 'f1':
                # 2 * precision * recall / (precision + recall)
                def f1_score_func(y, y_pred):
                    y_pred_binary = (y_pred > 0.5)
                    y_ravel = y.values.ravel()
                    TP = np.sum((y_ravel == 1) & (y_pred_binary == 1))
                    FP = np.sum((y_ravel == 0) & (y_pred_binary == 1))
                    FN = np.sum((y_ravel == 1) & (y_pred_binary == 0))
                    precision = TP/(TP+FP)
                    recall = TP/(TP+FN)
                    return 2 * precision * recall / (precision + recall)
                
                metric = ['f1', f1_score_func]


            elif metric == 'roc_auc':
                def auc_score_def(y, y_pred):
                    data = np.concatenate(
                        (y.to_numpy().reshape(-1, 1), np.round(y_pred.reshape(-1, 1), 10)), axis=1)
                    data = data[data[:, 1].argsort()][::-1]

                    pos_above_iter = 0

                    for y, pred in data:
                        if y == 0:
                            if (data[data[:, 1] == pred]).shape[0] > 1:
                                pos_above_iter += np.sum(
                                    data[data[:, 1] > pred][:, 0], axis=0) / 2
                            pos_above_iter += np.sum(
                                data[data[:, 1] > pred][:, 0], axis=0)
                    return pos_above_iter / (np.sum(data[:, 0] == 1) * np.sum(data[:, 0] == 0))

                metric = ['roc_auc', auc_score_def]

        return metric

    def fit(self, samples: pd.DataFrame, y: pd.Series, verbose=False) -> None:
        random.seed(self.random_state)  # фиксируем рандом сид

        sgd_sample_quantity = self.sgd_sample_size(self.sgd_sample, samples)

        X = samples.copy()
        X.insert(0, 'bias', pd.Series(1, index=X.index))

        self.weights = np.ones(X.shape[1])

        for iter in range(1, self.n_iter+1):
            sample_rows_idx = random.sample(
                range(X.shape[0]), sgd_sample_quantity)
            X_mini_batch = X.iloc[sample_rows_idx]
            y_mini_batch = y.iloc[sample_rows_idx]

            y_pred = np.array(1/(1 + np.exp(-np.dot(X_mini_batch, self.weights))))

            y_pred_for_loss = np.array(1/(1 + np.exp(-np.dot(X, self.weights))))

            loss = -np.mean(np.log(y_pred_for_loss+1e-100)*y.values.ravel() +
                            np.log(1 - y_pred_for_loss+1e-100)*(1-y.values.ravel()))

            # grad = (1/(X.shape[0]) * np.dot((y_pred - y.values.ravel()), X))
            grad = self.calculate_gradient(X_mini_batch, y_mini_batch, y_pred)

            self.weights = self.weights - grad * self.learning_rate(iter)
            if verbose and (iter % verbose) == 0 and self.metric is not None:
                print(
                    f'iter = {iter+1} ||| Loss = {loss} ||| {self.metric[0]} = {self.metric[1](y, y_pred_for_loss)}')
            elif verbose and (iter % verbose) == 0:
                print(f'iter = {iter+1} ||| Loss = {loss}')
        if self.metric:
            self.final_metric = self.metric[1](y, np.array(
                1/(1 + np.exp(-np.dot(X, self.weights)))))

    def predict(self, samples):
        X = samples.copy()
        X.insert(0, 'bias', pd.Series(1, index=range(X.shape[0])))
        return (np.array(1/(1 + np.exp(-np.dot(X, self.weights)))) > 0.5).astype(np.int8)

    def predict_proba(self, samples):
        X = samples.copy()
        X.insert(0, 'bias', pd.Series(1, index=range(X.shape[0])))
        return np.array(1/(1 + np.exp(-np.dot(X, self.weights))))

    def get_coef(self) -> list():
        try:
            assert self.weights is not None
            return np.array(self.weights[1:])
        except:
            return 'fit before!'

    def get_best_score(self) -> int:
        return self.final_metric

In [5]:
x = MyLogReg(50, lambda iter: 0.5 * (0.85 ** iter), metric='precision', sgd_sample=0.1, reg='elasticnet', l1_coef=0.1, l2_coef=0.1)

np.random.seed(42)

X = pd.DataFrame(np.random.randint(-25, 25, (400, 100)))
y = pd.DataFrame({'target': [0 if x<200 else 1 for x in range(400)]})

x.fit(X, y, verbose=10)

np.mean((x.predict_proba(X) > 0.5) == y.values)

iter = 11 ||| Loss = 35.631830863566286 ||| precision = 0.643979057591623
iter = 21 ||| Loss = 16.181827325897686 ||| precision = 0.6911764705882353
iter = 31 ||| Loss = 13.34606526399461 ||| precision = 0.7177033492822966
iter = 41 ||| Loss = 12.627028679628506 ||| precision = 0.7142857142857143
iter = 51 ||| Loss = 12.579146054146257 ||| precision = 0.7142857142857143


0.5