In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import random

import utils

%matplotlib inline
pd.options.display.max_columns = None

In [2]:
# dataset = pd.read_csv("../dataset/Student Stress Factors (2).csv")
# dataset.columns = ["Sleep Quality", "Headache Frequency", "Academic Performance", "Study Load", "Extracurricular Frequency", "Stress Level"]
# dataset.iloc[:,-1] = dataset.iloc[:,-1]-1

In [3]:
from sklearn.preprocessing import LabelEncoder

dataset = pd.read_csv("../dataset/diabetes_risk_prediction_dataset.csv")
df = dataset.copy()

for c in df.columns:
    if c != "Age":
        encoder = LabelEncoder()
        dataset[c] = encoder.fit_transform(df[c])

## Train-Validation Split

In [5]:
train_df, valid_df = utils.train_valid_split(dataset, test_size=0.1, random_state=11)

# X_valid = valid_df.copy()
# y_valid = X_valid["Stress Level"]
# X_valid.drop(["Stress Level"], axis=1, inplace=True)

# X_train = train_df.copy()
# y_train = X_train["Stress Level"]
# X_train.drop(["Stress Level"], axis=1, inplace=True)

X_valid = valid_df.copy()
y_valid = X_valid["class"]
X_valid.drop(["class"], axis=1, inplace=True)

X_train = train_df.copy()
y_train = X_train["class"]
X_train.drop(["class"], axis=1, inplace=True)


## Building XGB Model

In [7]:
class DecisionNode:
    def __init__(self, left_child: np.array=None, right_child: np.array=None, split_feature: int=None, split_thresh: float=None, gain: float=-1) -> None:
        self.left_child = left_child
        self.right_child = right_child
        self.split_feature = split_feature
        self.split_thresh = split_thresh
        self.gain = gain
        self.is_leaf = False

In [8]:
class Leaf:
    def __init__(self, output_value: float=None) -> None:
        self.output_value = output_value
        self.is_leaf = True

### XGBTree

In [9]:
class XGBTree:
    def __init__(self, lamda: float=0, max_depth: int=6, min_child_weight: int=0) -> None:
        """
        XGBTree class

        Parameters
        ----------
        lamda : float, optional
            L2 regularization term on weights, by default 0
        max_depth : int, optional
            Maximum tree depth for base learners, by default 6
        min_child_weight : int, optional
            Minimum sum of instance weight(hessian) needed in a child., by default 0
        """
        self.lamda = lamda
        self.max_depth = max_depth
        self.min_child_weight = min_child_weight
        self.root = None

    
    def __getResiduals(self, df: np.ndarray, prev_proba) -> np.ndarray:
        """
        Get residuals from probabilities

        Parameters
        ----------
        df : np.ndarray
            Dataset
        prev_proba : _type_
            Probability calculated by XGB

        Returns
        -------
        np.ndarray
            Array of residuals
        """
        Y = df[:,-1]
        residuals = Y-prev_proba
        return residuals
    

    def __getSimilarityScore(self, residuals: np.array, prev_proba: np.array) -> float:
        """
        Calculate Similarity Score of a node

        Parameters
        ----------
        residuals : np.array
            Array of residuals
        prev_proba : np.array
            Corresponding probabilities calculated by XGB

        Returns
        -------
        float
            Similarity Score
        """
        return np.sum(residuals)**2/(np.sum(prev_proba * (1-prev_proba)) + self.lamda)


    def __getCover(self, prev_proba: np.array) -> float:
        """
        Calculate Cover values of a node

        Parameters
        ----------
        prev_proba : np.array
            Probabilities calculated by XGB

        Returns
        -------
        float
            Cover
        """
        return np.sum(prev_proba * (1-prev_proba))
    

    def __getOutputValue(self, residuals: np.array, prev_proba: np.array) -> float:
        """
        Calculate output values of a leaf

        Parameters
        ----------
        residuals : np.array
            Array of residuals
        prev_proba : np.array
            Corresponding probabilities calculated by XGB

        Returns
        -------
        float
            Output value
        """
        return np.sum(residuals)/(np.sum(prev_proba * (1-prev_proba)) + self.lamda)
    
    
    def __split(self, df: np.ndarray, residuals: np.array, prev_proba: np.array, feature_indx: int, split_thresh: float) -> tuple:
        """
        Split the node based on a feature and threshold

        Parameters
        ----------
        df : np.ndarray
            Dataset
        residuals : np.array
            Array of residuals
        prev_proba : np.array
            Corresponding probabilities calculated by XGB
        feature_indx : int
            Index of feature to split on
        split_thresh : float
            Threshold of feature to split on
        """
        left_residuals = list()
        left_dataset = list()
        right_residuals = list()
        right_dataset = list()
        left_prev_proba = list()
        right_prev_proba = list()

        for row_indx in range(df.shape[0]):
            if df[row_indx, feature_indx] <= split_thresh:
                left_residuals.append(residuals[row_indx])
                left_dataset.append(df[row_indx])
                left_prev_proba.append(prev_proba[row_indx])
            else:
                right_residuals.append(residuals[row_indx])
                right_dataset.append(df[row_indx])
                right_prev_proba.append(prev_proba[row_indx])

        return np.array(left_residuals), np.array(right_residuals), np.array(left_dataset), np.array(right_dataset), np.array(left_prev_proba), np.array(right_prev_proba)
    

    def __calculateGain(self, root: np.array, left: np.array, right: np.array, root_prev_proba, 
                            left_prev_proba: np.array, right_prev_proba: np.array) -> float:
        """
        Calculate Gain for a split

        Parameters
        ----------
        root : np.array
            Residuals in root node
        left : np.array
            Residuals in left node
        right : np.array
            Residuals in right node
        root_prev_proba : _type_
            prev_proba in root node
        left_prev_proba : np.array
            Correcponding prev_proba in left node
        right_prev_proba : np.array
            Correcponding prev_proba in right node

        Returns
        -------
        float
            Gain
        """
        similarity_root = self.__getSimilarityScore(root, root_prev_proba)
        similarity_left = self.__getSimilarityScore(left, left_prev_proba)
        similarity_right = self.__getSimilarityScore(right, right_prev_proba)

        return similarity_left+similarity_right-similarity_root
    

    def __getBestSplit(self, df: np.ndarray, residuals: np.array, prev_proba: np.array, feature_indices: np.array) -> dict:
        """
        Get the best split

        Parameters
        ----------
        df : np.ndarray
            Dataset
        residuals : np.array
            Residuals
        prev_proba : np.array
            Probabilities calculated by XGB
        feature_indices : np.array
            Feature indices to split on

        Returns
        -------
        dict
            best_split
        """
        best_split = {'gain': -1, 'feature': None, 'split_thresh': None}

        for feature_indx in feature_indices:
            feature_values = df[:,feature_indx]
            thresholds = np.unique(feature_values)
            thresholds = np.array([(thresholds[i]+thresholds[i+1])/2 for i in range(len(thresholds)-1)])
            for threshold in thresholds:
                left_res, right_res, left_dataset, right_dataset, left_prev_proba, right_prev_proba = self.__split(df, residuals, prev_proba, feature_indx, threshold)
                if len(left_res) and len(right_res):
                    gain = self.__calculateGain(residuals, left_res, right_res, prev_proba, left_prev_proba, right_prev_proba)
                    if gain > best_split["gain"]:
                        best_split["feature"] = feature_indx
                        best_split["split_thresh"] = threshold
                        best_split["left_residuals"] = left_res
                        best_split["right_residuals"] = right_res
                        best_split["left_dataset"] = left_dataset
                        best_split["right_dataset"] = right_dataset
                        best_split["left_prev_proba"] = left_prev_proba
                        best_split["right_prev_proba"] = right_prev_proba
                        best_split["gain"] = gain
        
        return best_split
    

    def __buildTreeRecur(self, df: np.ndarray, residuals: np.array, prev_proba: np.array, depth: int=0) -> DecisionNode|Leaf:
        """
        Build XGBTree recursively

        Parameters
        ----------
        df : np.ndarray
            Dataset
        residuals : np.array
            Residuals
        prev_proba : np.array
            Probabilities calculated by XGB
        depth : int, optional
            Current depth, by default 0

        Returns
        -------
        DecisionNode|Leaf
        """
        X, y = df[:,:-1], df[:,-1]

        if self.__getCover(prev_proba)>self.min_child_weight and depth<=self.max_depth:
            best_split = self.__getBestSplit(df, residuals, prev_proba, np.arange(X.shape[1]))
            if best_split["gain"] >= 0:
                left_residuals = self.__buildTreeRecur(best_split["left_dataset"], best_split["left_residuals"], best_split["left_prev_proba"], depth+1)
                right_residuals = self.__buildTreeRecur(best_split["right_dataset"], best_split["right_residuals"], best_split["right_prev_proba"], depth+1)
                
                return DecisionNode(left_residuals, right_residuals, best_split["feature"], best_split["split_thresh"], 
                            best_split["gain"])
        
        output_value = self.__getOutputValue(residuals, prev_proba)
        return Leaf(output_value)
    
    
    def fit(self, df: pd.DataFrame, prev_proba: np.array):
        """
        Fit the XGBTree to the data

        Parameters
        ----------
        df : pd.DataFrame
            Dataset
        prev_proba : np.array
            Probability of base estimator
        """
        residuals = self.__getResiduals(df.to_numpy(), prev_proba)
        self.root = self.__buildTreeRecur(df.to_numpy(), residuals, prev_proba)


    def make_prediction(self, X: np.array, node: DecisionNode|Leaf) -> int:
        """
        Make individual prediction

        Parameters
        ----------
        X : np.array
            Data
        node : DecisionNode | Leaf
            root node

        Returns
        -------
        int
            Output value
        """
        if node.is_leaf: 
            return node.output_value
        else:
            feature = X[node.split_feature]
            if feature <= node.split_thresh:
                return self.make_prediction(X, node.left_child)
            else:
                return self.make_prediction(X, node.right_child)
    
    
    def predict(self, X: np.ndarray) -> np.array:
        """
        Make predictions

        Parameters
        ----------
        X : np.ndarray
            Data

        Returns
        -------
        np.array
            Output values
        """
        output_values = [self.make_prediction(x, self.root) for x in X]
        return np.array(output_values)

### XGBClassifier

In [10]:
# import warnings
# warnings.filterwarnings(action="error", category=RuntimeWarning)
# warnings.filterwarnings(action="ignore", category=DeprecationWarning)
from math import log

class XGBClassifier:
    def __init__(self, n_estimators: int=100, max_depth: int=6, eta: float=0.3, lamda: float=0, gamma: float=0, num_class: int=2, 
                    min_child_weight: int=0) -> None:
        """
        XGBClassifier class

        Parameters
        ----------
        n_estimators : int, optional
            Number of gradient boosted trees, by default 100
        max_depth : int, optional
            Maximum tree depth for base learners, by default 6
        eta : float, optional
            Boosting learning rate, by default 0.3
        lamda : float, optional
            L2 regularization term on weights, by default 0
        num_class : int, optional
            Number of class labels, by default 2
        min_child_weight : int, optional
            Minimum sum of instance weight(hessian) needed in a child, by default 0
        """
        self.num_class = num_class
        self.n_estimators = n_estimators
        self.eta = eta
        self.max_depth = max_depth
        self.min_child_weight = min_child_weight
        self.lamda = lamda
        self.estimators = []
        for _ in range(self.num_class):
            self.estimators.append([XGBTree(max_depth=self.max_depth, lamda=self.lamda) for _ in range(n_estimators)])

    
    def __getLogOdds(self, probabilities: float|np.ndarray) -> float:
        """
        Get log(odds)

        Parameters
        ----------
        probabilities : float | np.ndarray
            Probabilities

        Returns
        -------
        float
            log(odds)
        """
        if isinstance(probabilities, np.ndarray):
            odds = []
            for prob in probabilities:
                odds.append(prob/1-prob)
            log_odds = []
            for odd in odds:
                if odd != 0:
                    log_odds.append(log(odd))
                else:
                    log_odds.append(0)
            return np.array(log_odds)
        else:
            odds = probabilities/1-probabilities
            if odds>0:
                return np.log(odds)
            return 0
    
    
    def __sigmoid(self, z: float|np.ndarray) -> float:
        """
        Calculate sigmoid(z)

        Parameters
        ----------
        z : float | np.ndarray
            Data

        Returns
        -------
        float
            sidmoid(z)
        """
        return 1/(1 + np.exp(-z))
    

    def fit(self, df: pd.DataFrame):
        """
        Fit the XGBClassifier to data

        Parameters
        ----------
        df : pd.DataFrame
            Dataset
        """
        self.base_probability = np.array([0.5] * df.shape[0])
        for label in range(self.num_class):
            df_transformed = df.copy()
            df_transformed.iloc[:,-1] = np.where(df_transformed.iloc[:,-1]==label, 1, 0)

            calc_prob = np.array([self.base_probability] * df.shape[0])
            output_values = np.zeros(df.shape[0])
            for estimator in self.estimators[label]:
                estimator.fit(df_transformed, calc_prob)
                output_values += estimator.predict(df_transformed.to_numpy()[:,:-1])
                calc_prob = self.__sigmoid(self.__getLogOdds(self.base_probability) + self.eta*output_values)


    def predict(self, X: pd.DataFrame) -> np.ndarray:
        """
        Make predictions

        Parameters
        ----------
        X : pd.DataFrame
            Input Data

        Returns
        -------
        np.ndarray
            Predictions
        """
        y_hat = []
        for x in X.to_numpy():
            one_vs_all = {}
            for i, eestimators in enumerate(self.estimators):
                output_values = [estimator.make_prediction(x, estimator.root) for estimator in eestimators]
                one_vs_all[i] = np.mean(self.__sigmoid(self.__getLogOdds(self.base_probability) + (self.eta*sum(output_values))))

            y_hat.append(max(one_vs_all, key=one_vs_all.get))

        return np.array(y_hat).astype(int)

## Training the model on training set

In [19]:
xgb_clf = XGBClassifier(n_estimators=100, num_class=2)
xgb_clf.fit(train_df)

## Getting predictions of validation set

In [20]:
y_hat = xgb_clf.predict(X_valid)

## Evaluating Model

In [21]:
print(f"Accuracy Score: {utils.get_accuracy_score(y_valid, y_hat)}")

Accuracy Score: 0.9230769230769231


In [22]:
from sklearn.metrics import f1_score

print(f"F1 Score: {f1_score(y_valid, y_hat, average='weighted')}")

F1 Score: 0.923768884788375
