출처: Tunstall, Lewis, Leandro Von Werra, and Thomas Wolf. Natural language processing with transformers. " O'Reilly Media, Inc.", 2022.

# Attention Visualization
- BertViz for Jupyter 라이브러리를 이용해서 어텐션 가중치를 시각화
- 트랜스포머 모델에서 어텐션의 다양한 측면을 볼 수 있게 시각화 지원
- 언텐션 가중치는 nueron_veiw 모듈을 사용하자 -> 가중치 계산 과정을 시각화
- show()는 특정 인코더 층과 어텐션 헤드에 대한 시각화를 제공해준다.


In [1]:
! pip install transformers
! pip install bertviz



In [2]:
from transformers import AutoTokenizer
from bertviz.transformers_neuron_view import BertModel
from bertviz.neuron_view import show

model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
model = BertModel.from_pretrained(model_ckpt)
text = "time files like an arrow"
show(model, "bert", tokenizer, text, display_mode="light", layer=0, head=8)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [3]:
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)
inputs.input_ids

tensor([[2051, 6764, 2066, 2019, 8612]])

In [4]:
from torch import nn
from transformers import AutoConfig

config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
token_emb

Embedding(30522, 768)

- input_ids가 nn.Embedding에 저장된 30,522개 임베딩 벡터 중 하나에 매핑되고, 각 벡터의 크기는 768개이다.

In [5]:
inputs_embeds = token_emb(inputs.input_ids)
inputs_embeds.size() # [batch_size, seq_len, hidden_dim]

torch.Size([1, 5, 768])

In [6]:
import torch
from math import sqrt

query = key = value = inputs_embeds
dim_k = key.size(-1) # 768
scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k) # tf.matmul # sqrt -> 소프트맥스 함수의 포화 방지
scores.size() # 배치에 있는 샘플마다 5x5 크기의 어텐션 점수 행렬이 만들어짐

torch.Size([1, 5, 5])

In [7]:
scores

tensor([[[30.3134,  0.4272, -0.4861,  0.0349, -1.1678],
         [ 0.4272, 27.9497,  1.8670,  0.4196,  0.2047],
         [-0.4861,  1.8670, 28.7729, -1.3207, -0.9034],
         [ 0.0349,  0.4196, -1.3207, 28.3409, -1.2165],
         [-1.1678,  0.2047, -0.9034, -1.2165, 26.5357]]],
       grad_fn=<DivBackward0>)

In [8]:
import torch.nn.functional as F
weights = F.softmax(scores, dim=-1)
weights.sum(dim=-1)

tensor([[1., 1., 1., 1., 1.]], grad_fn=<SumBackward1>)

In [9]:
# 어텐션 가중치를 곱하기
attn_outputs = torch.bmm(weights, value)
attn_outputs.shape

torch.Size([1, 5, 768])

In [10]:
def scaled_dot_product_attention(query, key, value):
  dim_k = query.size(-1)
  scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
  weights = F.softmax(scores, dim=-1)
  return torch.bmm(weights, value)

# Multi-Head Attention
- 셀프 어텐션 층은 각 임베딩에 독립적인 선형 변환 세 개를 적용해 쿼리, 키, 값 벡터를 생성
- 여러 개의 헤드를 통해 모델은 동시에 여러 측면을 고려할 수 있음. 예를 들어 인접한 용어, 주어-동사와 같은 관계성을 파악 가능하다.
- 컴퓨터 비전의 합성곱 신경망 필터와 유사하다.

In [11]:
from torch import nn
class AttentionHead(nn.Module): # keras.layers.Layer
  def __init__(self, embed_dim, head_dim):
    super().__init__()
    self.q = nn.Linear(embed_dim, head_dim) # keras.layers.Dense
    self.k = nn.Linear(embed_dim, head_dim)
    self.v = nn.Linear(embed_dim, head_dim)
  def forward(self, hidden_state):
    attn_outputs = scaled_dot_product_attention(
        self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
    return attn_outputs

- head_dim 투영하려는 차원의 크기, embed_dim과 배수가 되게 선택한다.
- BERT에서는 어텐션 헤드가 12 개이니까 769/12 = 64가 된다.

In [12]:
class MultiHeadAttention(nn.Module):
  def __init__(self, config):
    super().__init__()
    embed_dim = config.hidden_size
    num_heads = config.num_attention_heads
    head_dim = embed_dim // num_heads
    self.heads = nn.ModuleList(
        [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
    )
    self.output_linear = nn.Linear(embed_dim, embed_dim)

  def forward(self, hidden_state):
    x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
    x = self.output_linear(x)
    return x

In [13]:
multihead_attn = MultiHeadAttention(config)
attn_output = multihead_attn(inputs_embeds)
attn_output.size()

torch.Size([1, 5, 768])

In [14]:
from bertviz import head_view
from transformers import AutoModel
model = AutoModel.from_pretrained(model_ckpt, output_attentions=True)

sentence_a = "time flies like an arrow"
sentence_b = "fruit flies like a banana"

viz_inputs = tokenizer(sentence_a, sentence_b, return_tensors='pt')
attention = model(**viz_inputs).attentions
sentence_b_start = (viz_inputs.token_type_ids == 0).sum(dim=1)
tokens = tokenizer.convert_ids_to_tokens(viz_inputs.input_ids[0])

head_view(attention, tokens, sentence_b_start, heads=[8])

<IPython.core.display.Javascript object>

- 직선은 어텐션 가중치의 정도를 나타남. 짙은 색은 1에 가깝고 흐린 색은 0에 가까움
- 동일한 문장에 속한 단어끼리 강하게 나타남

# Feed Forward Layer
- fully-connected layer
- 각 임베딩을 독립적으로 처리하여 position-wise feed-forward layer라고 함.

In [15]:
class FeedForward(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
    self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
    self.gelu = nn.GELU()
    self.dropout = nn.Dropout(config.hidden_dropout_prob)

  def forward(self, x):
    x = self.linear_1(x)
    x = self.gelu(x)
    x = self.linear_2(x)
    x = self.dropout(x)
    return x

In [16]:
feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_outputs)
ff_outputs.size()

torch.Size([1, 5, 768])

# layer normalization & skip connection

In [20]:
class TransformerEncoderLayer(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
    self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
    self.attention = MultiHeadAttention(config)
    self.feed_forward = FeedForward(config)

  def forward(self, x):
    # layer normalization 적용하고 입력을 쿼리, 키, 값으로 복사
    hidden_state = self.layer_norm_1(x)
    # 어텐션에 스킵 연결을 적용
    x = x + self.attention(hidden_state)
    # skip connection + feed_forward layer
    x = x + self.feed_forward(self.layer_norm_2(x))
    return x

In [21]:
encoder_layer = TransformerEncoderLayer(config)
inputs_embeds.shape, encoder_layer(inputs_embeds).size()

(torch.Size([1, 5, 768]), torch.Size([1, 5, 768]))

- 트랜스포머 인코더 층 구현을 했으나 토큰 위치에 대한 정보를 넣어줘야 함.

# 위치 임베딩

In [24]:
class Embedding(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.token_embeddings = nn.Embedding(config.vocab_size,
                                         config.hidden_size)
    self.position_embeddings = nn.Embedding(config.max_position_embeddings,
                                            config.hidden_size)
    self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
    self.dropout = nn.Dropout()

  def forward(self, input_ids):
    # 입력 시퀀스에 대해 position_ids 만들기
    seq_length = input_ids.size(1)
    position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
    # token_emb, position_emb 만들기
    token_embeddings = self.token_embeddings(input_ids)
    position_embeddings = self.position_embeddings(position_ids)
    # token_emb + position_emb
    embeddings = token_embeddings + position_embeddings
    embeddings = self.layer_norm(embeddings)
    embeddings = self.dropout(embeddings)
    return embeddings

embedding_layer = Embedding(config)
embedding_layer(inputs.input_ids).size()

torch.Size([1, 5, 768])

# 임베딩 + 인코더 층 연결 = 트랜스포머 인코더

In [27]:
class TransformerEncoder(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.embeddings = Embedding(config)
    self.layers = nn.ModuleList([TransformerEncoderLayer(config)
                                 for _ in range(config.num_hidden_layers)])
  def forward(self, x):
    x = self.embeddings(x)
    for layer in self.layers:
      x = layer(x)
    return x

In [28]:
encoder = TransformerEncoder(config)
encoder(inputs.input_ids).size()

torch.Size([1, 5, 768])

# 분류 헤드 추가하기
- 작업에 독립적인 바디 + 작업에 특화된 헤드
- 텍스트 분류 모델을 만들고 싶다면 바디에 연결할 분류 헤드가 필요하다
- 시퀀스 분류를 위해 기존 인코더를 확장한 클래스

In [29]:
# 시퀀스 분류를 위해 기존 인코더를 확장한 클래스
class TransformerForSequenceClassification(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.encoder = TransformerEncoder(config)
    self.dropout = nn.Dropout(config.hidden_dropout_prob)
    self.classifier = nn.Linear(config.hidden_size, config.num_labels)

  def forward(self, x):
    x = self.encoder(x)[:, 0, :] # [CLS]토큰의 은닉 상태를 선택
    x = self.dropout(x)
    x = self.classifier(x)
    return x

In [30]:
config.num_labels = 3 # 예측하려는 클래수 개수
encoder_classifier = TransformerForSequenceClassification(config)
encoder_classifier(inputs.input_ids).size()

torch.Size([1, 3])

# 디코더
- 인코더와 디코더의 차이는 디코더에는 두개의 어텐션 층이 있음
- 마스크드 멀티 헤드 셀프 어텐션 층
  - 타임스텝마다 지난 출력과 예측한 현재 토큰만 사용해서 토큰을 생성
  - 모델의 부정행위를 방지
- 인코더-디코더 어텐션 층
  - 디코더의 중간 표현을 쿼리처럼 사용해서 인코더 스택의 출력 키와 값 벡터에 멀티헤드어텐션 적용
  - 두개의 다른 언어를 사용할 때 토큰을 연관짓는 방법을 학습한다.
  - 디코더가 인코더의 키와 값을 참조

In [31]:
seq_len = inputs.input_ids.size(-1)
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0) # lower triangular matrix
mask[0]

tensor([[1., 0., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 1., 0., 0.],
        [1., 1., 1., 1., 0.],
        [1., 1., 1., 1., 1.]])

In [33]:
# masked_fill()를 사용해 0을 -inf로 바꿔서 부정행위 방지
scores.masked_fill(mask == 0, -float("inf"))

tensor([[[30.3134,    -inf,    -inf,    -inf,    -inf],
         [ 0.4272, 27.9497,    -inf,    -inf,    -inf],
         [-0.4861,  1.8670, 28.7729,    -inf,    -inf],
         [ 0.0349,  0.4196, -1.3207, 28.3409,    -inf],
         [-1.1678,  0.2047, -0.9034, -1.2165, 26.5357]]],
       grad_fn=<MaskedFillBackward0>)

- softmax(e^-inf) = 0
- 즉, 어텐션 가중치가 모두 0이 됨.

In [34]:
def scaled_dot_product_attention(query, key, value, maks=None):
  dim_k = query.size(-1)
  scores = torch.bmm(query. key.transpose(1,2))/squrt(dim_k)
  if mask is not None:
    scores = scores.masked_fill(maks ==0, float("-inf"))
  weights = F.softmax(scores, dim=-1)
  return weights.bmm(value)