# Evn

In [None]:
import os
import random
import shutil
import json
import zipfile
import math
import copy
import collections
import re

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import tensorflow as tf
import tensorflow.keras.backend as KK

from tqdm.notebook import tqdm

In [None]:
# random seed initialize
random_seed = 1234
random.seed(random_seed)
np.random.seed(random_seed)
tf.random.set_seed(random_seed)

In [None]:
!nvidia-smi

NVIDIA-SMI has failed because it couldn't communicate with the NVIDIA driver. Make sure that the latest NVIDIA driver is installed and running.



# Config

In [None]:
class Config(dict):
    """
    json을 config 형태로 사용하기 위한 Class
    :param dict: config dictionary
    """
    __getattr__ = dict.__getitem__
    __setattr__ = dict.__setitem__

    @classmethod
    def load(cls, file):
        """
        file에서 Config를 생성 함
        :param file: filename
        """
        with open(file, 'r') as f:
            config = json.loads(f.read())
            return Config(config)

In [None]:
# config 생성
# d_model: model hidden dim
# n_head: multi head attention head number
# d_head: multi head attention head dim
# dropout: dropout rate
# d_ff: feed forward dim
# norm_eps: layernormal epsilon
# n_layer: layer number
# n_seq: sequence max number
# n_vocab: vocab count
# i_pad: vocab pad id
config = Config({"d_model": 8,
                 "n_head": 2,
                 "d_head": 4,
                 "dropout": 0.1,
                 "d_ff": 32,
                 "norm_eps": 0.001,
                 "n_layer": 6,
                 "n_seq": 16,
                 "n_vocab": 16,
                 "i_pad": 0})
config

{'d_ff': 32,
 'd_head': 4,
 'd_model': 8,
 'dropout': 0.1,
 'i_pad': 0,
 'n_head': 2,
 'n_layer': 6,
 'n_seq': 16,
 'n_vocab': 16,
 'norm_eps': 0.001}

# Input

In [None]:
# 입력 문장
sentences = [
    ['나는 오늘 행복해', '나도 기분이 좋아'],
    # ['나는 오늘 기분이 좋아', '나도 매우 행복하다'],
]

In [None]:
# 각 문장을 띄어쓰기 단위로 분할
words = []
for pair in sentences:
    for sentence in pair:
        words.extend(sentence.split())

# 중복 단어 제거
words = list(dict.fromkeys(words))

# 각 단어별 고유한 번호 부여
word_to_id = {'[PAD]': 0, '[UNK]': 1, '[BOS]': 2, '[EOS]': 3}
for word in words:
    word_to_id[word] = len(word_to_id)

# 각 숫자별 단어 부여
id_to_word = {_id:word for word, _id in word_to_id.items()}

word_to_id, id_to_word

({'[BOS]': 2,
  '[EOS]': 3,
  '[PAD]': 0,
  '[UNK]': 1,
  '기분이': 8,
  '나는': 4,
  '나도': 7,
  '오늘': 5,
  '좋아': 9,
  '행복해': 6},
 {0: '[PAD]',
  1: '[UNK]',
  2: '[BOS]',
  3: '[EOS]',
  4: '나는',
  5: '오늘',
  6: '행복해',
  7: '나도',
  8: '기분이',
  9: '좋아'})

In [None]:
# Question과 Answer를 숫자료
question_list, answer_list = [], []

for pair in sentences:
    question_list.append([word_to_id[word] for word in pair[0].split()])
    answer_list.append([word_to_id[word] for word in pair[1].split()])

# 학습용 입력 데이터 생성
train_enc_inputs, train_dec_inputs, train_labels = [], [], []
for question, answer in zip(question_list, answer_list):
    train_enc_inputs.append(question)
    train_dec_inputs.append([word_to_id['[BOS]']] + answer)
    train_labels.append(answer + [word_to_id['[EOS]']])

# Encoder 입력의 길이를 모두 동일하게 변경 (최대길이 4)
for row in train_enc_inputs:
    row += [0] * (4 - len(row))

# Decoder 입력의 길이를 모두 동일하게 변경 (최대길이 6)
for row in train_dec_inputs:
    row += [0] * (6 - len(row))

# 정답의 길이를 모두 동일하게 변경 (최대길이 6)
for row in train_labels:
    row += [0] * (6 - len(row))

# numpy array로 변환/
train_enc_inputs = np.array(train_enc_inputs)
train_dec_inputs = np.array(train_dec_inputs)
train_labels = np.array(train_labels)

train_enc_inputs, train_dec_inputs, train_labels

(array([[4, 5, 6, 0]]),
 array([[2, 7, 8, 9, 0, 0]]),
 array([[7, 8, 9, 3, 0, 0]]))

In [None]:
# embedding with random weight
embed_weight = np.random.randint(-9, 10, (config.n_vocab, config.d_model)) / 10

embed = tf.keras.layers.Embedding(config.n_vocab, config.d_model, weights=[embed_weight])
embed_weight

array([[ 0.6, -0.3,  0.3,  0.6,  0.8,  0. ,  0.2,  0.3],
       [ 0.7, -0.4,  0.7,  0. ,  0.6,  0.9,  0.7,  0.3],
       [-0.4, -0.7, -0.3, -0.6, -0.2,  0.2, -0.9,  0. ],
       [ 0.2,  0.7, -0.6, -0.7,  0.3, -0.8,  0.2,  0.2],
       [ 0.8,  0.5, -0.2,  0.1,  0.2,  0.5,  0.8,  0.4],
       [-0.9,  0.3, -0.4,  0.8, -0.4,  0.4,  0.7,  0. ],
       [-0.1,  0.3, -0.3,  0.3,  0.6,  0.8,  0.9,  0.5],
       [-0.7, -0.4,  0.4, -0.3, -0.2, -0.5, -0.6, -0.4],
       [ 0.5,  0.6,  0.6,  0.6, -0.7,  0.1, -0.5,  0.9],
       [-0.2,  0.2,  0.5,  0.9,  0. , -0.9, -0.7, -0.8],
       [ 0.9,  0.8, -0.2, -0.5, -0.2,  0.8, -0.9,  0. ],
       [ 0.9,  0. , -0.8,  0.5, -0.6,  0.3,  0. ,  0.4],
       [-0.9, -0.5, -0.5, -0.9, -0.1,  0.3,  0.8,  0. ],
       [ 0.6, -0.1, -0.7,  0.7,  0.2, -0.7,  0.9,  0.6],
       [-0.6,  0.5, -0.7, -0.5, -0.8,  0.1, -0.7,  0.4],
       [-0.6,  0.9,  0.3, -0.4,  0.8,  0.7, -0.1,  0.4]])

In [None]:
# encoder hidden
hidden_enc = embed(train_enc_inputs)
hidden_enc

<tf.Tensor: shape=(1, 4, 8), dtype=float32, numpy=
array([[[ 0.8,  0.5, -0.2,  0.1,  0.2,  0.5,  0.8,  0.4],
        [-0.9,  0.3, -0.4,  0.8, -0.4,  0.4,  0.7,  0. ],
        [-0.1,  0.3, -0.3,  0.3,  0.6,  0.8,  0.9,  0.5],
        [ 0.6, -0.3,  0.3,  0.6,  0.8,  0. ,  0.2,  0.3]]], dtype=float32)>

In [None]:
# decoder hidden
hidden_dec = embed(train_dec_inputs)
hidden_dec

<tf.Tensor: shape=(1, 6, 8), dtype=float32, numpy=
array([[[-0.4, -0.7, -0.3, -0.6, -0.2,  0.2, -0.9,  0. ],
        [-0.7, -0.4,  0.4, -0.3, -0.2, -0.5, -0.6, -0.4],
        [ 0.5,  0.6,  0.6,  0.6, -0.7,  0.1, -0.5,  0.9],
        [-0.2,  0.2,  0.5,  0.9,  0. , -0.9, -0.7, -0.8],
        [ 0.6, -0.3,  0.3,  0.6,  0.8,  0. ,  0.2,  0.3],
        [ 0.6, -0.3,  0.3,  0.6,  0.8,  0. ,  0.2,  0.3]]], dtype=float32)>

# Mask

## PAD Mask

In [None]:
train_enc_inputs, train_dec_inputs

(array([[4, 5, 6, 0]]), array([[2, 7, 8, 9, 0, 0]]))

In [None]:
def get_pad_mask(tokens, i_pad=0):
    """
    pad mask 계산하는 함수
    :param tokens: tokens (bs, n_seq)
    :param i_pad: id of pad
    :return mask: pad mask (pad: 1, other: 0)
    """
    #########################################
    # 0인 부분 확인
    mask = tf.math.equal(tokens, i_pad)
    # boolean -> float 32
    mask = tf.cast(mask, tf.float32)
    # expand dimension for n_seq
    mask = tf.expand_dims(mask, axis=1)
    #########################################
    return mask

In [None]:
enc_pad_mask = get_pad_mask(train_enc_inputs)
enc_pad_mask

<tf.Tensor: shape=(1, 1, 4), dtype=float32, numpy=array([[[0., 0., 0., 1.]]], dtype=float32)>

## Causal Mask

In [None]:
def get_causal_mask(tokens, i_pad=0):
    """
    causal mask 계산하는 함수
    :param tokens: tokens (bs, n_seq)
    :param i_pad: id of pad
    :return mask: causal and pad mask (causal or pad: 1, other: 0)
    """
    #########################################
    # 개수 조회
    n_seq = tf.shape(tokens)[1]
    # print(n_seq)
    # make ahead mask
    mask = 1 - tf.linalg.band_part(tf.ones((n_seq, n_seq)), -1, 0)
    # expand dim for bs
    mask = tf.expand_dims(mask, axis=0)
    # print(mask)
    # get pad_mask
    pad_mask = get_pad_mask(tokens, i_pad)
    # print(pad_mask)
    # mask all ahead_mask or pad_mask
    mask = tf.maximum(mask, pad_mask)
    #########################################
    return mask

In [None]:
dec_causal_mask = get_causal_mask(train_dec_inputs)
dec_causal_mask

<tf.Tensor: shape=(1, 6, 6), dtype=float32, numpy=
array([[[0., 1., 1., 1., 1., 1.],
        [0., 0., 1., 1., 1., 1.],
        [0., 0., 0., 1., 1., 1.],
        [0., 0., 0., 0., 1., 1.],
        [0., 0., 0., 0., 1., 1.],
        [0., 0., 0., 0., 1., 1.]]], dtype=float32)>

## Mask 생성

In [None]:
# Encoder Self Attetnion mask
enc_self_mask = get_pad_mask(train_enc_inputs)
enc_self_mask

<tf.Tensor: shape=(1, 1, 4), dtype=float32, numpy=array([[[0., 0., 0., 1.]]], dtype=float32)>

In [None]:
# Decoder Self Attetnion mask
dec_self_mask = get_causal_mask(train_dec_inputs)
dec_self_mask

<tf.Tensor: shape=(1, 6, 6), dtype=float32, numpy=
array([[[0., 1., 1., 1., 1., 1.],
        [0., 0., 1., 1., 1., 1.],
        [0., 0., 0., 1., 1., 1.],
        [0., 0., 0., 0., 1., 1.],
        [0., 0., 0., 0., 1., 1.],
        [0., 0., 0., 0., 1., 1.]]], dtype=float32)>

In [None]:
# Encoder-Decoder Attetnion mask
enc_dec_mask = get_pad_mask(train_enc_inputs)
enc_dec_mask

<tf.Tensor: shape=(1, 1, 4), dtype=float32, numpy=array([[[0., 0., 0., 1.]]], dtype=float32)>

# Scaled dot product attention

In [None]:
class ScaleDotProductAttention(tf.keras.layers.Layer):
    """
    Scale Dot Product Attention Class
    """
    def __init__(self, name="scale_dot_product_attention"):
        """
        생성자
        :param name: layer name
        """
        super().__init__(name=name)

    def call(self, inputs):
        """
        layer 실행
        :param inputs: Q, K, V, attn_mask tuple
        :return attn_out: attention 실행 결과
        """
        #########################################
        Q, K, V, attn_mask = inputs
        # matmul Q, K (transpose_b=True)
        attn_score = tf.matmul(Q, K, transpose_b=True)
        # get scale = d_model ** 0.5
        scale = tf.math.sqrt(tf.cast(tf.shape(K)[-1], tf.float32))
        # print(attn_score)
        # divide by scale
        attn_scale = tf.math.divide(attn_score, scale)
        # print(attn_scale)
        # do mask (subtract 1e-9 for masked value)
        attn_scale -= 1.e9 * attn_mask
        # print(attn_scale)
        # calculate attention prob
        attn_prob = tf.nn.softmax(attn_scale, axis=-1)
        # print(attn_prob)
        # weighted sum of V
        attn_out = tf.matmul(attn_prob, V)
        return attn_out
        #########################################

In [None]:
# Encoder Self Attetnion
Q = hidden_enc
K = hidden_enc
V = hidden_enc

attention = ScaleDotProductAttention()
attn_out = attention((Q, K, V, enc_self_mask))
attn_out

<tf.Tensor: shape=(1, 4, 8), dtype=float32, numpy=
array([[[ 0.09644104,  0.383483  , -0.28066254,  0.32853726,
          0.20899351,  0.5851593 ,  0.81345034,  0.34623823],
        [-0.28741056,  0.34327316, -0.3261309 ,  0.49556422,
          0.03577897,  0.54402035,  0.7828285 ,  0.23952606],
        [-0.03932257,  0.36284068, -0.29634288,  0.37597537,
          0.19668652,  0.5946861 ,  0.81305325,  0.3297636 ],
        [ 0.0439067 ,  0.3756832 , -0.28674185,  0.3472341 ,
          0.2027991 ,  0.5881414 ,  0.8129915 ,  0.33924115]]],
      dtype=float32)>

In [None]:
# Decoder Self Attetnion
Q = hidden_dec
K = hidden_dec
V = hidden_dec

attn_out = attention((Q, K, V, dec_self_mask))
attn_out

<tf.Tensor: shape=(1, 6, 8), dtype=float32, numpy=
array([[[-0.4       , -0.7       , -0.3       , -0.6       ,
         -0.2       ,  0.2       , -0.9       ,  0.        ],
        [-0.5661127 , -0.5338873 ,  0.08759623, -0.43388736,
         -0.2       , -0.18759623, -0.7338874 , -0.22148357],
        [ 0.1065836 ,  0.17015488,  0.39534226,  0.20757586,
         -0.5128951 ,  0.00528993, -0.5930563 ,  0.48770773],
        [-0.22024183,  0.00745238,  0.3843947 ,  0.36241618,
         -0.19720611, -0.47977903, -0.6690711 , -0.29774258],
        [-0.13489895,  0.01090932,  0.3456014 ,  0.2524991 ,
         -0.29308322, -0.29094657, -0.6598883 , -0.04186695],
        [-0.13489895,  0.01090932,  0.3456014 ,  0.2524991 ,
         -0.29308322, -0.29094657, -0.6598883 , -0.04186695]]],
      dtype=float32)>

In [None]:
# Encoder-Decoder Attetnion
Q = hidden_dec
K = hidden_enc
V = hidden_enc

attn_out = attention((Q, K, V, enc_dec_mask))
attn_out

<tf.Tensor: shape=(1, 6, 8), dtype=float32, numpy=
array([[[-0.13165587,  0.35934266, -0.30766588,  0.4273435 ,
          0.10794223,  0.561637  ,  0.7956541 ,  0.28364244],
        [-0.20692162,  0.35365945, -0.31671894,  0.46408376,
          0.05719474,  0.54531634,  0.786073  ,  0.25542712],
        [-0.0051144 ,  0.37713844, -0.29296046,  0.3805099 ,
          0.13042648,  0.5581737 ,  0.7983715 ,  0.30378246],
        [-0.20495296,  0.35653824, -0.31665277,  0.4680712 ,
          0.03770464,  0.5355051 ,  0.7818871 ,  0.24712145],
        [ 0.0439067 ,  0.3756832 , -0.28674185,  0.3472341 ,
          0.2027991 ,  0.5881414 ,  0.8129915 ,  0.33924115],
        [ 0.0439067 ,  0.3756832 , -0.28674185,  0.3472341 ,
          0.2027991 ,  0.5881414 ,  0.8129915 ,  0.33924115]]],
      dtype=float32)>

# Multi Head Attention

In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):
    """
    Multi Head Attention Class
    """
    def __init__(self, config, name="multi_head_attention"):
        """
        생성자
        :param config: Config 객체
        :param name: layer name
        """
        super().__init__(name=name)

        self.d_model = config.d_model
        self.n_head = config.n_head
        self.d_head = config.d_head

        # Q, K, V input dense layer
        self.W_Q = tf.keras.layers.Dense(config.n_head * config.d_head)
        self.W_K = tf.keras.layers.Dense(config.n_head * config.d_head)
        self.W_V = tf.keras.layers.Dense(config.n_head * config.d_head)
        # Scale Dot Product Attention class
        self.attention = ScaleDotProductAttention(name="self_attention")
        # output dense layer
        self.W_O = tf.keras.layers.Dense(config.d_model)

    def call(self, inputs):
        """
        layer 실행
        :param inputs: Q, K, V, attn_mask tuple
        :return attn_out: attention 실행 결과
        """
        #########################################
        Q, K, V, attn_mask = inputs

        Q_m = self.W_Q(Q)
        print(Q_m.shape)
        Q_m = tf.reshape(Q_m, [-1, tf.shape(Q)[1], self.n_head, self.d_head])
        print(Q_m.shape)
        Q_m = tf.transpose(Q_m, )
        # build multihead Q, K, V
        # Q_m = tf.transpose(tf.reshape(self.W_Q(Q), [-1, tf.shape(Q)[1], self.n_head, self.d_head]), [0, 2, 1, 3])  # (bs, n_head, Q_len, d_head)
        # K_m = tf.transpose(tf.reshape(self.W_K(K), [-1, tf.shape(K)[1], self.n_head, self.d_head]), [0, 2, 1, 3])  # (bs, n_head, K_len, d_head)
        # V_m = tf.transpose(tf.reshape(self.W_V(V), [-1, tf.shape(V)[1], self.n_head, self.d_head]), [0, 2, 1, 3])  # (bs, n_head, K_len, d_head)
        #########################################

In [None]:
# Encoder Self Attetnion
Q = hidden_enc
K = hidden_enc
V = hidden_enc

attention = MultiHeadAttention(config)
attn_out = attention((Q, K, V, enc_self_mask))
attn_out

(1, 4, 8)
(1, 4, 2, 4)


In [None]:
# Decoder Self Attetnion
Q = hidden_dec
K = hidden_dec
V = hidden_dec

attn_out = attention((Q, K, V, dec_self_mask))
attn_out

(1, 6, 8)
(1, 6, 2, 4)


In [None]:
# Encoder-Decoder Attetnion
Q = hidden_dec
K = hidden_enc
V = hidden_enc

attn_out = attention((Q, K, V, enc_dec_mask))
attn_out

(1, 6, 8)
(1, 6, 2, 4)


# Feed Forward

In [None]:
class PositionWiseFeedForward(tf.keras.layers.Layer):
    """
    Position Wise Feed Forward Class
    """
    def __init__(self, config, name="feed_forward"):
        """
        생성자
        :param config: Config 객체
        :param name: layer name
        """
        super().__init__(name=name)

        self.W_1 = tf.keras.layers.Dense(config.d_ff, activation=tf.nn.relu)
        self.W_2 = tf.keras.layers.Dense(config.d_model)

    def call(self, inputs):
        """
        layer 실행
        :param inputs: inputs
        :return ff_val: feed forward 실행 결과
        """
        # linear W_1 and W_2
        ff_val = self.W_1(inputs)
        ff_val = self.W_2(ff_val)
        return ff_val

In [None]:
# feed-forward class 동작 확인
feed_forward = PositionWiseFeedForward(config)
ff_val = feed_forward(hidden_enc)
ff_val.shape

TensorShape([1, 4, 8])

# LayerNormal
- https://arxiv.org/abs/1607.06450

In [None]:
# 큰 hidden 생성
hidden = np.array([[1, 2, 3],
                   [11, 11, 13],
                   [111, 122, 133]]).astype(np.float32)
hidden

array([[  1.,   2.,   3.],
       [ 11.,  11.,  13.],
       [111., 122., 133.]], dtype=float32)

In [None]:
# layer_normal 실행
layer_norm = tf.keras.layers.LayerNormalization()
layer_norm(hidden)

<tf.Tensor: shape=(3, 3), dtype=float32, numpy=
array([[-1.2238274 ,  0.        ,  1.2238274 ],
       [-0.70670974, -0.70670974,  1.4134184 ],
       [-1.2247372 ,  0.        ,  1.2247372 ]], dtype=float32)>

In [None]:
# weights
layer_norm.get_weights()

[array([1., 1., 1.], dtype=float32), array([0., 0., 0.], dtype=float32)]

In [None]:
# 평균 값
mean = np.mean(hidden, axis=-1, keepdims=True)
mean

In [None]:
# sqrt(var - epsiolo)
sigma = np.sqrt(np.var(hidden, axis=-1, keepdims=True) + 0.001)
sigma

In [None]:
# layer normal 계산
(hidden - mean) / sigma

# Encoder Layer

In [None]:
class EncoderLayer(tf.keras.layers.Layer):
    """
    Encoder Layer Class
    """
    def __init__(self, config, name='encoder_layer'):
        """
        생성자
        :param config: Config 객체
        :param name: layer name
        """
        super().__init__(name=name)

        self.self_attention = MultiHeadAttention(config)
        self.norm1 = tf.keras.layers.LayerNormalization(epsilon=config.norm_eps)

        self.ffn = PositionWiseFeedForward(config)
        self.norm2 = tf.keras.layers.LayerNormalization(epsilon=config.norm_eps)

        self.dropout = tf.keras.layers.Dropout(config.dropout)
 
    def call(self, inputs):
        """
        layer 실행
        :param inputs: enc_hidden, self_mask tuple
        :return enc_out: EncoderLayer 실행 결과
        """
        enc_hidden, self_mask = inputs
        # self attention
        self_attn_val = self.self_attention((enc_hidden, enc_hidden, enc_hidden, self_mask))
        # add and layer normal
        norm1_val = self.norm1(enc_hidden + self.dropout(self_attn_val))
        
        # feed forward
        ffn_val = self.ffn(norm1_val)
        # add and layer normal
        enc_out = self.norm2(norm1_val + self.dropout(ffn_val))

        return enc_out

In [None]:
# EncoderLayer 기능 확인
encoder_layer = EncoderLayer(config)
enc_out = encoder_layer((hidden_enc, enc_self_mask))
enc_out.shape

# Decoder Layer

In [None]:
class DecoderLayer(tf.keras.layers.Layer):
    """
    Decoder Layer Class
    """
    def __init__(self, config, name='decoder_layer'):
        """
        생성자
        :param config: Config 객체
        :param name: layer name
        """
        super().__init__(name=name)

        self.self_attention = MultiHeadAttention(config)
        self.norm1 = tf.keras.layers.LayerNormalization(epsilon=config.norm_eps)

        self.ende_attn = MultiHeadAttention(config)
        self.norm2 = tf.keras.layers.LayerNormalization(epsilon=config.norm_eps)

        self.ffn = PositionWiseFeedForward(config)
        self.norm3 = tf.keras.layers.LayerNormalization(epsilon=config.norm_eps)

        self.dropout = tf.keras.layers.Dropout(config.dropout)

    def call(self, inputs):
        """
        layer 실행
        :param inputs: dec_hidden, enc_out, self_mask, ende_mask tuple
        :return dec_out: DecoderLayer 실행 결과
        """
        dec_hidden, enc_out, self_mask, ende_mask = inputs
        # self attention
        self_attn_val = self.self_attention((dec_hidden, dec_hidden, dec_hidden, self_mask))
        # add and layer normal
        norm1_val = self.norm1(dec_hidden + self.dropout(self_attn_val))

        # encoder and decoder attention
        ende_attn_val = self.ende_attn((norm1_val, enc_out, enc_out, ende_mask))
        # add and layer normal
        norm2_val = self.norm2(norm1_val + self.dropout(ende_attn_val))

        # feed forward
        ffn_val = self.ffn(norm2_val)
        # add and layer normal
        dec_out = self.norm3(norm2_val + self.dropout(ffn_val))

        return dec_out

In [None]:
# Decoder 실행
decoder_layer = DecoderLayer(config)
dec_out = decoder_layer((hidden_dec, hidden_enc, dec_self_mask, enc_dec_mask))
dec_out.shape

# Weight Shared Embedding

In [None]:
class SharedEmbedding(tf.keras.layers.Layer):
    """
    Weighed Shaed Embedding Class
    """
    def __init__(self, config, name='weight_shared_embedding'):
        """
        생성자
        :param config: Config 객체
        :param name: layer name
        """
        super().__init__(name=name)

        self.n_vocab = config.n_vocab
        self.d_model = config.d_model
    
    def build(self, input_shape):
        """
        shared weight 생성
        :param input_shape: Tensor Shape (not used)
        """
        with tf.name_scope('shared_embedding_weight'):
            self.shared_weights = self.add_weight(
                'weights',
                shape=[self.n_vocab, self.d_model],
                initializer=tf.keras.initializers.TruncatedNormal(stddev=self.d_model ** -0.5)
            )

    def call(self, inputs, mode='embedding'):
        """
        layer 실행
        :param inputs: 입력
        :param mode: 실행 모드
        :return: embedding or linear 실행 결과
        """
        # mode가 embedding일 경우 embedding lookup 실행
        if mode == 'embedding':
            return self._embedding(inputs)
        # mode가 linear일 경우 linear 실행
        elif mode == 'linear':
            return self._linear(inputs)
        # mode가 기타일 경우 오류 발생
        else:
            raise ValueError(f'mode {mode} is not valid.')
    
    def _embedding(self, inputs):
        """
        embedding lookup
        :param inputs: 입력
        """
        # lookup by gather
        embed = tf.gather(self.shared_weights, tf.cast(inputs, tf.int32))
        # muliply d_model ** 0.5
        embed *= self.d_model ** 0.5
        return embed

    def _linear(self, inputs):  # (bs, n_seq, d_model)
        """
        linear 실행
        :param inputs: 입력
        """
        # matmul inputs, shared_weights (transpose_b=True)
        outputs = tf.matmul(inputs, self.shared_weights, transpose_b=True)
        return outputs

In [None]:
embedding = SharedEmbedding(config)
hidden_dec = embedding(train_dec_inputs)
hidden_dec.shape

In [None]:
linear_outputs = embedding(hidden_dec, mode="linear")
linear_outputs.shape

# Postional Encoding

In [None]:
class PositionalEmbedding(tf.keras.layers.Layer):
    """
    Positional Embedding Class
    """
    def __init__(self, config, name='position_embedding'):
        """
        생성자
        :param config: Config 객체
        :param name: layer name
        """
        super().__init__(name=name)
        
        pos_encoding = PositionalEmbedding.get_sinusoid_encoding(config.n_seq, config.d_model)
        self.embedding = tf.keras.layers.Embedding(config.n_seq, config.d_model, trainable=False, weights=[pos_encoding])

    def call(self, inputs):
        """
        layer 실행
        :param inputs: 입력
        :return embed: positional embedding lookup 결과
        """
        # make position (0...n_seq)
        position = tf.math.cumsum(tf.ones_like(inputs), axis=1, exclusive=True)
        position = tf.cast(position, tf.int32)
        # embedding lookup
        embed = self.embedding(position)
        return embed

    @staticmethod
    def get_sinusoid_encoding(n_seq, d_model):
        """
        sinusoid encoding 생성
        :param n_seq: sequence number
        :param n_seq: model hidden dimension
        :return: positional encoding table
        """
        # calculate angle
        exs = [2 * (i_ang // 2) / d_model for i_ang in range(d_model)]
        angles = [np.power(10000, ex) for ex in exs]
        # calculate position
        pos_encoding = np.array([[pos / angle for angle in angles] for pos in range(n_seq)])
        # sin even number
        pos_encoding[:, 0::2] = np.sin(pos_encoding[:, 0::2])
        # cos odd number
        pos_encoding[:, 1::2] = np.cos(pos_encoding[:, 1::2])
        return tf.cast(pos_encoding, tf.float32)

In [None]:
# position encoding 확인
pos_encoding = PositionalEmbedding.get_sinusoid_encoding(4, 4)
pos_encoding

In [None]:
# display
plt.pcolormesh(pos_encoding, cmap='RdBu')
plt.xlabel('Depth')
plt.xlim((0, config.d_model))
plt.ylabel('Position')
plt.colorbar()
plt.show()

In [None]:
# PositionalEmbedding 클래스 시험
pos_embedding = PositionalEmbedding(config)
dec_pos = pos_embedding(train_enc_inputs)
dec_pos.shape

In [None]:
# 512x512 position encoding table 생성
pos_encoding = PositionalEmbedding.get_sinusoid_encoding(512, 512)
# display
plt.pcolormesh(pos_encoding, cmap='RdBu')
plt.xlabel('Depth')
plt.xlim((0, 512))
plt.ylabel('Position')
plt.colorbar()
plt.show()

# Transformer

In [None]:
class Transformer(tf.keras.Model):
    """
    Transformer Class
    """
    def __init__(self, config, name='transformer'):
        """
        생성자
        :param config: Config 객체
        :param name: layer name
        """
        super().__init__(name=name)

        self.i_pad = config.i_pad
        self.embedding = SharedEmbedding(config)
        self.position = PositionalEmbedding(config)
        
        self.encoder_layers = [EncoderLayer(config, name=f'encoder_layer_{i}') for i in range(config.n_layer)]
        self.decoder_layers = [DecoderLayer(config, name=f'decoder_layer_{i}') for i in range(config.n_layer)]

        self.dropout = tf.keras.layers.Dropout(config.dropout)

    def call(self, inputs):
        """
        layer 실행
        :param inputs: enc_tokens, dec_tokens tuple
        :return logits: dec_tokens에 대한 다음 토큰 예측 결과 logits
        """
        enc_tokens, dec_tokens = inputs
        # encoder self attention mask
        enc_self_mask = get_pad_mask(enc_tokens, self.i_pad)
        # decoder self attention mask
        dec_self_mask = get_causal_mask(dec_tokens, self.i_pad)
        # encoder and decoder attention mask
        enc_dec_mask = get_pad_mask(enc_tokens, self.i_pad)

        # enc_tokens, dec_tokens embedding lookup
        enc_embed = self.get_embedding(enc_tokens)
        dec_embed = self.get_embedding(dec_tokens)

        #########################################
        #########################################

        # call weight shared embedding (model=linear)
        logits = self.embedding(dec_hidden, mode='linear')
        return logits
    
    def get_embedding(self, tokens):
        """
        token embedding, position embedding lookup
        :param tokens: 입력 tokens
        :return embed: embedding 결과
        """
        embed = self.embedding(tokens) + self.position(tokens)
        return embed

In [None]:
# Transformer 기능 확인. 최종 결과가 (bs, n_seq(dec), n_vocab)
transformer = Transformer(config)
logits = transformer((train_enc_inputs, train_dec_inputs))
logits.shape