In [1]:
import numpy as np
import csv
import time

np.random.seed(1234) # 훈련과정에서 고정된 값을 위해
def randomize(): np.random.seed(time.time()) # 최종적으로는 랜덤적으로 쓰기 위해

In [2]:
RND_MEAN = 0
RND_STD = 0.0030
# 정규 분포 난수값의 평균과 표준 편차 
# => 가중치 파라미터 초기화에 이용

LEARNING_RATE = 0.001 # 학습률!!

In [5]:
def abalone_exec(epoch_count=10, mb_size=10, report=1):
    load_abalone_dataset() # 데이터셋 불러오기
    init_model() # 모델의 파라미터 초기화
    train_and_test(epoch_count, mb_size, report) # 학습 및 평가 과정을 수행하는 부분

In [7]:
def load_abalone_dataset():
    with open('../../data/chap01/abalone.csv') as csvfile:
        csvreader = csv.reader(csvfile)
        next(csvreader, None) # next, 첫 행 날리기 / 칼럼명 부분
        rows = []
        for row in csvreader:
            rows.append(row)
            
    global data, input_cnt, output_cnt # 전역변수 선언
    input_cnt, output_cnt = 10, 1 # 입력벡터 크기 / 출력벡터 크기
    data = np.zeros([len(rows), input_cnt+output_cnt]) 
    # 0으로 채운다, 오른쪽만큼
    # 개체들의 입출력 벡터정보를 저장할 데이터 행렬 ( row수, column 수(입력 + 출력))

    for n, row in enumerate(rows):
        # one-hot vector(encoding)
        if row[0] == 'I': data[n, 0] = 1
        if row[0] == 'M': data[n, 1] = 1
        if row[0] == 'F': data[n, 2] = 1
        # 성별 외의 데이터들을 복제
        data[n, 3:] = row[1:]

In [5]:
def init_model():
    global weight, bias, input_cnt, output_cnt
    # 가중치 => 정규분포를 갖는 난숫값으로 초기화 ( np.random.normal )
    # 파라미터의 초깃값을 실행 때마다 달라지게 하기 위함 
    # => 매번 다른 결과를 가져와 추세를 살피며 좋은 결과를 고를 수 있도록
    # 또한 local minimum을 피할 가능성을 높이기위함(조금이라도..)
    weight = np.random.normal(RND_MEAN, RND_STD,[input_cnt, output_cnt])
    # 편향 => 초기에 지나친 영향을 주어 학습에 역효과가 오지 않도록
    # 0으로 초기화해준다.
    bias = np.zeros([output_cnt])

In [6]:
def train_and_test(epoch_count, mb_size, report):
    # 데이터를 뒤섞고, 학습용 셋과 평가용 셋을 분리
    step_count = arrange_data(mb_size)
    # 중간, 최종 평가때 동일한 테스트 셋 사용
    test_x, test_y = get_test_data()
    
    # epoch만큼 실행 ( 총 몇번 )
    for epoch in range(epoch_count):
        losses, accs = [], []
        
        # step_count만큼 실행 ( 미니배치사이즈를 횟수만큼 반복 )
        for n in range(step_count):
            # 학습용 미니배치 셋 획득
            train_x, train_y = get_train_data(mb_size, n)
            # 비용 및 정확도 획득, 적재
            loss, acc = run_train(train_x, train_y)
            losses.append(loss)
            accs.append(acc)
            
        # 지정된 보고 주기에 따라 test 셋에서 중간 평가
        if report > 0 and (epoch+1) % report == 0:
            acc = run_test(test_x, test_y)
            print('Epoch {}: loss={:5.3f}, accuracy={:5.3f}/{:5.3f}'. \
                  format(epoch+1, np.mean(losses), np.mean(accs), acc))
    
    # 최종 평가
    final_acc = run_test(test_x, test_y)
    print('\nFinal Test: final accuracy = {:5.3f}'.format(final_acc))

In [7]:
def arrange_data(mb_size):
    global data, shuffle_map, test_begin_idx
    # np.arange(n) => n까지의 수가 나열된 리스트
    shuffle_map = np.arange(data.shape[0])
    # 섞는다!
    np.random.shuffle(shuffle_map)
    # 미니배치 처리 스텝 수를 지정 
    # => 80%만큼 학습용 데이터
    step_count = int(data.shape[0] * 0.8) // mb_size
    # 스텝카운트를 mb_size만큼 곱해주면 train set이 끝나고 test set이 나온다.
    test_begin_idx = step_count * mb_size
    return step_count

def get_test_data():
    global data, shuffle_map, test_begin_idx, output_cnt
    # shuffle_map의 후반부( test set )
    test_data = data[shuffle_map[test_begin_idx:]]
    # output_cnt = 1 -> 마지막 열에 정답벡터
    # 그 이전에까지 열들이 입력 벡터
    return test_data[:, :-output_cnt], test_data[:, -output_cnt:]

def get_train_data(mb_size, nth):
    global data, shuffle_map, test_begin_idx, output_cnt
    # epoch마다 다르게!!
    # eoch마다 첫 실행에서 섞는 동작 수행
    if nth == 0:
        # train set 부분의 인덱스를 다시 섞는다.
        np.random.shuffle(shuffle_map[:test_begin_idx])
    # 해당 회차에서 다음 회차 사이의 mb_size만큼을 획득
    train_data = data[shuffle_map[mb_size*nth:mb_size*(nth+1)]]
    return train_data[:, :-output_cnt], train_data[:, -output_cnt:]

In [8]:
def run_train(x, y):
    # 순전파 처리
    # 신경망 처리
    # 입력 x로부터 신경망 출력 output 획득
    output, aux_nn = forward_neuralnet(x)
    # 회귀분석적 성격에 맞춘 후처리
    # => 이진 판단, 선택 분류쪽으로 가면 이 부분이 달라진다.
    # output과 y로부터 손실함수 loss 계산
    loss, aux_pp = forward_postproc(output, y)
    # aux_nn, aux_pp => 역전파에 전달하기 위한 수치
    # 반드시 전달해야 하는 수치 && 불필요한 반복을 줄여주는 수치 ...etc
    
    # 보고용 정확도 계산
    accuracy = eval_accuracy(output, y)
    
    # 역전파 처리
    # 손실 기울기, 초기값은 1?
    # 역전파의 시작!
    G_loss = 1.0
    # 역순이기에 후처리과정이 먼저다
    # G_loss로부터 G_output을 얻는다. 이때, aux_pp 추가 정보를 사용
    G_output = backprop_postproc(G_loss, aux_pp)
    # 원칙적으로는, G_output으로부터 G_x를 구하는 것
    # x는 딥러닝 알고리즘으로 손댈 수 없는 고정값이면서
    # 더이상 수행될 역전파도 없으니 따로 반환을 할 필요는 없다.
    # ** 중요한 점 **
    # backprop_neuralnet()가 실행되는 중에,
    # 신경망 파라미터값의 변화, 학습이 실제로 일어난다.
    backprop_neuralnet(G_output, aux_nn)
    
    return loss, accuracy

def run_test(x, y):
    # 역전파 전달 데이터가 필요없으니 _로 처리 (dummy)
    # 후처리 또한 필요없다!!
    output, _ = forward_neuralnet(x)
    accuracy = eval_accuracy(output, y)
    return accuracy

In [9]:
def forward_neuralnet(x):
    global weight, bias
    # 입력 행렬 x에 가중치 행렬 weight를 곱하고,
    # 편향 벡터 bias를 더하면 끝
    output = np.matmul(x, weight) + bias
    # x => [N, 10] * weight = [10, 1] = [N, 1]
    # [N, 1] + [1] => 각 행에 합연산 적용
    return output, x

def backprop_neuralnet(G_output, x):
    global weight, bias
    # np.transpose() => 행, 열 바꾸기
    g_output_w = x.transpose() # [10, N]
    
    # weight, bias의 손실기울기 계산
    G_w = np.matmul(g_output_w, G_output)# [10, N] * [N, 1] = [10, 1]
    G_b = np.sum(G_output, axis=0) # [N,1]의 각 행 값의 합 => [1]
    
    # 실제 값에 적용
    weight -= LEARNING_RATE * G_w
    bias -= LEARNING_RATE * G_b

In [10]:
def forward_postproc(output, y):
    # 손실함수, 평균제곱오차값 => MSE( Mean Square Error )
    # 신경망 출력 - 정답행렬 => [N,1]
    diff = output - y 
    # 각 수치들에 제곱 => [N,1]
    square = np.square(diff)
    # 각 수치들의 평균 => [1]
    loss = np.mean(square)
    return loss, diff

def backprop_postproc(G_loss, diff):
    shape = diff.shape
    
    # 아래는 수학의 영역... p.82
    # 입출력간 부분의 기울기
    g_loss_square = np.ones(shape) / np.prod(shape)
    g_square_diff = 2 * diff
    g_diff_output = 1
    
    # 평균, 제곱, 오차 연산에 대한 역전파 처리
    # G_loss로부터 G_output을 얻어내게 된다.
    G_square = g_loss_square * G_loss
    G_diff = g_square_diff * G_square
    G_output = g_diff_output * G_diff
    
    return G_output

In [12]:
def eval_accuracy(output, y):
    # 다양한 방법들이 존재.
    # 여기서는 정답과 오차의 비율을 오류율로 본다.
    mdiff = np.mean(np.abs((output - y)/y))
    return 1 - mdiff

In [11]:
# 코드 정리된 방식 => 위에서는 흐름 전체를 나열
def backprop_postproc_oneline(G_loss, diff):  # backprop_postproc() 대신 사용 가능
    return 2 * diff / np.prod(diff.shape)