In [1]:
from io import BytesIO  
from urllib.request import urlopen 
from zipfile import ZipFile   # manipulate zip files on memory domain (ref) https://codesample-factory.tistory.com/1301

zipurl = 'https://github.com/DoranLyong/NalCoding-For-DeepLearning/raw/main/codes/01_SLP_Regression/data/archive.zip'


with urlopen(zipurl) as zipresp: 
    with ZipFile(BytesIO(zipresp.read())) as zfile: 
        zfile.extractall('./data')

# 전복 고리 수 추정 신경망 (p.68)

## 1. 구현하기 

In [2]:
import time 
import csv   # .csv 형태의 데이터셋을 읽어들이기 위함 

import numpy as np

np.random.seed(1234) # 시드고정 for Reproducibility

In [3]:
# === Hyperparameters 정의 === #

# for init. weights 
RND_MEAN = 0 
RND_STD = 3e-3

# 
LEARNING_RATE = 1e-3

#### 실험용 main() 함수 정의 
* 실행 함수: `exec() `:

In [4]:
def exec(epochs=10, batch_size=10, report=1):
    load_dataset() # 데이터셋 읽어들이기 
    init_model()   # 모델 가중치 초기화 
    train_and_test(epochs, batch_size, report)  # 학습 및 평가 수행 

#### 데이터 적재 함수 정의 
* 데이터셋 파일을 메모리(e.g., RAM)로 읽어들여주는 함수 정의 

In [5]:
def load_dataset(): 
    with open('./data/abalone.csv') as file: 
        csv_reader = csv.reader(file)  # .csv 파일 내용을 메모리로 읽어들임 
        next(csv_reader, None) # 파일의 첫 행을 읽지 않고 건너뛰게 만듬 (p.72)
        rows = [] 
        for row in csv_reader:
            rows.append(row)  # 전복 인스턴스별 정보를 rows 리스트에 수집 
    

    global data, input_cnt, output_cnt
    # === 입출력 사이즈 정의 === # 
    input_cnt = 10  # 입력 벡터 차원 수  
    output_cnt = 1  # 출력 벡터 차원 수 
    data = np.zeros([len(rows), input_cnt + output_cnt]) # 입출력 벡터 정보를 저장할 행렬

    # === data 행렬에 정보 저장 (구조화) === #
    for idx, row in enumerate(rows):
        # one-hot 벡터 레이블링  
        if row[0] == 'I': 
            data[idx, 0] = 1 
        elif row[0] == 'M':
            data[idx, 1] = 1 
        elif row[0] == 'F':
            data[idx, 2] = 1 

        # 이외 항목들 일괄 복사 
        data[idx, 3:] = row[1:]
    

In [6]:
import pandas as pd 

pd.read_csv('./data/abalone.csv') # for check the data format

Unnamed: 0,Sex,Length,Diameter,Height,Whole weight,Shucked weight,Viscera weight,Shell weight,Rings
0,M,0.455,0.365,0.095,0.5140,0.2245,0.1010,0.1500,15
1,M,0.350,0.265,0.090,0.2255,0.0995,0.0485,0.0700,7
2,F,0.530,0.420,0.135,0.6770,0.2565,0.1415,0.2100,9
3,M,0.440,0.365,0.125,0.5160,0.2155,0.1140,0.1550,10
4,I,0.330,0.255,0.080,0.2050,0.0895,0.0395,0.0550,7
...,...,...,...,...,...,...,...,...,...
4172,F,0.565,0.450,0.165,0.8870,0.3700,0.2390,0.2490,11
4173,M,0.590,0.440,0.135,0.9660,0.4390,0.2145,0.2605,10
4174,M,0.600,0.475,0.205,1.1760,0.5255,0.2875,0.3080,9
4175,F,0.625,0.485,0.150,1.0945,0.5310,0.2610,0.2960,10


#### 모델 가중치 초기화 함수 정의 
* `exec()` 함수에서 호출됨 

In [7]:
def init_model():
    global weight, bias, input_cnt, output_cnt  
    
    # === random init. === #
    weight = np.random.normal(RND_MEAN, RND_STD,[input_cnt, output_cnt]) 
    bias = np.zeros([output_cnt])  # 학습 초기에 지나진 영향으로 역효과가 나지 않게 0으로 설정 

#### 학습 및 평가 함수 정의 
* 학습과 평가를 일괄 실행하는 함수

In [8]:
def train_and_test(epochs:int, batch_size:int, report:int):

    step_cnt = arrange_data(batch_size) # 데이터 shuffle & split 등의 데이터 정렬 작업  
    test_x, test_y = get_test_data() 

    for epoch in range(epochs): 
        losses = []
        accs = []

        # === Train === # 
        for idx in range(step_cnt):
            train_x, train_y = get_train_data(batch_size, idx)

            loss, acc = run_train(train_x, train_y) 
            losses.append(loss)
            accs.append(acc)

        # === Validation === # 
        if report > 0 and (epoch+1) % report == 0:
            acc = run_test(test_x, test_y)
            
            print(f"Epoch={epoch+1}: ", 
                  f"loss={np.mean(losses):5.3f},",
                  f"accuracy={np.mean(accs):5.3f}/{acc:5.3f}",
                  )
    
    final_acc = run_test(test_x, test_y)
    print(f"\nFinal Test: final_acc={final_acc:5.3f}")            

#### 학습 및 평가 데이터 획득 함수 
* `train_and_test()` 함수에서 호출됨 

In [9]:
def arrange_data(batch_size):
    global data, shuffle_map, test_begin_idx
    
    shuffle_map = np.arange(data.shape[0]) # 데이터 인스턴스 개수만큼의 일련번호 생성 
    np.random.shuffle(shuffle_map)  # 뒤섞기 

    step_count = int(data.shape[0] * 0.8) // batch_size  # 8:2 로 split 
    test_begin_idx = step_count * batch_size
    return step_count


def get_test_data():
    global data, shuffle_map, test_begin_idx, output_cnt

    test_data = data[shuffle_map[test_begin_idx:]]  # arrange_data() 참고 

    return test_data[:, :-output_cnt], test_data[:, -output_cnt:] # input, label 


def get_train_data(batch_size, nth):
    global data, shuffle_map, test_begin_idx, output_cnt

    if nth == 0:  # epoch 이 새로 시작할 때마다 데이터를 뒤섞음 
        np.random.shuffle(shuffle_map[:test_begin_idx])

    train_data = data[shuffle_map[batch_size*nth:batch_size*(nth+1)]]

    return train_data[:, :-output_cnt], train_data[:, -output_cnt:]

#### 학습 실행 함수와 평가 실행 함수 
* `train_and_test()` 함수에서 호출 

In [10]:
def run_train(x, y):
    # === forward === # 
    output, aux_nn = forward_neuralnet(x)    # 순전파 진행 
    loss, aux_pp = forward_postproc(output, y)  # 손실 함수 계산을 위한 후처리 
    accuracy = eval_accuracy(output, y)
    
    # === backprop === # 
    G_loss = 1.0  # dL/dL = 1 
    G_output = backprop_postproc(G_loss, aux_pp)  # 역전파 
    backprop_neuralnet(G_output, aux_nn)   # step-up 수행 
    
    return loss, accuracy


def run_test(x, y):
    output, _ = forward_neuralnet(x)
    accuracy = eval_accuracy(output, y)
    return accuracy

#### SLP 에 대한 순전파 및 역전파 함수 정의 

In [11]:
def forward_neuralnet(x):
    global weight, bias

    output = np.matmul(x, weight) + bias  # Y = XW + b
    return output, x


# === 계산방법은 p.80 참고 === # 
def backprop_neuralnet(G_output, x):
    """ G_output := 순전파 출력에 대한 손실 기울기 
    """
    global weight, bias

    g_output_w = x.transpose()
    
    G_w = np.matmul(g_output_w, G_output)
    G_b = np.sum(G_output, axis=0)

    # === step-up === # 
    weight -= LEARNING_RATE * G_w
    bias -= LEARNING_RATE * G_b

#### 후처리 과정에 대한 순전파 및 역전파 함수 정의 
* forward propagation에서 생성된 출력으로부터 손실 함수값을 구하는 과정 

In [12]:
def forward_postproc(output, y):
    # === Loss function === # 
    # MSE 계산 
    diff = output - y
    square = np.square(diff)
    loss = np.mean(square)
    return loss, diff


def backprop_postproc(G_loss, diff):
    # === 기울기 도함수 계산 === 
    # 계산 방법 p.82 참고 
    shape = diff.shape
    
    g_loss_square = np.ones(shape) / np.prod(shape)
    g_square_diff = 2 * diff
    g_diff_output = 1

    G_square = g_loss_square * G_loss
    G_diff = g_square_diff * G_square
    G_output = g_diff_output * G_diff
    
    return G_output

In [13]:
# 간략화된 버전 
def backprop_postproc_oneline(G_loss, diff):
    return 2 * diff / np.prob(diff.shape)

#### 정확도 계산 함수 정의 

In [14]:
def eval_accuracy(output, y):
    mdiff = np.mean(np.abs((output - y)/y)) # 평균 오류율 
    return 1 - mdiff

***
## 2. 실행하기 
* 위에서 정의한 구현 프로그램을 실행해 동작 확인하기 

In [15]:
exec()

Epoch=1:  loss=33.875, accuracy=0.557/0.812
Epoch=2:  loss=8.226, accuracy=0.820/0.814
Epoch=3:  loss=7.582, accuracy=0.812/0.809
Epoch=4:  loss=7.475, accuracy=0.808/0.811
Epoch=5:  loss=7.395, accuracy=0.810/0.809
Epoch=6:  loss=7.328, accuracy=0.808/0.810
Epoch=7:  loss=7.269, accuracy=0.808/0.811
Epoch=8:  loss=7.217, accuracy=0.808/0.812
Epoch=9:  loss=7.175, accuracy=0.810/0.810
Epoch=10:  loss=7.135, accuracy=0.809/0.810

Final Test: final_acc=0.810


학습된 파라미터값 확인 

In [16]:
feature = ['유충 여부', '수컷 여부', '암컷 여부', 
           '키', '지름', '높이', '전체 무게', 
           '몸통 무게', '내장 무게', '껍질 무게',
           ]

for idx, item in enumerate(weight):
    print(f"{feature[idx]}: {item}")


유충 여부: [1.02697603]
수컷 여부: [1.47450981]
암컷 여부: [1.66960135]
키: [2.04468668]
지름: [1.62513525]
높이: [0.60292627]
전체 무게: [2.39993815]
몸통 무게: [0.54107313]
내장 무게: [0.46878034]
껍질 무게: [1.01969382]


In [17]:
print(f'편향: {bias}')

편향: [4.16894769]


학습된 가중치 해석:
* '전체 무게', '키' 정보가 나이 예측에 중요함 
* '높이', '몸통 무게', '내장 무게'가 상대적으로 덜 중요함 

#### Hyperparameter 바꾸어서 실행하기

In [20]:
LEARNING_RATE = 0.1
exec(epochs=100, batch_size=100, report=20)

Epoch=20:  loss=5.804, accuracy=0.825/0.831
Epoch=40:  loss=5.259, accuracy=0.834/0.828
Epoch=60:  loss=5.056, accuracy=0.837/0.838
Epoch=80:  loss=4.950, accuracy=0.838/0.840
Epoch=100:  loss=4.910, accuracy=0.840/0.826

Final Test: final_acc=0.826
