In [1]:
import numpy as np
import csv
import time

In [2]:
# Hyperparameter 설정
RND_MEAN = 0 
RND_STD = 0.0030 # 가중치 파라미터를 초기화 할 때 이용함

LEARNING_RATE = 0.0001

In [3]:
def randomize() -> None:
    np.random.seed(time.time())

In [4]:
def load_pulsar_dataset(adjust_ratio):
    pulsars, stars = [], []
    with open('/Users/hyeonjin/workspace/dataset/predicting_pulsar_star/pulsar_data_train.csv') as f:
        reader = csv.reader(f)
        next(reader, None)
        for row in reader:
            if '' in row:
                continue
            if row[8] == '1.0': pulsars.append(row)
            else: stars.append(row)

    # with open('/Users/hyeonjin/workspace/dataset/predicting_pulsar_star/pulsar_data_test.csv') as f:
    #     reader = csv.reader(f)
    #     next(reader, None)
    #     for row in reader:
    #         if '' in row:
    #             continue
    #         if row[8] == '1': pulsars.append(row)
    #         else: stars.append(row)
    
    global data, input_cnt, output_cnt
    input_cnt, output_cnt = 8, 1

    pulsar_cnt, star_cnt = len(pulsars), len(stars)
    if adjust_ratio:
        data = np.zeros([2*star_cnt, 9])
        data[0:star_cnt, :] = np.asarray(stars, dtype=np.float32)
        for n in range(star_cnt):
            data[star_cnt + n] = np.asarray(pulsars[n % pulsar_cnt], dtype=np.float32)
    else:
        data = np.zeros([star_cnt+pulsar_cnt, 9])
        data[0:star_cnt, :] = np.asarray(stars, dtype=np.float32)
        data[star_cnt:, :] = np.asarray(pulsars, dtype=np.float32)

In [5]:
def arrange_data(mb_size):
    global data, shuffle_map, test_begin_idx # load_abalone_data 함수에서 생성한 data 전역 변수
    shuffle_map = np.arange(data.shape[0]) # abalone data의 행 길이 만큼의 1차원 리스트 생성
    np.random.shuffle(shuffle_map) # 리스트를 섞음
    step_count = int(data.shape[0] * 0.8) // mb_size # 전체 데이터의 80퍼센트를 미니 배치 사이즈로 나누어 step_count 정의
    test_begin_idx = step_count * mb_size # 테스트 데이터의 시작 인덱스 번호 생성
    return step_count

def get_test_data():
    global data, shuffle_map, test_begin_idx, output_cnt # load_abalone_data와 arrage_data에서 생성한 변수들 사용
    test_data = data[shuffle_map[test_begin_idx:]] # 테스트 데이터의 시작 위치를 이용해 test_data 생성
    return test_data[:, :-output_cnt], test_data[:, -output_cnt] # 10개의 속성값이 x, 마지막 열이 label 데이터

def get_train_data(mb_size, nth):
    global data, shuffle_map, test_begin_idx, output_cnt # load_abalone_data, arrange_data에서 생성한 변수들 사용
    if nth == 0: # 각 에폭의 첫 번째 호출일때 학습 데이터 부분에 대한 부분적인 순서를 섞어 에폭마다 다른 순서로 학습이 수행되게 함
        np.random.shuffle(shuffle_map[:test_begin_idx])
    
    train_data = data[shuffle_map[mb_size*nth:mb_size*(nth+1)]]
    return train_data[:, :-output_cnt], train_data[:, -output_cnt:] # 10개의 속성값이 x, 마지막 열이 label 데이터

In [6]:
def safe_div(p, q):
    p, q = float(p), float(q)
    if np.abs(q) < 1.0e-20: return np.sign(p)
    return p / q

def eval_accuracy(output, y):
    estimate = np.greater(output, 0)
    answer = np.greater(y, 0.5)
    correct = np.equal(estimate, answer)

    est_yes = np.greater(output, 0)
    ans_yes = np.greater(y, 0.5)
    est_no = np.logical_not(est_yes)
    ans_no = np.logical_not(ans_yes)

    tp = np.sum(np.logical_and(est_yes, ans_yes))
    fp = np.sum(np.logical_and(est_yes, ans_no))
    fn = np.sum(np.logical_and(est_no, ans_no))
    tn = np.sum(np.logical_and(est_no, ans_yes))

    accuracy = safe_div(tp+tn, tp+tn+fp+fn)
    precision = safe_div(tp, tp+fp)
    recall = safe_div(tp, tp+tn)
    f1 = 2 * safe_div(recall*precision, recall + precision)
    return [accuracy, precision, recall, f1]

In [7]:
def relu(x):
    return np.maximum(x, 0)

def sigmoid(x):
    return np.exp(-relu(-x)) / (1.0 + np.exp(-np.abs(x)))

def sigmoid_derv(x, y):
    return y * (1 - y)

def sigmoid_cross_entropy_with_logits(z, x):
    return relu(x) - x * z + np.log(1 + np.exp(-np.abs(x)))

def sigmoid_cross_entropy_with_logits_derv(z, x):
    return -z + sigmoid(x)

In [8]:
def forward_postproc(output, y):
    entropy = sigmoid_cross_entropy_with_logits(output, y)
    loss = np.mean(entropy)
    return loss, [y, output, entropy]

def backprop_postproc(G_loss, aux):
    y, output, entropy = aux

    g_loss_entropy = 1.0 / np.prod(entropy.shape)
    g_entropy_output = sigmoid_cross_entropy_with_logits_derv(y, output)

    G_entropy = g_loss_entropy * G_loss
    G_output = g_entropy_output * G_entropy

    return G_output

def backprop_postproc_oneline(G_loss, diff):
    return 2 * diff / np.prod(diff.shape)

In [9]:
def forward_neuralnet(x):
    global weight, bias # init_model에서 선언한 wight, bias를 이용
    output = np.matmul(x, weight) + bias # XW + b 수행
    return output, x

def backprop_neuralnet(G_output, x): # 역전파 처리 함수, 순전파 출력 output에 대한 손실 기울기 G_output 
    global weight, bias # init_model에서 선언한 wegith, bias를 이용
    g_output_w = x.transpose() # x를 이용해 x와 output 사이의 부분 기울기 g_output_w를 구함

    G_w = np.matmul(g_output_w, G_output) # 부분 기울기와 손실 기울기를 이용해 weight 성분의 손실 기울기 구함
    G_b = np.sum(G_output, axis=0)

    weight -= LEARNING_RATE * G_w
    bias -= LEARNING_RATE * G_b

In [10]:
def run_train(x, y):
    # 순전파 과정 수행
    output, aux_nn = forward_neuralnet(x)
    loss, aux_pp = forward_postproc(output, y)
    accuracy = eval_accuracy(output, y)

    G_loss = 1.0
    G_output = backprop_postproc(G_loss, aux_pp)
    backprop_neuralnet(G_output, aux_nn)

    return loss, accuracy

def run_test(x, y):
    output, _ = forward_neuralnet(x)
    accuracy = eval_accuracy(output, y)
    return accuracy

In [11]:
def train_and_test(epoch_count:int, mb_size:int, report:int) -> None:
    step_count = arrange_data(mb_size)
    test_x, test_y = get_test_data()

    for epoch in range(epoch_count):
        losses = []

        for n in range(step_count):
            train_x, train_y = get_train_data(mb_size, n)
            loss, acc = run_train(train_x, train_y)
            losses.append(loss)
        
        if report > 0 and (epoch+1) % report == 0:
            acc = run_test(test_x, test_y)
            acc_str = ','.join(['%5.3f']*4) % tuple(acc)
            print(f"Epoch {epoch+1} loss={np.mean(losses):5.3f} result={acc_str}.")
    
    acc = run_test(test_x, test_y)
    acc_str = ','.join(['%5.3f']*4) % tuple(acc)
    print(f"\nFinal Test: final accuracy = {acc_str}")

In [12]:
def init_model() -> None:
    global weight, bias, input_cnt, output_cnt # 전역 변수로 wieght, bias 생성, load_abalone_dataset에서 생성한 input_cnt, output_cnt 사용
    weight = np.random.normal(RND_MEAN, RND_STD, [input_cnt, output_cnt]) # RND_MEAN, RND_STD를 이용해 input_cnt * output_cnt 형태의 파라미터 생성
    bias = np.zeros([output_cnt])

In [13]:
def pulsar_exec(epoch_count=10, mb_size=10, report=1, adjust_ratio=False):
    load_pulsar_dataset(adjust_ratio)
    init_model()
    train_and_test(epoch_count, mb_size, report)

In [14]:
pulsar_exec()

NameError: name 'accs' is not defined

In [None]:
pulsar_exec(adjust_ratio=True)

TypeError: unsupported operand type(s) for %: 'list' and 'tuple'