<a href="https://colab.research.google.com/github/BDH-teacher/Deep_Learning_Audit_code/blob/main/transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import math
import random
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn import Transformer

# (선택) 재현성
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device =", device)

device = cuda


In [2]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Create Position Vectors Matrix
        self.pe = torch.zeros(max_len, d_model)  # First, initialize it as zeros
        position = torch.arange(0, max_len).unsqueeze(1)  # [[1],[2],[3],[4],...max_len]
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))

        self.pe[:, 0::2] = torch.sin(position * div_term)
        self.pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = self.pe.unsqueeze(0)

    def forward(self, x):
        # x: (batch, seq_len, d_model) when batch_first=True
        x = x + self.pe[:, :x.size(1), :].to(x.device)
        return self.dropout(x)

In [3]:
class Seq2SeqTransformer(nn.Module):
    def __init__(
        self,
        src_vocab_size,
        tgt_vocab_size,
        d_model=512,
        nhead=8,
        num_encoder_layers=3,
        num_decoder_layers=3,
        dim_feedforward=2048,
        dropout=0.1
    ):
        super(Seq2SeqTransformer, self).__init__()

        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )

        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, dropout)

        self.fc_out = nn.Linear(d_model, tgt_vocab_size)
        self.d_model = d_model

    def forward(
        self,
        src,
        tgt,
        src_mask=None,
        tgt_mask=None,
        memory_mask=None,
        src_key_padding_mask=None,
        tgt_key_padding_mask=None,
        memory_key_padding_mask=None
    ):
        src = self.positional_encoding(self.src_embedding(src) * math.sqrt(self.d_model))
        tgt = self.positional_encoding(self.tgt_embedding(tgt) * math.sqrt(self.d_model))

        memory = self.transformer.encoder(
            src,
            mask=src_mask,
            src_key_padding_mask=src_key_padding_mask
        )

        output = self.transformer.decoder(
            tgt,
            memory,
            tgt_mask=tgt_mask,
            memory_mask=memory_mask,
            tgt_key_padding_mask=tgt_key_padding_mask,
            memory_key_padding_mask=memory_key_padding_mask
        )

        return self.fc_out(output)

In [4]:
def generate_square_subsequent_mask(size):
    mask = (torch.triu(torch.ones(size, size)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

def make_key_padding_mask(x, pad_idx=0):
    # x: (batch, seq_len)
    # True = padding 위치 (무시됨)
    return (x == pad_idx)

In [5]:
# Small sample dataset (English to French)
data = [
    ("I love you", "Je t'aime"),
    ("I am learning", "J'apprends"),
    ("How are you", "Comment ca va"),
    ("Thank you", "Merci"),
    ("Good morning", "Bonjour"),
]

def build_vocab(sentences):
    vocab = set()
    for sentence in sentences:
        vocab.update(sentence.split())

    vocab = sorted(list(vocab))
    vocab_dict = {word: idx for idx, word in enumerate(vocab, start=4)}  # Reserve 0-3 for special tokens
    vocab_dict["<pad>"] = 0
    vocab_dict["<sos>"] = 1
    vocab_dict["<eos>"] = 2
    vocab_dict["<unk>"] = 3
    return vocab_dict

def sentence_to_indices(sentence, vocab, max_len=10):
    tokens = sentence.split()
    indices = [vocab.get(token, vocab["<unk>"]) for token in tokens]
    indices = [vocab["<sos>"]] + indices + [vocab["<eos>"]]
    # Padding to max_len
    if len(indices) < max_len:
        indices = indices + [vocab["<pad>"]] * (max_len - len(indices))
    else:
        indices = indices[:max_len]
    return indices

src_sentences, tgt_sentences = zip(*data)
src_vocab = build_vocab(src_sentences)
tgt_vocab = build_vocab(tgt_sentences)

max_len = 10
src_data = torch.tensor([sentence_to_indices(s, src_vocab, max_len=max_len) for s in src_sentences], dtype=torch.long)
tgt_data = torch.tensor([sentence_to_indices(s, tgt_vocab, max_len=max_len) for s in tgt_sentences], dtype=torch.long)

print("src_data shape:", src_data.shape)  # (N, max_len)
print("tgt_data shape:", tgt_data.shape)

# id -> token (번역 출력용)
tgt_id_to_word = {idx: w for w, idx in tgt_vocab.items()}

src_data shape: torch.Size([5, 10])
tgt_data shape: torch.Size([5, 10])


In [6]:
src_vocab_size = len(src_vocab)
tgt_vocab_size = len(tgt_vocab)

d_model = 128
nhead = 8
num_encoder_layers = 3
num_decoder_layers = 3
dropout = 0.0  # <- 정규화 싫으면 0.0

model = Seq2SeqTransformer(
    src_vocab_size, tgt_vocab_size,
    d_model=d_model,
    nhead=nhead,
    num_encoder_layers=num_encoder_layers,
    num_decoder_layers=num_decoder_layers,
    dim_feedforward=512,
    dropout=dropout
)

optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.CrossEntropyLoss(ignore_index=src_vocab["<pad>"])  # pad token 무시

In [7]:
def train_epoch(model, optimizer, loss_fn, src_data, tgt_data, epochs=200, print_every=20, pad_idx=0):
    model.train()

    for epoch in range(1, epochs + 1):
        total_loss = 0.0

        for src, tgt in zip(src_data, tgt_data):
            # Add batch dimension
            src = src.unsqueeze(0)  # (1, max_len)
            tgt = tgt.unsqueeze(0)  # (1, max_len)

            # tgt_input: [<sos> ...] / tgt_output: [... <eos>]
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]

            # Masks
            tgt_mask = generate_square_subsequent_mask(tgt_input.size(1)).to(src.device)

            src_key_padding_mask = make_key_padding_mask(src, pad_idx=pad_idx)
            tgt_key_padding_mask = make_key_padding_mask(tgt_input, pad_idx=pad_idx)
            memory_key_padding_mask = make_key_padding_mask(src, pad_idx=pad_idx)

            optimizer.zero_grad()

            output = model(
                src,
                tgt_input,
                tgt_mask=tgt_mask,
                src_key_padding_mask=src_key_padding_mask,
                tgt_key_padding_mask=tgt_key_padding_mask,
                memory_key_padding_mask=memory_key_padding_mask
            )

            # (batch, tgt_len, vocab) -> (batch*tgt_len, vocab)
            output = output.reshape(-1, output.shape[-1])
            tgt_output = tgt_output.reshape(-1)

            loss = loss_fn(output, tgt_output)
            loss.backward()

            # (NaN 방지용) grad clip
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(src_data)
        if epoch % print_every == 0 or epoch == 1:
            print(f"Epoch {epoch:4d}/{epochs}, Loss: {avg_loss:.4f}")

# 실행
train_epoch(
    model, optimizer, loss_fn,
    src_data, tgt_data,
    epochs=200, print_every=20,
    pad_idx=src_vocab["<pad>"]
)



Epoch    1/200, Loss: 2.7530
Epoch   20/200, Loss: 0.0036
Epoch   40/200, Loss: 0.0016
Epoch   60/200, Loss: 0.0009
Epoch   80/200, Loss: 0.0006
Epoch  100/200, Loss: 0.0005
Epoch  120/200, Loss: 0.0003
Epoch  140/200, Loss: 0.0003
Epoch  160/200, Loss: 0.0002
Epoch  180/200, Loss: 0.0002
Epoch  200/200, Loss: 0.0001


In [8]:
def translate(sentence, model, src_vocab, tgt_vocab, tgt_id_to_word, max_len=10):
    model.eval()

    # Preprocess the input sentence
    indices = [src_vocab.get(token, src_vocab["<unk>"]) for token in sentence.split()]
    indices = [src_vocab["<sos>"]] + indices + [src_vocab["<eos>"]]
    if len(indices) < max_len:
        indices = indices + [src_vocab["<pad>"]] * (max_len - len(indices))
    else:
        indices = indices[:max_len]

    src_tensor = torch.tensor(indices, dtype=torch.long).unsqueeze(0)  # Add batch dimension

    # Generate target sequence with the start token
    tgt_input = torch.tensor([tgt_vocab["<sos>"]], dtype=torch.long).unsqueeze(0)

    with torch.no_grad():
        for _ in range(max_len):
            tgt_mask = generate_square_subsequent_mask(tgt_input.size(1)).to(src_tensor.device)

            src_key_padding_mask = make_key_padding_mask(src_tensor, pad_idx=src_vocab["<pad>"])
            tgt_key_padding_mask = make_key_padding_mask(tgt_input, pad_idx=tgt_vocab["<pad>"])
            memory_key_padding_mask = make_key_padding_mask(src_tensor, pad_idx=src_vocab["<pad>"])

            output = model(
                src_tensor,
                tgt_input,
                tgt_mask=tgt_mask,
                src_key_padding_mask=src_key_padding_mask,
                tgt_key_padding_mask=tgt_key_padding_mask,
                memory_key_padding_mask=memory_key_padding_mask
            )

            next_token = output[:, -1, :].argmax(dim=-1)  # Get the predicted token
            tgt_input = torch.cat((tgt_input, next_token.unsqueeze(0)), dim=1)  # Append

            if next_token.item() == tgt_vocab["<eos>"]:
                break

    # Convert predicted indices to words (exclude pad/sos/eos)
    pred_ids = tgt_input.squeeze(0).tolist()
    words = []
    for idx in pred_ids:
        if idx in (tgt_vocab["<pad>"], tgt_vocab["<sos>"], tgt_vocab["<eos>"]):
            continue
        words.append(tgt_id_to_word.get(idx, "<unk>"))

    return " ".join(words)

# 간단 테스트
tests = [
    "I love you",
    "Thank you",
    "Good morning",
    "How are you",
    "I am learning",
]

for s in tests:
    print(f"[SRC] {s}")
    print(f"[PRED] {translate(s, model, src_vocab, tgt_vocab, tgt_id_to_word, max_len=max_len)}")
    print("-" * 50)

[SRC] I love you
[PRED] Je t'aime
--------------------------------------------------
[SRC] Thank you
[PRED] Merci
--------------------------------------------------
[SRC] Good morning
[PRED] Bonjour
--------------------------------------------------
[SRC] How are you
[PRED] Comment ca va
--------------------------------------------------
[SRC] I am learning
[PRED] J'apprends
--------------------------------------------------


  output = torch._nested_tensor_from_mask(


In [9]:
# RNN Seq2Seq (GRU)
class RNNEncoder(nn.Module):
    def __init__(self, vocab_size, emb_dim=64, hid_dim=128):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, emb_dim)
        self.rnn = nn.GRU(emb_dim, hid_dim)

    def forward(self, src):  # src: (src_len, batch)
        emb = self.emb(src)  # (src_len, batch, emb_dim)
        outputs, hidden = self.rnn(emb)  # hidden: (1, batch, hid_dim)
        return hidden

class RNNDecoder(nn.Module):
    def __init__(self, vocab_size, emb_dim=64, hid_dim=128):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, emb_dim)
        self.rnn = nn.GRU(emb_dim, hid_dim)
        self.fc = nn.Linear(hid_dim, vocab_size)

    def forward(self, tgt_inp, hidden):  # tgt_inp: (tgt_len, batch)
        emb = self.emb(tgt_inp)
        outputs, hidden = self.rnn(emb, hidden)
        logits = self.fc(outputs)  # (tgt_len, batch, vocab)
        return logits, hidden

class RNNSeq2Seq(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, emb_dim=64, hid_dim=128):
        super().__init__()
        self.encoder = RNNEncoder(src_vocab_size, emb_dim, hid_dim)
        self.decoder = RNNDecoder(tgt_vocab_size, emb_dim, hid_dim)

    def forward(self, src_indices, tgt_input):
        # src_indices: (src_len, batch)
        # tgt_input: (tgt_len, batch)  (teacher forcing input)
        hidden = self.encoder(src_indices)
        outputs, _ = self.decoder(tgt_input, hidden)
        return outputs

# ----- 슬라이드 스타일 vocab: <bos>/<eos>/<unk> -----
def build_vocab_bos_eos(sentences):
    vocab = {"<pad>": 0, "<bos>": 1, "<eos>": 2, "<unk>": 3}
    idx = 4
    for s in sentences:
        for w in s.split():
            if w not in vocab:
                vocab[w] = idx
                idx += 1
    return vocab

pairs = data  # (src, tgt)
src_vocab2 = build_vocab_bos_eos([p[0] for p in pairs])
tgt_vocab2 = build_vocab_bos_eos([p[1] for p in pairs])

rnn_model = RNNSeq2Seq(len(src_vocab2), len(tgt_vocab2))

# RNN based Seq2Seq train API
def train(model, pairs, src_vocab, tgt_vocab, epochs=10, lr=0.01):
    model.train()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(epochs):
        total_loss = 0.0

        for src, tgt in pairs:
            src_indices = torch.tensor(
                [src_vocab["<bos>"]] + [src_vocab.get(word, src_vocab["<unk>"]) for word in src.split()] + [src_vocab["<eos>"]],
                dtype=torch.long
            ).unsqueeze(1)

            tgt_indices = torch.tensor(
                [tgt_vocab["<bos>"]] + [tgt_vocab.get(word, tgt_vocab["<unk>"]) for word in tgt.split()] + [tgt_vocab["<eos>"]],
                dtype=torch.long
            ).unsqueeze(1)

            optimizer.zero_grad()
            outputs = model(src_indices, tgt_indices[:-1])

            loss = criterion(outputs.view(-1, len(tgt_vocab)), tgt_indices[1:].view(-1))
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(pairs):.4f}")

# 실행(옵션)
train(rnn_model, pairs, src_vocab2, tgt_vocab2, epochs=30, lr=0.01)

Epoch 1/30, Loss: 2.5393
Epoch 2/30, Loss: 1.1826
Epoch 3/30, Loss: 0.4234
Epoch 4/30, Loss: 0.1564
Epoch 5/30, Loss: 0.0628
Epoch 6/30, Loss: 0.0195
Epoch 7/30, Loss: 0.0089
Epoch 8/30, Loss: 0.0052
Epoch 9/30, Loss: 0.0036
Epoch 10/30, Loss: 0.0027
Epoch 11/30, Loss: 0.0022
Epoch 12/30, Loss: 0.0019
Epoch 13/30, Loss: 0.0017
Epoch 14/30, Loss: 0.0015
Epoch 15/30, Loss: 0.0014
Epoch 16/30, Loss: 0.0013
Epoch 17/30, Loss: 0.0012
Epoch 18/30, Loss: 0.0012
Epoch 19/30, Loss: 0.0011
Epoch 20/30, Loss: 0.0010
Epoch 21/30, Loss: 0.0010
Epoch 22/30, Loss: 0.0010
Epoch 23/30, Loss: 0.0009
Epoch 24/30, Loss: 0.0009
Epoch 25/30, Loss: 0.0008
Epoch 26/30, Loss: 0.0008
Epoch 27/30, Loss: 0.0008
Epoch 28/30, Loss: 0.0007
Epoch 29/30, Loss: 0.0007
Epoch 30/30, Loss: 0.0007
