In [None]:
from IPython.display import display, HTML, Image

In [None]:
!git clone https://github.com/KU-DIC/LG_time_series_day05.git #코랩 사용

# [머신러닝 기반 시계열 분석 2 실습]
# ANN
## [Deep Neural Networks - Classification(다중 분류)]

##### jupyter notebook 단축키

- ctrl+enter: 셀 실행   
- shift+enter: 셀 실행 및 다음 셀 이동   
- alt+enter: 셀 실행, 다음 셀 이동, 새로운 셀 생성
- a: 상단에 새로운 셀 만들기
- b: 하단에 새로운 셀 만들기
- dd: 셀 삭제(x: 셀 삭제)
- 함수 ( ) 안에서 shift+tab: arguments description. shift+tab+tab은 길게 볼 수 있도록

## 1. 모듈 불러오기

In [None]:
''' 기본 모듈 및 시각화 모듈 '''
import numpy as np
import pandas as pd
pd.set_option('display.max_columns', None)
import seaborn as sns
import matplotlib.pyplot as plt

''' 데이터 전처리 모듈 '''
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

'''Neural Network을 위한 딥러닝 모듈'''
import torch
import copy
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

''' 결과 평가용 모듈 '''
from sklearn.metrics import accuracy_score, confusion_matrix
    
''' 기타 optional'''
import warnings, itertools
warnings.filterwarnings(action='ignore')

## 2. 분석데이터 : MNIST Handwritten Digits
   
<a href='http://yann.lecun.com/exdb/mnist/'> The MNIST Database Hompage </a>

In [None]:
Image('/content/LG_time_series_day05/image/intro10.png')

<center> <a href='http://colah.github.io/posts/2014-10-Visualizing-MNIST/'> http://colah.github.io/posts/2014-10-Visualizing-MNIST/ </a></center><br>

- 이미지 데이터는 픽셀로 이루어져 있으며, 각 픽셀의 밝기(intensity)를 통해 이미지를 나타낼 수 있다.
- MNIST 데이터는 이런 픽셀 값들을 설명변수(X)로 하여 어떤 숫자(Y)에 해당하는 이미지인지 분류하는 문제
- MNIST 데이터의 이미지는 가로 28 픽셀, 세로 28 픽셀로 이루어져 있으므로 28 x 28 = 784개의 설명변수로 10개의 클래스를 예측하는 다범주 분류문제

### 데이터 불러오기

In [None]:
data = pd.read_csv('/content/LG_time_series_day05/data/MNIST_Example.csv')
# data = pd.read_csv('./data/MNIST_Example.csv') #로컬

## 2.1 데이터 전처리 및 탐색적 데이터 분석

### 데이터 확인

In [None]:
print('Data shape: {}'.format(data.shape))
data.head()

### 클래스 비율 확인

In [None]:
data.groupby('Y')['Y'].count()

### 설명변수(X)와 반응변수 (Y) 정의

In [None]:
x = data.drop(labels='Y', axis=1)
y = data['Y']

In [None]:
display(x.head())
display(y.head())

### 학습데이터 (Training Data), 검증 데이터 (Validation Data) 및 테스트 데이터 (Testing Data) 나누기
- 학습데이터와 검증데이터, 테스트데이터의 클래스 비율이 달라지지 않도록 stratify 옵션 사용

In [None]:
train_x, test_x, train_y, test_y = train_test_split(x, y, stratify=y, test_size=0.2)

In [None]:
train_x, valid_x, train_y, valid_y = train_test_split(train_x, train_y, stratify=train_y, test_size=0.2)

In [None]:
print(f"학습 데이터셋 크기 : {train_x.shape}, 검증 데이터셋 크기 : {valid_x.shape}, 테스트 데이터셋 크기 : {test_x.shape}")

### Matplotlib 을 이용한 클래스 비율 plotting

In [None]:
plt.figure(figsize=(18, 6))

plt.subplot(131)
plt.hist(train_y, bins=np.arange(0, 10, 0.5))
plt.ylim(0, 70)
plt.xticks(np.arange(0 , 11, 1))
plt.title('Training Data')

plt.subplot(132)
plt.hist(valid_y, bins=np.arange(0, 10, 0.5))
plt.ylim(0, 70)
plt.xticks(np.arange(0 , 11, 1))
plt.title('Validation Data')

plt.subplot(133)
plt.hist(test_y, bins=np.arange(0, 10, 0.5))
plt.ylim(0, 70)
plt.xticks(np.arange(0 , 11, 1))
plt.title('Testing Data')
plt.show()

## 2-2. 모델링: Deep Neural Networks (DNN) Classifier

### Deep Neural Networks (DNN) Classifier에 적합한 입력값 형태로 변환
- Tensor : 데이터를 담는 다차원 행렬  
- Float Tensor : 다차원 행렬(Tensor) 내의 값이 실수값인 경우  
- Long Tensor : 다차원 행렬(Tensor) 내의 값이 정수값인 경우
- Tensor Dataset : 다차원 행렬(Tensor) 여러 개를 결합하여 하나의 데이터 셋으로 만드는 과정  
- Data Loader : 결합된 데이터 셋(Tensor Dataset)을 사용자가 지정하는 특정 크기인 배치 사이즈 (batch size)에 맞추어 출력하도록 설정

In [None]:
train_x_torch = torch.FloatTensor(train_x.values) # torch.FloatTensor(numpy)
train_y_torch = torch.LongTensor(train_y.values) # torch.LongTensor(numpy)
trainDataset = torch.utils.data.TensorDataset(train_x_torch, train_y_torch)
trainLoader = torch.utils.data.DataLoader(dataset = trainDataset,
                                         batch_size = 100,
                                         shuffle = True)

valid_x_torch = torch.FloatTensor(valid_x.values) # torch.FloatTensor(numpy)
valid_y_torch = torch.LongTensor(valid_y.values) # torch.LongTensor(numpy)
validDataset = torch.utils.data.TensorDataset(valid_x_torch, valid_y_torch)
validLoader = torch.utils.data.DataLoader(dataset = validDataset,
                                        batch_size = 100,
                                        shuffle = False)

test_x_torch = torch.FloatTensor(test_x.values) # torch.FloatTensor(numpy)
test_y_torch = torch.LongTensor(test_y.values) # torch.LongTensor(numpy)
testDataset = torch.utils.data.TensorDataset(test_x_torch, test_y_torch)
testLoader = torch.utils.data.DataLoader(dataset = testDataset,
                                        batch_size = 100,
                                        shuffle = False)

### DNN Classifier 구조

In [None]:
Image('/content/LG_time_series_day05/image/intro11.png')

In [None]:
class DNNClassifier(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim1, hidden_dim2, hidden_dim3, output_dim): 
        # input_dim = 784, output_dim = 10 (클래스 개수)
        super().__init__()
        
        ''' 모델 구조 만들기'''
        
        self.linear1 = torch.nn.Linear(input_dim, hidden_dim1) # input_dim(784) -> hidden_dim1(500)
        self.linear2 = torch.nn.Linear(hidden_dim1, hidden_dim2) # hidden_dim1(500) -> hidden_dim2(300)
        self.linear3 = torch.nn.Linear(hidden_dim2, hidden_dim3) # hidden_dim2(300) -> hidden_dim3(100)
        self.linear4 = torch.nn.Linear(hidden_dim3, output_dim) # hidden_dim3(100) -> output_dim(10)
        
        self.relu = torch.nn.ReLU() # Relu activation function
        self.dropout = torch.nn.Dropout(p=0.5)
        
    def forward(self, x):
        
        ''' 짜여진 모델에 설명 변수 데이터 x를 입력할 때 진행할 순서 설정'''

        x = self.linear1(x) 
        x = self.relu(x) 
        x = self.linear2(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear3(x)
        x = self.relu(x)
        output = self.linear4(x) 
        
        return output

### 모델 및 비용함수, Solver 설정

In [None]:
train_x.shape[1]

In [None]:
clf_DNN = DNNClassifier(train_x.shape[1], 500, 300, 100, 10) # 10=class 개수

In [None]:
# 비용함수 정의
criterion = torch.nn.CrossEntropyLoss()
# 경사하강법의 종류 정의 (adam)
solver = torch.optim.Adam(clf_DNN.parameters(), lr = 0.001)

### DNN Classifier 학습

In [None]:
num_epochs = 100

print("Start Training !")
print('-'*50)

train_loss_total = []
valid_loss_total = []
best_loss = np.inf

for epoch in range(num_epochs):
    
    train_loss = 0
    valid_loss = 0
    
    ''' Training '''
    clf_DNN.train()
    for x_data, y_data in trainLoader:
        
        # 정확한 학습을 위하여 모든 기울기 값을 0으로 설정
        solver.zero_grad()
        
        y_pred = clf_DNN(x_data)
        
        # 비용함수를 활용하여 오차 계산
        loss = criterion(y_pred, y_data)
        
        # 계산된 오차를 기반으로, 오차를 줄일 수 있는 방향으로 w값 업데이트
        loss.backward()
        solver.step() # forward evaluation, backward propagation, update를 모두 포함하는 step

        train_loss += loss.item()
    
    ''' Validation '''
    clf_DNN.eval()
    for eval_x_data, eval_y_data in validLoader:
        eval_y_pred = clf_DNN(eval_x_data)
        valid_loss += criterion(eval_y_pred, eval_y_data).item() # 딕셔너리에 있는 키와 값들의 쌍을 얻어 저장
        
    print('[%d epoch] Train loss : %.3f, Valid loss : %.3f' % (epoch+1, train_loss/len(trainLoader), valid_loss/len(validLoader)))
    
    if valid_loss/len(validLoader) < best_loss:
        # 로스값 업데이트
        best_loss = valid_loss/len(validLoader)
        # 최적의 epoch 수와 모델 저장하기
        best_epoch = epoch
        best_model = clf_DNN.state_dict()
    
    train_loss_total.append(train_loss/len(trainLoader))
    valid_loss_total.append(valid_loss/len(validLoader))
    
print('-'*50)
print("Finished Training ! Best Epoch is epoch %d." % (best_epoch+1))

### 학습 상태 확인 (learning curve)

In [None]:
plt.figure(figsize=(20,10))

# 학습 및 검증 로스 변동 관찰하기
plt.plot(train_loss_total,label='Train Loss')
plt.plot(valid_loss_total, label='Validation Loss')
# 최적의 모델이 저장된 곳 표시
plt.axvline(x = best_epoch, color='red', label='Best Epoch')
plt.legend(fontsize=20)
plt.title("Learning Curve of trained DNN Classifier", fontsize=18)
plt.show()

## 2-3. Deep Neural Networks 모델 성능 평가

In [None]:
# 최적의 모델 불러오기
best_clf_DNN = DNNClassifier(train_x.shape[1], 500, 300, 100, 10)
best_clf_DNN.load_state_dict(best_model)

In [None]:
# model을 evaluation 모드로 변경
best_clf_DNN.eval()

### 학습된 DNN Classifier 결과 확인 및 성능 평가: Training Data

In [None]:
# clf_mlp(data) == data -> logit -> probability=softmax(logit)
y_train_prob = best_clf_DNN(train_x_torch).softmax(dim=1)

# 가장 큰 확률값에 해당하는 범주를 예측 범주로 저장
y_train_pred = y_train_prob.max(1)[1].numpy()

In [None]:
y_train_prob[0].detach().numpy().tolist()

In [None]:
y_train_pred[0]

In [None]:
train_y.head(1)

In [None]:
train_accuracy = accuracy_score(y_pred=y_train_pred,y_true=train_y)

print(f"훈련 데이터셋 정확도: {train_accuracy:.3f}")

In [None]:
cm_train = confusion_matrix(y_true=train_y, y_pred=y_train_pred)
plt.figure(figsize=(8, 8))
sns.heatmap(data=cm_train, annot=True, fmt='d', annot_kws={'size': 18}, cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

### 학습된 DNN Classifier 결과 확인 및 성능 평가: Validation Data

In [None]:
y_valid_prob = best_clf_DNN(valid_x_torch).softmax(dim=1)
y_valid_pred = y_valid_prob.max(1)[1].numpy()
valid_accuracy = accuracy_score(y_pred=y_valid_pred,y_true=valid_y)

print(f"검증용 데이터셋 정확도: {valid_accuracy:.3f}")

In [None]:
cm_valid = confusion_matrix(y_true=valid_y, y_pred=y_valid_pred)
plt.figure(figsize=(8, 8))
sns.heatmap(data=cm_valid, annot=True, fmt='d', annot_kws={'size': 18}, cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

### 학습된 DNN Classifier 결과 확인 및 성능 평가: Testing Data

In [None]:
y_test_prob = best_clf_DNN(test_x_torch).softmax(dim=1)
y_test_pred = y_test_prob.max(1)[1].numpy()
test_accuracy = accuracy_score(y_pred=y_test_pred,y_true=test_y)

print(f"테스트용 데이터셋 정확도: {test_accuracy:.3f}")

In [None]:
cm_test = confusion_matrix(y_true=test_y, y_pred=y_test_pred)
plt.figure(figsize=(8, 8))
sns.heatmap(data=cm_test, annot=True, fmt='d', annot_kws={'size': 18}, cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

## 2-4. 실제 데이터 이미지로 확인

In [None]:
for_visualization_test_x = test_x_torch.numpy()[:100]
for_visualization_test_y = test_y_torch.numpy()[:100]

f ,axs = plt.subplots(5,5,figsize=(20,20))
plt.subplots_adjust(hspace=1) # subplot 간격 조정
for i in range(5):
    for j in range(5):
        
        x_data = for_visualization_test_x[10*i+j]
        y_data = for_visualization_test_y[10*i+j]

        real_class = str(y_data)
        predicted_class = y_test_pred[10*i+j]
        predicted_prob = np.max(y_test_prob[10*i+j].detach().numpy()).round(2)
            
        # x, y 축의 지점 표시를 안함
        axs[i,j].set_xticks([])
        axs[i,j].set_yticks([])

        # subplot의 제목을 i번째 결과에 해당하는 숫자로 설정
        axs[i,j].set_title(f"True class:{real_class}\nPredicted class:{predicted_class}\nProbability:{predicted_prob:.2f}", fontsize = 16)

        # 입력으로 사용한 i번째 테스트 이미지를 28x28로 재배열하고
        # 이 2차원 배열을 그레이스케일 이미지로 출력
        axs[i,j].imshow(x_data.reshape((28, 28)),cmap=plt.cm.gray_r)

plt.tight_layout()
plt.show()