### Transformer

In [1]:
# 패키지 수입
import torch
import torch.nn as nn
import torch.optim as optim
import math
from time import time
import numpy as np
import tqdm as tqdm

In [2]:
# 하이퍼 파라미터 지정
src_vocab = 2500 # 입력 문장에 사용하는 단어 종류
tgt_vocab = 3000 # 출력 문장에 사용하는 단어 종류
d_model = 512 # 단어 임베딩 차원수
num_heads = 8 # multi-head attention 계산할 머리 수
num_layers = 6 # 인코더/디코더 적층 수

d_ff = 2048 # feed through 층의 뉴런 갯수
max_seq = 100 # 입력 문장의 최대 길이
tot_epoch = 4 # 반복 학습 수
batch_size = 5 # 배치 크기

In [3]:
# 데이터 임의 생성

# 입력 데이터
src_data = torch.randint(0, src_vocab, (batch_size, max_seq))
print(f'입력 데이터 모양 {src_data.shape}')
print(src_data[0])

# 출력 데이터
tgt_data = torch.randint(0, tgt_vocab, (batch_size, max_seq))

입력 데이터 모양 torch.Size([5, 100])
tensor([2012, 1285, 1342,  559, 1166, 1495, 2398, 2420,  964,  713,   94,   88,
        1815, 2471, 1657, 2102, 1635, 1801, 1483, 2324, 1367, 2461, 1227,  684,
        2366,  777, 1187,   39,  370, 1642, 2139, 2250, 1429, 1577,  100,  773,
        1597,  102, 2499,   69, 1866, 2137, 2163,   95,  455, 2012,  959, 2060,
        1290, 1616, 1065,  887, 1457, 1734, 2046, 2228,  543, 1724, 1349,  470,
         700, 2435, 1714, 1646, 2386, 2198, 1453, 1572, 2492, 1733, 2460,  449,
        2191, 1339, 1511, 1299, 1203, 2111,  986, 2149,  385,  704, 1631, 1847,
        1494, 2472, 1533, 1466, 1586, 1727,  895, 2446, 2008,  953, 2405, 2296,
        2102, 1887, 1491,  618])


In [4]:
# Multi-head attention class 구현

class My_MHA(nn.Module):
    def __init__(self, d_model, num_heads):
        super(My_MHA, self).__init__()
        self.d_model = d_model # 512 차원
        self.num_heads = num_heads # 헤드 8개
        
        # 각 머리 당 처리할 차원 수
        self.d_head = d_model // num_heads # 512/8 = 64(헤드당 64개 차원 처리)
        # print(f'각 머리 당 처리할 차원 수 {self.d_head}')
        
        # Q, K, V 행렬 준비(W_Q, W_K, W_V에 학습 가능 파라미터 262, 656개 모델 생성)
        self.W_Q = nn.Linear(d_model, d_model) # 512 입력 512 출력(512*512+512=262,656)
        self.W_K = nn.Linear(d_model, d_model)
        self.W_V = nn.Linear(d_model, d_model)
        
        # 머리 나누기
        # 입력 데이터 모양 : [5, 100, 512]
        # 출력 데이터 모양 : [5, 8, 100, 64]
        def split_heads(self, x):
            batch_size = x.size(0)
            # PyTorch 텐서의 첫 번째 차원을 읽어서 현재 입력 배치의
            # 크기를 추출, 입력 x의 실제 배치 크기에 맞춰 자동 적용
            # (x:[B, T, d_model] = [5, 100, 512])
            # x = x.view(batch_size, -1, self.num_heads, self.d_head)
            # transpose(dim1, dim2)
            # 텐서에서 **두 개의 차원(axis)**을 서로 교환
            # num_heads를 앞으로 재위치하여 head별 연산을 준비
            # 데이터 모양 : [5, 100, 8, 64]
            x = x.view(batch_size, -1, self.num_heads, self.d_head)
            x = x.transpose(1,2) # 헤드별로 처리하기 위한 교환
            return x
        
        # 유사성 게산
        def dot_prod(self, Q, K, V, mask):
            score = torch.matmul(Q, K.transpose(-1, 2)) / math.sqrt(self.d_head)
            # 어텐션 스코어(attention score)**를 계산
            # K.transpose(-1, -2): Key의 전치
            # K: Shape [batch, heads, seq_len, d_head]
            # K.transpose(-1, -2) → [batch, heads, d_head, seq_len]
            # Q와 K의 내적이 가능하도록 마지막 두 차원 바꿈
            # torch.matmul(Q, Kᵀ)결과 Shape: [batch, heads, seq_len, seq_len]
            # 각 위치 간의 연관성 (attention score)을 계산
            # math.sqrt(self.d_head)스케일링 (정규화):
            # d_head가 클수록 값이 커져 softmax의 gradient가 작아지는
            # 문제를 완화하기 위함, 따라서 √64 = 8로 나누어 스케일 조정
            
            # 디코더 마스크 처리
            if mask is not None:
                score = score.masked_fill(mask == 0, -1e9)
            prob = torch.softmax(score, dim=-1)
            
            # V 행렬과 곱셈
            Z = torch.matmul(prob, V)
            return Z
        
        # 머리 합치기
        # 입력 데이터 모양 : [5, 8, 100, 64]
        # 출력 데이터 모양 : [5, 100, 512]
        def combine_heads(self, x):
            batch_size = x.size(0)
            # 데이터 모양 : [5, 100, 8, 64]
            Z = x.transpose(1,2)
            
            # 데이터 모양 : [5, 100, 512], 입력과 동일한 출력형태
            Z = Z.contiguous().view(batch_size, -1, self.d_model)
            return Z
        
        # 앞서 정의한 함수로 forward를 정의
        def forward(self, q, k, v, mask=None):
            # 머리 나누기 실행/생성된 모델에 임베딩된 토큰(문자열 q,k,v)입력
            Q = self.split_heads(self.W_Q(q))
            K = self.split_heads(self.W_K(k))
            V = self.split_heads(self.W_V(v))
            
            # 유사성 계산 실행
            attn = self.dot_prod(Q, K, V, mask)
            
            # 머리 합치기 실행
            Z = self.combine_heads(attn)
            
            return Z

In [5]:
# feed forward class 구현

class My_FFN(nn.Module):
    def __init__(self, d_model, d_ff):
        super(My_FFN, self).__init__()
        
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.linear_2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()
        
    def forward(self, x):
        x = self.linear_1(x)
        x = self.relu(x)
        x = self.linear_2(x)
        return x

In [6]:
# 자연어 트랜스포머 인코더 구현

class My_Encoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super(My_Encoder, self).__init__()
        
        self.mha = My_MHA(d_model, num_heads) # 멀티헤드 셀프 어텐션 -> 입력 토큰끼리 상호 작용
        self.ffn = My_FFN(d_model, d_ff) # 포지션별 피드포워드 신경망 -> 비선형 변환
        self.layer_norm = nn.LayerNorm(d_model) # 레이어 정규화 -> 학습 안정성, 기울기 흐름 개선
        
    def forward(self, x, mask):
        z = self.mha(x,x,x,mask)
        # self.mha(x, x, x)?
        # "Query = Key = Value = 동일한 입력 시퀀스 x"라는 뜻
        # 트랜스포머 구조에서 통상적으로 사용하는 표현 방식으로
        # 형태는 같으나 궁극적으로 서로 달라지는 q,k v
        z = self.layer_norm(x + z) # 잔차 연결
        w = self.ffn(z)
        z = self.layer_norm(w + z) # 레이어 정규화, 잔차 연결
        return z

In [7]:
# 위치 인코딩 구현

class My_Position(nn.Module):
    def __init__(self, d_model, max_seq):
        super(My_Position, self).__init__()
        
        self.d_model = d_model # 512차원
        self.max_seq = max_seq # 입력 문장 최대 길이 100
        
    # 위치 임베딩 계산
    def pos_enc(self, x):
        k = torch.arange(0, self.max_seq, 1).float() # 0부터 100까지의 실수
        print(f'전 {k.shape}') # [100]
        k = k.unsqueeze(1)
        print(f'후 {k.shape}') # [100, 1] 2차원 텐서로 변형
        result = torch.zeros(self.max_seq, self.d_model)
        print(f'결과 {result.shape}') # [100, 512] 텐서 공간에 0으로 세팅, 포지셔널 벡터용 공간
        twoi = torch.arange(0, self.d_model, 2).float() # 512 차원의 짝수 인덱스 생성
        print(twoi)
        
        result[:, 0::2] = torch.sin(k / (10000**(twoi / self.d_model))) # k는 입력 토큰
        result[:, 1::2] = torch.cos(k / (10000**(twoi / self.d_model)))
        result = result.unsqueeze(0)
        print(f'최종 모양 {result.shape}')
        return result
    
    def forward(self, x):
        pos = self.pos_enc(x)
        return pos

In [8]:
# 디코더 구현

class My_Decoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super(My_Decoder, self).__init__()
        
        self.mha_1 = My_MHA(d_model, num_heads)
        self.mha_2 = My_MHA(d_model, num_heads)
        self.ffn = My_FFN(d_model, d_ff)
        self.layer_norm = nn.LayerNorm(d_model)
        
    def forward(self, x, enc_out, src_mask, tgt_mask):
        print(f'디코더 입력 데이터 모양 {x.shape}')
        
        # 디코더 self attention 부분
        z = self.mha_1(x,x,x, tgt_mask)
        z = self.layer_norm(x + z)
        print(f'self attention 후 데이터 모양 {x.shape}')
        
        y = self.mha_2(z, enc_out, enc_out, src_mask)
        y = self.layer_norm(z + y)
        
        # feed forward 부분
        w = self.ffn(y)
        z = self.layer_norm(w + y)
        print(f'feed forward 후 데이터 모양 {z.shape}')
        return z

In [9]:
# 전체 자연어 트랜스포머 구성

class My_Transformer(nn.Module):
    def __init__(self, d_model, num_heads, num_layers, d_ff, max_seq):
        super(My_Transformer, self).__init__()
        
        self.enc_embed = nn.Embedding(src_vocab, d_model) # 2500개가 512차원으로 임베딩
        self.dec_embed = nn.Embedding(tgt_vocab, d_model) # 3000개가 512차원으로 임베딩
        self.pos_enc = My_Position(d_model, max_seq)
        
        # 인코더 쌓기
        self.enc_layers = nn.ModuleList(
            [My_Encoder(d_model, num_heads, d_ff)
             for _ in range(num_layers)]
        )
        
        # 디코더 쌓기
        self.dec_layers = nn.ModuleList(
            [My_Decoder(d_model, num_heads, d_ff)
             for _ in range(num_layers)]
        )
        
        # 최종 출력층
        self.linear = nn.Linear(d_model, tgt_vocab)
        self.softmax = nn.Softmax(dim=-1)
        
        # 디코더 마스크 제작
        def make_mask(self, src, tgt):
            src_mask = None
            tgt_mask = tgt.unsqueeze(1).unsqueeze(3)
            tmp = torch.ones(1, max_seq, max_seq)
            mask = torch.tril(tmp).bool()
            print(f'틀 {mask}')
            
            tgt_mask = tgt_mask * mask
            print(f'마스크 모양', tgt_mask.shape)
            print(f'마스크 결과', tgt_mask)
            return src_mask, tgt_mask
        
        def forward(self, src, tgt):
            # 마스크 만들기
            src_mask, tgt_mask = self.make_mask(src, tgt)
            print(f'마스크 모양 {tgt_mask.shape}')
            print(tgt_mask[1])
            
            # 단어 임베딩 및 위치 정보 추가
            src_embed = self.enc_embed(src)
            tgt_embed = self.dec_embed(tgt)
            src_pos = self.pos_enc(src)
            tgt_pos = self.pos_enc(tgt)
            src_embed = src_embed + src_pos
            tgt_embed = tgt_embed + tgt_pos
            
            # 인코더 연결
            enc_out = src_embed
            for layer in self.enc_layer:
                enc_out = layer(enc_out, src_mask)
            
            # 디코더 연결
            dec_out = tgt_embed
            for layer in self.dec_layers:
                dec_out = layer(dec_out, enc_out, src_mask, tgt_mask)
            
            # 최종 출력
            out = self.linear(dec_out)
            out = self.softmax(out)
            out = torch.argmax(out, dim=-1)
            print(f'최종 출력 모양 {out.shape}')
            return out
        
        # 테스트 코드
        temp = My_Transformer(d_model, num_heads, num_layers, d_ff, max_seq)
        z = temp(src_data, tgt_data)

In [None]:
# 가중치 수 출력하기
from prettytable import PrettyTable

def count_parameters(model):
    table = PrettyTable(["Moudles", "Parameters"])
    total_params = 0
    for name, parameter in model.named_parameters():
        if not parameter.requires_grad:
            continue
        params = parameter.numel()
        table.add_row([name, params])
        total_params += params
    print(table)
    print(f'전체 모델 가중치 수 : {total_params}')
    return total_params

: 

In [None]:
# 트랜스포머 학습

model = My_Transformer(d_model, num_heads, num_layers, d_ff, max_seq)
# count_parameter(model)
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'전체 모델 가중치수 : {total_params}')
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

# 학습 시작
begin = time()
print('학습 시작')
for epoch in range(tot_epoch):
    print('epoch', epoch, '시작')
    for batch in range(batch_size):
        output = model(src_data, tgt_data)
        pred = output.view(-1, tgt_vocab)
        truth = tgt_data.view(-1).long()
        
        loss = criterion(pred, truth)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(' batch', batch, 'done.')
print('학습 종료')
end=time()
print('걸린 시간', end-begin)