In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, random_split, Dataset
import numpy as np
import pandas as pd
from nltk.tokenize import word_tokenize
import random

In [2]:
import pandas as pd
from torch.utils.data import DataLoader, random_split
from nltk.tokenize import word_tokenize

# Assuming 'data.csv' is already loaded into 'data' DataFrame
data = pd.read_csv('data.csv', header=None)

# Slice the DataFrame
data_subset = data.iloc[:100]


In [3]:
# 토큰화 및 정수 인코딩
def tokenize(sentence):
    return word_tokenize(sentence)

In [4]:
word2idx = {"<pad>": 0}
for sent in sentences:
    for word in tokenize(sent[0]) + tokenize(sent[1]):
        if word not in word2idx:
            word2idx[word] = len(word2idx)

NameError: name 'sentences' is not defined

In [None]:
def numericalize(sent):
    return [word2idx[word] for word in tokenize(sent)]

In [None]:
class TranslationDataset(Dataset):
    def __init__(self, dataframe):
        self.data = dataframe
        self.src = self.data.iloc[:, 0]  # Assuming the source text is in the first column
        self.trg = self.data.iloc[:, 1]  # Assuming the target text is in the second column

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        src = self.tokenize(self.src.iloc[idx])
        trg = self.tokenize(self.trg.iloc[idx])
        return torch.tensor(src, dtype=torch.long), torch.tensor(trg, dtype=torch.long)

    @staticmethod
    def tokenize(text):
        return [word.lower() for word in word_tokenize(text)]


In [None]:
# 데이터셋 로드
dataset = TranslationDataset(data_subset)

train_size = int(0.7 * len(dataset))
valid_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - valid_size
train_dataset, valid_dataset, test_dataset = random_split(dataset, [train_size, valid_size, test_size])


In [None]:
# collate_fn 정의
def collate_fn(batch):
    src_batch, trg_batch = [], []

    for src, trg in batch:
        src_batch.append(torch.tensor(src))
        trg_batch.append(torch.tensor(trg))

    src_batch = torch.nn.utils.rnn.pad_sequence(src_batch, padding_value=0, batch_first=True)
    trg_batch = torch.nn.utils.rnn.pad_sequence(trg_batch, padding_value=0, batch_first=True)

    return src_batch, trg_batch

In [None]:
# DataLoader 설정
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)


In [None]:
class Encoder(torch.nn.Module):
  def __init__(self, input_size, hidden_size):
    super().__init__()
    self.embedding = torch.nn.Embedding(input_size, hidden_size)
    self.rnn = torch.nn.LSTM(hidden_size, hidden_size, batch_first=True)

  def forward(self, input):
    embedded = self.embedding(input)
    output, hidden = self.rnn(embedded)
    return output, hidden

In [None]:
class Decoder(torch.nn.Module):
  def __init__(self, output_size, hidden_size):
    super().__init__()
    self.embedding = torch.nn.Embedding(output_size, hidden_size)
    self.rnn = torch.nn.LSTM(hidden_size, hidden_size, batch_first=True)
    self.out = torch.nn.Linear(hidden_size, output_size)

  def forward(self, input, hidden, encoder_output):
    embedded = self.embedding(input)
    output, hidden = self.rnn(embedded, hidden)
    output = self.out(output)
    return output, hidden

In [None]:
class Seq2Seq(torch.nn.Module):
  def __init__(self, encoder, decoder):
    super().__init__()
    self.encoder = encoder
    self.decoder = decoder

  def forward(self, src, trg):
    encoder_output, encoder_hidden = self.encoder(src)
    decoder_output, decoder_hidden = self.decoder(trg, encoder_hidden, encoder_output)
    return decoder_output

In [None]:
# 모델 초기화
input_size = len(word2idx)
hidden_size = 256
encoder = Encoder(input_size, hidden_size)
decoder = Decoder(input_size, hidden_size)
model = Seq2Seq(encoder, decoder)
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=0)

In [None]:
def train(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0
    for src, trg in loader:
        optimizer.zero_grad()
        output = model(src, trg[:, :-1])  # trg input does not include the <eos> token
        output_dim = output.shape[-1]
        
        output = output.contiguous().view(-1, output_dim)
        trg = trg[:, 1:].contiguous().view(-1)  # trg shifted for loss calculation, does not include <sos>

        loss = criterion(output, trg)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    return total_loss / len(loader)

In [None]:
# 평가 함수
def evaluate(model, loader, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for src, trg in loader:
            output = model(src, trg)
            loss = criterion(output, trg)
            epoch_loss += loss.item()
    return epoch_loss / len(loader)

In [None]:
# 학습 과정
n_epochs = 1
clip = 1
best_valid_loss = float('inf')

for epoch in range(n_epochs):
    train_loss = train(model, train_loader, optimizer, criterion, clip)
    valid_loss = evaluate(model, valid_loader, criterion)
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'best_model.pt')
    print(f'Epoch: {epoch+1}, Train Loss: {train_loss:.3f}, Valid Loss: {valid_loss:.3f}')


In [None]:
# 최적 모델 로드 및 테스트
model.load_state_dict(torch.load('best_model.pth'))
test_loss = evaluate(model, test_loader, criterion)
print(f'Test Loss: {test_loss:.3f}')