In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from collections import Counter
import pandas as pd

# Seed 고정 (재현성을 위해)
SEED = 42
random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

# RNN Encoder-Decoder with GRU (Gated Recurrent Unit)

In [6]:
class GRUCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(GRUCell, self).__init__()
        self.hidden_size = hidden_size

        # Update Gate, Reset Gate, Hidden State
        self.W_z = nn.Linear(input_size + hidden_size, hidden_size)
        self.W_r = nn.Linear(input_size + hidden_size, hidden_size)
        self.W_h = nn.Linear(input_size + hidden_size, hidden_size)

    def forward(self, x, h_prev):
        # Concatenate input and previous hidden state
        combined = torch.cat([x, h_prev], dim=1)

        # Update Gate 계산
        z_t = torch.sigmoid(self.W_z(combined))

        # Reset Gate 계산
        r_t = torch.sigmoid(self.W_r(combined))

        # reset된 h_prev와 input을 이용해 h_tilde 계산
        combined_reset = torch.cat([x, r_t * h_prev], dim=1)
        h_tilde = torch.tanh(self.W_h(combined_reset))

        # 최종 hidden state 계산
        h_t = (1 - z_t) * h_prev + z_t * h_tilde
        return h_t

In [7]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim, n_layers=1, dropout=0.1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.gru = GRUCell(emb_dim, hidden_dim)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, src):
        # src: (batch_size, src_len)
        embedded = self.dropout(self.embedding(src))  # (batch_size, src_len, emb_dim)
        batch_size, src_len, _ = embedded.size()
        hidden = torch.zeros(batch_size, self.gru.hidden_size).to(src.device)
        
        for t in range(src_len):
            hidden = self.gru(embedded[:, t, :], hidden)
        
        return hidden  # (batch_size, hidden_dim)

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim, dropout=0.1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.gru = GRUCell(emb_dim, hidden_dim)
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, input, hidden):
        # input: (batch_size)
        input = input.unsqueeze(1)  # (batch_size, 1)
        embedded = self.dropout(self.embedding(input))  # (batch_size, 1, emb_dim)
        embedded = embedded.squeeze(1)  # (batch_size, emb_dim)
        
        hidden = self.gru(embedded, hidden)  # (batch_size, hidden_dim)
        output = self.fc_out(hidden)  # (batch_size, output_dim)
        
        return output, hidden

In [8]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device, teacher_forcing_ratio=0.5):
        super(Seq2Seq, self).__init__()
        
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        self.teacher_forcing_ratio = teacher_forcing_ratio
    
    def forward(self, src, trg):
        # src: (batch_size, src_len)
        # trg: (batch_size, trg_len)
        
        batch_size = src.size(0)
        trg_len = trg.size(1)
        output_dim = self.decoder.fc_out.out_features
        
        outputs = torch.zeros(batch_size, trg_len, output_dim).to(self.device)
        
        hidden = self.encoder(src)  # (batch_size, hidden_dim)
        
        # 첫 번째 디코더 입력은 <sos> 토큰
        input = trg[:, 0]  # (batch_size)
        
        for t in range(1, trg_len):
            output, hidden = self.decoder(input, hidden)  # output: (batch_size, output_dim)
            outputs[:, t, :] = output
            
            teacher_force = torch.rand(1).item() < self.teacher_forcing_ratio
            top1 = output.argmax(1)  # (batch_size)
            input = trg[:, t] if teacher_force else top1
        
        return outputs

In [2]:
# 1. 데이터셋 불러오기
file_path = 'english_german_dataset.csv'
dataset_df = pd.read_csv(file_path)
dataset = list(zip(dataset_df['English'], dataset_df['German']))

In [15]:
# 2. 단어 사전 구축
PAD_TOKEN = '<pad>'
SOS_TOKEN = '<sos>'
EOS_TOKEN = '<eos>'

def build_vocab(pairs, min_freq=1):
    counter = Counter()
    for src, trg in pairs:
        for word in src.split():
            counter[word] += 1
        for word in trg.split():
            counter[word] += 1
    vocab = {PAD_TOKEN: 0, SOS_TOKEN: 1, EOS_TOKEN: 2}
    index = 3
    for word, freq in counter.items():
        if freq >= min_freq:
            vocab[word] = index
            index += 1
    return vocab

vocab = build_vocab(dataset, min_freq=1)
inv_vocab = {v: k for k, v in vocab.items()}

print(f"Vocabulary size: {len(vocab)}")
print("Vocabulary:", vocab)

Vocabulary size: 481
Vocabulary: {'<pad>': 0, '<sos>': 1, '<eos>': 2, 'Hello,': 3, 'how': 4, 'are': 5, 'you?': 6, 'Hallo,': 7, 'wie': 8, 'geht': 9, 'es': 10, 'dir?': 11, 'Good': 12, 'morning!': 13, 'Guten': 14, 'Morgen!': 15, 'I': 16, 'love': 17, 'machine': 18, 'learning.': 19, 'Ich': 20, 'liebe': 21, 'maschinelles': 22, 'Lernen.': 23, 'What': 24, 'is': 25, 'your': 26, 'name?': 27, 'Wie': 28, 'heißt': 29, 'du?': 30, 'Nice': 31, 'to': 32, 'meet': 33, 'you.': 34, 'Freut': 35, 'mich,': 36, 'dich': 37, 'kennenzulernen.': 38, 'Thank': 39, 'you': 40, 'very': 41, 'much.': 42, 'Vielen': 43, 'Dank.': 44, 'See': 45, 'later.': 46, 'Bis': 47, 'später.': 48, 'am': 49, 'learning': 50, 'code.': 51, 'lerne': 52, 'zu': 53, 'programmieren.': 54, 'This': 55, 'a': 56, 'pen.': 57, 'Das': 58, 'ist': 59, 'ein': 60, 'Stift.': 61, 'How': 62, 'old': 63, 'alt': 64, 'bist': 65, 'Where': 66, 'the': 67, 'nearest': 68, 'restaurant?': 69, 'Wo': 70, 'das': 71, 'nächste': 72, 'Restaurant?': 73, 'Can': 74, 'help': 75, '

In [17]:
# 3. 문장을 인덱스로 변환하는 함수 정의
def tokenize(sentence, vocab):
    return [vocab.get(word, vocab[PAD_TOKEN]) for word in sentence.split()]

def add_special_tokens(token_ids, sos_token, eos_token, max_len=None):
    token_ids = [sos_token] + token_ids + [eos_token]
    if max_len:
        token_ids += [sos_token] * (max_len - len(token_ids))
    return token_ids

# 4. 데이터 전처리
src_sentences = [pair[0] for pair in dataset]
trg_sentences = [pair[1] for pair in dataset]

src_indices = [tokenize(sentence, vocab) for sentence in src_sentences]
trg_indices = [tokenize(sentence, vocab) for sentence in trg_sentences]

max_src_len = max(len(seq) for seq in src_indices) + 2
max_trg_len = max(len(seq) for seq in trg_indices) + 2

src_indices = [add_special_tokens(seq, vocab[SOS_TOKEN], vocab[EOS_TOKEN], max_len=max_src_len) for seq in src_indices]
trg_indices = [add_special_tokens(seq, vocab[SOS_TOKEN], vocab[EOS_TOKEN], max_len=max_trg_len) for seq in trg_indices]

src_tensor = torch.tensor(src_indices, dtype=torch.long)
trg_tensor = torch.tensor(trg_indices, dtype=torch.long)

print("Source Tensor shape:", src_tensor.shape)
print("Target Tensor shape:", trg_tensor.shape)

# 5. Train/Validation 분할 (80:20)
train_src, val_src, train_trg, val_trg = train_test_split(
    src_tensor, trg_tensor, test_size=0.2, random_state=42
)

Source Tensor shape: torch.Size([103, 9])
Target Tensor shape: torch.Size([103, 8])


In [18]:
# 6. 장치 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

train_src, train_trg = train_src.to(device), train_trg.to(device)
val_src, val_trg = val_src.to(device), val_trg.to(device)

print(f'Training set size: {train_src.size(0)}')
print(f'Validation set size: {val_src.size(0)}')

# 7. 하이퍼파라미터 설정
INPUT_DIM = len(vocab)
OUTPUT_DIM = len(vocab)
ENC_EMB_DIM = 64
DEC_EMB_DIM = 64
HIDDEN_DIM = 128
ENC_DROPOUT = 0.1
DEC_DROPOUT = 0.1
TEACHER_FORCING_RATIO = 0.5
LEARNING_RATE = 0.001
NUM_EPOCHS = 1000

# 8. 모델 초기화
encoder = Encoder(input_dim=INPUT_DIM, emb_dim=ENC_EMB_DIM, hidden_dim=HIDDEN_DIM, dropout=ENC_DROPOUT)
decoder = Decoder(output_dim=OUTPUT_DIM, emb_dim=DEC_EMB_DIM, hidden_dim=HIDDEN_DIM, dropout=DEC_DROPOUT)
model = Seq2Seq(encoder, decoder, device, teacher_forcing_ratio=TEACHER_FORCING_RATIO).to(device)

# 9. 손실 함수 및 옵티마이저 설정
criterion = nn.CrossEntropyLoss(ignore_index=vocab[PAD_TOKEN])
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# 10. 학습 및 검증 함수 정의
def train_epoch(model, optimizer, criterion, src, trg):
    model.train()
    optimizer.zero_grad()
    output = model(src, trg)
    output_dim = output.shape[-1]
    output = output[:, 1:].reshape(-1, output_dim)
    trg = trg[:, 1:].reshape(-1)
    loss = criterion(output, trg)
    loss.backward()
    optimizer.step()
    return loss.item()

def evaluate_epoch(model, criterion, src, trg):
    model.eval()
    with torch.no_grad():
        output = model(src, trg)
        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)
        loss = criterion(output, trg)
    return loss.item()

Using device: cuda
Training set size: 82
Validation set size: 21


In [19]:
# 11. 학습 루프
print("=== 학습 시작 ===")
for epoch in range(1, NUM_EPOCHS + 1):
    train_loss = train_epoch(model, optimizer, criterion, train_src, train_trg)
    val_loss = evaluate_epoch(model, criterion, val_src, val_trg)
    
    if epoch % 100 == 0 or epoch == 1:
        print(f'Epoch [{epoch}/{NUM_EPOCHS}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')


=== 학습 시작 ===
Epoch [1/1000], Train Loss: 6.2047, Val Loss: 6.1595
Epoch [100/1000], Train Loss: 2.5352, Val Loss: 4.8587
Epoch [200/1000], Train Loss: 1.7110, Val Loss: 5.0622
Epoch [300/1000], Train Loss: 0.6194, Val Loss: 5.7587
Epoch [400/1000], Train Loss: 0.2437, Val Loss: 5.6741
Epoch [500/1000], Train Loss: 0.0794, Val Loss: 6.0228
Epoch [600/1000], Train Loss: 0.0367, Val Loss: 6.6820
Epoch [700/1000], Train Loss: 0.0206, Val Loss: 6.5860
Epoch [800/1000], Train Loss: 0.0144, Val Loss: 6.9552
Epoch [900/1000], Train Loss: 0.0101, Val Loss: 6.6054
Epoch [1000/1000], Train Loss: 0.0076, Val Loss: 7.0580


In [20]:
# 12. 예측을 위한 디코딩 함수 정의
def decode(indices, inv_vocab):
    sentences = []
    for seq in indices:
        words = []
        for idx in seq:
            word = inv_vocab.get(idx.item(), PAD_TOKEN)
            if word == EOS_TOKEN:
                break
            if word not in [SOS_TOKEN, PAD_TOKEN]:
                words.append(word)
        sentences.append(' '.join(words))
    return sentences

# 13. 검증 데이터에 대한 예측 수행
model.eval()
with torch.no_grad():
    output = model(val_src, val_trg)
    pred_indices = output.argmax(dim=2)

pred_sentences = decode(pred_indices, inv_vocab)
val_sentences = decode(val_trg, inv_vocab)

In [21]:
# 14. 결과 출력
print("=== 검증 결과 ===")
for i in range(len(val_src)):
    src_sentence = decode(val_src[i].unsqueeze(0), inv_vocab)[0]
    trg_sentence = val_sentences[i]
    pred_sentence = pred_sentences[i]
    print(f"Source: {src_sentence}")
    print(f"Target: {trg_sentence}")
    print(f"Predicted: {pred_sentence}")
    print('---')

=== 검증 결과 ===
Source: This book is interesting.
Target: Dieses Buch ist interessant.
Predicted: Das ist mein erstes Mal
---
Source: What are you studying?
Target: Was studierst du?
Predicted: Was machst du gewesen?
---
Source: The movie was amazing.
Target: Der Film war großartig.
Predicted: Der Flug hatte Verspätung.
---
Source: Can I have the bill, please?
Target: Kann ich bitte die Rechnung haben?
Predicted: Kann ich mir einen Stift
---
Source: I am looking for a job.
Target: Ich suche einen Job.
Predicted: Ich gehe jetzt nach Hause.
---
Source: What is your favorite food?
Target: Was ist dein Lieblingsessen?
Predicted: Wie machst du gewesen?
---
Source: I forgot my keys.
Target: Ich habe meine Schlüssel vergessen.
Predicted: Ich habe mein Passwort vergessen.
---
Source: Where is the closest bank?
Target: Wo ist die nächste Bank?
Predicted: Wann bist ist lecker.
---
Source: Where is the nearest restaurant?
Target: Wo ist das nächste Restaurant?
Predicted: Er bist du gewesen?
---
Sou