##  MSSV : 18110014 - Họ tên : Nguyễn Phú Thành
# Bài thực hành Nhập môn máy học - Lab 03

In [1]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris

In [2]:
class ClassificationMetric:
    def __init__(self, y_truth, y_predict, positive_label = 1, negative_label = 0):
        # True positive
        self.TP = ((y_truth == positive_label) & (y_predict == positive_label)).astype(np.uint8).sum()
        # True negative
        self.TN = ((y_truth == negative_label) & (y_predict == negative_label)).astype(np.uint8).sum()
        # False positive
        self.FP = ((y_truth == negative_label) & (y_predict == positive_label)).astype(np.uint8).sum()
        # False negative
        self.FN = ((y_truth == positive_label) & (y_predict == negative_label)).astype(np.uint8).sum()
    
    def precision_score(self):
        return (self.TP)/(self.TP + self.FP)
    
    def recall_score(self):
        return (self.TP)/(self.TP + self.FN)
    
    def accuracy_score(self):
        return (self.TP + self.TN)/(self.TP + self.TN + self.FP + self.FN)
    
    def f1_score(self):
        precision = self.precision_score()
        recall = self.recall_score()
        return 2 * precision * recall/(precision + recall)

## 1. Hãy xây dựng mô hình logistic regression bằng tất cả các features trong file heart, so sánh với thư viện sklearn

In [3]:
class LogisticReg:
    @staticmethod
    def getBatches(X, y, batch_size = 10):
        sample_size = X.shape[0]

        for i in range(0, sample_size, batch_size):
            batch_X, batch_y = X[i:(i+batch_size), :], y[i:(i+batch_size)]
            yield batch_X, batch_y

    @classmethod
    def gradient_loss(cls, theta, X, y, lmbda = 0):
        h_theta = 1/(1 + np.exp(-X.dot(theta)))
        number_sample, dimension = X.shape[:2]

        regularization_term = np.zeros((dimension, 1))

        regularization_term[1:] = 2 * theta[1:]

        gradient = 1/number_sample * X.T.dot(h_theta - y) + lmbda * regularization_term

        return gradient

    def __init__(self, nb_epoch, batch_size = None, learning_rate = 1e-3, lmbda = 0):
        '''
            LogisticReg's constructor
            ------------------------------
            Parameters:
                nb_epoch: int
                    Number of epoches
                batch_size: int, default None
                    If batch_size is None then perform Batch Gradient Descent
                    If batch_size == 1 then perform Stochastic Gradient Descent
                    If batch_size > 1 then perform Mini Batch Gradient Descent
                learning_rate: float, default 1e-3
        '''
        self.epoch, self.batch_size, self.rate = nb_epoch, batch_size, learning_rate

        # Define loss's gradient

        self.gradient = lambda theta, batch_X, batch_y: LogisticReg.gradient_loss(theta, batch_X, batch_y, lmbda)

        self.theta = None

    def fit(self, X, y, init_theta = None, random_state = 0):
        '''
            Fit linear model
            ----------------------------
            Parameters:
                X: np.ndarray of shape (sample_size, dimension)
                    Training data
                y: np.ndarray of shape (sample_size, 1)
                    Target values
                init_theta: np.ndarray of shape (dimension, 1), default None
                    Initial value for theta
                    If None, initial value for theta will be chosen by normal distribution N(0, 1)
                random_state: int, default 0
                    Random state to set initial theta and to shuffle data for each epoch
            ----------------------------
            Returns: LogisticReg's instance
        '''
        rnd = np.random.RandomState(random_state)
        X = X.copy()
        y = y.reshape((y.shape[0], 1))
        sample_size, dimension = X.shape[:2]

        if init_theta is None:
            self.theta = rnd.normal(loc = 0, scale = 1, size = dimension).reshape((dimension, 1))
        else:
            self.theta = init_theta.copy()

        # If it is BGD (batch_size = None) then not shuffle else shuffle dataset
        shuffle = True
        if self.batch_size == None:
            self.batch_size = sample_size
            shuffle = False

        for i in range(self.epoch):
            if shuffle:
                # Stack X and y horizontally
                data = np.hstack((X, y))
                # Shuffle inplace
                rnd.shuffle(data)
                # Get back X, y after shuffle
                X, y = data[:, :dimension], data[:, dimension:]

            for batch_X, batch_y in LogisticReg.getBatches(X, y, batch_size = self.batch_size):
                # Update theta
                self.theta = self.theta - self.rate * self.gradient(self.theta, batch_X, batch_y)

        return self

    def predict(self, X):
        '''
            Predict using the linear model.
            ---------------------
            Parameters:
                X: np.ndarray
                    Samples
            ---------------------
            Returns: np.ndarray
        '''

        assert self.theta is not None, 'Model needs to fit to a training set before making prediction'

        if len(X.shape) == 1: # Predict one sample
            dimension, = X.shape
            X = X.reshape((1, dimension))
        predict_y = np.where(X.dot(self.theta) < 0, 0, 1)

        return predict_y

In [4]:
data = pd.read_csv('heart.csv')

In [5]:
data.head()

Unnamed: 0,age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
0,63,1,3,145,233,1,0,150,0,2.3,0,0,1,1
1,37,1,2,130,250,0,1,187,0,3.5,0,0,2,1
2,41,0,1,130,204,0,0,172,0,1.4,2,0,2,1
3,56,1,1,120,236,0,1,178,0,0.8,2,0,2,1
4,57,0,0,120,354,0,1,163,1,0.6,2,0,2,1


In [6]:
data.tail()

Unnamed: 0,age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
298,57,0,0,140,241,0,1,123,1,0.2,1,0,3,0
299,45,1,3,110,264,0,1,132,0,1.2,1,0,3,0
300,68,1,0,144,193,1,1,141,0,3.4,1,2,3,0
301,57,1,0,130,131,0,1,115,1,1.2,1,1,3,0
302,57,0,1,130,236,0,0,174,0,0.0,1,1,2,0


In [7]:
X, y = data.iloc[:, :-1].to_numpy(), data.iloc[:, -1].to_numpy()
y = y.reshape((y.shape[0], 1))

In [8]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 0)

In [9]:
mean = X_train.mean(axis = 0).reshape((1, X_train.shape[1]))
std = np.sqrt(np.cov(X_train, rowvar = False, ddof = 1).diagonal()).reshape((1, X_train.shape[1]))

In [10]:
X_train = (X_train - mean)/std
X_test = (X_test - mean)/std

In [11]:
X_train = np.hstack(
    (
        np.ones((X_train.shape[0], 1)),
        X_train
    )
)

X_test = np.hstack(
    (
        np.ones((X_test.shape[0], 1)),
        X_test
    )
)

In [12]:
clf = LogisticReg(
    nb_epoch = 24200, batch_size = None, learning_rate = 0.01, lmbda = 0
).fit(X_train, y_train, random_state = 0)

In [14]:
metric_train = ClassificationMetric(y_train, clf.predict(X_train).flatten())
metric_test = ClassificationMetric(y_test, clf.predict(X_test).flatten())

summary_table = pd.DataFrame(
    data = {
        'Training dataset': (
            metric_train.precision_score(),
            metric_train.accuracy_score(),
            metric_train.recall_score()
        ),
        'Testing dataset': (
            metric_test.precision_score(),
            metric_test.accuracy_score(),
            metric_test.recall_score()
        )
    },
    index = ('Precision score', 'Accuracy score', 'Recall score')
)

In [15]:
clf_sklearn = LogisticRegression(
    penalty = 'none',
    fit_intercept = False, 
    random_state = 0
).fit(X_train, y_train.flatten())

In [16]:
metric_train = ClassificationMetric(y_train, clf_sklearn.predict(X_train))
metric_test = ClassificationMetric(y_test, clf_sklearn.predict(X_test))

summary_table_sklearn = pd.DataFrame(
    data = {
        'Training dataset': (
            metric_train.precision_score(),
            metric_train.accuracy_score(),
            metric_train.recall_score()
        ),
        'Testing dataset': (
            metric_test.precision_score(),
            metric_test.accuracy_score(),
            metric_test.recall_score()
        )
    },
    index = ('Precision score', 'Accuracy score', 'Recall score')
)

In [17]:
summary_table

Unnamed: 0,Training dataset,Testing dataset
Precision score,0.541322,0.557377
Accuracy score,0.506147,0.512228
Recall score,0.57438,0.606557


In [18]:
summary_table_sklearn

Unnamed: 0,Training dataset,Testing dataset
Precision score,0.541322,0.557377
Accuracy score,0.506147,0.512228
Recall score,0.57438,0.606557


In [19]:
clf_sklearn.coef_

array([[ 0.11145947, -0.0688732 , -0.93543064,  0.85226323, -0.2031756 ,
        -0.28236505, -0.15007363,  0.09670082,  0.53850469, -0.49377506,
        -0.69283718,  0.12972914, -0.94151826, -0.48601289]])

In [20]:
clf.theta.flatten()

array([ 0.11148124, -0.06892789, -0.93543231,  0.85227392, -0.2031619 ,
       -0.28234248, -0.15006828,  0.09668824,  0.53841091, -0.49378617,
       -0.69281688,  0.12977992, -0.94151942, -0.4860068 ])

## 2. Hãy xây dựng mô hình softmax regression trên bộ Iris (nên Normalize data), so sánh với thư viện sklearn

In [21]:
class SoftmaxReg:
    @staticmethod
    def getBatches(X, y, batch_size = 10):
        sample_size = X.shape[0]

        for i in range(0, sample_size, batch_size):
            batch_X, batch_y = X[i:(i+batch_size), :], y[i:(i+batch_size)]
            yield batch_X, batch_y
    @classmethod
    def gradient_loss(cls, theta, index, X, y):
        number_sample, dimension = X.shape[:2]
        
        X_theta = X.dot(theta)
        exponent = np.exp(X_theta)
        softmax = exponent/exponent.sum(axis = 1).reshape((number_sample, 1))
        softmax_index = softmax[:, index].reshape((number_sample, 1))

        bool_y = (y == index).astype(np.int).reshape((number_sample, 1))
        
        gradient = 1/number_sample * ((-bool_y + softmax_index) * X).sum(axis = 0).reshape((dimension, 1))

        return gradient
    
    def __init__(self, nb_epoch, nb_classes, batch_size = None, learning_rate = 1e-3):
        self.epoch, self.classes, self.batch_size, self.rate = nb_epoch, nb_classes, batch_size, learning_rate
        self.gradient = lambda theta, index, batch_X, batch_y: (
            SoftmaxReg.gradient_loss(theta, index, batch_X, batch_y)
        )
        self.theta = None
    
    def fit(self, X, y, init_theta = None, random_state = 0):
        rnd = np.random.RandomState(random_state)
        X = X.copy()
        y = y.reshape((y.shape[0], 1))
        sample_size, dimension = X.shape[:2]
        
        if init_theta is None:
            self.theta = (
                rnd.normal(loc = 0, scale = 1, size = dimension * self.classes)
                .reshape((dimension, self.classes))
            )
            
        else:
            self.theta = init_theta.copy()

        # If it is BGD (batch_size = None) then not shuffle else shuffle dataset
        shuffle = True
        if self.batch_size == None:
            self.batch_size = sample_size
            shuffle = False
        for i in range(self.epoch):
            if shuffle:
                # Stack X and y horizontally
                data = np.hstack((X, y))
                # Shuffle inplace
                rnd.shuffle(data)
                # Get back X, y after shuffle
                X, y = data[:, :dimension], data[:, dimension:]

            for batch_X, batch_y in LogisticReg.getBatches(X, y, batch_size = self.batch_size):
                # Update theta
                for j in range(self.classes):
                    col_theta = self.theta[:, j].reshape((dimension, 1))
                    col_theta = col_theta - self.rate * self.gradient(self.theta, j, batch_X, batch_y)
                    self.theta[:, j] = col_theta.flatten()
        return self
    
    def predict(self, X):
        assert self.theta is not None, 'Model needs to fit to a training set before making prediction'

        if len(X.shape) == 1: # Predict one sample
            dimension, = X.shape
            X = X.reshape((1, dimension))
        
        number_sample, dimension = X.shape[:2]
        X_theta = X.dot(self.theta)
        exponent = np.exp(X_theta)
        softmax = exponent/exponent.sum(axis = 1).reshape((number_sample, 1))
        predict_y = softmax.argmax(axis = 1)

        return predict_y

In [22]:
X, y = load_iris(return_X_y = True)

In [23]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.33, random_state = 42)

In [24]:
mean = X_train.mean(axis = 0).reshape((1, X_train.shape[1]))
std = np.sqrt(np.cov(X_train, rowvar = False, ddof = 1).diagonal()).reshape((1, X_train.shape[1]))

In [25]:
X_train = (X_train - mean)/std
X_test = (X_test - mean)/std

In [26]:
X_train = np.hstack(
    (
        np.ones((X_train.shape[0], 1)),
        X_train
    )
)

X_test = np.hstack(
    (
        np.ones((X_test.shape[0], 1)),
        X_test
    )
)

In [27]:
clf = SoftmaxReg(nb_epoch = 1000, nb_classes = 3, learning_rate = 0.1).fit(X_train, y_train, random_state = 1)

In [29]:
clf_sklearn = LogisticRegression(
    penalty = 'none',
    max_iter = 1000,
    fit_intercept = False,
    random_state = 0
).fit(X_train, y_train.flatten())

In [30]:
np.mean(y_test.flatten() == clf.predict(X_test))

0.96

In [31]:
clf_sklearn.score(X_test, y_test)

0.98