# (틈새) 파이토치 이용한 Positional Encoding

In [None]:

import torch
import torch.nn as nn

max_sequence_length = 10 # 입력 문장 길이
d_model = 6 # 임베딩 차원

even_i = torch.arange(0, d_model, 2).float()
even_i

tensor([0., 2., 4.])

In [None]:
even_denominator = torch.pow(10000, even_i/d_model)
even_denominator

tensor([  1.0000,  21.5443, 464.1590])

In [None]:
odd_i = torch.arange(1, d_model, 2).float()
print(odd_i)

odd_denominator = torch.pow(10000, (odd_i - 1)/d_model)
print(odd_denominator)

tensor([1., 3., 5.])
tensor([  1.0000,  21.5443, 464.1590])


In [None]:
denominator = even_denominator

position = torch.arange(max_sequence_length, dtype=torch.float).reshape(max_sequence_length, 1)
even_PE = torch.sin(position / denominator)
odd_PE = torch.cos(position / denominator)

In [None]:
even_PE

tensor([[ 0.0000,  0.0000,  0.0000],
        [ 0.8415,  0.0464,  0.0022],
        [ 0.9093,  0.0927,  0.0043],
        [ 0.1411,  0.1388,  0.0065],
        [-0.7568,  0.1846,  0.0086],
        [-0.9589,  0.2300,  0.0108],
        [-0.2794,  0.2749,  0.0129],
        [ 0.6570,  0.3192,  0.0151],
        [ 0.9894,  0.3629,  0.0172],
        [ 0.4121,  0.4057,  0.0194]])

In [None]:
odd_PE

tensor([[ 1.0000,  1.0000,  1.0000],
        [ 0.5403,  0.9989,  1.0000],
        [-0.4161,  0.9957,  1.0000],
        [-0.9900,  0.9903,  1.0000],
        [-0.6536,  0.9828,  1.0000],
        [ 0.2837,  0.9732,  0.9999],
        [ 0.9602,  0.9615,  0.9999],
        [ 0.7539,  0.9477,  0.9999],
        [-0.1455,  0.9318,  0.9999],
        [-0.9111,  0.9140,  0.9998]])

In [None]:
stacked = torch.stack([even_PE, odd_PE], dim=2)
print(stacked.shape)

PE = torch.flatten(stacked, start_dim=1, end_dim=2)
PE

torch.Size([10, 3, 2])


tensor([[ 0.0000,  1.0000,  0.0000,  1.0000,  0.0000,  1.0000],
        [ 0.8415,  0.5403,  0.0464,  0.9989,  0.0022,  1.0000],
        [ 0.9093, -0.4161,  0.0927,  0.9957,  0.0043,  1.0000],
        [ 0.1411, -0.9900,  0.1388,  0.9903,  0.0065,  1.0000],
        [-0.7568, -0.6536,  0.1846,  0.9828,  0.0086,  1.0000],
        [-0.9589,  0.2837,  0.2300,  0.9732,  0.0108,  0.9999],
        [-0.2794,  0.9602,  0.2749,  0.9615,  0.0129,  0.9999],
        [ 0.6570,  0.7539,  0.3192,  0.9477,  0.0151,  0.9999],
        [ 0.9894, -0.1455,  0.3629,  0.9318,  0.0172,  0.9999],
        [ 0.4121, -0.9111,  0.4057,  0.9140,  0.0194,  0.9998]])

# Positional Encoding & Attention


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

In [None]:
# Positional Encoding

class PositionalEncoding(tf.keras.layers.Layer):
  def __init__(self, position, d_model):
    super(PositionalEncoding, self).__init__()
    self.pos_encoding = self.positional_encoding(position, d_model)

    # calculates the angles used in the positional encoding formula.
    def get_angles(self, position, i, d_model):
      angles = 1 / tf.pow(10000, (2 * (1 // 2) / tf.cast(d_model, tf.float32)))
      return position * angles

    def positional_encoding(self, position, d_model):
      angle_rads = self.get_angles(
          # adds an extra dimension to the tensor, transforming its shape from (position, ) to (position, 1).
          position = tf.range(position, dtype=tf.float32)[:, tf.newaxis],
          i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
          d_model=d_model)

      # 배열의 짝수 인덱스 - 사인 함수
      sines = tf.math.sin(angle_rads[:, 0::2])
      # 0 :: 2 - It specifies the step size or the number of columns to skip in each iteration. In this case, it skips one column and selects the next one.

      # 배열의 홀수 인덱스 - 코사인 함수
      cosines = tf.math.cos(angle_rads[:, 1::2])

      angle_rads = np.zeros(angle_rads.shape)
      angle_rads[:, 0::2] = sines
      angle_rads[:, 1::2] = cosines
      pos_encoding = tf.constant(angle_rads)
      pos_encoding = tf.pos_encoding[tf.newaxis, ...] # adds 1 dimension in the front

      print(pos_encoding.shape)
      return tf.cast(pos_encoding, tf.float32)


  def call(self, inputs):
    return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :] # 3-dimensions Slicing


In [None]:
# Scaled Dot-Product Attention Implementation

def scaled_dot_product_attention(query, key, value, mask):
  # query 크기 : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
  # key 크기 : (batch_size, num_heads, key의 문장 길이, d_model/num_heads)
  # value 크기 : (batch_size, num_heads, value의 문장 길이, d_model/num_heads)
  # padding_mask : (batch_size, 1, 1, key의 문장 길이)

  # Q 와 K 의 곱 (=어텐션 스코어 행렬)
  matmul_qk = tf.matmul(query, key, transpose_b=True)

  # 스케일링
  depth = tf.cast(tf.shape(key)[-1], tf.float32)
  logits = matmul_qk / tf.math.sqrt(depth)

  # 마스킹 (어텐션 스코어 행렬의 마스킹 할 위치에 매우 작은 음수값을 넣어준다. - 소프트맥스 함수를 지나면 0이 되는 값)
  if mask is not None:
    logits += (mask * -1e9)

  # 소프트맥스 함수는 마지막 차원인 key의 문장 길이 방향으로 수행된다.
  # attention weight : (batch_size, num_heads, query의 문장 길이, key의 문장 길이)
  # 소프트맥스 함수를 통해 어텐션 분포 행렬을 얻는다.
  attention_weights = tf.nn.softmax(logits, axis=-1)

  # output : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
  output = tf.matmul(attention_weights, value)

  return output, attention_weights

In [None]:
# 임의의 Query, Key, Value인 Q, K, V 행렬 생성

np.set_printoptions(suppress=True)
temp_k = tf.constant([[10,0,0],
                      [0,10,0],
                      [0,0,10],
                      [0,0,10]], dtype=tf.float32)  # (4, 3)

temp_v = tf.constant([[   1,0],
                      [  10,0],
                      [ 100,5],
                      [1000,6]], dtype=tf.float32)  # (4, 2)
temp_q = tf.constant([[0, 0, 10], [0, 10, 0], [10, 10, 0]], dtype=tf.float32)  # (3, 3)

In [None]:
# 함수 실행

temp_out, temp_attn = scaled_dot_product_attention(temp_q, temp_k, temp_v, None)
print(temp_attn) # 어텐션 분포 (어텐션 가중치 나열)
print(temp_out) # 어텐션 값

tf.Tensor(
[[0.  0.  0.5 0.5]
 [0.  1.  0.  0. ]
 [0.5 0.5 0.  0. ]], shape=(3, 4), dtype=float32)
tf.Tensor(
[[550.    5.5]
 [ 10.    0. ]
 [  5.5   0. ]], shape=(3, 2), dtype=float32)


# Multi-head Attnetion Implementing

멀티 헤드 어텐션의 구현은 크게 다섯 가지 파트로 구성됩니다.

1. WQ, WK, WV에 해당하는 d_model 크기의 밀집층(Dense layer)을 지나게한다.
2. 지정된 헤드 수(num_heads)만큼 나눈다(split).
3. 스케일드 닷 프로덕트 어텐션.
4. 나눠졌던 헤드들을 연결(concatenatetion)한다.
5. WO에 해당하는 밀집층을 지나게 한다.

In [None]:
class MutiHeadAttention(tf.keras.layers.Layer):

  def __init__(self, d_model, num_heads, name="multi_head_attention"):
    super(MultiHeadAttention, self).__init__(name=name)
    self.num_heads = num_heads
    self.d_model = d_model

    assert d_model % self.num_heads == 0 # if not, AssertionError

    # d_model 을 num_heads 로 나눈 값
    self.depth = d_model // self.num_heads

    # WQ, WK, WV에 해당하는 밀집층 정의
    self.query_dense = tf.keras.layers.Dense(units=d_model)
    self.key_dense = tf.keras.layers.Dense(units=d_model)
    self.value_dense = tf.keras.layers.Dense(units=d_model)

    # WO에 해당하는 밀집층 정의
    self.dense = tf.keras.layers.Dense(units=d_model)

  # num_heads 의 개수만큼 q, k, v를 split 하는 함수
  def split_heads(self, inputs, batch_size):
    inputs = tf.reshape(
        inputs, shape=(batch_size, -1, self.num_heads, self.depth)) # desired shape of the tensor after reshaping
    return tf.transpose(inputs, perm=[0, 2, 1, 3]) # desired order of dimensions after the transpose operation.

  def call(self, inputs):
    query, key, value, mask = inputs['query'], inputs['key'], inputs['value'], inputs['mask']
    batch_size = tf.shape(query)[0]

    # 1. WQ, WK, WV에 해당하는 밀집층 지나기
    # q : (batch_size, query의 문장 길이, d_model)
    # k : (batch_size, key의 문장 길이, d_model)
    # v : (batch_size, value의 문장 길이, d_model)
    # 참고) 인코더(k, v)-디코더(q) 어텐션에서는 query 길이와 key, value의 길이는 다를 수 있다.
    query = self.query_dense(query)
    key = self.key_dense(key)
    value = self.value_dense(value)

    # 2. 헤드 나누기
    # q : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
    # k : (batch_size, num_heads, key의 문장 길이, d_model/num_heads)
    # v : (batch_size, num_heads, value의 문장 길이, d_model/num_heads)
    query = self.split_heads(query, batch_size)
    key = self.split_heads(key, batch_size)
    value = self.split_heads(value, batch_size)

    # 3. 스케일드 닷 프로덕트 어텐션. 앞서 구현한 함수 사용.
    # (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
    scaled_attention, _ = scaled_dot_product_attention(query, key, value, mask)
    # (batch_size, query의 문장 길이, num_heads, d_model/num_heads)
    scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])

    # 4. 헤드 연결(concatenate)하기
    # (batch_size, query의 문장 길이, d_model)
    concat_attention = tf.reshape(scaled_attention,
                                  (batch_size, -1, self.d_model))

    # 5. WO에 해당하는 밀집층 지나기
    # (batch_size, query의 문장 길이, d_model)
    outputs = self.dense(concat_attention)

    return outputs

In [None]:
# Padding Mask
def create_padding_mask(x):
  mask = tf.cast(tf.math.equal(x, 0), tf.float32)
  # (batch_size, 1, 1, key의 문장 길이)
  return mask[:, tf.newaxis, tf.newaxis, :]

# Example
print(create_padding_mask(tf.constant([[1, 21, 777, 0, 0]])))

tf.Tensor([[0. 0. 0. 1. 1.]], shape=(1, 5), dtype=float32)
tf.Tensor([[[[0. 0. 0. 1. 1.]]]], shape=(1, 1, 1, 5), dtype=float32)
