In [20]:
import numpy as np
import matplotlib.pyplot as plt
import random
from typing import Tuple, List
import csv

In [25]:
class Model():
    def __init__(self) -> None:
        pass
    
    def getPredict(self, x, w, b):
        y_head = np.matmul(w, x.transpose()) + b
        act = Activation()
        y_head = act.sigmoid(y_head)
        return np.where(y_head > 0.5, 1, 0)


class Loss():
    def __init__(self) -> None:
        self.value = 0.0

    def L1(self, y, y_pred, N): #MAE
        self.value += np.abs(y - y_pred) * (1 / N)

    def BinaryCrossEntropy(self, y, y_pred, N):
        epsilon = 1e-5
        self.value += -(y * np.log(y_pred + epsilon) + (1 - y) * np.log(1 - y_pred + epsilon)) * (1 / N)


class Activation():
    def __init__(self) -> None:
        pass

    def sigmoid(self, z):
        return 1 / (1 + np.exp(-z))

            
class Optimizer():
    def __init__(self) -> None:
        self.w_gradient = 0.0
        self.b_gradient = 0.0

    def SGD(self, x, y, y_pred, N):
        if y - y_pred >= 0:
            self.w_gradient += -x * (1 / N)
            self.b_gradient += -1 * (1 / N)
        elif y - y_pred < 0:
            self.w_gradient += x * (1 / N)
            self.b_gradient += 1 * (1 / N)
        return [self.w_gradient, self.b_gradient]

    def Update(self, w_in, b_in, learning_rate):
        w_update = w_in - (learning_rate * self.w_gradient)
        b_update = b_in - (learning_rate * self.b_gradient)
        return[w_update, b_update]


class Trainer():
    def __init__(self, data, lr, iter_num, w_init, b_init):
        self.train_data = data
        self.learning_rate = lr
        self.max_iter_num = iter_num
        self.init_weight = w_init
        self.init_bias = b_init
        self.weight_dim = w_init.shape[0]

    def train(self):
        w_upt = self.init_weight
        b_upt = self.init_bias
        for i in range(self.max_iter_num):
            [w_upt, b_upt], epoch_loss = self.epoch_train(w_iter=w_upt, b_iter=b_upt)
            if i % 500 == 0:
                print("iteration[{}], loss={} val_acc is {:.4f}".format(i, epoch_loss, self.calcValAcc(w=w_upt, b=b_upt)))
        return [w_upt, b_upt]


    def epoch_train(self, w_iter, b_iter):
        N = float(len(self.train_data))
        model = Model()
        loss = Loss()
        optim = Optimizer()

        for index in range(0, len(self.train_data)):
            x = self.train_data[index, 0 : self.weight_dim]
            y = self.train_data[index, -1]

            # Get prediction
            y_pred = model.getPredict(x, w_iter, b_iter)
            # Calc gradient
            optim.SGD(x, y, y_pred, N)
            # Calc BinaryCrossEntropy loss
            loss.BinaryCrossEntropy(y, y_pred, N)

        # Update weight and bias
        update = optim.Update(w_iter, b_iter, self.learning_rate)
        return update, loss.value


    def calcValAcc(self, w, b):
        model = Model()
        val_err = 0.0

        for index in range(0, len(self.train_data)):
            x = self.train_data[index, 0 : self.weight_dim]
            y = self.train_data[index, -1]
            y_pred = model.getPredict(x, w, b)
            val_err += np.abs(y - y_pred)
        val_acc = 1 - (val_err / float(len(self.train_data)))
        # # Visualization
        # plt.cla()
        # plt.scatter(self.train_data[:, 0], self.train_data[:, 1])
        # plt.plot(self.train_data[:, 0], model.getPredict(self.train_data[:, 0], w, b), 'r-', lw=5)
        # plt.show()
        return val_acc

    def calcTestAcc(self, w, b, test_data):
        model = Model()
        val_err = 0.0

        for index in range(0, len(test_data)):
            x = test_data[index, 0 : self.weight_dim]
            y = test_data[index, -1]
            y_pred = model.getPredict(x, w, b)
            val_err += np.abs(y - y_pred)
        val_acc = 1 - (val_err / float(len(test_data)))
        # # Visualization
        # plt.cla()
        # plt.scatter(self.train_data[:, 0], self.train_data[:, 1])
        # plt.plot(self.train_data[:, 0], model.getPredict(self.train_data[:, 0], w, b), 'r-', lw=5)
        # plt.show()
        return val_acc


In [29]:
def csv_parser(csv_file: str, split_percent=1.0, seed=42) -> Tuple[List[List[float]], List[float], List[List[float]], List[float]]:
    rand = random.Random(seed)
    x_train = []
    y_train = []
    x_test = []
    y_test = []

    with open(csv_file, newline='') as fin:
        reader = csv.reader(fin)
        next(reader, None)
        for row in reader:
            x_value = [float(row[1]), float(row[2])]
            y_value = float(row[3])
            if rand.random() <= split_percent:
                x_train.append(x_value)
                y_train.append(y_value)
            else:
                x_test.append(x_value)
                y_test.append(y_value)
    return x_train, y_train, x_test, y_test


def main():
    x_train, y_train, x_test, y_test = csv_parser(csv_file='../data/LogisticRegression_data.csv', split_percent=0.8)
    x_train = np.array(x_train, dtype=float)
    y_train = np.array(y_train, dtype=float)
    x_test = np.array(x_test, dtype=float)
    y_test = np.array(y_test, dtype=float)

    # data simulation
    w_tgt = np.array([0.5, 0.3])
    b_tgt = 0.1

    # prepare train data
    points_train = np.zeros((x_train.shape[0], x_train.shape[1] + 1))
    points_train[:, 0 : x_train.shape[1]] = x_train
    points_train[:, -1] = y_train

    # parameters setup
    learning_rate = 0.01
    w_init = np.zeros_like(w_tgt)
    b_init = 0.0
    inter_num = 5001
    lr_trainer = Trainer(points_train, learning_rate, inter_num, w_init, b_init)

    # print
    print("Starting gradient descent at weight={} bias={} val_acc={}".format(w_init, b_init, lr_trainer.calcValAcc(w_init, b_init)))

    # training loop
    print("### START TRAINING ###")
    [w, b] = lr_trainer.train()
    print("### TRAINING STOPPED ###")
    # prepare test data
    points_test = np.zeros((x_test.shape[0], x_test.shape[1] + 1))
    points_test[:, 0 : x_test.shape[1]] = x_test
    points_test[:, -1] = y_test
    print("Trained {} iterations: weight={} bias={} val_acc={} test_acc={}".format(inter_num, w, b, lr_trainer.calcValAcc(w, b), lr_trainer.calcTestAcc(w, b, test_data=points_test)))


if __name__ == '__main__':
    main()

Starting gradient descent at weight=[0. 0.] bias=0.0 val_acc=0.4125
### START TRAINING ###
iteration[0], loss=6.763839585690638 val_acc is 0.5875
iteration[500], loss=4.029517412772082 val_acc is 0.6500
iteration[1000], loss=4.029517412772082 val_acc is 0.6500
iteration[1500], loss=4.029517412772082 val_acc is 0.6500
iteration[2000], loss=4.029517412772082 val_acc is 0.6500
iteration[2500], loss=4.029517412772082 val_acc is 0.6500
iteration[3000], loss=4.029517412772082 val_acc is 0.6500
iteration[3500], loss=4.029517412772082 val_acc is 0.6500
iteration[4000], loss=4.029517412772082 val_acc is 0.6500
iteration[4500], loss=4.029517412772082 val_acc is 0.6500
iteration[5000], loss=4.029517412772082 val_acc is 0.6500
### TRAINING STOPPED ###
Trained 5001 iterations: weight=[0.12827876 0.09962278] bias=0.15005750000000834 val_acc=0.65 test_acc=0.65
