In [1]:
import numpy as np

import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader
from torch import optim
import torch.nn.functional as F

from sklearn.model_selection import train_test_split
import seaborn as sns
from typing import List, Set, Dict, Tuple

In [2]:
def open_process(file_loc):
    file = open(file_loc, 'r')
    file_lines = file.readlines()
    file_lines = [list(map(int, line.split())) for line in file_lines]
    return file_lines

In [3]:
train_source_loc = 'naver/train_source.txt'
train_target_loc = 'naver/train_target.txt'

test_source_loc = 'naver/test_source.txt'
test_target_loc = 'naver/test_target.txt'

In [4]:
SOS_token = 0
EOS_token = 1
PAD_token = -1

train_source = open_process(train_source_loc)
train_target = open_process(train_target_loc)

test_source = open_process(test_source_loc)
test_target = open_process(test_target_loc)

train_src_vocab = set([i for line in train_source for i in line])
train_tgt_vocab = set([i for line in train_target for i in line])

test_src_vocab = set([i for line in test_source for i in line])
test_tgt_vocab = set([i for line in test_target for i in line])

train_src_vocab_dict = {value: idx+2 for idx, value in enumerate(sorted(train_src_vocab))}
train_src_vocab_dict[0] = 'SOS_token'
train_src_vocab_dict[1] = 'EOS_token'
train_tgt_vocab_dict = {value: idx+2 for idx, value in enumerate(sorted(train_tgt_vocab))}
train_tgt_vocab_dict[0] = 'SOS_token'
train_tgt_vocab_dict[1] = 'EOS_token'

test_src_vocab_dict = {value: idx+2 for idx, value in enumerate(sorted(test_src_vocab))}
test_src_vocab_dict[0] = 'SOS_token'
test_src_vocab_dict[1] = 'EOS_token'

test_tgt_vocab_dict = {value: idx+2 for idx, value in enumerate(sorted(test_tgt_vocab))}
train_tgt_vocab_dict[0] = 'SOS_token'
train_tgt_vocab_dict[1] = 'EOS_token'

train_src_vocab_size = len(train_src_vocab)
train_tgt_vocab_size = len(train_tgt_vocab)

test_src_vocab_size = len(test_src_vocab)
test_tgt_vocab_size = len(test_tgt_vocab)

In [5]:
train_source = [[train_src_vocab_dict[i] for i in seq] + [EOS_token] for seq in train_source]
train_target = [[SOS_token] + [train_tgt_vocab_dict[i] for i in seq] for seq in train_target]

test_source = [[test_src_vocab_dict[i] for i in seq] for seq in test_source]
test_target = [[test_tgt_vocab_dict[i] for i in seq] for seq in test_target]

In [7]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        
        self. hid_dim = hid_dim
        self.n_layers = n_layers
        
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout = dropout)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, src):
        # src = [src len, batch_size]
        embedded = self.dropout(self.embedding(src))
        
        # embedded = [src len, batch size, emb dim]
        outputs, (hidden, cell) = self.rnn(embedded)
        
        # outputs = [src len, batch size, hid dim * n directions]
        # hidden = [n layers * n directions, batch size, hid dim]
        # cell = [n layers * n directions, batch size, hid dim]
        
        return hidden, cell
    

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        
        self.output_dim = output_dim
        self.hid_dim = hid_dim
        self.n_layers = n_layers
        
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hid_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, input, hidden, cell):
        # input = [batch size]
        # hidden = [n layers * n directions, batch size, hid dim]
        # cell = [n layers * n directions, batch size, hid dim]
        # Decoder에서 항상 n directions = 1
        # 따라서 hidden = [n layers, batch size, hid dim]
        # context = [n layers, batch size, hid dim]
        
        # input = [1, batch size]
        input = input.unsqueeze(0)
        
        # embedded = [1, batch size, emb dim]
        embedded = self.dropout(self.embedding(input))
        
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        
        # output = [seq len, batch size, hid dim * n directions]
        # hidden = [n layers * n directions, batch size, hid dim]
        # cell = [n layers * n directions, batch size, hid dim]
        
        # Decoder에서 항상 seq len = n directions = 1 
        # 한 번에 한 토큰씩만 디코딩하므로 seq len = 1
        # 따라서 output = [1, batch size, hid dim]
        # hidden = [n layers, batch size, hid dim]
        # cell = [n layers, batch size, hid dim]
        
        # prediction = [batch size, output dim]
        prediction = self.fc_out(output.squeeze(0))
        
        return prediction, hidden, cell

In [8]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.device = device

        # Encoder와 Decoder의 hidden dim이 같아야 함
        assert encoder.hid_dim == decoder.hid_dim
        # Encoder와 Decoder의 layer 개수가 같아야 함
        assert encoder.n_layers == decoder.n_layers

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
       # src = [src len, batch size]
       # trg = [trg len, batch size]

        trg_len = trg.shape[0]
        batch_size = trg.shape[1]
        trg_vocab_size = self.decoder.ouput_dim

        # decoder 결과를 저장할 텐서
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size)

        # Encoder의 마지막 은닉 상태가 Decoder의 초기 은닉상태로 쓰임
        hidden, cell = self.encoder(src)

        # Decoder에 들어갈 첫 input은 <sos> 토큰
        input = trg[0, :]

        # target length만큼 반복
        # range(0,trg_len)이 아니라 range(1,trg_len)인 이유 : 0번째 trg는 항상 <sos>라서 그에 대한 output도 항상 0 
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[t] = output

            # random.random() : [0,1] 사이 랜덤한 숫자 
            # 랜덤 숫자가 teacher_forcing_ratio보다 작으면 True니까 teacher_force=1
            teacher_force = random.random() < teacher_forcing_ratio

            # 확률 가장 높게 예측한 토큰
            top1 = output.argmax(1) 

            # techer_force = 1 = True이면 trg[t]를 아니면 top1을 input으로 사용
            input = trg[t] if teacher_force else top1

        return outputs

In [12]:
# torch.cuda.is_available() checks and returns a Boolean True if a GPU is available, else it'll return False
is_cuda = torch.cuda.is_available()

# If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
if is_cuda:
    device = torch.device("cuda")
    print("GPU is available")
else:
    device = torch.device("cpu")
    print("GPU not available, CPU used")

GPU is available


In [13]:
input_dim = train_src_vocab_size
output_dim = train_tgt_vocab_size

# Encoder embedding dim
enc_emb_dim = 256
# Decoder embedding dim
dec_emb_dim = 256

hid_dim=512
n_layers=2

enc_dropout = 0.5
dec_dropout=0.5

enc = Encoder(input_dim, enc_emb_dim, hid_dim, n_layers, enc_dropout)
dec = Decoder(output_dim, dec_emb_dim, hid_dim, n_layers, dec_dropout)

model = Seq2Seq(enc, dec, device)

In [15]:
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.08, 0.08)

model.apply(init_weights)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(53, 256)
    (rnn): LSTM(256, 512, num_layers=2, dropout=0.5)
    (dropout): Dropout(p=0.5, inplace=False)
  )
  (decoder): Decoder(
    (embedding): Embedding(595, 256)
    (rnn): LSTM(256, 512, num_layers=2, dropout=0.5)
    (fc_out): Linear(in_features=512, out_features=595, bias=True)
    (dropout): Dropout(p=0.5, inplace=False)
  )
)