In [1]:
import os
path = "C:/pytest/data/transformer/"
os.chdir(path)

In [2]:
import tensorflow as tf
import pandas as pd
import numpy as np

In [3]:
import pickle
with open('transformer.pickle', 'rb') as f:
    loaded_tokenizer = pickle.load(f)

In [4]:
with open('transformer_dict.pickle','rb') as f:
    transformer_dict = pickle.load(f)

In [5]:
encoder_input_pad = transformer_dict['encoder_input_pad']
decoder_input_pad = transformer_dict['decoder_input_pad']
decoder_target_pad = transformer_dict['decoder_target_pad']
sentence_max_length = transformer_dict['sentence_max_length']

In [6]:
word_index = loaded_tokenizer.word_index

In [7]:
PAD_INDEX = 0
STD_INDEX = 1
END_INDEX = 2

index_inputs = encoder_input_pad
index_outputs = decoder_input_pad
index_targets = decoder_target_pad

char2idx_dict = word_index
idx2char_dict = {y:x for x,y in word_index.items()}

In [8]:
char2idx_dict['<PAD>'] = 0

char2idx_dict['<SOS>'] = char2idx_dict['SOS']
del char2idx_dict['SOS']

char2idx_dict['<END>'] = char2idx_dict['EOS']
del char2idx_dict['EOS']

idx2char_dict[0] = '<PAD>'
idx2char_dict[1] = '<SOS>'
idx2char_dict[2] = '<END>'

In [9]:
prepro_configs = dict({'char2idx': char2idx_dict, 'idx2char': idx2char_dict, 
                      'vocab_size': len(word_index), 'pad_symbol':'<PAD>','std_symbol':'<SOS>',
                      'end_symbol': '<END>'})

In [10]:
char2idx = prepro_configs['char2idx']
end_index = prepro_configs['end_symbol']
vocab_size = prepro_configs['vocab_size']
MAX_SEQUENCE = 25
BATCH_SIZE = 2
EPOCHS = 30
VALID_SPLIT = 0.1
model_name = 'transformer'

In [11]:
kargs = {'model_name': model_name,
         'num_layers': 2,
         'd_model': 512, # 단어의 차원 = 임베딩 dimension
         'num_heads':8,
         'dff': 2048, # 출력층의 노드 수
         'input_vocab_size': vocab_size, # 단어 사전의 수
         'target_vocab_size': vocab_size, # 단어 사전의 수
         'maximum_position_encoding': MAX_SEQUENCE, # 포지션 인코더의 최대 시퀀스 길이
         'end_token_idx': char2idx[end_index], # 종료 표지의 인덱스
         'rate' : 0.1 # Dropout에 사용되는 비율
        }

In [12]:
def create_padding_mask(seq):
    mask = tf.cast(tf.math.equal(seq,0), tf.float32)
    return mask[:, tf.newaxis, tf.newaxis,:]

def create_look_ahead_mask(size):
    mask = 1- tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask

def create_masks(inp, tar):
    enc_padding_mask = create_padding_mask(inp)
    dec_padding_mask = create_padding_mask(inp)
    
    look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
    dec_target_padding_mask = create_padding_mask(tar)
    combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
    
    return enc_padding_mask, combined_mask, dec_padding_mask

In [13]:
enc_padding_mask, look_ahead_mask, dec_padding_mask = create_masks(index_inputs, index_outputs)

In [14]:
def get_angles(pos, i, d_model):
    angle_rates = 1/np.power(10000,(2*i//2)/np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:,np.newaxis],
                           np.arange(d_model)[np.newaxis,:],
                           d_model)
    angle_rads[:,0::2] = np.sin(angle_rads[:,0::2])
    angle_rads[:,1::2] = np.cos(angle_rads[:,1::2])
    
    pos_encoding=  angle_rads[np.newaxis,...]
    return tf.cast(pos_encoding, dtype = tf.float32)

def scaled_dot_product_attention(q,k,v,mask):
    matmul_qk = tf.matmul(q,k,transpose_b = True)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
    
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis = -1)
    output = tf.matmul(attention_weights,v)
    
    return output, attention_weights

In [15]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, **kargs):
        super().__init__() # 부모의 기능 가져오기, Layers 클래스의 메서드 가져오기
        self.num_heads = kargs['num_heads'] # 어텐션 헤드 수 8
        self.d_model = kargs['d_model'] # 단어의 차원 수 512
        
        # assert구문은 문제 발생 시 알림 역할
        assert self.d_model % self.num_heads ==0
        # d_model의 차원 수는 헤드의 개수로 나머지 없이 나뉘어야 함
        
        self.depth = self.d_model // self.num_heads
        # 각 헤드에 입력될 벡터의 차원 수를 둘을 나눈 몫으로 결정
        
        # query, key, value 가중치 레이어 설정
        # input 결과를 받을 수 있도록 차원 수를 동일하게
        self.wq = tf.keras.layers.Dense(kargs['d_model'])
        self.wk = tf.keras.layers.Dense(kargs['d_model'])
        self.wv = tf.keras.layers.Dense(kargs['d_model'])
        
        # 셀프 어텐션 결과를 출력하기 위한 레이어
        self.dense = tf.keras.layers.Dense(kargs['d_model'])
        
    # 각 배치 사이즈마다 데이터가 [seq_len X depth]로 되어 있는 것을
    # [num_heads X seq_len X depth]로 변환, 헤드 수 만큼 분리하는 함수
    # (depth == d_model == 임베딩 차원)
    def split_heads(self, x, batch_size):
        # (batch_size, seq_len, depth) -> (batch_size, seq_len, num_heads, depth)
        # seq_len는 -1로 표기하여 자동 배정
        # 숫자가 기입된 부분의 축을 변환하고 난 뒤 남은 축의 형태는 원래 텐서의
        # 총 크기와 같도록 자동으로 결정
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))

        # (batch_size, num_heads, seq_len, depth)로 치환
        return tf.transpose(x, perm = [0,2,1,3])
        
    # fit단계(훈련)에서 실행되는 함수
    def call(self, v,k,q,mask):
        batch_size = tf.shape(q)[0] # batch size를 구함
        
        # (batch_size, seq_len, d_model)
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)
        
        # (batch_size, num_heads, seq_len, depth)
        # num_heads 별로 depth(임베딩 차원)를 갖게함
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)
        
        # 스케일 내적 어텐션 수행
        scaled_attention, attention_weights = scaled_dot_product_attention(q,k,v,mask)
        scaled_attention = tf.transpose(scaled_attention, perm = [0,2,1,3])
        
        # 4D -> 3D 변환 (batch_size, seq_len, d_model)
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
        # 가중합 (어텐션 결과)
        
        output = self.dense(concat_attention)
        
        # attention_weigth : softmax를 거친 확률 정보
        # 어텐션을 얼마나 적용시킬 것인지에 대한 정보
        return output, attention_weights

In [16]:
def feed_forward_network(**kargs):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(kargs['dff'], activation = 'relu'),
        tf.keras.layers.Dense(kargs['d_model'])
    ])

In [17]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, **kargs):
        super().__init__()
        self.mha = MultiHeadAttention(**kargs)
        self.ffn = feed_forward_network(**kargs)
        
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon = 1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon = 1e-6)
        
        self.dropout1 = tf.keras.layers.Dropout(kargs['rate'])
        self.dropout2 = tf.keras.layers.Dropout(kargs['rate'])
        
    def call(self, x, mask):
        attn_output, _ = self.mha(x,x,x,mask)
        
        attn_output = self.dropout1(attn_output)
        out1 = self.layernorm1(x+attn_output)
        
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        out2 = self.layernorm2(out1 + ffn_output)
        return out2

In [18]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, **kargs):
        super().__init__()
        self.mha1 = MultiHeadAttention(**kargs)
        self.mha2 = MultiHeadAttention(**kargs)
        self.ffn = feed_forward_network(**kargs)
        
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon = 1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon = 1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon = 1e-6)
        
        self.dropout1 = tf.keras.layers.Dropout(kargs['rate'])
        self.dropout2 = tf.keras.layers.Dropout(kargs['rate'])
        self.dropout3 = tf.keras.layers.Dropout(kargs['rate'])
        
    def call(self, x, enc_output, look_ahead_mask, padding_mask):
        attn1, attn_weights_block1 = self.mha1(x,x,x,look_ahead_mask)
        attn1 = self.dropout1(attn1)
        out1 = self.layernorm1(attn1 + x)
        
        attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask)
        attn2 = self.dropout2(attn2)
        out2 = self.layernorm2(attn2 + out1)
        
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output)
        out3 = self.layernorm3(ffn_output + out2)
        
        return out3, attn_weights_block1, attn_weights_block2

In [19]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, **kargs):
        super().__init__()
        self.d_model = kargs['d_model']
        self.num_layers = kargs['num_layers']
        
        self.embedding = tf.keras.layers.Embedding(input_dim = kargs['input_vocab_size'],output_dim = self.d_model)
        
        self.pos_encoding = positional_encoding(position= kargs['maximum_position_encoding'], d_model = self.d_model)
        
        self.enc_layers = [EncoderLayer(**kargs) for _ in range(self.num_layers)]
        
        self.dropout = tf.keras.layers.Dropout(kargs['rate'])
        
    def call(self, x, mask):
        seq_len = tf.shape(x)[1]
        
        x = self.embedding(x)
        
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:,:seq_len, :]
        
        x = self.dropout(x)
        
        for i in range(self.num_layers):
            x = self.enc_layers[i](x, mask)
            
        return x

In [20]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, **kargs):
        super().__init__()
        self.d_model = kargs['d_model']
        self.num_layers = kargs['num_layers']
        
        self.embedding = tf.keras.layers.Embedding(input_dim = kargs['target_vocab_size'], output_dim = self.d_model)
        
        self.pos_encoding = positional_encoding(position= kargs['maximum_position_encoding'], d_model = self.d_model)
        
        self.dec_layers = [DecoderLayer(**kargs) for _ in range(self.num_layers)]
        
        self.dropout = tf.keras.layers.Dropout(kargs['rate'])
        
    def call(self, x, enc_output, look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}
        
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:,:seq_len,:]
        
        x = self.dropout(x)
        
        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, look_ahead_mask, padding_mask)
            attention_weights['decoder_layer{}_block1'.format(i+1)] = block1
            attention_weights['decoder_layer{}_block2'.format(i+1)] = block2
        return x, attention_weights

In [21]:
class Transformer(tf.keras.Model):
    def __init__(self, **kargs):
        super().__init__(name = kargs['model_name'])
        self.end_token_idx = kargs['end_token_idx']
        
        self.encoder = Encoder(**kargs)
        self.decoder = Decoder(**kargs)
        
        self.final_layer = tf.keras.layers.Dense(kargs['target_vocab_size'])
        
    def call(self, x):
        inp, tar = x
        
        enc_padding_mask, look_ahead_mask, dec_padding_mask = create_masks(inp, tar)
        
        enc_output = self.encoder(inp, enc_padding_mask)
        
        dec_output, _ = self.decoder(tar, enc_output, look_ahead_mask, dec_padding_mask)
        
        final_output = self.final_layer(dec_output)
        
        return final_output
    
    def inference(self, x):
        inp = x
        tar = tf.expand_dims([STD_INDEX], axis =0)
        enc_padding_mask, look_ahead_mask, dec_padding_mask = create_masks(inp, tar)
        
        enc_output = self.encoder(inp, enc_padding_mask)
        
        predict_tokens = list()
        
        for t in range(0, MAX_SEQUENCE):
            dec_output, _ = self.decoder(tar, enc_output, look_ahead_mask, dec_padding_mask)
            final_output = self.final_layer(dec_output)
            outputs=  tf.argmax(final_output, axis = -1).numpy()
            pred_token = outputs[0][-1]
            
            if pred_token == self.end_token_idx:
                break
            predict_tokens.append(pred_token)
            tar = tf.expand_dims([STD_INDEX] + predict_tokens, axis = 0)
            _, look_ahead_mask, dec_padding_mask = create_masks(inp, tar)
            
        return predict_tokens

In [22]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name = 'accuracy')

In [23]:
def loss(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real,0))
    
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype = loss_.dtype)
    loss_ *= mask
    
    return tf.reduce_mean(loss_)

def accuracy(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real,0))
    
    mask = tf.expand_dims(tf.cast(mask, dtype = pred.dtype), axis = -1)
    pred *= mask
    acc  = train_accuracy(real, pred)
    
    return tf.reduce_mean(acc)

In [24]:
model = Transformer(**kargs)

In [26]:
model.load_weights('weights_cp')

<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x1962877c5c8>

In [27]:
text1 = pd.Series(['가스불 끈 것을 까먹은 것은 아니겠죠?'],name = 'sentence')

In [28]:
text1

0    가스불 끈 것을 까먹은 것은 아니겠죠?
Name: sentence, dtype: object

In [29]:
text1_sequencing = loaded_tokenizer.texts_to_sequences(text1)

In [30]:
text1_sequencing

[[12]]

In [31]:
from tensorflow.keras.preprocessing.sequence import pad_sequences
text1_padding = pad_sequences(text1_sequencing, maxlen = sentence_max_length, padding = 'post')

In [32]:
text1_padding

array([[12,  0,  0,  0,  0,  0,  0,  0]])

In [33]:
text1_inferencing =[model.inference(text1_padding)]

In [34]:
loaded_tokenizer.sequences_to_texts(text1_inferencing)

['다시 새로 사는 게 마음 편해요']