In [None]:
import torch
import torch.nn as nn

In [None]:
batch_size = 2  # 문장2개
str_len =4  #문장 길이 4
vocab_size = 100 # 전체 단어 개수
d_model = 512 # 임베딩 벡터 차원

## Attention Layer의 가장 큰 특징은 바로 문장의 길이를 가변적으로 가져갈 수 있다는 것인데, 왜 str_len을 고정으로 두는 걸까?
-> GPU 내에서 배치 단위로 묶어서 병렬 연산을 하고, 추후 위치 인코딩을 위해서 우선은 문장의 길이를 가변적으로 가져간다!

In [None]:
# "I love AI <pad>" -> [1,2,3,0]
# "Hello world <pad> <pad>" -> [4.5.0.0]

In [None]:
input_data = torch.LongTensor([
    [1,2,3,0],
    [4,5,0,0]
])

In [None]:
print(f"Input Shape: {input_data.shape}") #(Batch_Size, Seq_Len)

Input Shape: torch.Size([2, 4])


In [None]:
embedding_layer = nn.Embedding(
    num_embeddings = vocab_size,
    embedding_dim = d_model
    )

In [None]:
embedded_data = embedding_layer(input_data)

In [None]:
print(f"Embedded Shape: {embedded_data.shape}")

Embedded Shape: torch.Size([2, 4, 512])


[문장 개수,문장 길이]
->[문장 개수, 문장 길이, 512]

#Position Encoding

In [None]:
import math

class PositionalEncoding(nn.Module):
  def __init__(self, d_model = 512, max_len = str_len, dropout = 0.1):
    '''
    d_model: 임베딩 차원(기본값 512),
    max_len: 모델이 받아들일 수 있는 최대 문장 길이
    dropout: 과적합 방지
    '''
    super().__init__()
    self.dropout = nn.Dropout(p=dropout)

    pe = torch.zeros(max_len, d_model) #[str_len, 512]

    position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) #위치 인덱스 생성

    # 주기를 결정하는 분모 계산(10000^(2i/d_model))
    div_term = torch.exp(0,d_model,2).float() * (-math.log(10000.0)/d_model)

    #짝수 인덱스는 Sin
    pe[:,0::2] = torch.sin(position * div_term)

    #홀수 인덱스는 Cos
    pe[:,1::2] = torch.cos(position * div_term)

    #차원 맞추기
    '''
    (batch_size,str_len,d_model)과 더하기 위해 pe의 차원을
    (1.str_len,d_model)로 바꾼다.
    '''
    pe = pe.unsqueeze(0)
    self.register_buffer('pe',pe)

    def forward(self, x):
      x = x + self.pe[:,:x.size(1)]

      return self.dropout(x)

In [None]:
class TransformerInput(nn.Module):
  def __init__(self, vocab_size, d_model, max_len, dropout):
    super().__init__()
    self.embedding = nn.Embedding(vocab_size, d_model)
    self.pos_encoder = PositionalEncoding(d_model, max_len)
    self.d_model = d_model

  def forward(self, x):
    x = self.embedding(x) * math.sqrt(self.d_model) #보정
    x = self.pos_encoder(x)

    return x

# 아무튼 지금까지의 차원은 [문장 개수, 문장 길이, 512]

In [None]:
class MultiHeadAttention(nn.Module):
  def __init__(self, d_model, n_head):
    super().__init__()
    self.d_model = d_model #512
    self.n_head = n_head #논문은 일단 8개.
    self.d_k = d_model // n_head #각 헤드의 차원 (정수 나누기로 변경)

    assert d_model % n_head == 0

    self.w_q = nn.Linear(d_model, d_model)
    self.w_k = nn.Linear(d_model, d_model)
    self.w_v = nn.Linear(d_model, d_model)

    self.w_o = nn.Linear(d_model, d_model)

  def forward(self,q,k,v,mask=None):
    batch_size = q.size(0)

    q = self.w_q(q)
    k = self.w_k(k)
    v = self.w_v(v)

    # [batch, seq_len , d_model] -> [batch, seq_len, n_head, d_k] -> [batch, n_head, seq_len, d_k]
    q = q.view(batch_size, -1, self.n_head, self.d_k).transpose(1,2)
    k = k.view(batch_size, -1, self.n_head, self.d_k).transpose(1,2)
    v = v.view(batch_size, -1, self.n_head, self.d_k).transpose(1,2)

    scores = torch.matmul(q, k.transpose(-2,-1)) / math.sqrt(self.d_k)

    if mask is not None:
      scores = scores.masked_fill(mask==0.-1e9)

    attn_weights = torch.softmax(scores,dim=-1)

    output = torch.matmul(attn_weights, v) #[batch, n_head, seq_len, d_k]

    output = output.transpose(1,2).contiguous().view(batch_size, -1, self.d_model)
    #[batch, seq_len, n_head, d_k] -> [batch,seq_len, d_model]

    output = self.w_o(output)

    return output, attn_weights

### Self-Attention에서의 Q, K, V 생성

`MultiHeadAttention` 클래스에서 `forward` 메서드의 `q`, `k`, `v` 인자는 Self-Attention의 경우 모두 동일한 입력(예: 인코더의 경우 입력 임베딩)을 받게 됩니다. 각 `w_q`, `w_k`, `w_v` 선형 레이어가 이 동일한 입력을 각각 Q, K, V로 변환합니다.

In [None]:
n_head = 8 # 예시로 헤드 개수를 8개로 설정
multi_head_attn = MultiHeadAttention(d_model, n_head)

# Self-Attention에서는 Q, K, V가 모두 동일한 embedded_data로부터 파생됩니다.
# 즉, embedded_data를 각각 Q, K, V의 입력으로 사용합니다.
output_attn, attn_weights = multi_head_attn(embedded_data, embedded_data, embedded_data)

print(f"Output of MultiHeadAttention (Self-Attention): {output_attn.shape}")
print(f"Attention Weights Shape: {attn_weights.shape}")

Output of MultiHeadAttention (Self-Attention): torch.Size([2, 4, 512])
Attention Weights Shape: torch.Size([2, 8, 4, 4])


#멀티 헤드 어텐션
[문장 개수, 문장 길이, d_model]
-> [문장 개수, 문장 길이, 헤드 개수, 헤드 별 차원]
#헤드 별 차원: d_model / 헤드 개수
-> [문장 개수, 헤드 개수, 문장 길이, 헤드 별 차원]
멀티 헤드 어텐션 연산 이후

[문장 개수, 문장 길이, d_model]

In [None]:
class AddNorm(nn.Module):
  def __init__(self, d_model, dropout=0.1):
        super().__init__()
        self.layer_norm = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

  def forward(self, x, sub_layer_output):

    #sub_layer_output: 어텐션이나 FFN을 통과하고 나온 결과값.
    sub_layer_output = self.dropout(sub_layer_output)

    output = x + sub_layer_output

    #정규화
    output = self.layer_norm(output)

    return output

#Batch Nom vs Layer Nom:
Batch Nom -> 모든 문장의 첫 번째 단어끼리 평균
Layer Nom -> 한 문장 혹은 벡터 안에서 평균

#Post-Ln vs Pre-LN:
Post LN: 높은 성능, 하지만 어려운 초기화
Pre LN; 안정적인 성능

#왜? 512->2048로 늘려서 ReLU 및 Dropout을 하는가?
- 영역을 넓혀서 복잡한 패턴 파악
- 이후 ReLU(비선형 함수)로 표현력을 키운다

In [None]:
class PositionwiseFeedForward(nn.Module):
  def __init__(self, d_model, d_ff, dropout = 0.1):
    super().__init__()
    self.w_1 = nn.Linear(d_model, d_ff) # 512 -> 2048
    self.w_2 = nn.Linear(d_ff, d_model) # 2048 -> 512
    self.relu = nn.ReLU()
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):

    x = self.w_1(x) #512 -> 2048
    x = self.relu(x)
    x = self.dropout(x)
    x = self.w_2(x) #2048 -> 512
    return x

In [None]:
class EncoderLayer(nn.Module):
 def __init__(self, d_model, n_head, d_ff, dropout):
    super().__init__()

    self.self_attn =  MultiHeadAttention(d_model, n_head)

    self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)

    self.sublayer1 = AddNorm(d_model, dropout) #어텐션 뒤에
    self.sublayer2 = AddNorm(d_model, dropout) #FFN 뒤에

 def forward(self, x, mask):

   attn_output, _ = self.self_attn(x,x,x,mask) #어텐션 수행, 인코더의 mask는 패딩용!
   x = self.sublayer1(x, attn_output)# Reisudal 및 Normalization

   ffn_output = self.feed_forward(x)
   x = self.sublayer2(x, ffn_output)

   return x

In [None]:
import copy

def get_clones(module, N):
  return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

In [None]:
class Encoder(nn.Module):
  def __init__(self, vocab_size, d_model, n_head, d_ff, max_len , n_layers, dropout):
    super().__init__()

    self.input_embedding = TransformerInput(vocab_size, d_model, max_len, dropout)

    self.layers = get_clones(EncoderLayer(d_model, n_head, d_ff, dropout), n_layers)

    self.norm = nn.LayerNorm(d_model)

  def forward(self, x, mask):

    x = self.input_embedding(x)

    for layer in self.layers:
      x = layer(x, mask)

    return self.norm(x)

##디코더
- 일단 임베딩에 쓰이는 Class를 재탕할 수 있다!

- 디코더의 목표는 정답 생성이므로 한칸 밀린(Shifted Right) 문장이 들어가게 된다.

In [None]:
'''
인코더에 넣을 마스크를 생성할 함수.
'''
def generate_sqaure_subsequent_mask(sz):
  '''
  sz: 문장의 길이
  '''
  mask = torch.ones(sz,sz) #  sz * sz 크기의 모든 원소가 1인 행렬.

  mask = torch.tril(mask)
  #하삼각행렬 형태로 mask를 만듦.

  return mask


In [None]:
class DecoderLayer(nn.Module):
  def __init__(self, d_model, n_head, d_ff, dropout):
    super().__init__()

    #마스크가 들어간 어텐션
    self.self_attn = MultiHeadAttention(d_model, n_head)
    #인코더의 정보를 보는 어텐션
    self.enc_dec_attn = MultiHeadAttention(d_model, n_head)

    self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)

    self.sublayer1 = AddNorm(d_model, dropout)
    self.sublayer2 = AddNorm(d_model, dropout)
    self.sublayer3 = AddNorm(d_model, dropout)

    def forward(self, tgt, memory, tgt_mask, memory_mask):
      # tgt : 디코더 입력
      # memory : 인코더 결과값(K,V가 enc_dec_attn에서 쓰임)
      # tgt_mask : 마스크
      # memory_mask : 인코더의 패딩용 마스크

      #Masked Multi- Head Attn
      tgt2, _ = self.self_attn(tgt, tgt, tgt, tgt_mask)
      tgt = self.sublayer1(tgt, tgt2)

      #Enc-Dec Attn
      tgt2, _ = self.enc_dec_attn(tgt, memory, memory, memory_mask)
      tgt = self.sublayer2(tgt, tgt2)

      # FFN
      tgt2 = self.feed_forward(tgt)
      tgt = self.sublayer3(tgt, tgt2)

      return tgt


#디코더의 인풋, 그리고 마스크.
디코더는 다음 토큰을 예측하기 위해 실제로 '한 칸 밀린' 타겟 시퀀스를 입력으로 받습니다. 그리고 이 입력의 self-attention 과정에서는 '룩-어헤드 마스크'를 적용하여 현재 시점 이후의 토큰을 참조하지 못하게 함으로써, 진정한 의미의 순차적 예측을 가능하게 합니다. 두 가지 메커니즘이 합쳐져서 디코더가 문장을 생성하는 데 필요한 조건을 만족시킵니다.
- Shifted Right:모델이 시퀀스의 각 위치에서 다음 토큰(단어)을 예측하도록 학습
- Mask: 현재 예측하려는 위치의 토큰이 미래의 토큰을 보지 못하도록(미래 정보를 참조하지 못하도록) 강제합니다. 만약 현재 위치에서 미래의 토큰을 볼 수 있다면, 모델은 그저 미래 토큰을 복사하는 것과 같아져서 실제로 예측하는 능력을 학습할 수 없게 됩니다.

In [None]:
class Decoder(nn.Module):
  def __init__(self, vocab_size, d_model, n_head, d_ff, max_len, n_layers, dropout):
    super().__init__()

    self.input_embedding = TransformerInput(vocab_size, d_model, max_len, dropout)
    self.layers = get_clones(DecoderLayer(d_model, n_head, d_ff, dropout), n_layers)

    self.norm = nn.LayerNorm(d_model)

  def forward(self,tgt,memory, tgt_mask, memory_mask):
    x = self.input_embedding(tgt)

    for layer in self.layers:
      x = layer(x, memory, tgt_mask, memory_mask)

    return self.norm(x)

In [None]:
class Transformer(nn.Module):
  def __init__(self, src_vocab_size, tgt_vocab_size, d_model, n_head, d_ff, src_max_len, tgt_max_len, n_layers, dropout, src_pad_idx, tgt_pad_idx):
    super().__init__()

    self.src_pad_idx = src_pad_idx
    self.tgt_pad_idx = tgt_pad_idx
    self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    self.encoder = Encoder(src_vocab_size, d_model, n_head, d_ff, src_max_len, n_layers, dropout)
    self.decoder = Decoder(tgt_vocab_size, d_model, n_head, d_ff, tgt_max_len, n_layers, dropout)

    #출력 프로젝션: d_model -> vocab_size
    self.projection = nn.Linear(d_model, tgt_vocab_size)

    self._init_weights() #가중치 초기화: Xavier Initialization

   def _init_weights(self):
    for p in self.parameters():
      if p.dim() > 1:
        nn.init.xavier_uniform_(p)

   def make_src_mask(self, src):
    # src : [batch. src_len]
    # 인코더에서 SelfAttention의 마스킹 부분을 땜빵하기 위해 쓰이는 패딩.
    return (src != self.src_pad_idx).unsqueeze(1).usqueeze(2)
    # [batch,1,1,src_len] 형태에서 브로드캐스팅.

   def make_tgt_mask(self, tgt):
    # tgt : [batch, tgt_len]

    # 패딩 마스크(문장 끝부분 가리기) & [batch, 1, 1, tgt_len]
    tgt_pad_mask = (tgt != self.tgt_pad_idx).unsqueeze(1).unsqueeze(2)

    # 룩 - 어헤드 마스크 (미래 단어 가리기)
    tgt_len = tgt.size(1)
    tgt_sub_mask = torch.tril(torch.ones(tgt_len, tgt_len, device = self.device)).bool()

    # [batch, 1, tgt_len, tgt_len]
    tgt_mask = tgt_pad_mask & tgt_sub_mask #패딩이 아니면서 & 현재보다 과거의 것만 True

    return tgt_mask

   def forward(self, src, tgt):

    src_mask = self.mask_src_mask(src)
    tgt_mask = self.mask_tgt_mask(tgt)

    enc_output = self.encoder(src, src_mask)
    dec_output = self.decoder(tgt,enc_output, tgt_mask, src_mask)

    output = self.projection(dec_output) #[batch, sequence_length, d_model]  -> [batch,sequence_length, tgt_vocab_size]

    return output

| 모델 | n_layers (쌓은 층수) | d_model (벡터 차원) | n_head (헤드 개수) | 파라미터 수 |
|------|---------------------|--------------------|--------------------|------------|
| Original Transformer | 6 | 512 | 8 | 65M |
| BERT-Base | 12 | 768 | 12 | 110M |
| BERT-Large | 24 | 1024 | 16 | 340M |

#BERT

##GELU랑 사전 정규화는 여건상 구현을 못햇음...

In [None]:
class BertEmbeddings(nn.Module):
  def __init__(self, vocab_size, d_model, max_len, dropout=0.1):
      super().__init__()

      #1. Token Embedding(기존과 동일)
      self.token_embedding = nn.Embedding(vocab_size, d_model)

      #2. Position Embedding
      self.position_embedding = PositionalEncoding(d_model, max_len, dropout)

      #3. Segment Embedding
      self.segment_embedding = nn.Embedding(2, d_model)

      self.dropout = nn.Dropout(dropout)

   def forward(self, input_ids, segment_ids):
     # input_ids : [batch, seq_len]
     # segment_ids : [batch, seq_len]
     x = self.token_embedding(input_ids) +  self.segment_embedding(segment_ids)

     x = self.position_embedding(x)

     return self.dropout(x)

In [None]:
class BertModel(nn.Module):
  def __init__(self, vocab_size, d_model = 768, n_head = 12, d_ff = 3072,
               max_len = 512, n_layers = 12, dropout = 0.1):

    self.embeddings = BertEmbeddings(vocab_size, d_model, max_len, dropout)

    self.encoder = Encoder(vocab_size, d_model, n_head, d_ff,
                           max_len, n_layers, dropout)
    #[CLS] 토큰 처리용
    self.pooler_dense = nn.Linear(d_model, d_model)
    self.pooler_activation = nn.Tanh()
  def forward(self, input_ids, segment_ids, mask = None):
    #임베딩
    x = self.embeddings(input_ids, segment_ids)

    #인코더 스택 12층
    for layer in self.encoder.layers:
      x = layer(x,mask)

    #[CLS] 토큰 가공
    cls_token = x[:,0]
    pooled_output = self.pooler_activation(self.pooler_ense(cls_token))

    return x, pooled_output