# [실습6] AI 기법 성능 향상 방법론 (정답)


---

## 실습 목표
---
- 하이퍼 매개변수 튜닝에 대해 배워봅니다.
- K-fold 교차 검증을 수행해봅니다.
- Residual network를 구현해봅니다.
- Positional encoding을 구현해봅니다.
- 금속분말 데이터셋에 대한 최적의 인공지능 모델을 구현해봅니다.

## 실습 목차
---
1. **Hyper-parameter 튜닝:** 하이퍼 매개변수 튜닝을 수행해봅니다.

2. **K-fold 교차검증:** K-fold 교차검증으로 모델을 평가해봅니다.

3. **Residual network:** Residual network를 구현해봅니다.

4. **Positional encoding:** Positional encoding을 구현해봅니다.

5. **최적의 모델 구현:** 여태까지 배웠던 것들을 종합하여 최적의 인공지능 모델을 만들어봅니다.

## 실습 개요
---

이번 실습에서는 AI 모델의 성능 향상을 위한 다양한 기법을 수행해봅니다.

## 1. Hyper-parameter 튜닝
---
금속분말 데이터셋을 이용하여 하이퍼 매개변수 튜닝을 수행해보겠습니다.


### 1.1 라이브러리 불러오기

In [None]:
import numpy as np
import random
from matplotlib import pyplot as plt
%matplotlib inline
import tensorflow as tf
from tensorflow.keras.layers import Input
import json
import sklearn.metrics

### 1.2 데이터셋 읽어오기

In [None]:
stage1 = {
    'train_X': np.load('./Data/train_data_stage1_X.npy'),
    'train_y': np.load('./Data/train_data_stage1_y.npy'),
    'valid_X': np.load('./Data/valid_data_stage1_X.npy'),
    'valid_y': np.load('./Data/valid_data_stage1_y.npy'),
    'test_X': np.load('./Data/test_data_stage1_X.npy'),
    'test_y': np.load('./Data/test_data_stage1_y.npy'),
}

stage2 = {
    'train_X': np.load('./Data/train_data_stage2_X.npy'),
    'train_y': np.load('./Data/train_data_stage2_y.npy'),
    'valid_X': np.load('./Data/valid_data_stage2_X.npy'),
    'valid_y': np.load('./Data/valid_data_stage2_y.npy'),
    'test_X': np.load('./Data/test_data_stage2_X.npy'),
    'test_y': np.load('./Data/test_data_stage2_y.npy'),
}

# 삭제
# columns = json.load(open('./Data/valid_columns.json', 'r'))

### 1.3 데이터 표준화하기

### 1.3.1 Stage1 데이터 표준화하기

In [None]:
stage1_X_mean = stage1['train_X'].mean(axis = 0)
stage1_y_mean = stage1['train_y'].mean(axis = 0)
print('입력값 평균:', stage1_X_mean)
print('출력값 평균:', stage1_y_mean)

In [None]:
stage1_X_std = stage1['train_X'].std(axis = 0)
stage1_y_std = stage1['train_y'].std(axis = 0)
print('입력값 표준편차:', stage1_X_std)
print('출력값 표준편차:', stage1_y_std)

In [None]:
# 학습 데이터 표준화
stage1['train_X'] = (stage1['train_X'] - stage1_X_mean) / stage1_X_std
stage1['train_y'] = (stage1['train_y'] - stage1_y_mean) / stage1_y_std
# 검증용 데이터 표준화
stage1['valid_X'] = (stage1['valid_X'] - stage1_X_mean) / stage1_X_std
stage1['valid_y'] = (stage1['valid_y'] - stage1_y_mean) / stage1_y_std
# 테스트 데이터 표준화
stage1['test_X'] = (stage1['test_X'] - stage1_X_mean) / stage1_X_std
stage1['test_y'] = (stage1['test_y'] - stage1_y_mean) / stage1_y_std

### 1.3.2 Stage2 데이터 표준화하기

In [None]:
stage2_X_mean = stage2['train_X'].mean(axis = 0)
stage2_y_mean = stage2['train_y'].mean(axis = 0)
print('입력값 평균:', stage2_X_mean)
print('출력값 평균:', stage2_y_mean)

In [None]:
stage2_X_std = stage2['train_X'].std(axis = 0)
stage2_y_std = stage2['train_y'].std(axis = 0)
print('입력값 표준편차:', stage2_X_std)
print('출력값 표준편차:', stage2_y_std)

In [None]:
# 학습 데이터 표준화
stage2['train_X'] = (stage2['train_X'] - stage2_X_mean) / stage2_X_std
stage2['train_y'] = (stage2['train_y'] - stage2_y_mean) / stage2_y_std

# 검증용 데이터 표준화
stage2['valid_X'] = (stage2['valid_X'] - stage2_X_mean) / stage2_X_std
stage2['valid_y'] = (stage2['valid_y'] - stage2_y_mean) / stage2_y_std

# 테스트 데이터 표준화
stage2['test_X'] = (stage2['test_X'] - stage2_X_mean) / stage2_X_std
stage2['test_y'] = (stage2['test_y'] - stage2_y_mean) / stage2_y_std

### 1.4 Hyper-parameter 범위 설정하기

랜덤 서치를 이용한 하이퍼파라미터 설정을 해봅니다. 이때 실습 시간을 고려하여, 학습데이터 중 1000개만 사용해봅니다.

In [None]:
learning_rate = [0.005, 0.03]
dropout_rate = [0.0, 0.2]
trials = 10

In [None]:
# 랜덤한 하이퍼파라미터를 리턴하는 함수
def sampling(parameter_range):
    min_value, max_value = parameter_range
    random_value = np.random.random()
    return random_value * (max_value - min_value) + min_value

In [None]:
lrs = []
drs = []
r2s = []

# 총 10회 반복
# 매 반복마다 seed 를 다르게 설정 -> 매 반복마다 random한 값들로 구성됨
for try_ in range(trials):
    np.random.seed(try_)
    
    # 0.005부터 0.03의 범위에서 랜덤한 값이 도출됨
    lr = sampling(learning_rate)
    
    # 0.0부터 0.2의 범위에서 랜덤한 값이 도출됨
    dr = sampling(dropout_rate)
    print('%d 번째 시도 - 학습률: %.3f, dropout rate: %.3f'%(try_ + 1, lr, dr))
    
    # 모델 정의
    np.random.seed(0)
    random.seed(0)
    tf.random.set_seed(0)
    MLP_model = tf.keras.Sequential([
        Input(shape = stage1['train_X'].shape[1]),
        tf.keras.layers.Dense(64, activation = 'relu'),
        tf.keras.layers.Dropout(rate=dr), 
        tf.keras.layers.Dense(32, activation = 'relu'),
        tf.keras.layers.Dropout(rate=dr), 
        tf.keras.layers.Dense(stage1['train_y'].shape[1])
    ])
    # 모델 컴파일
    MLP_model.compile(loss = 'mse',
              optimizer = tf.keras.optimizers.SGD(learning_rate = lr),
    )
    # 모델 학습
    history = MLP_model.fit(stage1['train_X'], stage1['train_y'], epochs = 50, batch_size = 16, verbose = 0)
    # 모델 예측
    pred = MLP_model.predict(stage1['test_X'])
    # 모델 평가
    r2 = sklearn.metrics.r2_score(stage1['test_y'], pred)
    print("    -> R2 score: %f"%r2)
    
    # 빈 리스트에 매 반복마다 학습률, dropout rate, r-score 추가
    # 마지막에 시각화하기 위함
    lrs.append(lr)
    drs.append(dr)
    r2s.append(r2)
    

In [None]:
plt.scatter(lrs, r2s)
plt.xlabel('learning rate')
plt.ylabel('r2 score')
plt.show()

In [None]:
plt.scatter(drs, r2s)
plt.xlabel('dropout rate')
plt.ylabel('r2 score')
plt.show()

In [None]:
np.corrcoef(lrs, r2s)[1,0]

In [None]:
np.corrcoef(drs, r2s)[1,0]

학습률은 모델의 성능과 상관 관계가 큰 반면, dropout rate는 비교적 상관 관계가 약한 것을 확인할 수 있습니다.

**[TODO] Stage 2 데이터에 대해 hyper-parameter 튜닝을 수행해보세요.**

In [None]:
lrs = []
drs = []
r2s = []
for try_ in range(trials):
    np.random.seed(try_)
    lr = sampling(learning_rate)
    dr = sampling(dropout_rate)
    print('%d 번째 시도 - 학습률: %.3f, dropout rate: %.3f'%(try_ + 1, lr, dr))
    
    # 모델 정의
    np.random.seed(0)
    random.seed(0)
    tf.random.set_seed(0)
    MLP_model = tf.keras.Sequential([
        Input(shape = stage2['train_X'].shape[1]),
        tf.keras.layers.Dense(64, activation = 'relu'),
        tf.keras.layers.Dropout(rate=dr), 
        tf.keras.layers.Dense(32, activation = 'relu'),
        tf.keras.layers.Dropout(rate=dr), 
        tf.keras.layers.Dense(stage2['train_y'].shape[1])
    ])
    # 모델 컴파일
    MLP_model.compile(loss = 'mse',
              optimizer = tf.keras.optimizers.SGD(learning_rate = lr),
    )
    # 모델 학습
    history = MLP_model.fit(stage2['train_X'], stage2['train_y'], epochs = 50, batch_size = 16, verbose = 0)
    # 모델 예측
    pred = MLP_model.predict(stage2['test_X'])
    # 모델 평가
    r2 = sklearn.metrics.r2_score(stage2['test_y'], pred)
    print("    -> R2 score: %f"%r2)
    lrs.append(lr)
    drs.append(dr)
    r2s.append(r2)

In [None]:
np.corrcoef(lrs, r2s)[1,0]

In [None]:
np.corrcoef(drs, r2s)[1,0]

## 2. K-fold 교차 검증

### 2.1 데이터 합치기

In [None]:
X_stage1 = np.concatenate([stage1['train_X'], stage1['valid_X']])
y_stage1 = np.concatenate([stage1['train_y'], stage1['valid_y']])

In [None]:
# 반복1
# training idxs
# 0 ~ 1/5까지
# 2/5 ~ 끝까지

# valid idxs
# 1/5 ~ 2/5 Rkwl

# 반복2
# training idxs
# 0 ~ 2/5까지
# 3/5 ~ 끝까지

# valid idxs
# 2/5 ~ 3/5까지

In [None]:
# k-fold 데이터셋을 도출하는 함수
# 매개변수로 독립변수, 종속변수, fold 개수를 받음
# k-1개: training, 1개: validation
def get_K_fold_dataset(X, y, K):
    dataset = {}
    len_data = len(X)
    idxs = np.arange(len_data)
    
    # k번 반복수행
    for k in range(K):
        # training: 데이터 길이의 1/5부터 2/5, 2/5부터 
        training_idxs = np.concatenate([idxs[:int(len_data * k/K)], idxs[int(len_data * (k+1)/K):]])
        valid_idxs = idxs[int(len_data * k/K) : int(len_data * (k+1)/K)]
        dataset['%d-fold'%(k+1)] = {
            'train_X': X[training_idxs],
            'valid_X': X[valid_idxs],
            'train_y': y[training_idxs],
            'valid_y': y[valid_idxs]
        }
    return dataset

데이터를 K 개의 학습-테스트 셋으로 분리하는 함수를 정의합니다.

In [None]:
# 5개의 fold
# K_fold는 dictionary 안에 dictionary 형태의 데이터셋
# 총 5개의 key: '1-fold', '2-fold', '3-fold', '4-fold', '5-fold' 가 있음.
# 각각의 key에 대한 value에는 train_X, valid_X, train_y, valid_y라는 또다른 key값이 있음
K_fold = get_K_fold_dataset(X_stage1, y_stage1, 5)

In [None]:
r2s = []
# 매 반복마다 K에는 각각
# train_X, valid_X, train_y, valid_y 이 들어감
for K in K_fold.keys():
    # 첫번째 반복이라 가정 -> 이때 K는 1-fold
    # 따라서 데이터셋은 1-fold 에 해당하는 train_X, valid_X, train_y, valid_y
    dataset = K_fold[K]
    print('%s'%K)
    # 모델 정의
    np.random.seed(0)
    random.seed(0)
    tf.random.set_seed(0)
    MLP_model = tf.keras.Sequential([
        Input(shape = stage1['train_X'].shape[1]),
        tf.keras.layers.Dense(64, activation = 'relu'),
        tf.keras.layers.Dense(32, activation = 'relu'),
        tf.keras.layers.Dense(stage1['train_y'].shape[1])
    ])
    # 모델 컴파일
    MLP_model.compile(loss = 'mse',
              optimizer = tf.keras.optimizers.SGD(),
    )
    # 모델 학습
    history = MLP_model.fit(dataset['train_X'], dataset['train_y'], epochs = 50, batch_size = 16, verbose = 0)
    # 모델 예측
    pred = MLP_model.predict(dataset['valid_X'])
    # 모델 평가
    r2 = sklearn.metrics.r2_score(dataset['valid_y'], pred)
    print("    -> R2 score: %f"%r2)
    r2s.append(r2)

In [None]:
# K개 fold에서 나온 성능의 평균
Average_r2 = np.mean(r2s)
print("Average R2 score: %f"%Average_r2)

모델의 최종 성능을 평가할 때에는, K개의 fold에서 나온 성능의 평균을 취합니다.

**[TODO] Stage2 데이터에 대해 K-fold 교차검증을 수행해보세요.**

In [None]:
X_stage2 = np.concatenate([stage2['train_X'], stage2['valid_X']])
y_stage2 = np.concatenate([stage2['train_y'], stage2['valid_y']])

In [None]:
K_fold = get_K_fold_dataset(X_stage2, y_stage2, 5)

In [None]:
r2s = []
for K in K_fold.keys():
    dataset = K_fold[K]
    print('%s'%K)
    # 모델 정의
    np.random.seed(0)
    random.seed(0)
    tf.random.set_seed(0)
    MLP_model = tf.keras.Sequential([
        Input(shape = stage2['train_X'].shape[1]),
        tf.keras.layers.Dense(64, activation = 'relu'),
        tf.keras.layers.Dense(32, activation = 'relu'),
        tf.keras.layers.Dense(stage2['train_y'].shape[1])
    ])
    # 모델 컴파일
    MLP_model.compile(loss = 'mse',
              optimizer = tf.keras.optimizers.SGD(),
    )
    # 모델 학습
    history = MLP_model.fit(dataset['train_X'], dataset['train_y'], epochs = 50, batch_size = 16, verbose = 0)
    # 모델 예측
    pred = MLP_model.predict(dataset['valid_X'])
    # 모델 평가
    r2 = sklearn.metrics.r2_score(dataset['valid_y'], pred)
    print("    -> R2 score: %f"%r2)
    r2s.append(r2)

In [None]:
Average_r2 = np.mean(r2s)
print("Average R2 score: %f"%Average_r2)

## 3. Residual network

### 3.1 Residual network 정의

In [None]:
# tf.keras.Model 이라는 부모클래스에서 상속받아서 새로운 클래스 정의
# 상속을 받으면 tf.keras.Model 이 갖고 있는 모든 특성 (메소드, 멤버변수 등) 가져다가 사용할 수 있음
# 붕어빵 틀

# 클래스와 객체
# 클래스가 붕어빵 틀이라면 객체는 붕어빵
# 생성자는 슈크림 붕어빵인지 팥 붕어빵인지 초기에 세팅해주는 것

class ResidualMLP(tf.keras.Model):
    # 생성자
    # self는 객체의 주소값을 받아옴 -> 내가 빵틀에서 빵을 찍어낼건데, 너는 어떤 빵이니?
    # out_dim, use_residual을 생성자로 받아옴
    
    def __init__(self, out_dim, use_residual):
        # 부모클래스의 생성자를 그대로 가져옴 -> Model이 가지고 있는 모든 함수 이용 가능
        super(ResidualMLP, self).__init__()
        self.use_residual = use_residual
        # 총 8개의 fully connected layer
        self.fc1 = tf.keras.layers.Dense(16, activation = 'relu')
        self.fc2 = tf.keras.layers.Dense(16, activation = 'relu')
        self.fc3 = tf.keras.layers.Dense(16, activation = 'relu')
        self.fc4 = tf.keras.layers.Dense(16, activation = 'relu')
        self.fc5 = tf.keras.layers.Dense(16, activation = 'relu')
        self.fc6 = tf.keras.layers.Dense(16, activation = 'relu')
        self.fc7 = tf.keras.layers.Dense(16, activation = 'relu')
        self.fc8 = tf.keras.layers.Dense(16, activation = 'relu')
        self.fc9 = tf.keras.layers.Dense(out_dim)

    def call(self, x):
        # 객체의 use_residual이 True라면 실행
        if self.use_residual:
            h = self.fc1(x)
            h = self.fc2(h) + h
            h = self.fc3(h) + h
            h = self.fc4(h) + h
            h = self.fc5(h) + h
            h = self.fc6(h) + h
            h = self.fc7(h) + h
            h = self.fc8(h) + h
            h = self.fc9(h)
        else:
            h = self.fc1(x)
            h = self.fc2(h)
            h = self.fc3(h)
            h = self.fc4(h)
            h = self.fc5(h)
            h = self.fc6(h)
            h = self.fc7(h)
            h = self.fc8(h)
            h = self.fc9(h)
        return h

Resiudal network 같이 복잡한 네트워크를 정의하기 위해서는 Sequential 보다 위 처럼 직접 class 를 정의하는 것이 편리합니다.

In [None]:
np.random.seed(0)
random.seed(0)
tf.random.set_seed(0)

# 객체 생성 (use_residual 은 True)
MLP_model_residual = ResidualMLP(stage1['train_y'].shape[1], use_residual = True)

### 3.2 Residual network 컴파일

In [None]:
MLP_model_residual.compile(loss = 'mse',
              optimizer = tf.keras.optimizers.SGD(),
)

### 3.3 Residual network 학습

In [None]:
history = MLP_model_residual.fit(stage1['train_X'], stage1['train_y'], epochs = 500, batch_size = 16, verbose = 2)

### 3.4 Residual network 예측 및 평가

In [None]:
pred = MLP_model_residual.predict(stage1['test_X'])

In [None]:
r2 = sklearn.metrics.r2_score(stage1['test_y'], pred)
print("R2 score: %f"%r2)

### 3.5 일반 MLP 와의 성능 비교

In [None]:
# 모델 정의
np.random.seed(0)
random.seed(0)
tf.random.set_seed(0)

# 객체 생성 (use_residual 은 False)
MLP_model = ResidualMLP(stage1['train_y'].shape[1], use_residual = False)

use_residual 을 False로 설정하여 일반 MLP 네트워크를 정의합니다.

In [None]:
# 모델 컴파일
MLP_model.compile(loss = 'mse',
              optimizer = tf.keras.optimizers.SGD(),
)

In [None]:
# 모델 학습
history = MLP_model.fit(stage1['train_X'], stage1['train_y'], epochs = 50, batch_size = 16, verbose = 2)

In [None]:
# 모델 예측
pred = MLP_model.predict(stage1['test_X'])

In [None]:
# 모델 평가
r2 = sklearn.metrics.r2_score(stage1['test_y'], pred)
print("R2 score: %f"%r2)

Residual network 가 더 성능이 좋은 것을 확인할 수 있습니다.

**[TODO] Stage2 데이터에 대해 Residual network 를 학습해보세요.**

In [None]:
# resiudal network 모델 정의
np.random.seed(0)
random.seed(0)
tf.random.set_seed(0)
MLP_model_residual = ResidualMLP(stage2['train_y'].shape[1], use_residual = True)

# resiudal network 사용하지 않는 모델 정의
np.random.seed(0)
random.seed(0)
tf.random.set_seed(0)
MLP_model = ResidualMLP(stage2['train_y'].shape[1], use_residual = False)

In [None]:
# 모델 컴파일
MLP_model.compile(loss = 'mse',
              optimizer = tf.keras.optimizers.SGD(),
)

MLP_model_residual.compile(loss = 'mse',
              optimizer = tf.keras.optimizers.SGD(),
)

In [None]:
# residual 모델 학습
MLP_model_residual.fit(stage2['train_X'], stage2['train_y'], epochs = 50, batch_size = 16, verbose = 2)

In [None]:
# 일반 MLP 모델 학습
MLP_model.fit(stage2['train_X'], stage2['train_y'], epochs = 50, batch_size = 16, verbose = 2)

In [None]:
# Residual 모델 예측
pred = MLP_model_residual.predict(stage2['test_X'])

In [None]:
# Residual 모델 평가
r2 = sklearn.metrics.r2_score(stage2['test_y'], pred)
print("R2 score: %f"%r2)

In [None]:
# 일반 MLP 모델 예측
pred = MLP_model.predict(stage2['test_X'])

In [None]:
# 일반 MLP 모델 평가
r2 = sklearn.metrics.r2_score(stage2['test_y'], pred)
print("R2 score: %f"%r2)

## 4. Positional encoding

In [None]:
# L은 위치 인코딩에서 사용되는 주파수(frequency)의 개수를 의미.
# frequency는 주파수를 의미
# 위치 인코딩은 사인(sin)과 코사인(cos) 함수를 사용하여 위치 정보를 인코딩
# 이 때 주파수가 높을수록 더 빠른 주기로 변화
# 주파수는 주기를 반복하는 빈도를 나타내는데, 예를 들어 주파수가 높을수록 더 자주 반복되는 주기가 발생
# 따라서 L 값이 클수록 주파수가 높아지며, 더 많은 주기가 포함된 위치 인코딩 값을 생성
# 이로 인해 더 많은 세부적인 위치 정보가 인코딩되어 모델이 입력 시퀀스의 순서를 더 세밀하게 학습

# X: 입력 시퀀스의 위치 정보를 나타내는 배열. 크기는 (N, d)이며, N은 시퀀스의 길이이고, d는 입력 벡터의 차원
# L: 위치 인코딩의 차원을 결정하는 파라미터로, 정수
def positional_encoding(X, L):
    Xs = []
    # l이 증가할수록 사인과 코사인 함수의 주기가 더 빠르게 변하므로, 더 높은 주파수 정보를 포함합니다.
    for l in range(L):
        # sin, cos의 인자값은 radian이기 때문에 pi를 곱해줌
        Xs.append(np.sin(2 ** l * np.pi * X))
        Xs.append(np.cos(2 ** l * np.pi * X))        
    return np.concatenate(Xs, axis = -1)

입력 데이터를 L개의 frequency를 가진 데이터로 변환하는 함수를 정의합니다.

In [None]:
train_X = positional_encoding(stage1['train_X'], 5)
test_X = positional_encoding(stage1['test_X'], 5)

In [None]:
train_X.shape

In [None]:
np.random.seed(0)
random.seed(0)
tf.random.set_seed(0)
MLP_model_pe = ResidualMLP(stage1['train_y'].shape[1], use_residual = True)

In [None]:
MLP_model_pe.compile(loss = 'mse',
              optimizer = tf.keras.optimizers.SGD(),
)

In [None]:
history = MLP_model_pe.fit(train_X, stage1['train_y'], epochs = 50, batch_size = 16, verbose = 2)

In [None]:
pred = MLP_model_pe.predict(test_X)

In [None]:
r2 = sklearn.metrics.r2_score(stage1['test_y'], pred)
print("R2 score: %f"%r2)

금속분말 데이터셋에 대해서 positional encoding은 좋은 효과가 없었습니다. 모든 머신러닝 방법론이 항상 좋은 성능을 보장하진 않습니다. 

**[TODO] Stage2 데이터에 대해 positional encoding 적용해보기**

In [None]:
train_X = positional_encoding(stage2['train_X'], 5)
test_X = positional_encoding(stage2['test_X'], 5)

In [None]:
np.random.seed(0)
random.seed(0)
tf.random.set_seed(0)
MLP_model_pe = ResidualMLP(stage2['train_y'].shape[1], use_residual = True)

In [None]:
MLP_model_pe.compile(loss = 'mse',
              optimizer = tf.keras.optimizers.SGD(),
)

In [None]:
history = MLP_model_pe.fit(train_X, stage2['train_y'], epochs = 50, batch_size = 16, verbose = 2)

In [None]:
pred = MLP_model_pe.predict(test_X)

In [None]:
r2 = sklearn.metrics.r2_score(stage2['test_y'], pred)
print("R2 score: %f"%r2)

## 5. [TODO] 최적의 인공지능 모델 구현

여태까지 배운 내용들을 종합하여 금속분말 데이터셋에 대해 최고 성능을 발휘하는 최적의 인공지능 모델을 만들어보세요.

<span style="color:rgb(120, 120, 120)">본 학습 자료를 포함한 사이트 내 모든 자료의 저작권은 엘리스에 있으며 외부로의 무단 복제, 배포 및 전송을 불허합니다.

Copyright @ elice all rights reserved</span>