<a href="https://colab.research.google.com/github/Dohy-Lee/NLP_By_Pytorch/blob/main/ch4_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## CNN(Convolution Neural Network)  
공간상의 부분 구조를 감지하는 데 적합함.  
소수의 가중치를 사용하여 입력 데이터 텐서를 스캔하는 방식으로 이를 수행함. 스캔하면서 부분 구조의 유무를 표현하는 출력 텐서를 만듦  
 * 1차원 합성곱 (Conv1d) : 각 타임 스텝에 특성 벡터가 있는 시계열에 잘 맞음.  
 이 경우 시퀀스 차원을 따라 패턴을 학습할 수 있음.  
 NLP에서 합성곱 연산은 대부분 1차원임
 * 2차원 합성곱 (Conv2d) : 데이터의 두 방향을 따라 시공간 패턴을 감지. 예) 이미지의 높이와 너비 차원  
 이미지 처리 분야에서 많이 쓰임  
 * 3차원 합성곱 (Conv3d) : 데이터의 세 방향을 따라 패턴을 감지.   
 예) 비디오 데이터(두 차원은 이미지 프레임을 나타내고, 하나는 시간 차원은 프레임의 시퀀스)
 * 채널(Channel) : 입력의 각 포인트에 있는 특성 차원을 의미  
 예) 이미지의 경우 픽셀마다 RGB에 해당하는 차원이 3개가 있음  
텍스트 문서의 픽셀이 단어라면 채널 개수는 어휘 크기(이 집합이 어휘 사전을 이룸)
 * 커널 크기 : 커널 행렬의 너비. 커널 크기를 늘리면 출력의 크기가 줄어듦  
NLP에서는 여러 단어를 보면서 언어의 패턴을 감지하는 <u>'n-그램'</u>과 비슷하다 볼 수 있음.  
커널 크기가 작을수록, 작고 자주 등장하는 패턴을 감지. 커널 크기가 커지면 큰 패턴을 감지  
→ 작은 커널은 상세한 특성을 출력, 큰 커널은 성긴 특성을 만듦
 * 스트라이드 : 합성공 간의 스탭 크기 제어. 높이면  출력 텐서의 크기를 의도적으로 줄여서 정보를 요약할 수 있음.  
 스트라이드 = 커널의크기: 커널 연산이 겹치지 않음  
 스트라이드 = 1 : 커널이 가장 많이 겹침. 
 * 패딩 : 입력텐서의 (1D,2D,3D라면) 길이, (2D, 3D라면) 높이, (3D라면) 깊이차원의 앞뒤에 0을 추가하여 인공적으로 늘려줌
 → CNN이 합성곱을 더 많이 수행할 수 있으며, 출력 크기를 조절할 수 있음.
 * 다일레이션(팽창 계수) : 합성곱 커널이 입력 행렬에 적용되는 방식을 제어  
 다일레이션을 디폴트값 1에서 2로 늘리면 입력 행렬에 적용될 때 커널의 원소 사이에 공간이 생김  
즉, dilation이 2라면 커널 사이의 간격이 2가 되는 것이고, 커널의 크기가 (3,3)이라면 (5,5) 커널과 동일한 넓이  
파라미터 개수를 늘리지 않고 넓은 입력 공간을 요약하는 데 유용  
다일레이션 합성곱은 합성곱 층을 쌓을 때 매우 유용하다고 입증되었음.  
연속된 다일레이션 합성곱은 수용장의 크기를 기하급수적으로 늘려줌.  
*&nbsp;수용장 : 신경망이 예측을 만들기 전에 바라보는 입력 공간의 크기

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

seed = 1337

torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

In [3]:
# 인공 데이터와 Conv1d
batch_size = 2
one_hot_size = 10
sequence_width = 7
data = torch.randn(batch_size, one_hot_size, sequence_width) # 크기는 3차원. 벡터로 변경된 텍스트 데이터에서 만든 미니배치의 크기임.
conv1 = nn.Conv1d(in_channels=one_hot_size, out_channels=16, kernel_size=3)
intermediate1 = conv1(data)
print(data.size())
print(intermediate1.size())

torch.Size([2, 10, 7])
torch.Size([2, 16, 5])


In [4]:
print(data,'\n')
print(intermediate1)

tensor([[[ 0.1808, -0.0700, -0.3596, -0.9152,  0.6258,  0.0255,  0.9545],
         [ 0.0643,  0.3612,  1.1679, -1.3499, -0.5102,  0.2360, -0.2398],
         [-0.9211,  1.5433,  1.3488, -0.1396,  0.2858,  0.9651, -2.0371],
         [ 0.4931,  1.4870,  0.5910,  0.1260, -1.5627, -1.1601, -0.3348],
         [ 0.4478, -0.8016,  1.5236,  2.5086, -0.6631, -0.2513,  1.0101],
         [ 0.1215,  0.1584,  1.1340, -1.1539, -0.2984, -0.5075, -0.9239],
         [ 0.5467, -1.4948, -1.2057,  0.5718, -0.5974, -0.6937,  1.6455],
         [-0.8030,  1.3514, -0.2759, -1.5108,  2.1048,  2.7630, -1.7465],
         [ 1.4516, -1.5103,  0.8212, -0.2115,  0.7789,  1.5333,  1.6097],
         [-0.4032, -0.8345,  0.5978, -0.0514, -0.0646, -0.4970,  0.4658]],

        [[-0.2573, -1.0673,  2.0089, -0.5370,  0.2228,  0.6971, -1.4267],
         [ 0.9059,  0.1446,  0.2280,  2.4900, -1.2237,  1.0107,  0.5560],
         [-1.5935, -1.2706,  0.6903, -0.1961,  0.3449, -0.3419,  0.4759],
         [-0.7663, -0.4190, -0.4370,

## 출력 텐서 크기를 줄이는 방법 3가지
 * 1) 합성곱 층을 더 만들어 차례로 적용 

In [5]:
# 데이터에 반복 적용한 합성곱
conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3)
conv3 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3)

intermediate2 = conv2(intermediate1)
intermediate3 = conv3(intermediate2)

print(intermediate2.size())
print(intermediate3.size())

torch.Size([2, 32, 3])
torch.Size([2, 64, 1])


In [6]:
y_output = intermediate3.squeeze() #squeeze : size=1인 차원을 모두 삭제한 결과 반환
print(y_output.size())

torch.Size([2, 64])


 * 2) 남은 값을 특성 벡터로 펼치는 방법
  * 모든 벡터를 파이토치의 view 메서드를 사용해 하나의 벡터로 펼침
  * 모든 정보를 유지하지만, 필요 이상으로 (혹은 적절한 계산량 이상으로) 큰 특성 벡터를 만들수도 있음
 * 3) 특성 맵 차원을 따라 평균을 계산하는 방법
  * 수학 연산을 사용해 정보를 벡터로 요약. 수학 연산으론 산술 평균, 합산, 최댓값 고르는 등이 있지만, 산술 평균을 많이 사용함.
  * 특성 맵 차원의 크기에 상관은 없지만, 일부 정보를 잃을 수 있음

In [7]:
# 특성 벡터를 줄이는 두 가지 추가 방법

# 특성 벡터를 줄이는 방법 2
print(intermediate1.view(batch_size, -1).size())

# 특성 벡터를 줄이는 방법 3
print(torch.mean(intermediate1, dim=2).size())

torch.Size([2, 80])
torch.Size([2, 16])


# CNN으로 성씨 분류하기

In [8]:
from argparse import Namespace
from collections import Counter
import json
import os
import string

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import tqdm

In [9]:
# 데이터 벡터 변환
class Vocabulary(object): # 매핑을 위해 텍스트를 처리하고 어휘 사전을 만듦

    def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"): # token_to_idx (dict): 기존 토큰-인덱스 매핑 딕셔너리
                                                                            # add_unk (bool): UNK 토큰을 추가할지 지정하는 플래그
                                                                            # unk_token (str): Vocabulary에 추가할 UNK 토큰

        if token_to_idx is None:
            token_to_idx = {}
        self._token_to_idx = token_to_idx

        self._idx_to_token = {idx: token 
                              for token, idx in self._token_to_idx.items()}
        
        self._add_unk = add_unk
        self._unk_token = unk_token
        
        self.unk_index = -1
        if add_unk:
            self.unk_index = self.add_token(unk_token) 
        
        
    def to_serializable(self): # 직렬화할 수 있는 딕셔너리 반환
        return {'token_to_idx': self._token_to_idx, 
                'add_unk': self._add_unk, 
                'unk_token': self._unk_token}

    @classmethod
    def from_serializable(cls, contents): #직렬화된 딕셔너리에서 Vocabulary 객체를 만듦
        return cls(**contents)

    def add_token(self, token): # 토큰을 기반으로 매핑 딕셔너리 업데이트
                                # token (str): Vocabulary에 추가할 토큰
                                # 반환값 : index (int): 토큰에 상응하는 정수
        try:
            index = self._token_to_idx[token]
        except KeyError:
            index = len(self._token_to_idx)
            self._token_to_idx[token] = index
            self._idx_to_token[index] = token
        return index
    
    def add_many(self, tokens): # 토큰 리스트를 Vocabulary에 추가
                                # tokens (list): 문자열 토큰 리스트
                                # 반환값 : indices (list): 토큰 리스트에 상응되는 인덱스 리스트
        return [self.add_token(token) for token in tokens]

    def lookup_token(self, token): # 토큰에 대응하는 인덱스를 추출
                                   # token (str): 찾을 토큰 
                                   # 반환값: index (int): 토큰에 해당하는 인덱스
                                   # UNK 토큰을 사용하려면 (Vocabulary에 추가하기 위해) `unk_index`가 0보다 커야함
        if self.unk_index >= 0:
            return self._token_to_idx.get(token, self.unk_index)
        else:
            return self._token_to_idx[token]

    def lookup_index(self, index): # 인덱스에 해당하는 토큰을 반환
                                   # index (int): 찾을 인덱스
                                   # 반환값 : token (str): 인텍스에 해당하는 토큰

        if index not in self._idx_to_token: # 인덱스가 Vocabulary에 없을 때 발생
            raise KeyError("the index (%d) is not in the Vocabulary" % index)
        return self._idx_to_token[index]

    def __str__(self):
        return "<Vocabulary(size=%d)>" % len(self)

    def __len__(self):
        return len(self._token_to_idx)

In [10]:
class SurnameVectorizer(object): # 어휘 사전을 생성하고 관리

    def __init__(self, surname_vocab, nationality_vocab, max_surname_length): # surname_vocab (Vocabulary): 문자를 정수에 매핑하는 Vocabulary 객체
                                                                              # nationality_vocab (Vocabulary): 국적을 정수에 매핑하는 Vocabulary 객체
                                                                              # max_surname_length (int): 가장 긴 성씨 길이
        self.surname_vocab = surname_vocab
        self.nationality_vocab = nationality_vocab
        self._max_surname_length = max_surname_length

    def vectorize(self, surname): # 성씨에 대한 원-핫 벡터를 만듦
                                  # surname (str): 성씨
                                  # 반환값 : one_hot (np.ndarray): 원-핫 벡터의 행렬
        one_hot_matrix_size = (len(self.surname_vocab), self._max_surname_length)
        one_hot_matrix = np.zeros(one_hot_matrix_size, dtype=np.float32)
                               
        for position_index, character in enumerate(surname):
            character_index = self.surname_vocab.lookup_token(character)
            one_hot_matrix[character_index][position_index] = 1
        
        return one_hot_matrix

    @classmethod
    def from_dataframe(cls, surname_df): # 데이터셋 데이터프레임에서 Vectorizer 객체를 만듦
                                         # surname_df (pandas.DataFrame): 성씨 데이터셋
                                         # 반환값 : SurnameVectorizer 객체

        surname_vocab = Vocabulary(unk_token="@")
        nationality_vocab = Vocabulary(add_unk=False)
        max_surname_length = 0

        for index, row in surname_df.iterrows():
            max_surname_length = max(max_surname_length, len(row.surname))
            for letter in row.surname:
                surname_vocab.add_token(letter)
            nationality_vocab.add_token(row.nationality)

        return cls(surname_vocab, nationality_vocab, max_surname_length)

    @classmethod
    def from_serializable(cls, contents):
        surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
        nationality_vocab =  Vocabulary.from_serializable(contents['nationality_vocab'])
        return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab, 
                   max_surname_length=contents['max_surname_length'])

    def to_serializable(self):
        return {'surname_vocab': self.surname_vocab.to_serializable(),
                'nationality_vocab': self.nationality_vocab.to_serializable(), 
                'max_surname_length': self._max_surname_length}

In [11]:
class SurnameDataset(Dataset):
    def __init__(self, surname_df, vectorizer): # surname_df (pandas.DataFrame): 데이터셋
                                                # vectorizer (SurnameVectorizer): SurnameVectorizer 객체
        self.surname_df = surname_df
        self._vectorizer = vectorizer
        self.train_df = self.surname_df[self.surname_df.split=='train']
        self.train_size = len(self.train_df)

        self.val_df = self.surname_df[self.surname_df.split=='val']
        self.validation_size = len(self.val_df)

        self.test_df = self.surname_df[self.surname_df.split=='test']
        self.test_size = len(self.test_df)

        self._lookup_dict = {'train': (self.train_df, self.train_size),
                             'val': (self.val_df, self.validation_size),
                             'test': (self.test_df, self.test_size)}

        self.set_split('train')
        
        # 클래스 가중치
        class_counts = surname_df.nationality.value_counts().to_dict()
        def sort_key(item):
            return self._vectorizer.nationality_vocab.lookup_token(item[0])
        sorted_counts = sorted(class_counts.items(), key=sort_key)
        frequencies = [count for _, count in sorted_counts]
        self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)


    @classmethod
    def load_dataset_and_make_vectorizer(cls, surname_csv): # 데이터셋을 로드하고 새로운 SurnameVectorizer 객체를 만듦
                                                            # review_csv (str): 데이터셋의 위치
                                                            # 반환값 : SurnameDataset의 인스턴스
        surname_df = pd.read_csv(surname_csv)
        train_surname_df = surname_df[surname_df.split=='train']
        return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))

    @classmethod
    def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath): # 데이터셋을 로드하고 새로운 SurnameVectorizer 객체를 만듦. 캐시된 SurnameVectorizer 객체를 재사용할 때 사용
                                                                                 # surname_csv (str): 데이터셋의 위치
                                                                                 # vectorizer_filepath (str): SurnameVectorizer 객체의 저장 위치
                                                                                 # 반환값 : SurnameDataset의 인스턴스
        surname_df = pd.read_csv(surname_csv)
        vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
        return cls(surname_df, vectorizer)

    @staticmethod
    def load_vectorizer_only(vectorizer_filepath): # 파일에서 SurnameVectorizer 객체를 로드하는 정적 메서드
                                                   # vectorizer_filepath (str): 직렬화된 SurnameVectorizer 객체의 위치
                                                   # 반환값 : SurnameVectorizer의 인스턴스
        with open(vectorizer_filepath) as fp:
            return SurnameVectorizer.from_serializable(json.load(fp))

    def save_vectorizer(self, vectorizer_filepath): # SurnameVectorizer 객체를 json 형태로 디스크에 저장
                                                    # vectorizer_filepath (str): SurnameVectorizer 객체의 저장 위치
        with open(vectorizer_filepath, "w") as fp:
            json.dump(self._vectorizer.to_serializable(), fp)

    def get_vectorizer(self): # 벡터 변환 객체를 반환
        return self._vectorizer

    def set_split(self, split="train"): # 데이터프레임에 있는 열을 사용해 분할 세트를 선택
                                        # split (str): "train", "val", "test" 중 하나
        self._target_split = split
        self._target_df, self._target_size = self._lookup_dict[split]

    def __len__(self):
        return self._target_size

    def __getitem__(self, index): # index (int): 데이터 포인트의 인덱스
                                  # 반환값 : 데이터 포인트의 특성(x_surname)과 레이블(y_nationality)로 이루어진 딕셔너리
        row = self._target_df.iloc[index]

        surname_matrix = \
            self._vectorizer.vectorize(row.surname)

        nationality_index = \
            self._vectorizer.nationality_vocab.lookup_token(row.nationality)

        return {'x_surname': surname_matrix,
                'y_nationality': nationality_index}

    def get_num_batches(self, batch_size): # 배치 크기가 주어지면 데이터셋으로 만들 수 있는 배치 개수를 반환
                                           # batch_size (int)
                                           # 반환값 : 배치 개수
        return len(self) // batch_size

    
def generate_batches(dataset, batch_size, shuffle=True, # 파이토치 DataLoader를 감싸고 있는 제너레이터 함수, 각 텐서를 지정된 장치로 이동
                     drop_last=True, device="cpu"):
    dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
                            shuffle=shuffle, drop_last=drop_last)

    for data_dict in dataloader:
        out_data_dict = {}
        for name, tensor in data_dict.items():
            out_data_dict[name] = data_dict[name].to(device)
        yield out_data_dict

In [12]:
class SurnameClassifier(nn.Module):
    def __init__(self, initial_num_channels, num_classes, num_channels): # initial_num_channels (int): 입력 특성 벡터의 크기
                                                                         # num_classes (int): 출력 예측 벡터의 크기
                                                                         # num_channels (int): 신경망 전체에 사용될 채널 크기
        super(SurnameClassifier, self).__init__()
        
        self.convnet = nn.Sequential( #sequential : 연속적인 선형연산을 캡슐화 해주는 래퍼 클래스. 여기선 연속된 Conv1d층을 캡슐화
            nn.Conv1d(in_channels=initial_num_channels, 
                      out_channels=num_channels, kernel_size=3),
            nn.ELU(), # Relu와 비슷한 비선형 함수지만 0 이하를 지수적으로 감소. 합성곱 층 사이에 쓰기 좋은 비선형 함수임.
            nn.Conv1d(in_channels=num_channels, out_channels=num_channels, 
                      kernel_size=3, stride=2),
            nn.ELU(),
            nn.Conv1d(in_channels=num_channels, out_channels=num_channels, 
                      kernel_size=3, stride=2),
            nn.ELU(),
            nn.Conv1d(in_channels=num_channels, out_channels=num_channels, 
                      kernel_size=3),
            nn.ELU()
        )
        self.fc = nn.Linear(num_channels, num_classes)

    def forward(self, x_surname, apply_softmax=False): # 모델의 정방향 계산
                                                       # x_surname (torch.Tensor): 입력 데이터 텐서. x_surname.shape은 (batch, initial_num_channels, max_surname_length)
                                                       # apply_softmax (bool): 소프트맥스 활성화 함수를 위한 플래그. 크로스-엔트로피 손실을 사용하려면 False로 지정
                                                       # 반환값: 결과 텐서. tensor.shape은 (batch, num_classes)
        features = self.convnet(x_surname).squeeze(dim=2)
       
        prediction_vector = self.fc(features)

        if apply_softmax:
            prediction_vector = F.softmax(prediction_vector, dim=1)

        return prediction_vector

In [13]:
def make_train_state(args):
    return {'stop_early': False,
            'early_stopping_step': 0,
            'early_stopping_best_val': 1e8,
            'learning_rate': args.learning_rate,
            'epoch_index': 0,
            'train_loss': [],
            'train_acc': [],
            'val_loss': [],
            'val_acc': [],
            'test_loss': -1,
            'test_acc': -1,
            'model_filename': args.model_state_file}

In [14]:
def update_train_state(args, model, train_state): # 훈련 상태를 업데이트
                                                  # args: 메인 매개변수
                                                  # model: 훈련할 모델
                                                  # train_state: 훈련 상태를 담은 딕셔너리
                                                  # 반환값: 새로운 훈련 상태
    
    # 적어도 한 번 모델을 저장
    if train_state['epoch_index'] == 0:
        torch.save(model.state_dict(), train_state['model_filename'])
        train_state['stop_early'] = False

    # 성능이 향상되면 모델을 저장
    elif train_state['epoch_index'] >= 1:
        loss_tm1, loss_t = train_state['val_loss'][-2:]

        # 손실이 나빠지면
        if loss_t >= train_state['early_stopping_best_val']:
            # 조기 종료 단계 업데이트
            train_state['early_stopping_step'] += 1
        # 손실이 감소하면
        else:
            # 최상의 모델 저장
            if loss_t < train_state['early_stopping_best_val']:
                torch.save(model.state_dict(), train_state['model_filename'])

            # 조기 종료 단계 재설정
            train_state['early_stopping_step'] = 0

        # 조기 종료 여부 확인
        train_state['stop_early'] = \
            train_state['early_stopping_step'] >= args.early_stopping_criteria

    return train_state

In [15]:
def compute_accuracy(y_pred, y_target):
    y_pred_indices = y_pred.max(dim=1)[1]
    n_correct = torch.eq(y_pred_indices, y_target).sum().item()
    return n_correct / len(y_pred_indices) * 100

In [16]:
args = Namespace(
    # 날짜와 경로 정보
    surname_csv="data/surnames/surnames_with_splits.csv",
    vectorizer_file="vectorizer.json",
    model_state_file="model.pth",
    save_dir="model_storage/ch4/cnn",
    # 모델 하이퍼파라미터
    hidden_dim=100,
    num_channels=256,
    # 훈련 하이퍼파라미터
    seed=1337,
    learning_rate=0.001,
    batch_size=128,
    num_epochs=100,
    early_stopping_criteria=5,
    dropout_p=0.1,
    # 실행 옵션
    cuda=True,
    reload_from_files=False,
    expand_filepaths_to_save_dir=True,
    catch_keyboard_interrupt=True
)


if args.expand_filepaths_to_save_dir:
    args.vectorizer_file = os.path.join(args.save_dir,
                                        args.vectorizer_file)

    args.model_state_file = os.path.join(args.save_dir,
                                         args.model_state_file)
    
    print("파일 경로: ")
    print("\t{}".format(args.vectorizer_file))
    print("\t{}".format(args.model_state_file))
    
# CUDA 체크
if not torch.cuda.is_available():
    args.cuda = False

args.device = torch.device("cuda" if args.cuda else "cpu")
print("CUDA 사용여부: {}".format(args.cuda))

def set_seed_everywhere(seed, cuda):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if cuda:
        torch.cuda.manual_seed_all(seed)
        
def handle_dirs(dirpath):
    if not os.path.exists(dirpath):
        os.makedirs(dirpath)
        
# 재현성을 위해 시드 설정
set_seed_everywhere(args.seed, args.cuda)

# 디렉토리 처리
handle_dirs(args.save_dir)

파일 경로: 
	model_storage/ch4/cnn/vectorizer.json
	model_storage/ch4/cnn/model.pth
CUDA 사용여부: False


In [18]:
!mkdir data
!wget https://git.io/JtaFp -O data/download.py
!wget https://git.io/Jtabe -O data/get-all-data.sh
!chmod 755 data/get-all-data.sh
%cd data
!./get-all-data.sh
%cd ..

--2022-09-11 12:16:19--  https://git.io/JtaFp
Resolving git.io (git.io)... 140.82.112.21
Connecting to git.io (git.io)|140.82.112.21|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/rickiepark/nlp-with-pytorch/main/chapter_4/4_2_mlp_surnames/data/download.py [following]
--2022-09-11 12:16:19--  https://raw.githubusercontent.com/rickiepark/nlp-with-pytorch/main/chapter_4/4_2_mlp_surnames/data/download.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1572 (1.5K) [text/plain]
Saving to: ‘data/download.py’


2022-09-11 12:16:19 (24.9 MB/s) - ‘data/download.py’ saved [1572/1572]

--2022-09-11 12:16:20--  https://git.io/Jtabe
Resolving git.io (git.io)... 140.82.112.21
Connecting to git.io (git.io)|1

In [19]:
if args.reload_from_files:
    # 체크포인트에서 훈련을 다시 시작
    dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
                                                              args.vectorizer_file)
else:
    # 데이터셋과 Vectorizer 만들기
    dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
    dataset.save_vectorizer(args.vectorizer_file)
    
vectorizer = dataset.get_vectorizer()

classifier = SurnameClassifier(initial_num_channels=len(vectorizer.surname_vocab), 
                               num_classes=len(vectorizer.nationality_vocab),
                               num_channels=args.num_channels)

classifer = classifier.to(args.device)
dataset.class_weights = dataset.class_weights.to(args.device)

loss_func = nn.CrossEntropyLoss(weight=dataset.class_weights)
optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
                                           mode='min', factor=0.5,
                                           patience=1)

train_state = make_train_state(args)

In [20]:
if args.reload_from_files:
    # 체크포인트에서 훈련을 다시 시작
    dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
                                                              args.vectorizer_file)
else:
    # 데이터셋과 Vectorizer 만들기
    dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
    dataset.save_vectorizer(args.vectorizer_file)
    
vectorizer = dataset.get_vectorizer()

classifier = SurnameClassifier(initial_num_channels=len(vectorizer.surname_vocab), 
                               num_classes=len(vectorizer.nationality_vocab),
                               num_channels=args.num_channels)

classifer = classifier.to(args.device)
dataset.class_weights = dataset.class_weights.to(args.device)

loss_func = nn.CrossEntropyLoss(weight=dataset.class_weights)
optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
                                           mode='min', factor=0.5,
                                           patience=1)

train_state = make_train_state(args)

In [21]:
epoch_bar = tqdm.notebook.tqdm(desc='training routine', 
                               total=args.num_epochs,
                               position=0)

dataset.set_split('train')
train_bar = tqdm.notebook.tqdm(desc='split=train',
                               total=dataset.get_num_batches(args.batch_size), 
                               position=1, 
                               leave=True)
dataset.set_split('val')
val_bar = tqdm.notebook.tqdm(desc='split=val',
                             total=dataset.get_num_batches(args.batch_size), 
                             position=1, 
                             leave=True)

try:
    for epoch_index in range(args.num_epochs):
        train_state['epoch_index'] = epoch_index

        # 훈련 세트에 대한 순회

        # 훈련 세트와 배치 제너레이터 준비, 손실과 정확도를 0으로 설정
        dataset.set_split('train')
        batch_generator = generate_batches(dataset, 
                                           batch_size=args.batch_size, 
                                           device=args.device)
        running_loss = 0.0
        running_acc = 0.0
        classifier.train()

        for batch_index, batch_dict in enumerate(batch_generator):
            # 훈련 과정은 5단계

            # --------------------------------------
            # 단계 1. 그레이디언트를 0으로 초기화
            optimizer.zero_grad()

            # 단계 2. 출력 계산
            y_pred = classifier(batch_dict['x_surname'])

            # 단계 3. 손실 계산
            loss = loss_func(y_pred, batch_dict['y_nationality'])
            loss_t = loss.item()
            running_loss += (loss_t - running_loss) / (batch_index + 1)

            # 단계 4. 손실을 사용해 그레이디언트 계산
            loss.backward()

            # 단계 5. 옵티마이저로 가중치 업데이트
            optimizer.step()
            # -----------------------------------------

            # 정확도 계산
            acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
            running_acc += (acc_t - running_acc) / (batch_index + 1)

            # 진행 바 업데이트
            train_bar.set_postfix(loss=running_loss, acc=running_acc, 
                            epoch=epoch_index)
            train_bar.update()

        train_state['train_loss'].append(running_loss)
        train_state['train_acc'].append(running_acc)

        # 검증 세트에 대한 순회

        # 검증 세트와 배치 제너레이터 준비, 손실과 정확도를 0으로 설정
        dataset.set_split('val')
        batch_generator = generate_batches(dataset, 
                                           batch_size=args.batch_size, 
                                           device=args.device)
        running_loss = 0.
        running_acc = 0.
        classifier.eval()

        for batch_index, batch_dict in enumerate(batch_generator):

            # 단계 1. 출력 계산
            y_pred =  classifier(batch_dict['x_surname'])

            # 단계 2. 손실 계산
            loss = loss_func(y_pred, batch_dict['y_nationality'])
            loss_t = loss.item()
            running_loss += (loss_t - running_loss) / (batch_index + 1)

            # 단계 3. 정확도 계산
            acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
            running_acc += (acc_t - running_acc) / (batch_index + 1)
            val_bar.set_postfix(loss=running_loss, acc=running_acc, 
                            epoch=epoch_index)
            val_bar.update()

        train_state['val_loss'].append(running_loss)
        train_state['val_acc'].append(running_acc)

        train_state = update_train_state(args=args, model=classifier,
                                         train_state=train_state)

        scheduler.step(train_state['val_loss'][-1])

        if train_state['stop_early']:
            break

        train_bar.n = 0
        val_bar.n = 0
        epoch_bar.update()
except KeyboardInterrupt:
    print("Exiting loop")

training routine:   0%|          | 0/100 [00:00<?, ?it/s]

split=train:   0%|          | 0/60 [00:00<?, ?it/s]

split=val:   0%|          | 0/12 [00:00<?, ?it/s]

In [22]:
classifier.load_state_dict(torch.load(train_state['model_filename']))

classifier = classifier.to(args.device)
dataset.class_weights = dataset.class_weights.to(args.device)
loss_func = nn.CrossEntropyLoss(dataset.class_weights)

dataset.set_split('test')
batch_generator = generate_batches(dataset, 
                                   batch_size=args.batch_size, 
                                   device=args.device)
running_loss = 0.
running_acc = 0.
classifier.eval()

for batch_index, batch_dict in enumerate(batch_generator):
    # 출력 계산
    y_pred =  classifier(batch_dict['x_surname'])
    
    # 손실 계산
    loss = loss_func(y_pred, batch_dict['y_nationality'])
    loss_t = loss.item()
    running_loss += (loss_t - running_loss) / (batch_index + 1)

    # 정확도 계산
    acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
    running_acc += (acc_t - running_acc) / (batch_index + 1)

train_state['test_loss'] = running_loss
train_state['test_acc'] = running_acc

In [23]:
print("테스트 손실: {};".format(train_state['test_loss']))
print("테스트 정확도: {}".format(train_state['test_acc']))

테스트 손실: 1.8036395013332365;
테스트 정확도: 55.208333333333336


In [24]:
def predict_nationality(surname, classifier, vectorizer): # 새로운 성씨로 국적 예측
                                                          # surname (str): 분류할 성씨
                                                          # classifier (SurnameClassifer): 분류기 객체
                                                          # vectorizer (SurnameVectorizer): SurnameVectorizer 객체
                                                          # 반환값: 가장 가능성이 높은 국적과 확률로 구성된 딕셔너리
    vectorized_surname = vectorizer.vectorize(surname)
    vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0) # 새로운 데이터 텐서에 배치 차원을 추가할 때, size=1인 배치 차원을 추가
    result = classifier(vectorized_surname, apply_softmax=True)

    probability_values, indices = result.max(dim=1)
    index = indices.item()

    predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
    probability_value = probability_values.item()

    return {'nationality': predicted_nationality, 'probability': probability_value}

In [25]:
new_surname = input("분류하려는 성씨를 입력하세요: ")
classifier = classifier.cpu()
prediction = predict_nationality(new_surname, classifier, vectorizer)
print("{} -> {} (p={:0.2f})".format(new_surname,
                                    prediction['nationality'],
                                    prediction['probability']))

분류하려는 성씨를 입력하세요: Kim
Kim -> Korean (p=0.73)


In [26]:
def predict_topk_nationality(surname, classifier, vectorizer, k=5): # 새로운 성씨에 대한 최상위 K개 국적을 예측
                                                                    # surname (str): 분류하려는 성씨
                                                                    # classifier (SurnameClassifer): 분류기 객체
                                                                    # vectorizer (SurnameVectorizer): SurnameVectorizer 객체
                                                                    # k (int) : 최상위 몇 개를 보여줄 것인지
                                                                    # 딕셔너리 리스트, 각 딕셔너리는 국적과 확률로 구성
    vectorized_surname = vectorizer.vectorize(surname)
    vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(dim=0)
    prediction_vector = classifier(vectorized_surname, apply_softmax=True)
    probability_values, indices = torch.topk(prediction_vector, k=k)
    
    # 반환되는 크기는 (1,k)
    probability_values = probability_values[0].detach().numpy()
    indices = indices[0].detach().numpy()
    
    results = []
    for kth_index in range(k):
        nationality = vectorizer.nationality_vocab.lookup_index(indices[kth_index])
        probability_value = probability_values[kth_index]
        results.append({'nationality': nationality, 
                        'probability': probability_value})
    return results

new_surname = input("분류하려는 성씨를 입력하세요: ")

k = int(input("얼마나 많은 예측을 보고 싶나요? "))
if k > len(vectorizer.nationality_vocab):
    print("앗! 전체 국적 개수보다 큰 값을 입력했습니다. 모든 국적에 대한 예측을 반환합니다. :)")
    k = len(vectorizer.nationality_vocab)
    
predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)

print("최상위 {}개 예측:".format(k))
print("===================")
for prediction in predictions:
    print("{} -> {} (p={:0.2f})".format(new_surname,
                                        prediction['nationality'],
                                        prediction['probability']))

분류하려는 성씨를 입력하세요: Kim
얼마나 많은 예측을 보고 싶나요? 15
최상위 15개 예측:
Kim -> Korean (p=0.73)
Kim -> Chinese (p=0.14)
Kim -> German (p=0.03)
Kim -> English (p=0.03)
Kim -> Scottish (p=0.03)
Kim -> Czech (p=0.01)
Kim -> Dutch (p=0.01)
Kim -> Japanese (p=0.01)
Kim -> Polish (p=0.00)
Kim -> Russian (p=0.00)
Kim -> Vietnamese (p=0.00)
Kim -> Arabic (p=0.00)
Kim -> Irish (p=0.00)
Kim -> Spanish (p=0.00)
Kim -> French (p=0.00)


# 추가내용
* 풀링 : 고차원 특성 맵을 저차원 특성 맵으로 요약하는 연산
 * 합성곱의 출력은 특성 맵인데, 특성 맵의 값은 입력의 일부 영역을 요약함. 합성곱 연산의 중첩되는 특징 때문에 많은 특성이 중복될 수 있음.   
 → 풀링은 고차원이고 중복 가능성이 높은 특성 맵을 저차원으로 요약함, 통계적으로 약하고 크기가 큰 특성 맵을 강하고 작은 특성 맵으로 개선할 수 있음
 * 특성 맵의 국부적인 영역에 적용하는 합(합 풀링), 평균(평균 풀링), 최댓값(최대 풀링)과 같은 산술 연산
* 배치 정규화 : CNN을 만들 때 자주 사용되는 층.
 * CNN의 출력에 적용되어 활성화값의 평균이 0이고, 단위 분산이 되도록 만듦. 
 * 파라미터 초기화에 덜 민감하게 만들고 학습률 튜닝을 단순화함
* NiN 연결 (1x1 합성곱) : kernel_size=1인 합성곱 커널을 사용
 * 완전 연결층과 비슷 → 채널이 많은 특성 맵을 얕은 특성 맵으로 매핑하는데 유용
 * 소량의 파라미터로 비선형성을 추가로 주입할 수 있는 저렴한 방법
* 잔차 연결(스킵 연결) : 원본 행렬에 합성곱의 출력을 더하는 방법
 * 잔차 블록의 출력 = conv (입력) + 입력