In [5]:
import pandas as pd
import numpy as np
import math
import time
from numpy.linalg import inv
from typing import Tuple
from IPython.display import Markdown, display
from sklearn.metrics import roc_auc_score

In [6]:
# Sources for IWLS:
# https://github.com/jaeho3690/LogisiticRegression/blob/main/LogisticRegressionIRLS.ipynb

# Sources for GD & SGD:
# https://stackoverflow.com/questions/47795918/logistic-regression-gradient-descent
# https://medium.com/analytics-vidhya/gradient-descent-and-stochastic-gradient-descent-from-scratch-python-1cd93d4def49
# https://github.com/Darshansol9/GD-SGD_FromScratch_Python/blob/master/Code.ipynb

# Sources for ADAM:
# https://medium.com/analytics-vidhya/derivative-of-log-loss-function-for-logistic-regression-9b832f025c2d
# https://stackoverflow.com/questions/67080049/adam-optimization-for-gradient-descent-update-doesnt-seem-to-work-with-logistic?fbclid=IwAR3uW95_Entc1-esFrQtVhBKvIWE43781OW6OGIYWJAGLOa37o_z_tHpV0Q

# Source for cross-validation:
# https://github.com/jaeho3690/LogisiticRegression/blob/main/LogisticRegressionIRLS.ipynb

In [7]:
class LogisticRegression:
    
    def __sigmoid(self,x:float)->float:
        """ Activation function used to map any real value between 0 and 1 """
        return 1/(1+np.exp(-x))

    def loss_function(self,X:np.matrix,y:np.array,params:np.array,b:np.array=None)->float:
        """
        Computes the cost function for all the training samples 
        
        param: X - design matrix
        param: y - target vector comprising boolean value
        param: params - array of weights
        param: b - intercept (optional)
        """
        m,_ = X.shape
        if b:
            fx=self.__sigmoid(X.dot(params)+b)
        else:
            fx=self.__sigmoid(X.dot(params))
        cost=-np.sum(y * np.log(fx) + (1 - y)*np.log(1-fx))*(1/m)
        return cost
    
    def gradient_descent(self,X:np.matrix,y:np.array,params:np.array,iterations:int,alpha:float)->np.array:
        """
        Performs gradient descent optimization.
        
        Works assuming that weights vector contains
        intercept and the corresponding one column 
        has been added to the design matrix before 
        it is given to the method
        
        param: X - design matrix
        param: y - target vector comprising boolean value
        param: params - array of weights
        param: b - intercept (optional)
        param: iterations - number of iterations
        param: alpha - learning rate
        """
        for i in range(iterations):
            params = params + alpha * (X.T.dot(y - self.__sigmoid(X.dot(params))))
        return params
    
    def stochastic_gradient_descent(self,X:np.matrix,y:np.array,params:np.array,iterations:int,alpha:float,sample_size:int=1)->np.array:
        """
        Performs stochastic gradient descent optimization.
        
        Works assuming that weights vector contains
        intercept and the corresponding one column 
        has been added to the design matrix before 
        it is given to the method
        
        param: X - design matrix
        param: y - target vector comprising boolean value
        param: params - array of weights
        param: b - intercept (optional)
        param: iterations - number of iterations
        param: alpha - learning rate
        param: sample_size - batch size
        """
        assert sample_size <= X.shape[0]
        df_X = pd.DataFrame(X)
        df_y = pd.DataFrame(y)
        for i in range(iterations):
            n_samples = math.ceil(df_X.shape[0]/sample_size)
            shuffled = df_X.sample(frac=1)
            samples = np.array_split(shuffled, n_samples)
            for sample in samples:
                X_st = np.array(sample)
                y_st = np.array(y[sample.index])
                # y_st = np.expand_dims(y_st, axis=-1)
                params = params + alpha * (X_st.T.dot(y_st - self.__sigmoid(X_st.dot(params))))
        return params
    
    def irls(self,X:np.matrix,y:np.array,iterations:int=1000)->np.array:
        """
        Performs Iterative-Reweighted Least Squares optimization.
        
        Works assuming that weights vector contains
        intercept and the corresponding one column 
        has been added to the design matrix before 
        it is given to the method
        
        param: X - design matrix
        param: y - target vector comprising boolean value
        param: iterations - number of iterations
        """
        w = np.zeros((X.shape[1],1))
        for i in range(iterations):
            y_ = self.__sigmoid(np.matmul(X,w))
            R = np.diag(np.ravel(y_*(1-y_)))
            grad = np.matmul(X.T,(y_-y))
            hessian = np.matmul(np.matmul(X.T,R),X)+0.001*np.eye(X.shape[1])
            w -= np.matmul(np.linalg.inv(hessian),grad)
        return w
    
    def adam(self,X:np.matrix,y:np.array,b1:float,b2:float,iterations:int,alpha:float,eps:float)->np.array:
        """
        Performs stochastic gradient descent optimization.
        
        Works assuming that weights vector does not 
        contain intercept - it is a separate variable (named b)
        and the design matrix does not include additional 
        one column
        
        param: X - design matrix
        param: y - target vector comprising boolean value
        param: params - array of weights
        param: iterations - number of iterations
        param: alpha - learning rate
        param: sample_size - batch size
        param: b - intercept (optional)
        param: b1, b2 - initial decay rates used when estimating 
            the first and second moments of the gradient
        """
        m,n = X.shape
        W,b = np.random.randn(n,1),np.random.randn(1)
        VW,Vb = np.zeros((n,1)),np.zeros(1)
        SW,Sb = np.zeros((n,1)),np.zeros(1)
        
        y = y.reshape(len(y),1)
        
        for i in range(iterations):
            # sigmoid
            A = self.__sigmoid((X.dot(W)+b))
    
            # binary classification cost
            j = self.loss_function(X=X,y=y,params=W,b=b)  # (-y*np.log(A)-(1-y)*np.log(1-A)).sum()*(1/m)

            # derivative respect to j
            dA = (A-y)/(A*(1-A))
            dZ = A-y

            dW = X.transpose().dot(dZ)
            db = dZ.sum()
            
            # momentum
            VW = b1*VW + (1-b1)*dW
            Vb = b1*Vb + (1-b1)*db
            
            # rmsprop
            SW = b2*SW + (1-b2)*dW**2
            Sb = b2*Sb + (1-b2)*db**2
            
            # update weight
            W -= alpha*VW/(np.sqrt(SW)+eps)
            b -= alpha*Vb/(np.sqrt(Sb)+eps)
        return W,b
    
    
    def fit(self,X:np.matrix,y:np.array,**kwds)->np.array:
        if len(kwds.keys()) == 2:
            display(Markdown('### Gradient Descent\n'))
            # parameters setting
            # X = np.c_[np.ones((X.shape[0], 1)), X]
            params = np.random.randn(X.shape[1])
            params = params[:,np.newaxis]
            iterations = kwds["iterations"]
            learning_rate = kwds["alpha"]
            # optimization
            initial_cost = self.loss_function(X,y,params)
            start = time.time()
            params_optimal  = self.gradient_descent(X,y,params,iterations,learning_rate)
            end = time.time()
            final_cost = self.loss_function(X, y, params_optimal)
            
        elif len(kwds.keys()) == 3:
            display(Markdown('### Stochastic Gradient Descent\n'))
            # parameters setting
            # X = np.c_[np.ones((X.shape[0], 1)), X]
            params = np.random.randn(X.shape[1])
            params = params[:,np.newaxis]
            iterations = kwds["iterations"]
            learning_rate = kwds["alpha"]
            sample_size = kwds["sample_size"]
            # optimization
            initial_cost = self.loss_function(X=X,y=y,params=params)
            start = time.time()
            params_optimal  = self.stochastic_gradient_descent(X,y,params,iterations,learning_rate,sample_size)
            end = time.time()
            final_cost = self.loss_function(X=X,y=y,params=params_optimal)
            
        elif len(kwds.keys()) == 5:
            display(Markdown('### ADAM\n'))
            # parameters setting
            params=np.random.randn(X.shape[1],1)
            b1=kwds["b1"]
            b2=kwds["b2"]
            iterations=kwds["iterations"]
            learning_rate=kwds["alpha"]
            eps=kwds["epsilon"]
            # optimization
            m,n = X.shape
            W,b = np.random.randn(n,1),np.random.randn(1)
            initial_cost=self.loss_function(X=X,y=y,params=W,b=b)
            start=time.time()
            W,b=self.adam(X,y,b1=b1,b2=b2,iterations=iterations,alpha=learning_rate,eps=eps)
            end=time.time()
            final_cost=self.loss_function(X=X,y=y,params=W,b=b)
            params_optimal=(W,b)
        else:
            display(Markdown('### Iterative-Reweighted Least Squares\n'))
            # X = np.c_[np.ones((X.shape[0], 1)), X]
            # parameters setting
            try:
                iterations=kwds["iterations"]
                start = time.time()
                # optimization
                params_optimal  = self.irls(X,y,iterations=iterations)
                end = time.time()
            except Exception as e:
                start = time.time()
                # optimization
                params_optimal  = self.irls(X,y)
                end = time.time()
            params = np.zeros((X.shape[1],1))
            initial_cost = self.loss_function(X=X,y=y,params=params)
            final_cost = self.loss_function(X=X,y=y,params=params_optimal)
            
        print(f'Time eclapsed for fitting: {end-start} secs')
        print('Initial cost ',initial_cost)
        print('Final cost ',final_cost)
        print('\n\n')
        return params_optimal
    
    def predict(self,X:np.matrix,params:np.array,b:np.array=None,threshold=.5)->Tuple[np.array,np.array]:
        if b:
            prob_pred=self.__sigmoid(X.dot(params)+b)
        else:
            # X = np.c_[np.ones((X.shape[0], 1)), X]
            prob_pred=self.__sigmoid(X.dot(params))
        return prob_pred, (prob_pred > threshold).astype(int)

## Test

In [8]:
credit_train_x = pd.read_csv("../datasets/preprocessed/credit_train_x.csv")
credit_train_y = pd.read_csv("../datasets/preprocessed/credit_train_y.csv")

credit_train_x = np.array(credit_train_x)
credit_train_y = np.array(credit_train_y)

X, y = credit_train_x, credit_train_y

In [9]:
# Gradient descent
iterations = 1000
learning_rate = 2e-5
gd_kwds={
    "iterations": iterations,
    "alpha": learning_rate
}

lr = LogisticRegression()
X_ = np.c_[np.ones((X.shape[0], 1)), X]
W=lr.fit(X_,y,**gd_kwds)
prob_pred,y_pred=lr.predict(X_, W)

roc_auc_score(y, prob_pred)

### Gradient Descent


Time eclapsed for fitting: 0.07903933525085449 secs
Initial cost  1.873999212292507
Final cost  0.727610927765672





0.6499978835978836

In [10]:
# Stochastic gradient descent
iterations = 2000
learning_rate = 2e-5
sample_size = 1

sgd_kwds={
    "iterations": iterations,
    "alpha": learning_rate,
    "sample_size": sample_size
}

lr = LogisticRegression()
X_ = np.c_[np.ones((X.shape[0], 1)), X]
W=lr.fit(X_,y,**sgd_kwds)
prob_pred,y_pred=lr.predict(X_, W)

roc_auc_score(y, prob_pred)

### Stochastic Gradient Descent


Time eclapsed for fitting: 410.4774401187897 secs
Initial cost  1.628308204959802
Final cost  0.5838620170392443





0.7580275132275133

In [11]:
# ADAM
b1=0.9
b2=0.999
iterations=20000
alpha=2e-5
eps=1e-8

adam_kwds={
    "iterations": iterations,
    "b1": b1,
    "b2": b2,
    "alpha": alpha,
    "epsilon": eps,
}
lr = LogisticRegression()
W,b=lr.fit(X,y,**adam_kwds)
prob_pred,y_pred=lr.predict(X=X,params=W,b=b)

roc_auc_score(y, prob_pred)

### ADAM


Time eclapsed for fitting: 8.703153133392334 secs
Initial cost  1.2186603092998944
Final cost  0.9949351664089601





0.53954708994709

In [12]:
# Iterative-Reweighted Least Squares
iterations=10
lr = LogisticRegression()
kwds={
    "iterations": iterations,
}

lr = LogisticRegression()
X_ = np.c_[np.ones((X.shape[0], 1)), X]
W=lr.fit(X_,y,**kwds)
prob_pred,y_pred=lr.predict(X_, W)

roc_auc_score(y, prob_pred)

### Iterative-Reweighted Least Squares


Time eclapsed for fitting: 0.07012605667114258 secs
Initial cost  0.6931471805599453
Final cost  0.4397883680889956





0.8394666666666666

In [13]:
# Iterative-Reweighted Least Squares
lr = LogisticRegression()
X_ = np.c_[np.ones((X.shape[0], 1)), X]
W=lr.fit(X_,y)
prob_pred,y_pred=lr.predict(X_, W)

roc_auc_score(y, prob_pred)

### Iterative-Reweighted Least Squares


Time eclapsed for fitting: 8.05920147895813 secs
Initial cost  0.6931471805599453
Final cost  0.43978836808899546





0.8394666666666666