In [21]:
import torch
import torch.nn as nn
import torch.optim as optim
import math
from time import time
import numpy as np
import tqdm as tqdm

In [22]:
src_vocab = 2500  # 입력 문장에서 사용할 단어(또는 토큰) 수
tgt_vocab = 3000  # 출력 문장에서 사용할 단어(또는 토큰) 수
d_model = 512  # 단어 임베딩 벡터 차원 (단어를 몇 차원으로 표현할지)
num_heads = 8  # 멀티 헤드 어텐션에서의 헤드 수 (병렬로 몇 개의 어텐션을 계산할지)
num_layers = 6  # 인코더와 디코더의 층 수 (각각 몇 층으로 구성할지)

d_ff = 2048  # Feed Forward 네트워크의 은닉층 크기
max_seq = 100  # 문장의 최대 길이 (100 토큰까지 허용)
tot_epoch = 4  # 학습 데이터 전체를 몇 번 반복할지 (에폭 수)
batch_size = 5  # 학습할 때 한 번에 처리할 문장 수 (배치 크기)

In [None]:
# 3번 블럭
# 데이터 임의 생성

# 입력 데이터 (src_data) 생성
# 0부터 src_vocab(2500) 미만의 숫자 중에서 랜덤하게 선택
# 크기: (batch_size, max_seq) → 5개의 문장, 각 문장은 100단어
src_data = torch.randint(0, src_vocab, (batch_size, max_seq))
print(f"입력 데이터 모양 {src_data.shape}")
print(src_data[0])

# 출력 데이터 (tgt_data) 생성
# 0부터 tgt_vocab(3000) 미만의 숫자 중에서 랜덤하게 선택
# 크기: (batch_size, max_seq)
tgt_data = torch.randint(0, tgt_vocab, (batch_size, max_seq))

입력 데이터 모양 torch.Size([5, 100])
tensor([ 684, 2232, 1393,  516, 1232,   38, 2313,  937, 1295,  413,  146,    5,
         583, 1656, 1321,  328,  664,  484,  808, 1208, 1839, 2207,  316, 2005,
        2412,  701,  519, 1068, 2245,  278, 1226, 1153, 1008,  645, 2222,  295,
         959, 1227,  121, 1013,  421, 2171,  388,  568, 1321, 1142,  365, 1370,
         227, 1767,  281, 2167, 1590, 2490, 1745, 1545, 2310,  379, 1686, 2059,
         987,  944,  964,  824, 1014, 1998, 1110,  142,  364, 2079, 1331,  131,
         944, 1172,  989,  387,  305, 1626,  902, 1436,  472, 1263, 1787,  494,
         920,  118, 1954, 1909,  835, 1174, 1639,  614, 2283, 2128,  448, 1479,
         207, 2218, 1392, 2363])


In [None]:
# 4번 블럭
# Multi-head attention class 구현
class My_MHA(nn.Module):
    def __init__(self, d_model, num_heads):
        super(My_MHA, self).__init__()
        self.d_model = d_model  # 512 차원
        self.num_heads = num_heads  # 헤드 8개

        # 각 헤드 당 처리할 차원 수
        # 총 512 개의 차원을 8개의 헤드가 나눠서 처리하기때문에 각 헤드당 64차원 처리함
        self.d_head = d_model // num_heads

        # Q,K,V 행렬 준비 (W_Q, W_K, W_V에 학습가능 파라미터 262,656개 모델 생성)
        self.W_Q = nn.Linear(
            d_model, d_model
        )  # 512 입력 512 출력 (512*512+512=262,656)
        self.W_K = nn.Linear(d_model, d_model)
        self.W_V = nn.Linear(d_model, d_model)

    # 머리나누기
    # 입력 데이터 모양 : [5, 100, 512]
    # 출력 데이터 모양 : [5, 8, 100, 64]
    def split_heads(self, x):
        batch_size = x.size(0)
        # 데이터 모양 : [5, 100, 8, 64]
        x = x.view(batch_size, -1, self.num_heads, self.d_head)
        x = x.transpose(1, 2)  # 헤드별로 처리하기 위한 교환
        return x

    # 유사성 계산
    def dot_prod(self, Q, K, V, mask):
        score = torch.matmul(Q, K.transpose(-1, -2)) / math.sqrt(self.d_head)

        # 디코더 마스크 처리
        if mask is not None:
            score = score.masked_fill(mask == 0, -1e9)
        prob = torch.softmax(score, dim=-1)

        # V 행렬과 곱셈
        Z = torch.matmul(prob, V)
        return Z

    # 머리 합치기
    # 입력데이터 모양 : [5, 8, 100, 64]
    # 출력 데이터 모양 : [5, 100, 512]
    def combine_heads(self, x):
        batch_size = x.size(0)
        # 데이터 모양 : [5,100,8,64]
        Z = x.transpose(1, 2)

        # 데이터 모양 : [5, 100, 512], 입력과 동일한 출력형태
        Z = Z.contiguous().view(batch_size, -1, self.d_model)
        return Z

    # 앞서 정의한 함수로 forward를 정의

    def forward(self, q, k, v, mask=None):
        # 머리 나누기 실행/생성된 모델에 임베딩된 토큰(문자열 q,k,v) 입력
        Q = self.split_heads(self.W_Q(q))
        K = self.split_heads(self.W_K(k))
        V = self.split_heads(self.W_V(v))

        # 유사성 계산 실행
        attn = self.dot_prod(Q, K, V, mask)

        # 머리 합치기 실행
        Z = self.combine_heads(attn)

        return Z

## 🔍 Multi-Head Attention 클래스 흐름 정리

### ✅ 1. 입력된 단어를 512차원으로 벡터화
```python
self.W_Q(q)
```
- 입력 임베딩 벡터 `q`에 선형 변환(`W_Q`, `W_K`, `W_V`)을 적용해서  
  각각 Query, Key, Value 벡터를 생성한다.
- 입력 shape: `[batch_size, seq_len, d_model]` → 예: `[5, 100, 512]`

---

### ✅ 2. 설정된 헤드로 Q, K, V를 나눔 (차원 변환 포함)
```python
Q = self.split_heads(self.W_Q(q))
```
- 512차원을 8개의 head로 쪼갬 → 각 head는 64차원씩 담당.
- shape 변화: `[5, 100, 512]` → `[5, 8, 100, 64]`
- `transpose(1, 2)`를 통해 head 차원을 앞으로 이동시켜 병렬 연산이 가능하게 함.

---

### ✅ 3. Q, K, V 계산 → 어텐션 연산 수행
```python
attn = self.dot_prod(Q, K, V, mask)
```
- 어텐션 스코어 계산: `score = Q @ Kᵀ / sqrt(d_head)`
- 마스크(mask)가 있을 경우 적용.
- `softmax(score)`를 통해 확률적 가중치 구함.
- `Z = softmax(score) @ V` 계산으로 최종 attention 결과 획득.

---

### ✅ 4. 계산한 결과를 다시 하나로 합침
```python
Z = self.combine_heads(attn)
```
- shape 변화: `[5, 8, 100, 64]` → `[5, 100, 512]`
- 여러 head에서 계산된 결과를 다시 원래 차원(d_model=512)으로 합침.

---

## 🧠 한 줄 요약
> 입력 임베딩을 Q, K, V로 만들어 head마다 attention을 따로 계산하고,  
> 다시 합쳐서 더 풍부한 문맥 정보를 가진 출력으로 만든다.

In [None]:
# 5번 블럭
# feed forward (순전파) class 구현


class My_FFN(nn.Module):
    def __init__(self, d_model, d_ff):
        super(My_FFN, self).__init__()

        self.linear_1 = nn.Linear(d_model, d_ff)
        self.linear_2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.linear_1(x)
        x = self.relu(x)
        x = self.linear_2(x)
        return x

In [None]:
# 6번 블럭
# 자연어 트랜스포머 인코더 구현


class My_Encoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super(My_Encoder, self).__init__()

        self.mha = My_MHA(d_model, num_heads)
        self.ffn = My_FFN(d_model, d_ff)
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, x, mask):
        z = self.mha(x, x, x, mask)
        z = self.layer_norm(x + z)  # 잔차 연결
        w = self.ffn(z)
        z = self.layer_norm(w + z)  # 레이어 정규화, 잔차 연결
        return z

In [None]:
# 7번 블럭
# 위치 인코딩 구현


class My_Position(nn.Module):
    def __init__(self, d_model, max_seq):
        super(My_Position, self).__init__()

        self.d_model = d_model  # 512 차원
        self.max_seq = max_seq  # 입력 문장 최대길이 100

    # 위치 임베딩 계산
    def pos_enc(self, x):
        k = torch.arange(0, self.max_seq, 1).float()  # 0부터 100까지의 실수
        print("전", k.shape)  # [100]
        k = k.unsqueeze(1)
        print("후", k.shape)  # [100, 1] 2차원 텐서로 변형
        result = torch.zeros(self.max_seq, self.d_model)
        print(
            "결과", result.shape
        )  # [100, 512] 텐서 공간에 0으로 세팅, 포지셔널 벡터용 공간
        twoi = torch.arange(0, self.d_model, 2).float()  # 512 차원의 짝수 인덱스 생성
        print(twoi)

        result[:, 0::2] = torch.sin(
            k / (10000 ** (twoi / self.d_model))
        )  # k 는 입력 토큰
        result[:, 1::2] = torch.cos(k / 10000 ** (twoi / self.d_model))
        result = result.unsqueeze(0)
        print("최종 모양", result.shape)
        return result

    def forward(self, x):
        pos = self.pos_enc(x)
        return pos

In [None]:
# 8번 블럭
# 디코더구현
class My_Decoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super(My_Decoder, self).__init__()
        self.mha_1 = My_MHA(d_model, num_heads)
        self.mha_2 = My_MHA(d_model, num_heads)
        self.ffn = My_FFN(d_model, d_ff)
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, x, enc_out, src_mask, tgt_mask):
        print("디코더입력데이터모양", x.shape)
        # 디코더self attention 부분
        z = self.mha_1(x, x, x, tgt_mask)
        z = self.layer_norm(x + z)
        print("self attention 후데이터모양", x.shape)

        # 디코더 cross attention 부분
        y = self.mha_2(z, enc_out, enc_out, src_mask)
        y = self.layer_norm(z + y)
        print("cross attention 후 데이터 모양", y.shape)

        # 마지막feed forward 부분
        w = self.ffn(y)
        z = self.layer_norm(w + y)
        print("feed forward 후데이터모양", z.shape)
        return z

In [29]:
# 9번 블럭
# 전체 자연어 트랜스포머 구성


class My_Transformer(nn.Module):
    def __init__(self, d_model, num_heads, num_layers, d_ff, max_seq):
        super(My_Transformer, self).__init__()

        self.enc_embed = nn.Embedding(src_vocab, d_model)
        self.dec_embed = nn.Embedding(tgt_vocab, d_model)
        self.pos_enc = My_Position(d_model, max_seq)

        # 인코더 쌓기
        self.enc_layers = nn.ModuleList(
            [My_Encoder(d_model, num_heads, d_ff) for _ in range(num_layers)]
        )

        # 디코더 쌓기
        self.dec_layers = nn.ModuleList(
            [My_Decoder(d_model, num_heads, d_ff) for _ in range(num_layers)]
        )

        # 최종 출력층
        self.linear = nn.Linear(d_model, tgt_vocab)
        self.softmax = nn.Softmax(dim=-1)

    # 디코더 마스크 제작
    def make_mask(self, src, tgt):
        src_mask = None
        tgt_mask = tgt.unsqueeze(1).unsqueeze(3)
        tmp = torch.ones(1, max_seq, max_seq)
        mask = torch.tril(tmp).bool()
        print("틀", mask)

        tgt_mask = tgt_mask * mask
        print("마스크 모양", tgt_mask.shape)
        print("마스크 결과", tgt_mask)
        return src_mask, tgt_mask

    def forward(self, src, tgt):
        # 마스크 만들기
        src_mask, tgt_mask = self.make_mask(src, tgt)
        print("마스크 모양", tgt_mask.shape)
        print(tgt_mask[1])

        # 단어 임베딩 및 위치 정보 추가
        src_embed = self.enc_embed(src)
        tgt_embed = self.dec_embed(tgt)
        src_pos = self.pos_enc(src)
        tgt_pos = self.pos_enc(tgt)
        src_embed = src_embed + src_pos
        tgt_embed = tgt_embed + tgt_pos

        # 인코더 연결
        # print('인코더 입력 모양', src_embed.shape)
        enc_out = src_embed
        for layer in self.enc_layers:
            enc_out = layer(enc_out, src_mask)
        # print('인코더 출력 모양', enc_out.shape)

        # 디코더 연결
        dec_out = tgt_embed
        for layer in self.dec_layers:
            dec_out = layer(dec_out, enc_out, src_mask, tgt_mask)

        # 최종 출력
        out = self.linear(dec_out)
        out = self.softmax(out)
        out = torch.argmax(out, dim=-1)
        print("최종 출력 모양", out.shape)
        print("최종 출력 내용", out)
        return out

In [30]:
%pip install prettytable

Note: you may need to restart the kernel to use updated packages.


In [31]:
# 10번 블럭
# 가중치 수 출력하기

from prettytable import PrettyTable


def count_parameters(model):
    table = PrettyTable(["Modules", "Parameters"])
    total_params = 0
    for name, parameter in model.named_parameters():
        if not parameter.requires_grad:
            continue
        params = parameter.numel()
        table.add_row([name, params])
        total_params += params
    print(table)
    print(f"전체 모델 가중치 수: {total_params}")
    return total_params

In [32]:
# 11번 블럭
# 트랜스포머 학습

model = My_Transformer(d_model, num_heads, num_layers, d_ff, max_seq)
count_parameters(model)
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"전체 모델 가중치 수: {total_params}")
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

# 학습 시작
begin = time()
print("학습 시작")
for epoch in range(tot_epoch):
    print("epoch", epoch, "시작")
    for batch in range(batch_size):
        output = model(src_data, tgt_data)
        pred = output.contiguous().view(-1).float().requires_grad_()
        truth = tgt_data.contiguous().view(-1).float()

        loss = criterion(pred, truth)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print("    batch", batch, "done.")
print("학습 종료")
end = time()
print("걸린 시간", end - begin)

+----------------------------------+------------+
|             Modules              | Parameters |
+----------------------------------+------------+
|         enc_embed.weight         |  1280000   |
|         dec_embed.weight         |  1536000   |
|   enc_layers.0.mha.W_Q.weight    |   262144   |
|    enc_layers.0.mha.W_Q.bias     |    512     |
|   enc_layers.0.mha.W_K.weight    |   262144   |
|    enc_layers.0.mha.W_K.bias     |    512     |
|   enc_layers.0.mha.W_V.weight    |   262144   |
|    enc_layers.0.mha.W_V.bias     |    512     |
| enc_layers.0.ffn.linear_1.weight |  1048576   |
|  enc_layers.0.ffn.linear_1.bias  |    2048    |
| enc_layers.0.ffn.linear_2.weight |  1048576   |
|  enc_layers.0.ffn.linear_2.bias  |    512     |
|  enc_layers.0.layer_norm.weight  |    512     |
|   enc_layers.0.layer_norm.bias   |    512     |
|   enc_layers.1.mha.W_Q.weight    |   262144   |
|    enc_layers.1.mha.W_Q.bias     |    512     |
|   enc_layers.1.mha.W_K.weight    |   262144   |


In [33]:
import torch
import torch.nn as nn
import torch.optim as optim
import math
from time import time
import numpy as np

# 2번 블록 - 하이퍼파라미터 정의
src_vocab = 2500  # 입력 어휘 사전 크기 (입력 토큰 종류 수)
tgt_vocab = 3000  # 출력 어휘 사전 크기 (출력 토큰 종류 수)
d_model = 512  # 단어 임베딩 차원 수 (각 단어를 512차원 벡터로 표현)
num_heads = 8  # 멀티헤드 어텐션에서의 헤드 수 (각기 다른 8개의 시선으로 어텐션 수행)
d_ff = 2048  # Feed-Forward Network의 은닉층 차원 수
num_layers = 6  # 인코더와 디코더의 층 수 (각각 6층)
max_seq = 100  # 문장의 최대 길이 (한 문장에 들어갈 수 있는 최대 토큰 수)
tot_epoch = 4  # 학습 epoch 수 (전체 데이터를 4회 반복 학습)
batch_size = 5  # 배치 크기 (한 번에 처리할 문장 수 = 5개)

# 3번 블록 - 임의 데이터 생성
src_data = torch.randint(
    0, src_vocab, (batch_size, max_seq)
)  # [5, 100] 형태의 입력 시퀀스
# 값은 0~2499 범위의 정수. 각 값은 단어를 나타냄.
tgt_data = torch.randint(
    0, tgt_vocab, (batch_size, max_seq)
)  # [5, 100] 형태의 출력 시퀀스


# 4번 블록 - 멀티헤드 어텐션
class My_MHA(nn.Module):
    def __init__(self, d_model, num_heads):
        super(My_MHA, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_head = d_model // num_heads  # 각 헤드가 처리할 차원 수: 512 // 8 = 64

        # Q, K, V 생성을 위한 선형 변환 레이어. 각기 다른 파라미터를 사용하여 독립적으로 학습됨
        self.W_Q = nn.Linear(
            d_model, d_model
        )  # 입력 벡터를 Query 벡터로 변환 (512→512)
        self.W_K = nn.Linear(d_model, d_model)  # 입력 벡터를 Key 벡터로 변환
        self.W_V = nn.Linear(d_model, d_model)  # 입력 벡터를 Value 벡터로 변환

    def split_heads(self, x):
        batch_size = x.size(0)  # 배치 크기 추출
        # [B, T, 512] → [B, T, 8, 64]로 변형 후
        # transpose로 [B, 8, T, 64]로 바꿔 각 헤드에 대해 독립 연산 가능하게 함
        x = x.view(batch_size, -1, self.num_heads, self.d_head)
        return x.transpose(1, 2)

    def dot_prod(self, Q, K, V, mask):
        # 어텐션 스코어 계산: [Q]·[K]^T / sqrt(d_head)
        score = torch.matmul(Q, K.transpose(-1, -2)) / math.sqrt(self.d_head)
        if mask is not None:
            score = score.masked_fill(
                mask == 0, -1e9
            )  # 마스크가 0인 곳은 매우 작은 값으로
        prob = torch.softmax(score, dim=-1)  # 어텐션 확률 분포
        return torch.matmul(prob, V)  # 가중합 결과

    def combine_heads(self, x):
        batch_size = x.size(0)
        # [B, 8, T, 64] → [B, T, 8, 64] → [B, T, 512]
        x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        return x

    def forward(self, q, k, v, mask=None):
        # Q, K, V 선형변환 및 헤드 분리
        Q = self.split_heads(self.W_Q(q))
        K = self.split_heads(self.W_K(k))
        V = self.split_heads(self.W_V(v))
        # 어텐션 적용 및 헤드 결합
        attn = self.dot_prod(Q, K, V, mask)
        return self.combine_heads(attn)


# 5번 블록 - Feed Forward
class My_FFN(nn.Module):
    def __init__(self, d_model, d_ff):
        super(My_FFN, self).__init__()
        self.linear_1 = nn.Linear(d_model, d_ff)  # 512 → 2048
        self.relu = nn.ReLU()  # 비선형성 부여
        self.linear_2 = nn.Linear(d_ff, d_model)  # 2048 → 512

    def forward(self, x):
        return self.linear_2(self.relu(self.linear_1(x)))


# 6번 블록 - 인코더 블록
class My_Encoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super(My_Encoder, self).__init__()
        self.mha = My_MHA(d_model, num_heads)
        self.ffn = My_FFN(d_model, d_ff)
        self.layer_norm = nn.LayerNorm(d_model)  # 정규화

    def forward(self, x, mask):
        z = self.mha(x, x, x, mask)  # 셀프 어텐션
        z = self.layer_norm(x + z)  # 잔차 연결 + 정규화
        w = self.ffn(z)
        return self.layer_norm(w + z)  # 잔차 연결 + 정규화


# 7번 블록 - 위치 인코딩
class My_Position(nn.Module):
    def __init__(self, d_model, max_seq):
        super(My_Position, self).__init__()
        self.d_model = d_model
        self.max_seq = max_seq

    def pos_enc(self, x):
        k = torch.arange(0, self.max_seq, 1).float().unsqueeze(1)  # 위치 인덱스
        result = torch.zeros(self.max_seq, self.d_model)
        twoi = torch.arange(0, self.d_model, 2).float()  # 짝수 차원 인덱스
        result[:, 0::2] = torch.sin(k / (10000 ** (twoi / self.d_model)))
        result[:, 1::2] = torch.cos(k / (10000 ** (twoi / self.d_model)))
        return result.unsqueeze(0)  # [1, max_seq, d_model]

    def forward(self, x):
        return self.pos_enc(x)


# 8번 블록 - 디코더 블록
class My_Decoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super(My_Decoder, self).__init__()
        self.mha_1 = My_MHA(d_model, num_heads)  # 디코더 셀프 어텐션
        self.mha_2 = My_MHA(d_model, num_heads)  # 인코더-디코더 어텐션
        self.ffn = My_FFN(d_model, d_ff)
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, x, enc_out, src_mask, tgt_mask):
        z = self.mha_1(x, x, x, tgt_mask)  # 자기 자신에 대한 셀프 어텐션
        z = self.layer_norm(x + z)
        y = self.mha_2(z, enc_out, enc_out, src_mask)  # 인코더 출력에 대한 어텐션
        y = self.layer_norm(z + y)
        w = self.ffn(y)
        return self.layer_norm(w + y)


# 9번 블록 - 전체 트랜스포머
class My_Transformer(nn.Module):
    def __init__(self, d_model, num_heads, num_layers, d_ff, max_seq):
        super(My_Transformer, self).__init__()
        self.enc_embed = nn.Embedding(src_vocab, d_model)
        self.dec_embed = nn.Embedding(tgt_vocab, d_model)
        self.pos_enc = My_Position(d_model, max_seq)
        self.enc_layers = nn.ModuleList(
            [My_Encoder(d_model, num_heads, d_ff) for _ in range(num_layers)]
        )
        self.dec_layers = nn.ModuleList(
            [My_Decoder(d_model, num_heads, d_ff) for _ in range(num_layers)]
        )
        self.linear = nn.Linear(d_model, tgt_vocab)
        self.softmax = nn.Softmax(dim=-1)

    def make_mask(self, src, tgt):
        src_mask = None  # 현재 구현에서는 src_mask 생략
        tgt_mask = tgt.unsqueeze(1).unsqueeze(3)
        tmp = torch.ones(1, max_seq, max_seq)
        mask = torch.tril(tmp).bool()  # 하삼각 마스킹 (미래 정보 차단)
        return src_mask, tgt_mask * mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.make_mask(src, tgt)
        src_embed = self.enc_embed(src) + self.pos_enc(src)
        tgt_embed = self.dec_embed(tgt) + self.pos_enc(tgt)
        enc_out = src_embed
        for layer in self.enc_layers:
            enc_out = layer(enc_out, src_mask)
        dec_out = tgt_embed
        for layer in self.dec_layers:
            dec_out = layer(dec_out, enc_out, src_mask, tgt_mask)
        out = self.linear(dec_out)
        out = self.softmax(out)
        return torch.argmax(out, dim=-1)


# 10번 블록 - 파라미터 수 계산
from prettytable import PrettyTable


def count_parameters(model):
    table = PrettyTable(["Modules", "Parameters"])
    total_params = 0
    for name, parameter in model.named_parameters():
        if not parameter.requires_grad:
            continue
        params = parameter.numel()
        table.add_row([name, params])
        total_params += params
    print(table)
    print(f"전체 모델 가중치 수: {total_params}")
    return total_params


# 11번 블록 - 트랜스포머 학습 루프
model = My_Transformer(d_model, num_heads, num_layers, d_ff, max_seq)
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"전체 모델 가중치 수: {total_params}")

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

begin = time()
print("학습 시작")
for epoch in range(tot_epoch):
    print("epoch", epoch, "시작")
    for batch in range(batch_size):
        output = model(src_data, tgt_data)  # [5, 100]
        pred = output.contiguous().view(-1).float().requires_grad_()
        truth = tgt_data.contiguous().view(-1).float()

        loss = criterion(pred, truth)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print("    batch", batch, "done.")
print("학습 종료")
end = time()
print("걸린 시간", end - begin)

전체 모델 가중치 수: 43747256
학습 시작
epoch 0 시작
    batch 0 done.
    batch 1 done.
    batch 2 done.
    batch 3 done.
    batch 4 done.
epoch 1 시작
    batch 0 done.
    batch 1 done.
    batch 2 done.
    batch 3 done.
    batch 4 done.
epoch 2 시작
    batch 0 done.
    batch 1 done.
    batch 2 done.
    batch 3 done.
    batch 4 done.
epoch 3 시작
    batch 0 done.
    batch 1 done.
    batch 2 done.
    batch 3 done.
    batch 4 done.
학습 종료
걸린 시간 3.978344202041626


In [34]:
import torch
import torch.nn as nn
import torch.optim as optim
import math
from time import time
import numpy as np

# 2번 블록 - 하이퍼파라미터 정의
src_vocab = 2500  # 입력 어휘 사전 크기 (입력 토큰 종류 수)
tgt_vocab = 3000  # 출력 어휘 사전 크기 (출력 토큰 종류 수)
d_model = 512  # 단어 임베딩 차원 수 (각 단어를 512차원 벡터로 표현)
num_heads = 8  # 멀티헤드 어텐션에서의 헤드 수 (각기 다른 8개의 시선으로 어텐션 수행)
d_ff = 2048  # Feed-Forward Network의 은닉층 차원 수
num_layers = 6  # 인코더와 디코더의 층 수 (각각 6층)
max_seq = 100  # 문장의 최대 길이 (한 문장에 들어갈 수 있는 최대 토큰 수)
tot_epoch = 4  # 학습 epoch 수 (전체 데이터를 4회 반복 학습)
batch_size = 5  # 배치 크기 (한 번에 처리할 문장 수 = 5개)

# 3번 블록 - 임의 데이터 생성
src_data = torch.randint(
    0, src_vocab, (batch_size, max_seq)
)  # [5, 100] 형태의 입력 시퀀스
# 값은 0~2499 범위의 정수. 각 값은 단어를 나타냄.
tgt_data = torch.randint(
    0, tgt_vocab, (batch_size, max_seq)
)  # [5, 100] 형태의 출력 시퀀스


# 4번 블록 - 멀티헤드 어텐션
class My_MHA(nn.Module):
    def __init__(self, d_model, num_heads):
        super(My_MHA, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_head = d_model // num_heads  # 각 헤드가 처리할 차원 수: 512 // 8 = 64

        # Q, K, V 생성을 위한 선형 변환 레이어. 각기 다른 파라미터를 사용하여 독립적으로 학습됨
        self.W_Q = nn.Linear(
            d_model, d_model
        )  # 입력 벡터를 Query 벡터로 변환 (512→512)
        self.W_K = nn.Linear(d_model, d_model)  # 입력 벡터를 Key 벡터로 변환
        self.W_V = nn.Linear(d_model, d_model)  # 입력 벡터를 Value 벡터로 변환

    def split_heads(self, x):
        batch_size = x.size(0)  # 배치 크기 추출
        # [B, T, 512] → [B, T, 8, 64]로 변형 후
        # transpose로 [B, 8, T, 64]로 바꿔 각 헤드에 대해 독립 연산 가능하게 함
        x = x.view(batch_size, -1, self.num_heads, self.d_head)
        return x.transpose(1, 2)  # 헤드 축 앞으로 이동

    def dot_prod(self, Q, K, V, mask):
        # 어텐션 스코어 계산: [Q]·[K]^T / sqrt(d_head)
        score = torch.matmul(Q, K.transpose(-1, -2)) / math.sqrt(self.d_head)
        if mask is not None:
            score = score.masked_fill(
                mask == 0, -1e9
            )  # 마스크가 0인 곳은 매우 작은 값으로
        prob = torch.softmax(score, dim=-1)  # 어텐션 확률 분포
        return torch.matmul(prob, V)  # 가중합 결과: [B, H, T, D]

    def combine_heads(self, x):
        batch_size = x.size(0)
        # [B, 8, T, 64] → [B, T, 8, 64]로 transpose → view로 다시 합침
        x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        return x  # [B, T, 512]

    def forward(self, q, k, v, mask=None):
        Q = self.split_heads(self.W_Q(q))  # [B, T, 512] → [B, 8, T, 64]
        K = self.split_heads(self.W_K(k))
        V = self.split_heads(self.W_V(v))
        attn = self.dot_prod(Q, K, V, mask)  # 어텐션 연산
        return self.combine_heads(attn)  # 다시 [B, T, 512]로 병합


# 5번 블록 - Feed Forward
class My_FFN(nn.Module):
    def __init__(self, d_model, d_ff):
        super(My_FFN, self).__init__()
        self.linear_1 = nn.Linear(d_model, d_ff)  # 512 → 2048
        self.relu = nn.ReLU()  # 비선형성 부여
        self.linear_2 = nn.Linear(d_ff, d_model)  # 2048 → 512

    def forward(self, x):
        return self.linear_2(self.relu(self.linear_1(x)))  # [B, T, 512] 유지


# 6번 블록 - 인코더 블록
class My_Encoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super(My_Encoder, self).__init__()
        self.mha = My_MHA(d_model, num_heads)
        self.ffn = My_FFN(d_model, d_ff)
        self.layer_norm = nn.LayerNorm(d_model)  # 정규화 레이어

    def forward(self, x, mask):
        z = self.mha(x, x, x, mask)  # 셀프 어텐션
        z = self.layer_norm(x + z)  # 잔차 연결 + 정규화
        w = self.ffn(z)  # FFN 통과
        return self.layer_norm(w + z)  # 잔차 연결 + 정규화


# 7번 블록 - 위치 인코딩
class My_Position(nn.Module):
    def __init__(self, d_model, max_seq):
        super(My_Position, self).__init__()
        self.d_model = d_model
        self.max_seq = max_seq

    def pos_enc(self, x):
        k = torch.arange(0, self.max_seq, 1).float().unsqueeze(1)  # 위치 인덱스
        result = torch.zeros(self.max_seq, self.d_model)
        twoi = torch.arange(0, self.d_model, 2).float()
        result[:, 0::2] = torch.sin(k / (10000 ** (twoi / self.d_model)))
        result[:, 1::2] = torch.cos(k / (10000 ** (twoi / self.d_model)))
        return result.unsqueeze(0)  # [1, max_seq, d_model]

    def forward(self, x):
        return self.pos_enc(x)


# 8번 블록 - 디코더 블록
class My_Decoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super(My_Decoder, self).__init__()
        self.mha_1 = My_MHA(d_model, num_heads)  # 디코더 셀프 어텐션
        self.mha_2 = My_MHA(d_model, num_heads)  # 인코더-디코더 어텐션
        self.ffn = My_FFN(d_model, d_ff)
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, x, enc_out, src_mask, tgt_mask):
        z = self.mha_1(x, x, x, tgt_mask)  # 자기 자신에 대한 어텐션
        z = self.layer_norm(x + z)
        y = self.mha_2(z, enc_out, enc_out, src_mask)  # 인코더에 대한 어텐션
        y = self.layer_norm(z + y)
        w = self.ffn(y)
        return self.layer_norm(w + y)


# 9번 블록 - 전체 트랜스포머
class My_Transformer(nn.Module):
    def __init__(self, d_model, num_heads, num_layers, d_ff, max_seq):
        super(My_Transformer, self).__init__()
        self.enc_embed = nn.Embedding(src_vocab, d_model)
        self.dec_embed = nn.Embedding(tgt_vocab, d_model)
        self.pos_enc = My_Position(d_model, max_seq)
        self.enc_layers = nn.ModuleList(
            [My_Encoder(d_model, num_heads, d_ff) for _ in range(num_layers)]
        )
        self.dec_layers = nn.ModuleList(
            [My_Decoder(d_model, num_heads, d_ff) for _ in range(num_layers)]
        )
        self.linear = nn.Linear(d_model, tgt_vocab)
        self.softmax = nn.Softmax(dim=-1)

    def make_mask(self, src, tgt):
        src_mask = None
        tgt_mask = tgt.unsqueeze(1).unsqueeze(3)  # [B, 1, T, 1]
        tmp = torch.ones(1, max_seq, max_seq)
        mask = torch.tril(tmp).bool()  # 미래를 가리지 않도록 하삼각 마스크
        return src_mask, tgt_mask * mask  # [B, T, T] 형태로 적용

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.make_mask(src, tgt)
        src_embed = self.enc_embed(src) + self.pos_enc(src)  # [B, T, 512]
        tgt_embed = self.dec_embed(tgt) + self.pos_enc(tgt)

        enc_out = src_embed
        for layer in self.enc_layers:
            enc_out = layer(enc_out, src_mask)

        dec_out = tgt_embed
        for layer in self.dec_layers:
            dec_out = layer(dec_out, enc_out, src_mask, tgt_mask)

        out = self.linear(dec_out)  # [B, T, 3000]
        out = self.softmax(out)
        return torch.argmax(out, dim=-1)  # 예측 결과: [B, T]


# 10번 블록 - 파라미터 수 계산
from prettytable import PrettyTable


def count_parameters(model):
    table = PrettyTable(["Modules", "Parameters"])
    total_params = 0
    for name, parameter in model.named_parameters():
        if not parameter.requires_grad:
            continue
        params = parameter.numel()
        table.add_row([name, params])
        total_params += params
    print(table)
    print(f"전체 모델 가중치 수: {total_params}")
    return total_params


# 11번 블록 - 트랜스포머 학습 루프
model = My_Transformer(d_model, num_heads, num_layers, d_ff, max_seq)
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"전체 모델 가중치 수: {total_params}")

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

begin = time()
print("학습 시작")
for epoch in range(tot_epoch):
    print("epoch", epoch, "시작")
    for batch in range(batch_size):
        output = model(src_data, tgt_data)  # [5, 100]
        pred = (
            output.contiguous().view(-1).float().requires_grad_()
        )  # 예측 결과를 flat하게
        truth = tgt_data.contiguous().view(-1).float()  # 정답도 같은 형태로

        loss = criterion(pred, truth)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print("    batch", batch, "done.")
print("학습 종료")
end = time()
print("걸린 시간", end - begin)

전체 모델 가중치 수: 43747256
학습 시작
epoch 0 시작
    batch 0 done.
    batch 1 done.
    batch 2 done.
    batch 3 done.
    batch 4 done.
epoch 1 시작
    batch 0 done.
    batch 1 done.
    batch 2 done.
    batch 3 done.
    batch 4 done.
epoch 2 시작
    batch 0 done.
    batch 1 done.
    batch 2 done.
    batch 3 done.
    batch 4 done.
epoch 3 시작
    batch 0 done.
    batch 1 done.
    batch 2 done.
    batch 3 done.
    batch 4 done.
학습 종료
걸린 시간 3.876668930053711
