In [1]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

### Positional Encoding

In [2]:
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, position, d_model):
        super(PositionalEncoding, self).__init__()
        self.pos_encoding = self.positional_encoding(position, d_model)
        
    def get_angles(self, position, i, d_model):
        angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))  ## 왜 i // 2 ??
        return position * angles
    
    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(
            position = tf.range(position, dtype = tf.float32)[:, tf.newaxis],  ## tf.newaxis ?
            i = tf.range(d_model, dtype = tf.float32)[tf.newaxis, :],
            d_model = d_model)
        
        sines = tf.math.sin(angle_rads[:, 0::2])
        
        cosines = tf.math.cos(angle_rads[:, 1::2])
        
        angle_rads = np.zeros(angle_rads.shape)
        angle_rads[:, 0::2] = sines
        angle_rads[:, 1::2] = cosines
        pos_encoding = tf.constant(angle_rads)
        pos_encoding = pos_encoding[tf.newaxis, ...]
        ## a = np.array([[1, 2, 3, 4, 5]], dtype = np.float32)
        ## a.shape --> (1, 5)
        ## a[tf.newaxis, ...].shape --> (1, 1, 5)
        ## a[tf.newaxis, tf.newaxis, ...].shape --> (1, 1, 1, 5)
        
        print(pos_encoding.shape)
        return tf.cast(pos_encoding, tf.float32)
    

    def call(self, inputs):
        return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]
        

### Scaled Dot-Product Attention

In [3]:
def scaled_dot_product_attention(query, key, value, mask):
    
    matmul_qk = tf.matmul(query, key, transpose_b = True)
    
    depth = tf.cast(tf.shape(key)[-1], tf.float32)
    logits = matmul_qk / tf.math.sqrt(depth)
    
    # 매우 작은 음수를 어텐션 스코어 행렬에 넣어주므로 softmax 함수를 지나면 해당 위치는 0이 된다.
    if mask is not None:
        logits += (mask * -1e9)
        
    attention_weights = tf.nn.softmax(logits, axis = -1)
    
    output = tf.matmul(attention_weights, value)
    
    return output, attention_weights
    
    

### Multi-head Attention

In [4]:
class MultiHeadAttention(tf.keras.layers.Layer):
    
    def __init__(self, d_model, num_heads, name = "multi_head_attention"):
        super(MultiHeadAttention, self).__init__(name = name)
        self.num_heads = num_heads
        self.d_model = d_model
        
        assert d_model % self.num_heads == 0
        
        self.depth = d_model // self.num_heads
        
        # WQ, WK, WV 에 해당하는 dense layer 정의
        self.query_dense = tf.keras.layers.Dense(units = d_model) ## units : Positive integer, dimensionality of the output space
        self.key_dense = tf.keras.layers.Dense(units = d_model)
        self.value_dense = tf.keras.layers.Dense(units = d_model)
        
        # W0 에 해당하는 dense layer 정의
        self.dense = tf.keras.layers.Dense(units = d_model)
        
    def split_heads(self, inputs, batch_size):
        inputs = tf.reshape(
            inputs, shape = (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(inputs, perm = [0, 2, 1, 3]) # perm : index of dimension list
    
    
    def call(self, inputs):
        query, key, value, mask = inputs['query'], inputs['key'], inputs['value'], inputs['mask']
        batch_size = tf.shape(query)[0]
        
        
        
        # 1. WQ, WK, WV 에 해당하는 dense layer 지나기
        # q : (batch_size, query 의 문장 길이, d_model)
        # k : (batch_size, key 의 문장 길이, d_model)
        # v : (batch_size, value 의 문장 길이, d_model)
        
        query = self.query_dense(query)
        key = self.key_dense(key)
        value = self.value_dense(value)
        
        
        # 2. head split
        # q : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
        # k : (batch_size, num_heads, key 의 문장 길이, d_model/num_heads)
        # v : (batch_size, num_heads, value 의 문장 길이, d_model/num_heads)
        
        query = self.split_heads(query, batch_size)
        key = self.split_heads(key, batch_size)
        value = self.split_heads(value, batch_size)
        
        # 3. Scaled dot product attention
        # (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
        scaled_attention, _ = scaled_dot_product_attention(query, key, value, mask)
        # (batch_size, query의 문장 길이, num_heads, d_model/num_heads)
        scaled_attention = tf.transpose(scaled_attention, perm = [0, 2, 1, 3])
        
        
        # 4. head concatenate
        # (batch_size, query 의 문장 길이, d_model)
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
        
        
        # 5. W0 에 해당하는 dense laeyr 지나기 
        # (batch_size, query 의 문장 길이, d_model)
        outputs = self.dense(concat_attention)
        
        return outputs
        

### Padding Mask

In [5]:
def create_padding_mask(x):
    mask = tf.cast(tf.math.equal(x, 0), tf.float32)
    # (batch_size, 1, 1, key 의 문장 길이)
    return mask[:, tf.newaxis, tf.newaxis, :]


## 리턴된 벡터를 통해서 1의 값을 가진 위치의 열을 어텐션 스코어 행렬에서 마스킹하는 용도로 사용할 수 있다.
## 리턴된 벡터를 scaled dot product attention 메서드에 전달하면 해당 열에 매우 작은 음수 값을 더해줘 마스킹 하게 된다.

In [6]:
print(create_padding_mask(tf.constant([[1, 21, 777, 0, 0]])))

tf.Tensor([[[[0. 0. 0. 1. 1.]]]], shape=(1, 1, 1, 5), dtype=float32)


### Position-wise FFNN

In [7]:
# outputs = tf.keras.layers.Dense(units = dff, activation = 'relu')(attention)
# outputs = tf.keras.layers.Dense(units = d_model)(outputs)

### Residual Connection & Layer Normalization

### Encoder

In [8]:
def encoder_layer(dff, d_model, num_heads, dropout, name = "encoder_layer"):
    inputs = tf.keras.Input(shape = (None, d_model), name = "inputs")
    # 배치(batch)의 크기에 해당하는 첫 번째 차원 크기의 None은 크기를 여기서 정하지 않는다(어떤 배치 크기라도 가능하다)는 것을 의미
    
    # encoder 는 padding mask 사용
    padding_mask = tf.keras.Input(shape = (1, 1, None), name = "padding_mask")
    
    # multi-head attention (첫 번째 서브층 / 셀프 어텐션)
    attention = MultiHeadAttention(
        d_model, num_heads, name = "attention")({
            'query' : inputs, 'key' : inputs, 'value' : inputs, 'mask' : padding_mask
    })
    
    # dropout + residual connection + layer normalization
    attention = tf.keras.layers.Dropout(rate = dropout)(attention)
    attention = tf.keras.layers.LayerNormalization(epsilon = 1e-6)(inputs + attention)
    
    # position wise FFNN
    outputs = tf.keras.layers.Dense(units = dff, activation = 'relu')(attention)
    outputs = tf.keras.layers.Dense(units = d_model)(outputs)
    
    # dropout + residual connection + layer normalization
    outputs = tf.keras.layers.Dropout(rate = dropout)(outputs)
    outputs = tf.keras.layers.LayerNormalization(epsilon = 1e-6)(attention + outputs)
    
    return tf.keras.Model(inputs = [inputs, padding_mask], outputs = outputs, name = name)
    

### Encoder Stack

In [9]:
def encoder(vocab_size, num_layers, dff, d_model, num_heads, dropout, name = "encoder"):
    inputs = tf.keras.Input(shape = (None, ), name = "inputs")
    
    # encoder는 패딩 마스크 사용
    padding_mask = tf.keras.Input(shape = (1, 1, None), name = "padding_mask")
    
    # positioanl encoding + dropout
    embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
    embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))                   ## 왜 sqrt(d_model) ?
    embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
    outputs = tf.keras.layers.Dropout(rate = dropout)(embeddings)
    
    # encoder를 num_layers개 쌓기
    for i in range(num_layers):
        outputs = encoder_layer(dff = dff, d_model = d_model, num_heads = num_heads, 
                                dropout = dropout, name = "encoder_layer_{}".format(i),)([outputs, padding_mask])
        
        
    return tf.keras.Model(inputs = [inputs, padding_mask], outputs = outputs, name = name)

### Decoder first sublayer

In [10]:
## transformer는 seq2seq와 마찬가지로 Teacher Forcing을 사용하여 훈련되므로 학습 과정에서 디코더는 번역할 문장을
## 행렬로 한 번에 입력 받는다. 여기서 트랜스포머는 현재 시점의 단어를 예측하고자 할 때, 입력 문장 행렬로부터 
## 미래 시점의 단어까지도 참고할 수 있는 현상이 발생한다.
## 트랜스포머는 디코더에서 현재 시점의 예측에서 미래에 있는 단어들을 참고하지 못하도록 look-ahead mask를 도입했다.

In [11]:
## encoder의 sublayer인 multi head self attention과 동일한 역할을 하는데,
## 한 가지 차이는 어텐션 스코어 행렬에서 masking을 적용한다는 것이다.
## 자기 자신보다 미래에 있는 단어들은 참고하지 못하도록 마스킹한다.

In [12]:
## look ahead mask는 padding mask와 마찬가지로 scaled dot product attention 함수에 mask라는 인자로 전달된다.
## padding making을 써야할 땐, padding mask를 전달, look ahead masking을 해야할 땐, look ahead mask를 전달

```
    - 인코더의 셀프 어텐션 : 패딩 마스크 전달
    - 디코더 첫 번째 서브층 셀프 어텐션 : 룩 어헤드 마스크 전달
    - 디코더 두 번째 서브층 인코더-디코더 어텐션 : 패딩 마스크 전달
```

In [13]:
# decoder의 첫 번째 sublayer에서 미래 토큰을 mask하는 함수
def create_look_ahead_mask(x):
    seq_len = tf.shape(x)[1]
    # tf.linalg.band_part(input, num_lower, num_upper) --> num_lower : # of subdiagonals to keep, if -1, keep entire lower triangle
    look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)  # --> upper 다 0으로
    # padding mask도 포함 (look ahead mask를 한다고 padding mask가 필요 없는 것은 아님)
    padding_mask = create_padding_mask(x) # --> 0인 부분 1로, 0이 아닌 부분 0으로
    
    return tf.maximum(look_ahead_mask, padding_mask) #

### Decoder second sublayer

```
인코더의 첫번째 서브층 : Query = Key = Value
디코더의 첫번째 서브층 : Query = Key = Value
디코더의 두번째 서브층 : Query : 디코더 행렬 / Key = Value : 인코더 행렬

```

In [14]:
## 디코더의 두 번째 서브층은 멀티 헤드 어텐션을 수행한다는 점에서 이전 어텐션들과 같지만, 셀프 어텐션은 아니다.
## 셀프 어텐션은 query, key, value가 같은 경우를 말한다.
## 인코더-디코더 어텐션은 query가 디코더 행렬인 반면, key와 value는 인코더 행렬이기 때문

### decoder 구현

In [15]:
def decoder_layer(dff, d_model, num_heads, dropout, name = "decoder_layer"):
    inputs = tf.keras.Input(shape = (None, d_model), name = "inputs")
    enc_outputs = tf.keras.Input(shape = (None, d_model), name = "encoder_outputs")
    
    # look ahead mask(first layer)
    look_ahead_mask = tf.keras.Input(shape = (1, None, None), name = "look_ahead_mask")
    
    # padding mask(second layer)
    padding_mask = tf.keras.Input(shape = (1, 1, None), name = "padding_mask")
    
    # multi-head attention (첫 번째 서브층 / masked self attention)
    attention1 = MultiHeadAttention(
        d_model, num_heads, name = "attention_1")(inputs = {
            'query' : inputs, 'key' : inputs, 'value' : inputs, # Q = K = V
            'mask' : look_ahead_mask
    })
    
    # residual connection & layer norm
    attention1 = tf.keras.layers.LayerNormalization(epsilon = 1e-6)(attention1 + inputs)
    
    # multi-head attention (두 번째 서브층 / encoder-decoder attention)
    attention2 = MultiHeadAttention(
        d_model, num_heads, name = "attention_2")(inputs = {
            'query' : attention1, 'key' : enc_outputs, 'value' : enc_outputs, # Q != K = V
            'mask' : padding_mask
    })
    
    # dropout + resisual connection & layer norm
    attention2 = tf.keras.layers.Dropout(rate = dropout)(attention2)
    attention2 = tf.keras.layers.LayerNormalization(epsilon = 1e-6)(attention2 + attention1)
    
    # position wise FFNN (세 번째 서브층)
    outputs = tf.keras.layers.Dense(units = dff, activation = 'relu')(attention2)
    outputs = tf.keras.layers.LayerNormalization(epsilon = 1e-6)(outputs + attention2)
    
    return tf.keras.Model(inputs = [inputs, enc_outputs, look_ahead_mask, padding_mask],
                         outputs = outputs,
                         name = name)


```
디코더는 총 세 개의 서브층으로 구성됩니다. 첫번째와 두번째 서브층 모두 멀티 헤드 어텐션이지만, 첫번째 서브층은 mask의 인자값으로 look_ahead_mask가 들어가는 반면, 두번째 서브층은 mask의 인자값으로 padding_mask가 들어가는 것을 확인할 수 있습니다. 이는 첫번째 서브층은 마스크드 셀프 어텐션을 수행하기 때문입니다. 세 개의 서브층 모두 서브층 연산 후에는 드롭 아웃, 잔차 연결, 층 정규화가 수행되는 것을 확인할 수 있습니다.

```

### Decoder Stack

In [16]:
def decoder(vocab_size, num_layers, dff, d_model, num_heads, dropout, name = 'decoder'):
    inputs = tf.keras.Input(shape = (None, ), name = 'inputs')
    enc_outputs = tf.keras.Input(shape = (None, d_model), name = 'encoder-outputs')
    
    look_ahead_mask = tf.keras.Input(shape = (1, None, None), name = 'look_ahead_mask')
    padding_mask = tf.keras.Input(shape = (1, 1, None), name = 'padding_mask')
    
    # positional encoding + dropout
    embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
    embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
    embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
    outputs = tf.keras.layers.Dropout(rate = dropout)(embeddings)
    
    # 디코더를 num_layers개 쌓기
    for i in range(num_layers):
        outputs = decoder_layer(dff = dff, d_model = d_model, num_heads = num_heads,
                               dropout = dropout, name = 'decoder_layer_{}'.format(i),
                               )(inputs = [outputs, enc_outputs, look_ahead_mask, padding_mask])
        
    return tf.keras.Model(inputs = [inputs, enc_outputs, look_ahead_mask, padding_mask],
                         outputs = outputs,
                         name = name)

### Transformer 구현

In [17]:
def transformer(vocab_size, num_layers, dff, d_model, num_heads, dropout, name = 'transformer'):
    
    # encoder의 입력
    inputs = tf.keras.Input(shape = (None, ), name = 'inputs')
    
    # decoder의 입력
    dec_inputs = tf.keras.Input(shape = (None, ), name = 'dec_inputs')
    
    # encoder의 padding mask
    enc_padding_mask = tf.keras.layers.Lambda(create_padding_mask, output_shape = (1, 1, None),
                                             name = 'enc_padding_mask')(inputs)
    
    # decoder의 look ahead mask(첫 번째 서브층)
    look_ahead_mask = tf.keras.layers.Lambda(create_look_ahead_mask, output_shape = (1, None, None),
                                            name = 'look_ahead_mask')(dec_inputs)
    
    # decoder의 padding mask(두 번째 서브층)
    dec_padding_mask = tf.keras.layers.Lambda(create_padding_mask, output_shape = (1, 1, None),
                                             name = 'dec_padding_mask')(inputs)
    
    # encoder의 출력은 enc_outputs, decoder로 전달
    enc_outputs = encoder(vocab_size = vocab_size, num_layers = num_layers, dff = dff, d_model = d_model,
                         num_heads = num_heads, dropout = dropout,)(inputs = [inputs, enc_padding_mask])
    
    # decoder의 출력은 dec_outputs, 출력층으로 전달
    dec_outputs = decoder(vocab_size = vocab_size, num_layers = num_layers, dff = dff, d_model = d_model,
                         num_heads = num_heads, dropout = dropout,)(inputs = [dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])
    
    # 다음 단어 예측을 위한 출력층
    outputs = tf.keras.layers.Dense(units = vocab_size, name = 'outputs')(dec_outputs)
    
    return tf.keras.Model(inputs = [inputs, dec_inputs], outputs = outputs, name = name)

### Transformer 하이퍼 파라미터 setting

In [49]:
small_transformer = transformer(
    vocab_size = 9000,
    num_layers = 4,
    dff = 128,
    d_model = 128,
    num_heads = 4,
    dropout = 0.3,
    name = "small_transformer")

# tf.keras.utils.plot_model(small_transformer, to_file = 'small_transformer.png')

(1, 9000, 128)
(1, 9000, 128)


### 손실 함수 정의하기

In [50]:
def loss_function(y_true, y_pred):
    y_true = tf.reshape(y_true, shape = (-1, MAX_LENGTH - 1))
    
    loss = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits = True, reduction = 'none')(y_true, y_pred)
    
    mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
    
    loss = tf.multiply(loss, mask)
    
    return tf.reducde_mean(loss)