# transformer.ipynb
* 모델 구현

In [None]:
import tensorflow as tf
import numpy as np

from konlpy.tag import Twitter
import pandas as pd
import tensorflow as tf
import enum
import os
import re
import json
from sklearn.model_selection import train_test_split

from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import matplotlib.pyplot as plt

from preprocess import *

In [None]:
# 시각화 함수
def plot_graphs(history, string):
    plt.plot(history.history[string])
    plt.plot(history.history['val_' + string], '')
    plt.xlabel("Epochs")
    plt.ylabel(string)
    plt.legend([string, 'val_'+string])
    plt.show()

In [None]:
data_in_path = path + '/data_in/'
data_out_path = './data_out/'
train_inputs = 'train_inputs.npy'
train_outputs = 'train_outputs.npy'
train_targets = 'train_targets.npy'
data_configs = 'data_configs.json'

In [None]:
seed_num = 1234
tf.random.set_seed(seed_num)

In [None]:
index_inputs = np.load(open(data_in_path + train_inputs, 'rb'))
index_outputs = np.load(open(data_in_path + train_outputs, 'rb'))
index_target = np.load(open(data_in_path + train_targets, 'rb'))
prepro_configs = json.load(open(data_in_path + data_configs, 'r'))

In [6]:
model_name = 'transformer'
BATCH_SIZE = 64
max_sequence = 25
EPOCH = 20
VALIDATION_SPLIT = 0.1

char2idx = prepro_configs['char2idx']
end_index = prepro_configs['end_symbol']
vocab_size = prepro_configs['vocab_size']

kargs = {'model_name': model_name,
        'num_layers 2,
        'd_model' : 512}

In [7]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
        super(Encoder, self).__init__()
        self.batch_sz = batch_sz
        self.enc_units = enc_units
        self.vocab_size = vocab_size 
        self.embedding_dim = embedding_dim     
        
        # tf.keras.layers.Embedding: 사전에 포함된 각 단어를 self.embedding_dim 차원의 임베딩 벡터로 만듬
        self.embedding = tf.keras.layers.Embedding(self.vocab_size, self.embedding_dim)
        
        # tF.keras.layers.GRU:
        # self.enc_units: GRU의 결과 차원의 크기
        # return_sequences: 각 시퀀스마다 출력을 반환할지 여부를 결정하는 것, True이므로 각 시퀀스마다 출력 반환함
        # return_state: 마지막 상태 값의 반환 여부, True이므로 상태값 반환
        # recurrent_initializer: 초깃값 무엇으로 할 것인지 선언
        # glorot_uniform: Glorot 초기화 또는 Xavier 초기화라고 불리는 초기화 방법, 이전 노드와 다음 노드의 개수에 의존하는 방법
        self.gru = tf.keras.layers.GRU(self.enc_units,
                               return_sequences=True,
                               return_state=True,
                               recurrent_initializer='glorot_uniform')
        
    def call(self, x, hidden):
        # 입력값 x을 임베딩 벡터로 만듬
        x = self.embedding(x)
        
        # gru 함수에 임베딩 벡터를 전달, 재귀 순환망의 초기화 상태로는 인자로 받은 은닉 상태 전달
        output, state = self.gru(x, initial_state = hidden)
        
        # output: 시퀀스의 출력값
        # state: 마지막 상태값
        return output, state
    
    # 배치 크기를 받아 rnn 초기에 사용될 은닉 상태 만듬
    def initialize_hidden_state(self, inp):
        return tf.zeros((tf.shape(inp)[0], self.enc_units))

In [8]:
class BahdanauAttention(tf.keras.layers.Layer):
    # units: 출력 벡터의 크기
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        
        # 가중치 W1, W2, V는 모델 훈련 하면서 학습됨
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)
        
    # query: 인코더 재귀 순환망 은닉층의 상태 값
    # values: 인코더 재귀 순환망의 결과 값
    def call(self, query, values):
        # query를 W2에 행렬곱 할 수 있는 형태를 만듬
        hidden_with_time_axis = tf.expand_dims(query, 1)
        
        # W1과 W2의 결과값의 요소를 각각 더한 다음 tanh 활성 함수를 통과시킨 값을 V에 행렬곱하면 1차원의 벡터값이 나옴
        score = self.V(tf.nn.tanh(self.W1(values) + self.W2(hidden_with_time_axis)))
        
        # softmax 함수에 통과시켜 어텐션 가중치를 얻는데, 값이 모델이 중요하다고 판단되는 값은 1에 가까워진다.
        attention_weights = tf.nn.softmax(score, axis=1)
        
        # attention_weight와 value를 곱하게 되면 1에 가까운 값에 위치한 value 값은 커지고 반대는 작아진다.
        context_vector = attention_weights * values
        context_vector = tf.reduce_sum(context_vector, axis=1)
        
        return context_vector, attention_weights

In [9]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
        super(Decoder, self).__init__()
        
        self.batch_sz = batch_sz
        self.dec_units = dec_units
        self.vocab_size = vocab_size 
        self.embedding_dim = embedding_dim  
        
        self.embedding = tf.keras.layers.Embedding(self.vocab_size, self.embedding_dim)
        self.gru = tf.keras.layers.GRU(self.dec_units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        # 출력값이 사전 크기인 완전 연결 계층
        self.fc = tf.keras.layers.Dense(self.vocab_size)

        self.attention = BahdanauAttention(self.dec_units)
        
    def call(self, x, hidden, enc_output):
        context_vector, attention_weights = self.attention(hidden, enc_output)
        
        # x를 임베딩 벡터로 만든 다음 문맥 벡터와 결합해 x를 구성
        x = self.embedding(x)

        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
        
        # x를 gru에 통과시켜 output얻음
        output, state = self.gru(x)
        
        # 이 값을 완전 연결 계층에 통과시켜 사전 크기읭 벡터 x를 만듬
        output = tf.reshape(output, (-1, output.shape[2]))
            
        x = self.fc(output)
        
        return x, state, attention_weights

In [10]:
optimizer = tf.keras.optimizers.Adam()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='accuracy')

def loss(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_mean(loss_)

def accuracy(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    mask = tf.expand_dims(tf.cast(mask, dtype=pred.dtype), axis=-1)
    pred *= mask    
    acc = train_accuracy(real, pred)

    return tf.reduce_mean(acc)

In [11]:
class seq2seq(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units, dec_units, batch_sz, end_token_idx=2):    
        super(seq2seq, self).__init__()
        self.end_token_idx = end_token_idx
        self.encoder = Encoder(vocab_size, embedding_dim, enc_units, batch_sz) 
        self.decoder = Decoder(vocab_size, embedding_dim, dec_units, batch_sz) 

    def call(self, x):
        inp, tar = x
        
        enc_hidden = self.encoder.initialize_hidden_state(inp)
        enc_output, enc_hidden = self.encoder(inp, enc_hidden)

        dec_hidden = enc_hidden

        predict_tokens = list()
        for t in range(0, tar.shape[1]):
            dec_input = tf.dtypes.cast(tf.expand_dims(tar[:, t], 1), tf.float32) 
            predictions, dec_hidden, _ = self.decoder(dec_input, dec_hidden, enc_output)
            predict_tokens.append(tf.dtypes.cast(predictions, tf.float32))   
        return tf.stack(predict_tokens, axis=1)
    
    def inference(self, x):
        inp  = x

        enc_hidden = self.encoder.initialize_hidden_state(inp)
        enc_output, enc_hidden = self.encoder(inp, enc_hidden)

        dec_hidden = enc_hidden
        
        dec_input = tf.expand_dims([char2idx[std_index]], 1)
        
        predict_tokens = list()
        for t in range(0, MAX_SEQUENCE):
            predictions, dec_hidden, _ = self.decoder(dec_input, dec_hidden, enc_output)
            predict_token = tf.argmax(predictions[0])
            
            if predict_token == self.end_token_idx:
                break
            
            predict_tokens.append(predict_token)
            dec_input = tf.dtypes.cast(tf.expand_dims([predict_token], 0), tf.float32)   
            
        return tf.stack(predict_tokens, axis=0).numpy()

In [13]:
model = seq2seq(vocab_size, embedding_dim, units, units, BATCH_SIZE, char2idx[end_index])
model.compile(loss=loss, optimizer=tf.keras.optimizers.Adam(1e-3), metrics=[accuracy])

In [14]:
PATH = data_out_path + model_name
if not(os.path.isdir(PATH)):
        os.makedirs(os.path.join(PATH))
        
checkpoint_path = data_out_path + data_out_path + '/weights.h5'
    
cp_callback = ModelCheckpoint(
    checkpoint_path, monitor='val_accuracy', verbose=1, save_best_only=True, save_weights_only=True)

earlystop_callback = EarlyStopping(monitor='val_accuracy', min_delta=0.0001, patience=10)

history = model.fit([index_inputs, index_outputs], index_target,
                    batch_size=BATCH_SIZE, epochs=EPOCH,
                    validation_split=VALIDATION_SPLIT, callbacks=[earlystop_callback, cp_callback])

Epoch 1/30
 953/5320 [====>.........................] - ETA: 2:00:06 - loss: 1.3319 - accuracy: 0.8581

KeyboardInterrupt: 

In [None]:
plot_graphs(history, 'accuracy')

In [None]:
plot_graphs(history, 'loss')

In [None]:
save_file_name = 'weights.h5'
model.load_weights(os.path.join(data_out_path, model_name, save_file_nm))

In [None]:
query = "남자친구 승진 선물로 뭐가 좋을까?"

test_index_inputs, _ = enc_processing([query], char2idx)    
predict_tokens = model.inference(test_index_inputs)
print(predict_tokens)

print(' '.join([idx2char[str(t)] for t in predict_tokens]))