### 4.1 다층 퍼셉트론 (MLP)

* 파이토치로 MLP 구현하기

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
class MultilayerPerceptron(nn.Module):
    
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(MultilayerPerceptron, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x_in, apply_softmax=False): # crossEntropy 사용하려면 False
        intermediate = F.relu(self.fc1(x_in))
        output = self.fc2(intermediate)
        
        if apply_softmax:
            output = F.softmax(output, dim=1)
        return output

In [3]:
batch_size = 2 # 한 번에 입력할 샘플 개수
input_dim = 3
hidden_dim = 100
output_dim = 4

# 모델 생성
mlp = MultilayerPerceptron(input_dim, hidden_dim, output_dim)
print(mlp)

MultilayerPerceptron(
  (fc1): Linear(in_features=3, out_features=100, bias=True)
  (fc2): Linear(in_features=100, out_features=4, bias=True)
)


In [4]:
def describe(x):
    print("타입: {}".format(x.type()))
    print("크기: {}".format(x.shape))
    print("값: \n{}".format(x))

In [5]:
# random 입력
x_input = torch.rand(batch_size, input_dim)
describe(x_input)

타입: torch.FloatTensor
크기: torch.Size([2, 3])
값: 
tensor([[0.0977, 0.5046, 0.1484],
        [0.2781, 0.1258, 0.5622]])


In [6]:
y_output = mlp(x_input, apply_softmax=False)
describe(y_output)

타입: torch.FloatTensor
크기: torch.Size([2, 4])
값: 
tensor([[ 0.0324,  0.2516,  0.0643,  0.0892],
        [-0.1216,  0.3439, -0.0473,  0.1324]], grad_fn=<AddmmBackward0>)


### 4.2 예제 : MLP로 성씨 분류하기

In [7]:
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import tqdm
import pandas as pd

In [10]:
class Vocabulary(object):

    def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
        if token_to_idx is None:
            token_to_idx = {}
        self._token_to_idx = token_to_idx

        self._idx_to_token = {idx: token 
                              for token, idx in self._token_to_idx.items()}
        
        self._add_unk = add_unk
        self._unk_token = unk_token
        
        self.unk_index = -1
        if add_unk:
            self.unk_index = self.add_token(unk_token) 
        
        
    def to_serializable(self):
        return {'token_to_idx': self._token_to_idx, 
                'add_unk': self._add_unk, 
                'unk_token': self._unk_token}

    @classmethod
    def from_serializable(cls, contents):
        return cls(**contents)
    
    def add_token(self, token) :
        try:
            index = self._token_to_idx[token]
        except KeyError:
            index = len(self._token_to_idx)
            self._token_to_idx[token] = index
            self._idx_to_token[index] = token
        return index
    
    def add_many(self, tokens):
        return [self.add_token(token) for token in tokens]

    def lookup_token(self, token):
        if self.unk_index >= 0:
            return self._token_to_idx.get(token, self.unk_index)
        else:
            return self._token_to_idx[token]
        
    def lookup_index(self, index):
        if index not in self._idx_to_token:
            raise KeyError("Vocabulary에 인덱스(%d)가 없습니다." % index)
        return self._idx_to_token[index]

    def __str__(self):
        return "<Vocabulary(size=%d)>" % len(self)

    def __len__(self):
        return len(self._token_to_idx)

In [11]:
class SurnameVectorizer(object):

    def __init__(self, surname_vocab, nationality_vocab):
        self.surname_vocab = surname_vocab # 문자를 정수에 매핑
        self.nationality_vocab = nationality_vocab # 국적을 정수에 매핑

    def vectorize(self, surname):
        vocab = self.surname_vocab
        one_hot = np.zeros(len(vocab), dtype=np.float32)
        for token in surname:
            one_hot[vocab.lookup_token(token)] = 1

        return one_hot

    @classmethod
    def from_dataframe(cls, surname_df):
        surname_vocab = Vocabulary(unk_token="@")
        nationality_vocab = Vocabulary(add_unk=False)

        for index, row in surname_df.iterrows():
            for letter in row.surname:
                surname_vocab.add_token(letter)
            nationality_vocab.add_token(row.nationality)

        return cls(surname_vocab, nationality_vocab)

    @classmethod
    def from_serializable(cls, contents):
        surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
        nationality_vocab =  Vocabulary.from_serializable(contents['nationality_vocab'])
        return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab)

    def to_serializable(self):
        return {'surname_vocab': self.surname_vocab.to_serializable(),
                'nationality_vocab': self.nationality_vocab.to_serializable()}

In [13]:
class SurnameDataset(Dataset):
    def __init__(self, surname_df, vectorizer):
        self.surname_df = surname_df
        self._vectorizer = vectorizer

        self.train_df = self.surname_df[self.surname_df.split=='train']
        self.train_size = len(self.train_df)

        self.val_df = self.surname_df[self.surname_df.split=='val']
        self.validation_size = len(self.val_df)

        self.test_df = self.surname_df[self.surname_df.split=='test']
        self.test_size = len(self.test_df)

        self._lookup_dict = {'train': (self.train_df, self.train_size),
                             'val': (self.val_df, self.validation_size),
                             'test': (self.test_df, self.test_size)}

        self.set_split('train')
        
        # 클래스 가중치
        class_counts = surname_df.nationality.value_counts().to_dict()
        def sort_key(item):
            return self._vectorizer.nationality_vocab.lookup_token(item[0])
        sorted_counts = sorted(class_counts.items(), key=sort_key)
        frequencies = [count for _, count in sorted_counts]
        self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
        
        @classmethod
        def load_dataset_and_make_vectorizer(cls, surname_csv):
            surname_df = pd.read_csv(surname_csv)
            train_surname_df = surname_df[surname_df.split=='train']
            return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))

        @classmethod
        def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
            surname_df = pd.read_csv(surname_csv)
            vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
            return cls(surname_df, vectorizer)

        @staticmethod
        def load_vectorizer_only(vectorizer_filepath):
            with open(vectorizer_filepath) as fp:
                return SurnameVectorizer.from_serializable(json.load(fp))

        def save_vectorizer(self, vectorizer_filepath):
            with open(vectorizer_filepath, "w") as fp:
                json.dump(self._vectorizer.to_serializable(), fp)

        def get_vectorizer(self):
            return self._vectorizer

        def set_split(self, split="train"):
            self._target_split = split
            self._target_df, self._target_size = self._lookup_dict[split]

        def __len__(self):
            return self._target_size
        def __getitem(self, index) :
            row = self._target_df.iloc[index]
            surname_vector = self._vectorizer.vectorize(row.surname)
            nationality_index = self._vectorizer.nationality_vocab.lookup_token(row.nationality)
            
            return {'x_surname' : surname_vector,
                   'y_nationality' : nationality_index}
        
        def get_num_batches(self, batch_size) :
            return len(self) // batch_size

In [14]:
def generate_batches(dataset, batch_size, shuffle=True,
                     drop_last=True, device="cpu"): 
    dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
                            shuffle=shuffle, drop_last=drop_last)

    for data_dict in dataloader:
        out_data_dict = {}
        for name, tensor in data_dict.items():
            out_data_dict[name] = data_dict[name].to(device)
        yield out_data_dict

In [15]:
# 성씨 분류를 위한 다층 퍼셉트론
class surnameClassifier(nn.Module) :
    def __init__(self, input_dim, hidden_dim, output_dim) :
        super(SurnameClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x_in, apply_softmax=False):
        intermediate_vector = F.relu(self.fc1(x_in))
        prediction_vector = self.fc2(intermediate_vector)

        if apply_softmax:
            prediction_vector = F.softmax(prediction_vector, dim=1)

        return prediction_vector

In [16]:
def make_train_state(args):
    return {'stop_early': False,
            'early_stopping_step': 0,
            'early_stopping_best_val': 1e8,
            'learning_rate': args.learning_rate,
            'epoch_index': 0,
            'train_loss': [],
            'train_acc': [],
            'val_loss': [],
            'val_acc': [],
            'test_loss': -1,
            'test_acc': -1,
            'model_filename': args.model_state_file}

def update_train_state(args, model, train_state):
    # 적어도 한 번 모델을 저장합니다
    if train_state['epoch_index'] == 0:
        torch.save(model.state_dict(), train_state['model_filename'])
        train_state['stop_early'] = False

    # 성능이 향상되면 모델을 저장합니다
    elif train_state['epoch_index'] >= 1:
        loss_tm1, loss_t = train_state['val_loss'][-2:]

        # 손실이 나빠지면
        if loss_t >= train_state['early_stopping_best_val']:
            # 조기 종료 단계 업데이트
            train_state['early_stopping_step'] += 1
        # 손실이 감소하면
        else:
            # 최상의 모델 저장
            if loss_t < train_state['early_stopping_best_val']:
                torch.save(model.state_dict(), train_state['model_filename'])

            # 조기 종료 단계 재설정
            train_state['early_stopping_step'] = 0

        # 조기 종료 여부 확인
        train_state['stop_early'] = \
            train_state['early_stopping_step'] >= args.early_stopping_criteria

    return train_state

def compute_accuracy(y_pred, y_target):
    _, y_pred_indices = y_pred.max(dim=1)
    n_correct = torch.eq(y_pred_indices, y_target).sum().item()
    return n_correct / len(y_pred_indices) * 100