- 해당 노트북은 "Character-Aware Neural Language Models" 논문을 기반으로 합니다.
- https://github.com/FengZiYjun/CharLM/blob/master/model.py 사이트를 참고하였습니다.

In [260]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import torch.nn.functional as F

from torchtext.datasets import TranslationDataset, Multi30k
from torchtext.data import Field, BucketIterator
from torchtext import data
from torchtext import datasets

import spacy

import random
import math
import os
import time

SEED = 1

random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

## Declaring the Fields
- Torchtext 는 데이터를 가져오는 과정에서 선언하는 방식을 사용합니다.
    - 데이터가 어떤 형식을 지닐 것인지에 대한 것에 대해 선언을 해주고 이에 따라 torchtext 는 데이터를 로딩합니다.

## Constructing the Dataset
- fields 객체는 raw data 를 어떻게 가져올 지에 대한 선언이 담겨있습니다.
- TabularDataset 객체를 통해서, 어디서, 어떤 데이터를 가져올 지에 대해 선언을 해줍니다.
- 아래의 소스코드를 통해 형성된 객체는 generator 의 형태를 띕니다.

In [261]:
# set up fields
TEXT = data.Field(sequential=True, lower=True)
LABEL = data.Field(sequential=False)

# make splits for data
train, test = datasets.IMDB.splits(TEXT, LABEL)

# build the vocabulary
TEXT.build_vocab(train);TEXT.build_vocab(test)
LABEL.build_vocab(train);LABEL.build_vocab(test)

# make iterator for splits
train_iter, test_iter = data.BucketIterator.splits(
    (train, test), batch_size=3, device='cpu')

- TabularDatset 을 통해서, tokenizing 까지는 되었지만, word_to_integar process는 아직 이뤄지지 않았습니다. 
- 우리의 경우, train , text 데이터 셋에 대해서 TEXT 부분에 대해서, word_to_integar converting이 필요합니다.
- `TEXT.build_vocab(trn)` 이라는 코드를 통해, converting이 가능합니다.
- 위의 연산은 모든 training set에 있는 모든 엘리먼트들을 torchtext로 만들어줍니다. Torchtext는 vocabulary를 핸들링하는 Vocab이라는 클래스를 가지고 있습니다. Vocab클래스는 word와 id를 stoi attribute에서 mapping 시켜주고, itos attribute에서는 reverse mapping시켜줍니다.
- stoi : word_to_idx default dictionary 
- itos : word list

word dictionary를 만들어줍니다.

In [262]:
word_to_idx_dict = TEXT.vocab.stoi
idx_to_word_dict = {val:idx for idx,val in word_to_idx_dict.items()}

character dictionary를 만들어줍니다.

In [263]:
word_list = list(word_to_idx_dict.keys())[3:]
char_dict = {}
char_dict['<unk>'] = 0
char_dict['<pad>'] = 1
count = 1

for word in word_list : 
    for char in word : 
        if char not in char_dict.keys() :
            count += 1
            char_dict[char] = count
char_dict['<unk>'] = 0
char_dict['<pad>'] = 1

In [264]:
batch = next(iter(train_iter))
batch.text.size()

torch.Size([427, 3])

In [265]:
class CharacterIndex_for_SINGLE() : 
    
    def __init__(self,idx_to_word_dict,char_dict,max_length=10,batch_size=3) :
    
        self.batch_size = batch_size
        self.idx_to_word_dict = idx_to_word_dict
        self.char_dict = char_dict
        self.max_length = max_length
    def return_char_idx(self,text) : 

        inputs_ = []
    
        for idx1 in range(batch_size) : 
            inputs_.append([char_dict[i] for i in idx_to_word_dict[text[idx1].item()]])
        
        for idx,val in enumerate(inputs_) : 
            if len(val) <= self.max_length :
                inputs_[idx] = val + [1]*(self.max_length - len(val))
            else : 
                inputs_[idx] = val[:self.max_length]
        
        t = torch.tensor(inputs_)
        return t

In [266]:
char_idx = CharacterIndex_for_SINGLE(idx_to_word_dict,char_dict)

In [267]:
char_idx.return_char_idx(batch.text[0])

tensor([[ 8, 25, 19, 20,  1,  1,  1,  1,  1,  1],
        [27, 15,  8, 18,  2, 22,  1,  1,  1,  1],
        [ 8,  1,  1,  1,  1,  1,  1,  1,  1,  1]])

In [268]:
class Highway(nn.Module):
    """Highway network"""
    def __init__(self, input_size):
        super(Highway, self).__init__()
        self.fc1 = nn.Linear(input_size, input_size, bias=True)
        self.fc2 = nn.Linear(input_size, input_size, bias=True)

    def forward(self, x):
        x = torch.sigmoid(self.fc1(x))
        return torch.mul(x, F.relu(self.fc2(x))) + torch.mul(1-x, x)

class Char_CNN_for_SINGLE(nn.Module) : 
    
    def __init__(self,word_dict,character_dict,idx_to_word_dict,char_embed_size,lstm_hidden_size,\
                 kernel_size,num_filter,dropout,num_layers) : 
        
        super(Char_CNN_for_SINGLE, self).__init__()
        
        self.word_vocab_size = len(word_dict) 
        self.char_vocab_size = len(character_dict)
        self.idx_to_word_dict = idx_to_word_dict
        self.char_embed_size = char_embed_size 
        self.lstm_hidden_size = lstm_hidden_size 
        self.dropout = dropout 
        self.char_to_idx = CharacterIndex_for_SINGLE(idx_to_word_dict,character_dict)
        
        if type(kernel_size) !=list :
            self.kernel_size = list(kernel_size) # kernel의 사이즈로, 여러개의 kernel_size를 리스트 형태로 넣어줄 수 있습니다.
        else : self.kernel_size = kernel_size # 많을 수록 complex해집니다.
            
        self.num_filter = num_filter # 각각의 kernel 이 몇 개씩 있는지에 대한 파라미터입니다. 많을 수록 complex해집니다.

        self.embedding = nn.Embedding(
            num_embeddings = self.char_vocab_size,
            embedding_dim = char_embed_size,
            padding_idx = 1) 
        
        self.convs = nn.ModuleList([(nn.Conv2d(in_channels = 1,out_channels = self.num_filter,\
        kernel_size = (kernel,self.char_embed_size))) for kernel in self.kernel_size])
    
        self.highway = Highway(len(kernel_size)*num_filter)
        
        self.lstm = nn.LSTM(input_size=len(kernel_size)*num_filter, 
                hidden_size=lstm_hidden_size, 
                num_layers=num_layers,
                dropout=dropout,
                batch_first=True)
        
        self.fc = nn.Linear(lstm_hidden_size,len(self.idx_to_word_dict))
        
    def forward(self,x,hidden) : 
                
        # x : [batch_size]
        char_x = self.char_to_idx.return_char_idx(x) # char_idx : [batch_size, word_length]
        
        (batch_size,word_length) = char_x.size()
        
        embed = self.embedding(char_x) # embed : [batch_size,word_length, embed_dim]
        embed = embed.unsqueeze(1) # embed : [batch_size*sent_length,1,word_length, embed_dim]
        
        convolution = [conv(embed).squeeze(3) for conv in self.convs]
#         [torch.Size([batch_size, num_filter, filter_width+embed_dim-1])

        pooled = [F.max_pool1d(conv,(conv.size(2))).squeeze(2) for conv in convolution]
#         [torch.Size([batch_size, num_filter])

        cat_mat = torch.cat(pooled,dim=1)
#         [torch.Size([batch_size, num_filter * len(kernel_size)])

        highway_net = self.highway(cat_mat)
#         [torch.Size([batch_size, num_filter * len(kernel_size)])

        outputs,hidden = self.lstm(highway_net.unsqueeze(1),hidden)
#       [batch_size, 1, lstm_hidden_size]
#       [num_layer, batch_size, lstm_hidden_size]
        fc_mat = outputs.squeeze(1)
#       [batch_size, lstm_hidden_size]
        return self.fc(fc_mat),hidden
#       self.fc(fc_mat) : [batch_size,char_vocab_size]
#       hidden : [num_layer, batch_size, char_hidden_dim]

In [269]:
word_dict = word_to_idx_dict
character_dict = char_dict
idx_to_word_dict = idx_to_word_dict
char_embed_size = 15
lstm_hidden_size = 256
kernel_size = [1, 2, 3, 4]
num_filter = 25
dropout = 0.5
num_layers = 2

model = \
Char_CNN_for_SINGLE(word_to_idx_dict,char_dict,idx_to_word_dict\
         ,char_embed_size,lstm_hidden_size,kernel_size,num_filter,dropout,num_layers)
model

Char_CNN_for_SINGLE(
  (embedding): Embedding(147, 15, padding_idx=1)
  (convs): ModuleList(
    (0): Conv2d(1, 25, kernel_size=(1, 15), stride=(1, 1))
    (1): Conv2d(1, 25, kernel_size=(2, 15), stride=(1, 1))
    (2): Conv2d(1, 25, kernel_size=(3, 15), stride=(1, 1))
    (3): Conv2d(1, 25, kernel_size=(4, 15), stride=(1, 1))
  )
  (highway): Highway(
    (fc1): Linear(in_features=100, out_features=100, bias=True)
    (fc2): Linear(in_features=100, out_features=100, bias=True)
  )
  (lstm): LSTM(100, 256, num_layers=2, batch_first=True, dropout=0.5)
  (fc): Linear(in_features=256, out_features=247632, bias=True)
)

In [270]:
batch_size = 3

hidden = torch.zeros(num_layers,batch_size,lstm_hidden_size)
cell = torch.zeros(num_layers,batch_size,lstm_hidden_size)
hiddens = (hidden,cell)

outputs,hidden = model(batch.text[0],hiddens)
outputs.size()

torch.Size([3, 247632])

In [278]:
class Char_LM(nn.Module):
    def __init__(self, model, batch_size, lstm_hidden_size, idx_to_word_dict,device):
        super().__init__()
        
        self.device = device
        self.num_layers = 2
        self.batch_size = batch_size
        self.lstm_hidden_size = lstm_hidden_size
        self.model = model
        self.word_vocab_size = len(idx_to_word_dict)
        
    def forward(self, x, teacher_forcing_ratio=0.5):
        
        max_len = x.size()[0]
        
        hidden = torch.zeros(self.num_layers,self.batch_size,self.lstm_hidden_size)
        cell = torch.zeros(self.num_layers,self.batch_size,self.lstm_hidden_size)
        hiddens = (hidden,cell)
        
        input_ = x[0]
        outputs = torch.zeros(max_len, batch_size, self.word_vocab_size).to(self.device)
        
        for t in range(1, max_len):
            # for 문이 돈다는 것은, many-to-many의 네트워크가 한 칸씩 옆으로 이동한다는 뜻과 같습니다.
            output, hiddens = self.model(input_,hiddens)
#            output'dimension : [batch_size , output_dim], 여기서 output_dim 은 출현 가능한 모든 target lang 의 수 입니다.
            outputs[t] = output
    
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.max(1)[1] # 해당 글자의 numericalized index 를 넣어주어야 합니다.
            input_ = (x[t] if teacher_force else top1)
            
        return outputs
    # [sent_length, batch_size, char_vocab_size]

In [281]:
final_model = Char_LM(model,batch_size,lstm_hidden_size,idx_to_word_dict,'cpu')

In [282]:
final_model(batch.text).size()

torch.Size([427, 3, 247632])

In [292]:
class fit() : 
    
    def __init__(self, model, train_iter, test_iter, epoch = 5) : 
        
        self.optimizer = optim.Adam(model.parameters())
        # <pad> 토큰은 임베딩 벡터와, loss_function에 argument 로 들어가서, training 과정에서 제외됩니다.
        self.pad_idx = 1 
        self.criterion = nn.CrossEntropyLoss(ignore_index=self.pad_idx)
        self.device = 'cpu'
#         self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.model = model.to(self.device)
        self.train_iter = train_iter
        self.test_iter = test_iter
        self.epoch = epoch
            
    def train(self,clip):
    
        epoch_loss = 0 # loss per epoch
        self.model.train()
        
        for i, batch in enumerate(self.train_iter):
            print('train batch : ',i,end='\r')
            src = batch.text

            self.optimizer.zero_grad()

            output = self.model(src)        

            loss_output = output[1:].view(-1, output.shape[-1])
            loss_trg = src[1:].view(-1)
            # sos 토큰을 제외하고, 차원을 맞춘 후에, output을 변수에 저장해줍니다.
            
            loss = self.criterion(loss_output, loss_trg)
            loss.backward()

            torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
            # gradient clipping
            self.optimizer.step()
            
            epoch_loss += loss.item()
        
        return epoch_loss / len(self.train_iter)
    
    def test(self):
    
        epoch_loss = 0 # loss per epoch
        self.model.eval()
        
        for i, batch in enumerate(self.test_iter):
            print('test batch : ',i,end='\r')
            src = batch.src

            output = self.model(src)        

            loss_output = output[1:].view(-1, output.shape[-1])
            loss_trg = src[1:].view(-1)
            # sos 토큰을 제외하고, 차원을 맞춘 후에, output을 변수에 저장해줍니다.
            
            loss = self.criterion(loss_output, loss_trg)
            epoch_loss += loss.item()
        
        return epoch_loss / len(self.test_iter)

    
    
    def fit_by_iterate(self,clip) : 
        
        for epoch in range(self.epoch):
            print('epoch : ',epoch + 1)
            train_loss= self.train(clip)
            print("training loss : {}".format(train_loss))
            
            if epoch == self.epoch :  #마지막에 test를 실행합니다.
                test_loss = self.test()
                print('last test : {}'.format(test_loss))
                
            if (epoch % 5 == 0) and (epoch != 0): #5의 배수 epoch마다 test를 실행합니다.
                test_loss = self.test()
                print('testing loss : {}'.format(test_loss))

In [None]:
fitting_process = fit(final_model,train_iter,test_iter,epoch=5)
fitting_process.fit_by_iterate(1)

epoch :  1
train batch :  0