In [1]:
import numpy as np
import tensorflow as tf 

In [2]:
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, N, d_model):
        super(PositionalEncoding, self).__init__()
        self.pos_encoding = self.get_positional_encoding(N, d_model)
    
    def get_positional_encoding(self, N, d_model):
        
        def get_pos_matrix(pos, i, d_model):
            pos_matrix = pos / tf.math.pow(10000, i / tf.cast(d_model, tf.float32))
            pos_matrix = pos_matrix.numpy()

            pos_matrix[:,0::2] = tf.math.sin(pos_matrix[:,0::2])
            pos_matrix[:,1::2] = tf.math.cos(pos_matrix[:,1::2])

            return pos_matrix

        pos_encoding = get_pos_matrix(pos=tf.range(N, dtype=tf.float32)[:, tf.newaxis], i=tf.range(d_model, dtype=tf.float32)[tf.newaxis,:], d_model=d_model)
   
        print("Positional Encoding", pos_encoding.shape)
        return tf.cast(pos_encoding[:,:], tf.float32)

    def call(self, inputs):
        return inputs + self.pos_encoding

In [3]:
def create_padding_mask(multiple_qk):
    mask = tf.cast(tf.math.equal(multiple_qk,0), tf.float32)
    return mask * -1e9

In [4]:
def create_look_ahead_mask(attention_score_matrix):
    N = tf.shape(attention_score_matrix)[1]

    mask = tf.ones(shape=(N, N), dtype=tf.float32)
    #mask = tf.experimental.numpy.triu(mask, 1) 
    
    mask = 1 - tf.linalg.band_part(mask, -1, 0)
    mask = mask[tf.newaxis, :, :] * -1e9

    pad_mask = create_padding_mask(attention_score_matrix)
    return tf.minimum(mask, pad_mask)

In [6]:
def scaled_dot_product_attention(query, key, value, mask_type):

    # Attention Score : Q * K^T
    attention_score_matrix = tf.matmul(query, key, transpose_b=True)
    
    # Scaling : Divide by sqrt(d_k)
    d_k = tf.cast(key.shape[-1], tf.float32)
    scaled_matrix = attention_score_matrix / tf.math.sqrt(d_k)

    # Padding Mask or Look-Ahead Mask
    if mask_type is 'padding':
        scaled_matrix += create_padding_mask(scaled_matrix)
    elif mask_type is 'look_ahead':
        scaled_matrix += create_look_ahead_mask(scaled_matrix)

    # Softmax fuction
    attention_weights = tf.nn.softmax(scaled_matrix, axis=-1) 

    # Weighted Sum : multiply V matrix
    attention_value = tf.matmul(attention_weights, value)

    return attention_value, attention_weights

In [7]:
def printShape(Q, K, V, status):
    print("[{status}] Q shape : {q}, K shape : {k}, V shape : {v}\n".format(status=status, q=Q.shape, k=K.shape, v=V.shape))

In [8]:
class MultiHeadAttention:
    def __init__(self, num_heads, d_model):
        self.num_heads = num_heads
        self.d_model = d_model
        
        assert d_model % num_heads == 0, "depth가 정수 형식이 아닙니다."
        self.depth = self.d_model // self.num_heads

        # Dense 층의 출력 차원은 d_model
        self.WQ = tf.keras.layers.Dense(units=self.d_model)
        self.WK = tf.keras.layers.Dense(units=self.d_model)
        self.WV = tf.keras.layers.Dense(units=self.d_model)
        self.WO = tf.keras.layers.Dense(units=self.d_model)

    
    def get_attention(self, query, key, value, mask_type=None):

        printShape(query, key, value, "Input")
        
        def split_sequences(batch_size, num_heads, d_model, query, key, value):
            Q_list = tf.reshape(query, (batch_size, -1, num_heads, d_model // num_heads))
            K_list = tf.reshape(key, (batch_size, -1, num_heads, d_model // num_heads))
            V_list = tf.reshape(value, (batch_size, -1, num_heads, d_model // num_heads))

            return tf.transpose(Q_list, perm=[0, 2, 1, 3]), tf.transpose(K_list, perm=[0, 2, 1, 3]), tf.transpose(V_list, perm=[0, 2, 1, 3])
            
        # 현재 batch_size는 1이다.
        # 모델 훈련에서의 batch 당 token의 수를 의미한다.
        batch_size = tf.shape(query)[0]

        # Q*W^Q : Dense 층 구성
        q_WQ = self.WQ(query)
        k_WK = self.WK(key)
        v_WV = self.WV(value)
        printShape(q_WQ, k_WK, v_WV, "Dense")

        # num_heads로 입력 행렬 분할
        # (batch_size, 입력 시퀀스 개수, d_model) -> (batch_size, num_heads, 입력 시퀀스 개수, d_model/num_heads)
        Q_list, K_list, V_list = split_sequences(batch_size, self.num_heads, self.d_model, q_WQ, k_WK, v_WV)
        printShape(Q_list, K_list, V_list, "Splited")

        # Attention value 
        scaled_attention, _ = scaled_dot_product_attention(Q_list, K_list, V_list, mask_type)
        
        # head를 연결하기 위한 Tensor shape 조정
        # (batch_size, num_heads, 입력 시퀀스 개수, d_model/num_heads) -> (batch_size, 입력 시퀀스 개수, num_heads, d_model/num_heads)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        
        # head 연결
        # (batch_size, 입력 시퀀스 개수, d_model)
        concat_attention = tf.reshape(scaled_attention,(batch_size, -1, self.d_model))

        # Multi-Head 최종 결과 값
        result = self.WO(concat_attention)

        return result
        
        

In [9]:
class Encoder:
    def __init__(self, N, layer_num, dff, d_model, num_heads, dropout=None):
        self.N = N
        self.layer_num = layer_num

        self.dff = dff
        self.d_model = d_model
        self.num_heads = num_heads
        self.dropout = dropout

    def stack_encode_layer(self, layer_name):
        
        # Input 1개  : 인코더 입력
        inputs = tf.keras.Input(shape=(None, self.d_model), name="encode_inputs")

        print(layer_name, "sub-layer 1")
        # encoder의 self attention은 query, key, value가 모두 입력 문장의 단어 벡터를 의미한다.
        # query = key = value
        query = key = value = inputs
        
        # Multi-Head Attention
        multi_head_attention = MultiHeadAttention(self.num_heads, self.d_model)
        attention_value = multi_head_attention.get_attention(query, key, value, mask_type='padding')

        attention_value = tf.keras.layers.Dropout(rate=0.1)(attention_value)
        # Residual connection
        attention_value += inputs
        # Normalization
        sublayer_output = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention_value)

        print(layer_name, "sub-layer 2")
        # Feed Forward Network
        # 입력과 출력의 크기가 보존되며, FFN의 은닉층 크기는 dff다.
        feed_forward_net = tf.keras.layers.Dense(units=self.dff, activation='relu')(sublayer_output)
        feed_forward_net = tf.keras.layers.Dense(units=self.d_model)(feed_forward_net)
        
        feed_forward_net = tf.keras.layers.Dropout(rate=0.1)(feed_forward_net)
        # Residual connection
        feed_forward_net += sublayer_output
        # Normalization
        encoder_layer_output = tf.keras.layers.LayerNormalization(epsilon=1e-6)(feed_forward_net)

        return tf.keras.Model(inputs=[inputs], outputs=encoder_layer_output, name=layer_name)

    def get_encoder(self):

        inputs = tf.keras.Input(shape=(None, self.d_model), name="encoder_inputs")

        # Positional Encoding
        encoder_input = PositionalEncoding(self.N, self.d_model)(inputs)
        
        # Encoder Layer 쌓기
        # layer_num 만큼 encoder layer를 쌓는다
        for idx in range(self.layer_num):
            encoder_input = encoder_output = self.stack_encode_layer(layer_name="encoder_layer_{}".format(idx))(inputs=[encoder_input])

        return tf.keras.Model(inputs=[inputs], outputs=encoder_output, name="Encoder")


In [12]:
class Decoder:
    def __init__(self, N, layer_num, dff, d_model, num_heads, dropout=None):
        self.N = N
        self.layer_num = layer_num

        self.dff = dff
        self.d_model = d_model
        self.num_heads = num_heads
        self.dropout = dropout

    def stack_decode_layer(self, layer_name):
        print(layer_name, "sub-layer 1")
        
        #Input 2개 : 디코더 입력, 인코더 출력
        decoder_input = tf.keras.Input(shape=(None, self.d_model), name="decoder_layer_input")
        encoder_output = tf.keras.Input(shape=(None, self.d_model), name="encoder_output")

        # Masked Multi-Head Self Attention
        # 디코더의 Self Attention에서 query, key, value의 출처는 디코더 입력이다.
        query = key = value = decoder_input

        self_attention = MultiHeadAttention(self.num_heads, self.d_model)
        attention_value = self_attention.get_attention(query, key, value, mask_type='look_ahead')

        attention_value = tf.keras.layers.Dropout(rate=0.1)(attention_value)
        # Residual connection
        attention_value += decoder_input
        # Normalization
        sublayer_output_1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention_value)

        # Multi-Head Encoder-Decoder Attention
        # 디코더 Encoder-Decoder Attention의 입력 중 Q는 디코더 sub-layer의 출력이고, K,V는 인코더의 출력이다.
        key_from_encoder = value_from_encoder = encoder_output
        query_from_decoder = sublayer_output_1

        print(layer_name, "sub-layer 2")
        # 두번째 Encoder-Decoder Attention은 padding masking을 수행한다.
        encoder_decoder_attention = MultiHeadAttention(self.num_heads, self.d_model)
        attention_value = encoder_decoder_attention.get_attention(query_from_decoder, key_from_encoder, value_from_encoder, mask_type='padding')

        attention_value = tf.keras.layers.Dropout(rate=0.1)(attention_value)
        # Residual connection
        attention_value += sublayer_output_1
        # Normalization
        sublayer_output_2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention_value)

        print(layer_name, "sub-layer 3")
        # Feed Forward Network
        # 입력과 출력의 크기가 보존되며, FFN의 은닉층 크기는 dff다.
        feed_forward_net = tf.keras.layers.Dense(units=self.dff, activation='relu')(sublayer_output_2)
        feed_forward_net = tf.keras.layers.Dense(units=self.d_model)(feed_forward_net)

        feed_forward_net = tf.keras.layers.Dropout(rate=0.1)(feed_forward_net)
        # Residual connection
        feed_forward_net += sublayer_output_2
        # Normalization
        decoder_layer_output = tf.keras.layers.LayerNormalization(epsilon=1e-6)(feed_forward_net)

        return tf.keras.Model(inputs=[decoder_input, encoder_output], outputs=decoder_layer_output, name=layer_name)


    def get_decoder(self):
        
        #Input 4개 : 디코더 입력, 인코더 출력, Look-ahead mask, padding mask
        decoder_input = tf.keras.Input(shape=(None, self.d_model), name="decoder_inputs")
        encoder_output = tf.keras.Input(shape=(None, self.d_model), name="encoder_outputs")

        # Positional Encoding
        input = PositionalEncoding(self.N, self.d_model)(decoder_input)
        
        # Decoder Layer 쌓기
        # layer_num 만큼 decoder layer를 쌓는다
        for idx in range(self.layer_num):
            input = decoder_output = self.stack_decode_layer(layer_name="decoder_layer_{}".format(idx))(inputs=[input, encoder_output])

        return tf.keras.Model(inputs=[decoder_input, encoder_output], outputs=decoder_output, name="Decoder")


In [11]:
class Transformer:
    def __init__(self, vocab_size, layer_num, dff, d_model, num_heads):
        self.vocab_size = vocab_size
        self.layer_num = layer_num

        self.dff = dff
        self.d_model = d_model
        self.num_heads = num_heads

    '''
    encoder input : 인코더의 입력은, 요약하지 않은 문장을 토큰화하여 임베딩한 벡터들.
    decoder input : 디코더의 입력은 요약된 문장을 토큰화하여 임베딩한 벡터들.
    '''
    def get_transformer(self):
        
        #Input 2개 : 인코더 입력, 디코더 입력
        encoder_input = tf.keras.Input(shape=(None, self.d_model), name="encoder_inputs")
        decoder_input = tf.keras.Input(shape=(None, self.d_model), name="decoder_inputs")

        #인코더
        encoder = Encoder(self.vocab_size, self.layer_num, self.dff, self.d_model, self.num_heads)
        encoder_output = encoder.get_encoder()(inputs=[encoder_input])

        #디코더
        decoder_output = decoder.get_decoder()(inputs=[decoder_input, encoder_output])

        '''
        디코더에서는 인코더의 행렬과 디코더의 입력을 통해 다음 단어를 예측한다.
        디코더의 출력은 임베딩 벡터의 개수 vocab size의 크기를 가지며, 확률 값을 가진다.
        '''
        # 단어 예측을 위한 출력층
        output = tf.keras.layers.Dense(units=self.vocab_size, name="Output")(decoder_output)

        return tf.keras.Model(inputs=[encoder_input, decoder_input], outputs=output, name="Transformer")

In [10]:
class LearningRate(tf.keras.callbacks.LearningRateScheduler):
    def __init__(self, d_model, warmup_steps):
        self.d_model = d_model
        self.warmup_steps = warmup_steps
    
    def __call__(self, step_num):
        min_val = tf.math.minimum(step_num ** -0.5,
                                  step_num * (self.warmup_steps ** -1.5))
        lrate = (self.d_model ** -0.5) * min_val
        return lrate