In [23]:
import numpy as np
import csv
import pandas as pd

In [24]:
np.random.seed(123)

In [25]:
RND_MEAN = 0
RND_STD = 0.0030

LEARNING_RATE = 0.003

In [26]:
def binary_classification_exec(epoch_count = 10, mb_size=10, report=1, train_rate = 0.8, adjust_ratio = False):
    binary_load_dataset(adjust_ratio)
    init_model()
    train_and_test(epoch_count, mb_size, report, train_rate)

In [27]:
def binary_load_dataset(adjust_ratio):
    pulsars, stars = [], []
    with open('./Data/pulsar_stars.csv') as csvfile:
        csvreader = csv.reader(csvfile)
        next(csvreader, None)
        # rows = []
        for row in csvreader:
            if row[8] == '1':
                pulsars.append(row)
            else:
                stars.append(row)

    global input_cnt, output_cnt, data

    input_cnt, output_cnt = 8, 1   # input은 독립변수의 갯수, output은 종속변수의 갯수
    # data = np.asarray(rows, dtype='float32')
    star_cnt, pulsar_cnt = len(stars), len(pulsars)
    if adjust_ratio:
        data = np.zeros([2*star_cnt, 9])
        data[0:star_cnt, :] = np.asarray(stars, dtype='float32')        # 절반은 star 데이터를 담는다

        for n in range(star_cnt):
            data[star_cnt+n] = np.asarray(pulsars[n % pulsar_cnt], dtype='float32')        # 남은 절반은 pulsar 데이터의 범위를                                                                                                      반복해서 data 버퍼에 담아준다

    else:
        data = np.zeros([star_cnt + pulsar_cnt, 9])
        data[0:star_cnt, :] = np.asarray(stars, dtype='float32')
        data[star_cnt:, :] = np.asarray(pulsars, dtype='float32')

In [28]:
def init_model():
    global weight, bias, input_cnt, output_cnt
    weight = np.random.normal(RND_MEAN, RND_STD, [input_cnt, output_cnt])
    bias = np.zeros([output_cnt])

In [29]:
def eval_accuracy(output, y):
    est_yes = np.greater(output, 0)
    ans_yes = np.greater(y, 0.5)

    est_no = np.logical_not(est_yes)
    ans_no = np.logical_not(ans_yes)
    
    tp = np.sum(np.logical_and(est_yes, ans_yes))
    fp = np.sum(np.logical_and(est_yes, ans_no))
    fn = np.sum(np.logical_and(est_no, ans_no))
    tn = np.sum(np.logical_and(est_no, ans_yes))
    
    accuracy = safe_div(tp+tn, tp+tn+fp+fn)
    precision = safe_div(tp, tp+fp)
    recall = safe_div(tp, tp+fn)
    
    f1 = 2 * safe_div(recall*precision, recall+precision)
    
    return [accuracy, precision, recall, f1]

In [30]:
# def eval_accuracy(output, y):
#     estimate = np.greater(output,0)
#     answer = np.greater(y,0.5)
#     correct = np.equal(estimate, answer)

#     return np.mean(correct)

In [31]:
def safe_div(p, q):
    '''
    분모가 매우 작은 값, 즉 0인 경우에 분모를 분자의 부호에 맞는 1 혹은 -1로 변환
    '''
    p, q = float(p), float(q)
    if np.abs(q) < 1.0e-20:
        return np.sign(p)

    return p / q

In [32]:
def train_and_test(epoch_count, mb_size, report, train_rate):
    step_count = arrange_data(mb_size, train_rate)
    test_x, test_y = get_test_data()

    for epoch in range(epoch_count):
        losses = []

        for n in range(step_count):
            train_x, train_y = get_train_data(mb_size, n)
            loss, _ = run_train(train_x, train_y)
            losses.append(loss)

        if report > 0 and (epoch+1) % report == 0:
            acc = run_test(test_x, test_y)
            print("Epoch {}: TRAIN-LOSS = {:5.3f},\naccuracy = {:5.3f}, precision:{:5.3f}, recall:{:5.3f}, f1:{:5.3f}".\
                format(epoch+1, np.mean(losses), acc[0], acc[1], acc[2], acc[3]))

    final_acc = run_test(test_x, test_y)
    print('\nFinal Test: accuracy = {:5.3f}, precision:{:5.3f}, recall:{:5.3f}, f1:{:5.3f}'.\
        format(final_acc[0], final_acc[1], final_acc[2], final_acc[3]))

In [33]:
def arrange_data(mb_size, train_rate):
    global data, shuffle_map, test_begin_idx
    shuffle_map = np.arange(data.shape[0])
    np.random.shuffle(shuffle_map)

    step_count = int(data.shape[0] * train_rate) // mb_size
    test_begin_idx = step_count * mb_size
    
    return step_count

In [34]:
def get_test_data():
    global data, shuffle_map, test_begin_idx, output_cnt

    test_data = data[shuffle_map[test_begin_idx:]]
    return test_data[:, :-output_cnt] , test_data[:, -output_cnt:]

In [35]:
def get_train_data(mb_size, nth):
    global data, shuffle_map, test_begin_idx, output_cnt
    if nth == 0:
        np.random.shuffle(shuffle_map[:test_begin_idx])
    train_data = data[shuffle_map[mb_size*nth : mb_size*(nth+1)]]

    return train_data[:, :-output_cnt], train_data[:, -output_cnt:]

In [36]:
def run_train(x, y):
    output, aux_nn = forward_neuralnet(x)
    loss , aux_pp = forward_postproc(output,y)
    accuracy = eval_accuracy(output, y)

    G_loss = 1.0
    G_output = backprop_postproc(G_loss, aux_pp)
    backprop_neuralnet(G_output, aux_nn)

    return loss, accuracy

In [37]:
def run_test(x, y):
    output, _ = forward_neuralnet(x)
    accuracy = eval_accuracy(output, y)

    return accuracy

In [38]:
def forward_neuralnet(x):
    global weight, bias
    output = np.matmul(x, weight) + bias

    return output, x

In [39]:
def backprop_neuralnet(G_output, x):
    global weight, bias
    
    g_output_w = x.transpose()

    G_w = np.matmul(g_output_w, G_output)
    G_b = np.sum(G_output, axis=0)

    weight -= LEARNING_RATE * G_w
    bias -= LEARNING_RATE * G_b

In [40]:
def backprop_postproc(G_loss, aux):
    y, output, CEE = aux
    G_loss = 1.0
    
    g_loss_entropy = 1.0 / np.prod(CEE.shape)
    g_entropy_output = sigmoid_cross_entropy_with_derv(y,output)

    G_entropy = g_loss_entropy * G_loss
    G_output = g_entropy_output * G_entropy

    return G_output

In [41]:
def forward_postproc(output, y):
    CEE = sigmoid_cross_entropy_with_logits(y,output)
    loss = np.mean(CEE)

    return loss, [y, output, CEE]

In [42]:
def sigmoid(x):
    return np.exp(-relu(-x)) / (1.0 + np.exp(-np.abs(x)))

def relu(x):
    return np.maximum(x,0)

def sigmoid_derv(x,y):
    return y*(1-y)

def sigmoid_cross_entropy_with_logits(z,x):
    return relu(x) - x * z + np.log(1+np.exp(-np.abs(x)))

def sigmoid_cross_entropy_with_derv(z,x):
    return -z + sigmoid(x)