In [80]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder, MinMaxScaler, StandardScaler

## Data Preprocessing

In [81]:
class Preprocessor():
    def __init__(self, _data_path: str, _label_col: str):
        self.label_col = _label_col
        self.data = pd.read_csv(_data_path)
        self.x = None
        self.y = None
    
    def clean_data(self):
        self.data = self.data.dropna(subset=[self.label_col])
        self.data = self.data.drop_duplicates()

    def encode_data(self):
        label_encoder = LabelEncoder()
        for column in self.x.columns:
            if self.x[column].dtype == 'object':
                if len(self.x[column].unique()) <= 2:
                    self.x[column] = label_encoder.fit_transform(self.x[column])
                else:
                    self.x = pd.get_dummies(self.x, columns=[column])
    
    def scale_data(self, _mode = 'standard'):
        numerical_columns = []
        for column in self.x.columns:
            if self.x[column].dtype not in [type(object), type(bool)]:
                numerical_columns.append(column)
        if _mode == 'standard':
            scaler = StandardScaler()
        elif _mode == 'minmax':
            scaler = MinMaxScaler()
        self.x[numerical_columns] = scaler.fit_transform(self.x[numerical_columns])
    
    def preprocess_data(self):
        if self.x == None and self.y == None:
            self.clean_data()
            self.x = self.data.drop(columns=[self.label_col])
            self.y = self.data[self.label_col]
            self.y = LabelEncoder().fit_transform(self.y)
            self.encode_data()
            self.scale_data()
        return np.array(self.x), np.array(self.y)


In [82]:
class TelcoPreprocessor(Preprocessor):
    def __init__(self, _data_path: str, _label_col: str):
        super().__init__(_data_path, _label_col)

    def preprocess_data(self):
        self.data = self.data.drop(columns=['customerID'])
        self.data['TotalCharges'] = pd.to_numeric(self.data['TotalCharges'], errors='coerce')
        self.data = self.data.dropna(subset=['TotalCharges'])
        return super().preprocess_data() 

## Logistic Regression

In [84]:
class LogisticRegression:
    def __init__(self, _x: np.ndarray, _y: np.ndarray):
        self.x = _x
        self.y = _y
        self.__w = np.zeros(self.x.shape[1])
    
    def __h(self, _x: np.ndarray):
        assert _x.shape[1] == self.__w.size
        return 1 / (1 + np.exp(-_x @ self.__w))
    
    def __BGD(self, _lr: float, _epoch: int):
        for _ in range(_epoch):
            self.__w -= _lr * self.x.T @ (self.__h(self.x) - self.y) / self.y.size
    
    def __SGD(self, _lr: float, _epoch: int):
        for _ in range(_epoch):
            for i in range(self.y.size):
                self.__w -= _lr * self.x[i].T * (self.__h(self.x)[i] - self.y[i])
    
    def __miniBGD(self, _lr: float, _epoch: int, _batch_size: int, _verbose: bool):
        for ep in range(_epoch):
            for i in range(0, self.y.size, _batch_size):
                self.__w -= _lr * self.x[i:i+_batch_size].T @ (self.__h(self.x)[i:i+_batch_size] - self.y[i:i+_batch_size]) / _batch_size
                if _verbose:
                    print(f'Epoch {ep+1}/{_epoch} | Batch {i//_batch_size+1}/{self.y.size//_batch_size} - Loss: {self.__negative_log_likelihood(self.x[i:i+_batch_size], self.y[i:i+_batch_size])}')
            metrics = self.__calculate_metrics(self.x, self.y)
            if _verbose:
                print(f'Epoch {ep+1}/{_epoch} - Loss: {self.__negative_log_likelihood(self.x, self.y)} | Accuracy: {metrics["accuracy"]} | Sensitivity: {metrics["sensitivity"]} | Specificity: {metrics["specificity"]} | Precision: {metrics["precision"]} | F1: {metrics["f1"]} | AUROC: {metrics["auroc"]} | AUPR: {metrics["aupr"]}')
    
    def __calculate_confusion_matrix(self, _y_pred: np.ndarray, _y_true: np.ndarray):
        tp = np.sum((_y_pred == 1) & (_y_true == 1))
        tn = np.sum((_y_pred == 0) & (_y_true == 0))
        fp = np.sum((_y_pred == 1) & (_y_true == 0))
        fn = np.sum((_y_pred == 0) & (_y_true == 1))
        return tp, tn, fp, fn
    
    def __calculate_aupr(self, _y_prob: np.ndarray, _y_true: np.ndarray):
        thresholds = np.sort(np.unique(_y_prob))
        precision = []
        recall = []
        for threshold in thresholds:
            y_pred = np.where(_y_prob > threshold, 1, 0)
            tp, tn, fp, fn = self.__calculate_confusion_matrix(y_pred, _y_true)
            if tp + fp == 0:
                precision.append(1)
            else:
                precision.append(tp / (tp + fp))
            if tp + fn == 0:
                recall.append(1)
            else:
                recall.append(tp / (tp + fn))
        aupr = 0
        for i in range(1, len(precision)):
            aupr += (recall[i] - recall[i-1]) * precision[i]
        return aupr

    def __calculate_auroc(self, _y_prob: np.ndarray, _y_true: np.ndarray):
        thresholds = np.sort(np.unique(_y_prob))
        tpr = []
        fpr = []
        for threshold in thresholds:
            y_pred = np.where(_y_prob > threshold, 1, 0)
            tp, tn, fp, fn = self.__calculate_confusion_matrix(y_pred, _y_true)
            if tp + fn == 0:
                tpr.append(1)
            else:
                tpr.append(tp / (tp + fn))
            if tn + fp == 0:
                fpr.append(1)
            else:
                fpr.append(fp / (tn + fp))
        auroc = 0
        for i in range(1, len(fpr)):
            auroc += (fpr[i] - fpr[i-1]) * tpr[i]
        return auroc
    
    def __calculate_metrics(self, _x: np.ndarray, _y: np.ndarray):
        y_true = _y 
        y_prob = self.__h(_x)
        y_pred = self.predict(_x)
        tp, tn, fp, fn = self.__calculate_confusion_matrix(y_pred, y_true)

        metrics = {}
        metrics['accuracy'] = (tp + tn) / (tp + tn + fp + fn)
        if tp + fn == 0:
            metrics['sensitivity'] = 1
        else:
            metrics['sensitivity'] = tp / (tp + fn)
        if tn + fp == 0:
            metrics['specificity'] = 1
        else:
            metrics['specificity'] = tn / (tn + fp)
        if tp + fp == 0:
            metrics['precision'] = 1
        else:
            metrics['precision'] = tp / (tp + fp)
        metrics['f1'] = 2 * metrics['precision'] * metrics['sensitivity'] / (metrics['precision'] + metrics['sensitivity'])
        metrics['auroc'] = self.__calculate_auroc(y_prob, y_true)
        metrics['aupr'] = self.__calculate_aupr(y_prob, y_true)
        return metrics
    
    def __negative_log_likelihood(self, _x: np.ndarray, _y: np.ndarray):
        return -np.sum(_y * np.log(self.__h(_x)) + (1 - _y) * np.log(1 - self.__h(_x))) / _y.size
    
    
    def fit(self, _lr: float = 0.01, _epoch: int = 3, _batch_size: int = 1, _verbose: bool = False):
        self.__miniBGD(_lr, _epoch, _batch_size, _verbose)
        return self.__calculate_metrics(self.x, self.y)
    
    def predict(self, _x: np.ndarray, _threshold: float = 0.5):
        p = self.__h(_x)
        return np.where(p > _threshold, 1, 0)
    
    def test(self, _x: np.ndarray, _y: np.ndarray):
        return self.__calculate_metrics(_x, _y)


In [None]:
tp = TelcoPreprocessor('./Telco-Customer-Churn.csv', 'Churn')
x, y = tp.preprocess_data()
print(x.shape, y.shape)
print(y[:10])
lr = LogisticRegression(x, y)
print(lr.fit(_lr = 0.01, _epoch = 50, _batch_size = 500))

## Ensemble

In [None]:
class BaggingEnsemble:
    def __init__(self, _x: np.ndarray, _y: np.ndarray, _n_estimators: int = 9, _lr: float = 0.01, _epoch: int = 50, _batch_size: int = 500):
        self.x = _x
        self.y = _y
        self.lr = _lr
        self.epoch = _epoch
        self.batch_size = _batch_size
        self.n_estimators = _n_estimators
        self.estimators = []
    
    def __bootstrap_sample(self):
        indices = np.random.choice(self.y.size, size=self.y.size, replace=True)
        return self.x[indices], self.y[indices]
    
    def __fit_estimators(self):
        for _ in range(self.n_estimators):
            x_sample, y_sample = self.__bootstrap_sample()
            model = LogisticRegression(x_sample, y_sample)
            model.fit(_lr = self.lr, _epoch = self.epoch, _batch_size = self.batch_size)
            self.estimators.append(model)
    
    