# HW02

이름 : 박유나
학번 : 2021-12659 

# 파일 및 모듈 불러오기

In [366]:
!wget https://www.manythings.org/anki/spa-eng.zip

--2024-05-13 03:56:48--  https://www.manythings.org/anki/spa-eng.zip
Resolving www.manythings.org... 173.254.30.110
Connecting to www.manythings.org|173.254.30.110|:443... connected.
OpenSSL: error:1407742E:SSL routines:SSL23_GET_SERVER_HELLO:tlsv1 alert protocol version
Unable to establish SSL connection.


In [4]:
!unzip spa-eng.zip

In [1]:
import torchtext
import torch
SEED = 123

torch.manual_seed(SEED)

<torch._C.Generator at 0x122cd5ddb90>

In [2]:
from torch.utils.data.dataset import random_split
from torchtext.data.functional import to_map_style_dataset

from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torchtext.data.metrics import bleu_score

from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
from torch import nn

import torch.optim as optim

import time

import random



# 데이터 구성하기

In [3]:
def read_file(fname):
    list_iter = []
    with open(fname, 'r', encoding='UTF8') as f:
        lines = f.readlines()
        for line in lines[1:]:
            eng, spa, attribut = line.strip().split('\t')
            list_iter.append([eng, spa])

    return list_iter

In [4]:
# data sampling and splitting
data_iter = read_file('spa.txt')
data_iter = random.sample(data_iter, 10000)
dataset = to_map_style_dataset(data_iter)
num_train = int(len(dataset) * 0.9) 
train_dataset, test_dataset = random_split(dataset, [num_train, len(dataset) - num_train])

컴퓨터 성능이 좋지 않아 10000개를 랜덤 추출하여 진행하였습니다. 데이터를 훈련과 테스트 데이터로 9:1의 비율로 나누었습니다. 

In [5]:
# pre_trained tokenizer사용
import spacy
import spacy.cli.download
spacy.cli.download('es_core_news_sm')
spacy.cli.download('en_core_web_sm')

[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('es_core_news_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


In [6]:
# tokenizer
sp_nlp = spacy.load('es_core_news_sm')
en_nlp = spacy.load('en_core_web_sm')

def en_yield_tokens(data_iter):
    for en, _ in data_iter:
        yield [token.text for token in en_nlp.tokenizer(en.lower())]

def sp_yield_tokens(data_iter):
    for _, sp in data_iter:
        yield [token.text for token in sp_nlp.tokenizer(sp.lower())]

In [7]:
# vocab 만들기
en_voc = build_vocab_from_iterator(en_yield_tokens(train_dataset),
                                specials=["<unk>", "<pad>", "<sos>", "<eos>"],
                                max_tokens= 25000)
sp_voc = build_vocab_from_iterator(sp_yield_tokens(train_dataset),
                                specials=["<unk>", "<pad>", "<sos>", "<eos>"], #토큰들 추가
                                max_tokens= 25000)

In [8]:
en_token2id = en_voc.get_stoi()
sp_token2id = sp_voc.get_stoi()

In [9]:
en_pipeline = lambda x: [en_token2id.get(token.text, en_token2id['<unk>']) for token in en_nlp.tokenizer(x)]
sp_pipeline = lambda x: [sp_token2id['<sos>']] + [sp_token2id.get(token.text, sp_token2id['<unk>']) for token in sp_nlp.tokenizer(x)] + [sp_token2id['<eos>']]
# target sentence에 sos와 eos 토큰 추가

In [10]:
def custom_collate_fn(batch):
    en_list, sp_list= [], []
    for en, sp in batch:
        processed_en = torch.tensor(en_pipeline(en), dtype=torch.int64)
        en_list.append(processed_en)
        processed_sp = torch.tensor(sp_pipeline(sp), dtype=torch.int64)
        sp_list.append(processed_sp)

    en_list = pad_sequence(en_list, padding_value = 1) #padding
    sp_list = pad_sequence(sp_list, padding_value = 1)
    return en_list, sp_list

In [11]:
# Dataloader 만들기
train_dataloader = DataLoader(train_dataset, batch_size=16,
                              shuffle=True, collate_fn=custom_collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=16,
                              shuffle=True, collate_fn=custom_collate_fn)

# Seq2Seq Model

## Encoder

In [12]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, dropout):
        super().__init__()

        self.hid_dim = hid_dim

        self.embedding = nn.Embedding(input_dim, emb_dim)
        
        self.rnn = nn.GRU(emb_dim, hid_dim) # gru 사용

        self.dropout = nn.Dropout(dropout)


    def forward(self, src):
        embedded = self.dropout(self.embedding(src))

        outputs, hidden = self.rnn(embedded)

        return hidden

## Decoder

In [13]:
class Decoder(nn.Module):
    def __init__(self, ouptput_dim, emb_dim, hid_dim, dropout):
        super().__init__()

        self.output_dim = ouptput_dim
        self.hid_dim = hid_dim

        self.embedding = nn.Embedding(ouptput_dim, emb_dim)
        
        self.rnn = nn.GRU(emb_dim + hid_dim, hid_dim)

        self.fc_out = nn.Linear(emb_dim + hid_dim *2, ouptput_dim)

        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, context):
        input = input.unsqueeze(0)

        embedded = self.dropout(self.embedding(input))

        emb_con = torch.cat((embedded, context), dim =2)
        
        output, hidden = self.rnn(emb_con, hidden)

        output = torch.cat((embedded.squeeze(0), hidden.squeeze(0), context.squeeze(0)), dim=1)

        prediction = self.fc_out(output.squeeze(0))

        return prediction, hidden

## Seq2seq

In [14]:
class Seq2seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.device = device

        assert encoder.hid_dim == decoder.hid_dim, \
            "Hidden dimensions of encoder and decoder must be equal!"

        
    def forward(self, src, trg, teacher_forcing_ratio = 0.5):

        batch_size = trg.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim

        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)

        context = self.encoder(src)
        hidden = context

        input = trg[0, :]

        for t in range(1, trg_len):
            output, hidden = self.decoder(input, hidden, context)

            outputs[t] = output

            teacher_force = random.random() < teacher_forcing_ratio

            topl = output.argmax(1)

            input = trg[t] if teacher_force else topl

        return outputs

# Training

In [15]:
# default setting

input_dim = len(en_voc.get_itos())
output_dim = len(sp_voc.get_itos())
enc_emb_dim = 256
dec_emb_dim = 256
hid_dim = 512
n_layers = 2
enc_dropout = 0.5
dec_dropout = 0.5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

enc = Encoder(input_dim, enc_emb_dim, hid_dim, enc_dropout)
dec = Decoder(output_dim, dec_emb_dim, hid_dim, dec_dropout)

model = Seq2seq(enc, dec, device).to(device)

In [16]:
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.08, 0.08)
        
model.apply(init_weights)

Seq2seq(
  (encoder): Encoder(
    (embedding): Embedding(4707, 256)
    (rnn): GRU(256, 512)
    (dropout): Dropout(p=0.5, inplace=False)
  )
  (decoder): Decoder(
    (embedding): Embedding(7463, 256)
    (rnn): GRU(768, 512)
    (fc_out): Linear(in_features=1280, out_features=7463, bias=True)
    (dropout): Dropout(p=0.5, inplace=False)
  )
)

In [17]:
# optimizer와 loss function
optimizer = optim.Adam(model.parameters(), lr=0.001) 
criterion = nn.CrossEntropyLoss(ignore_index=1)


In [19]:
# 훈련을 위한 함수
def train(model, dataloader, optimizer, criterion, clip):
  epoch_loss = 0

  model.train() 

  for en, sp in dataloader:  
    src = en.to(device)
    trg = sp.to(device)

    optimizer.zero_grad()

    output = model(src, trg)

    output_dim = output.shape[-1]
        
    output = output[1:].view(-1, output_dim)
    trg = trg[1:].view(-1)
        
        
    loss = criterion(output, trg)

    loss.backward()  
    torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
    optimizer.step()
        
    epoch_loss += loss.item()

  return epoch_loss / len(dataloader)


In [20]:
def epoch_time(start_time, end_time):
  elapsed_time = end_time - start_time
  elapsed_mins = int(elapsed_time / 60)
  elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
  return elapsed_mins, elapsed_secs

In [28]:
# 모델 훈련

N_EPOCHS = 1
clip = 1

for epoch in range(N_EPOCHS):
  start_time = time.time()

  train_loss= train(model, train_dataloader, optimizer, criterion, clip)

  end_time = time.time()

  epoch_mins, epoch_secs = epoch_time(start_time, end_time)

  print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
  print(f'\tTrain Loss: {train_loss:.3f}')



Epoch: 01 | Epoch Time: 7m 37s
	Train Loss: 4.494


In [364]:
#torch.save(model.state_dict(), 'hw02.pt') # model save

# Evaluation

In [29]:
# 평가함수
def evaluate(model, dataloader, criterion):
  epoch_loss = 0

  model.eval() 

  with torch.no_grad():
    for en, sp in dataloader:
      src = en.to(device)
      trg = sp.to(device)

      output = model(src, trg)
      output_dim = output.shape[-1]
        
      output = output[1:].view(-1, output_dim)
      trg = trg[1:].view(-1)
        
        
      loss = criterion(output, trg)
        
      epoch_loss += loss.item()

  return epoch_loss / len(dataloader)

In [30]:
sp_id2token = sp_voc.get_itos()

en_token2id = en_voc.get_stoi()

In [31]:
# 한 문장을 번역하는 함수
def translate_sentence(model, sentence, src_field, trg_field, device, max_len=50, logging=True):
    model.eval() 

    tokens = [token.text for token in en_nlp.tokenizer(sentence.lower())]

    tokens = ['<sos>'] + tokens + ['<eos>']

    src_indexes = [en_token2id.get(token, en_token2id['<unk>']) for token in tokens]

    src_tensor = torch.LongTensor(src_indexes).unsqueeze(1).to(device)

    with torch.no_grad():
        context = model.encoder(src_tensor)
        hidden = context

    trg_indexes = [sp_token2id['<sos>']]

    for _ in range(max_len):
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)

        with torch.no_grad():
            output, hidden = model.decoder(trg_tensor, hidden, context)
            pred_token = output.argmax().item()

        
        trg_indexes.append(pred_token) 

        if pred_token == sp_token2id['<eos>']:
            break

    trg_tokens = [sp_id2token[i] for i in trg_indexes]

    return trg_tokens[1:]

In [32]:
# bleu score를 계산하는 함수
def bleu(dataset, model, SRC, TRG, device):
    targets = []
    outputs = []
    bleu_sc = []

    for en, sp in dataset:

      prediction = translate_sentence( model, en, SRC, TRG, device)
      prediction = prediction[:-1]  # remove <eos> token
      outputs.append(prediction)

      targets.append([[token.text for token in en_nlp.tokenizer(sp)]])

    bleu_sc.append(bleu_score(outputs, targets))

    return bleu_sc

In [33]:
# test loss와 bleu score 계산
#model = model.load_state_dict(torch.load('hw02.pt'))

test_loss = evaluate(model, test_dataloader, criterion)
bleu_sc = bleu(test_dataset, model, en_voc, sp_voc, device)
print(f'Test Loss: {test_loss:.3f} | Bleu Score: {bleu_sc}')

Test Loss: 3.617 | Bleu Score: [0.00906093418598175]


In [34]:
# test sentence 번역
src = 'He is a baseball player .'

print(f'test sentence: {src}')
print("model output:", " ".join(translate_sentence(model, src, en_voc, sp_voc, device)))


test sentence: He is a baseball player .
model output: <unk> es es un un . . <eos>


블루스코어는 매우 낮고, test loss는 높다. 하지만 test sentence에 대해서 '<unk>(남성 : es)는 축구선수이다.'로 번역한 것으로 보아 그럭저럭 비슷하게 잘 번역하였다.  