In [None]:
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras import optimizers
import tensorflow.keras.backend as K
import sentencepiece as spm
import pickle
import numpy as np

from transformer import Encoder, Decoder, PaddingMask, PaddingAndLookaheadMask

# Commented out IPython magic to ensure Python compatibility.
# %cd '/content/drive/My Drive/Colab Notebooks'

# Sub-word 사전을 읽어온다.
with open('data/chatbot_voc.pkl', 'rb') as f:
    word2idx,  idx2word = pickle.load(f)

MAX_LEN = 15
MODEL_PATH = 'data/transformer_model.h5'
SPM_MODEL = "data/chatbot_model.model"
VOCAB_SIZE = len(word2idx)

sp = spm.SentencePieceProcessor()
sp.Load(SPM_MODEL)

# Model
# -----
K.clear_session()

# 인코더 입력 (source). 입력 문장의 길이는 MAX_LEN으로 고정돼 있다.
src = Input(batch_shape = (None, MAX_LEN), dtype="int32", name="src")

# 디코더 입력 (target). 처음에는 [[<BOS>]], 그 다음은 [[<BOS>, 첫번째 예측값]], ...
# 마지막 차원이 1에서 시작해서 하나씩 증가한다. 따라서 input shape에 마지막 차원을 지정하지 않는다.
tar = Input(batch_shape = (None, None), dtype="int32", name="tar")

# Encoder
# -------
padding_mask = PaddingMask()(src)
encoder = Encoder(num_layers=4, d_model=128, num_heads=8, d_ff=512, vocab_size=VOCAB_SIZE, dropout_rate=0.1)
enc_output, _ = encoder(src, padding_mask)

# Decoder
# -------
lookahead_mask = PaddingAndLookaheadMask()(tar)
decoder = Decoder(num_layers=4, d_model=128, num_heads=8, d_ff=512, vocab_size=VOCAB_SIZE, dropout_rate=0.1)
dec_output, _, _ = decoder(tar, enc_output, lookahead_mask, padding_mask)

# Final output
final_output = Dense(VOCAB_SIZE, activation='softmax')(dec_output)

model = Model(inputs=[src, tar], outputs=final_output)
model.compile(optimizer=optimizers.Adam(learning_rate=0.001), loss='sparse_categorical_crossentropy')
model.load_weights(MODEL_PATH)

model.summary()

# Question을 입력받아 Answer를 생성한다.
def genAnswer(question):
    question = question[np.newaxis, :]
    target = np.array(sp.bos_id()).reshape(1, 1)

    answer = []
    for i in range(MAX_LEN):
        preds = model.predict_on_batch([question, target])
        
        # 디코더의 출력은 vocabulary에 대응되는 one-hot이다.
        # argmax로 해당 단어를 채택한다.
        nextWord = np.argmax(preds[:, -1:, :], axis=-1)
        
        # 예상 단어가 <EOS>이거나 <PAD>이면 더 이상 예상할 게 없다.
        if nextWord == sp.eos_id() or nextWord == sp.pad_id():
            break

        # 다음 예상 단어인 디코더의 출력을 answer에 추가한다.
        answer.append(idx2word[nextWord[0][0]])
        
        # 다음 target을 준비한다.
        target = np.concatenate([target, nextWord], axis = -1)
    
    return sp.decode_pieces(answer)

# Chatting
def chatting(n=100):   
    for i in range(n):
        question = input('Q : ')
        
        if  question == 'quit':
            break
        
        q_idx = []
        for x in sp.encode_as_pieces(question):
            if x in word2idx:
                q_idx.append(word2idx[x])
            else:
                q_idx.append(sp.unk_id())   # out-of-vocabulary (OOV)
        
        # <PAD>를 삽입한다.
        if len(q_idx) < MAX_LEN:
            q_idx.extend([sp.pad_id()] * (MAX_LEN - len(q_idx)))
        else:
            q_idx = q_idx[0:MAX_LEN]

        answer = genAnswer(np.array(q_idx))
        print('A :', answer)

####### Chatting 시작 #######
print("\nTransformer ChatBot (ver. 1.0)")
print("Chatting 모듈을 로드하고 있습니다 ...")

# 처음 1회는 시간이 걸리기 때문에 dummy question을 입력한다.
# answer = genAnswer(np.zeros(MAX_LEN))
print("ChatBot이 준비 됐습니다.\n")

chatting(100)

# train data:
# [('이별 후 1년 그리고 선물', '이별하신게 맞나요'),
#  ('허기져', '챙겨 드세요'),
#  ('맥주 소주 어떤거 마실까', '소맥이요'),
#  ('교양 수업 재밌어', '저도 듣고 싶어요'),
#  ('권태기 이별', '극복하거나 이별하거나 둘 중 하나죠'),
#  ('읽씹은 아프네', '상대방에 대한 예의가 없네요'),
#  ('신혼여행 어디로 갈까', '못 가본 곳으로 가보세요'),
#  ('반 배정 잘 될까', '잘 되길 바랍니다'),
#  ('친구가 다 떠나서 내가 못났나 싶어', '지난 인연에 연연해하지 마세요'),
#  ('뒤돌아 보지 말고 나가야 하는데', '살짝 뒤돌아봐도 괜찮아요')]

# test data:
# [('소오름 쫙', '좋은 일이길 바랍니다'),
#  ('고백은 어떻게 하는거야', '솔직한 마음으로 다가가는 거죠'),
#  ('참 잘낫네', '진정하셔요'),
#  ('늘 빡빡하게 살기 힘드네', '여유가 생기길 바랍니다'),
#  ('집까지 데려다줬는데 호감 그냥 매너', '호감이 있을 수도 있어요 그렇지만 조금 더 상황을 지켜보세요'),
#  ('짝녀가 연락 안 되고 있는데 자나', '자고 있을지도 모르겠어요'),
#  ('마음도 춥고 날씨도 춥고', '마음 감기 조심하세요'),
#  ('죽었던 연애세포가 살아나는 것 같아', '좋은 소식이네요'),
#  ('겨울에는 온천이지', '몸은 뜨겁고 머리는 차갑게'),
#  ('소개팅 하고싶다', '친구한테 부탁해보세요')]

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Add, Dense, Dropout, Embedding
from tensorflow.keras.layers import Layer, LayerNormalization
from tensorflow.keras.layers import Permute, Reshape

class Transformer:
    def __init__(self,
                 num_layers,
                 d_model,
                 num_heads,
                 d_ff,
                 input_vocab_size,
                 target_vocab_size,
                 dropout_rate,
                 ffn_activation=tf.keras.activations.relu,
                 scope="transformer"):
        self.encoder = Encoder(num_layers=num_layers,
                               d_model=d_model,
                               num_heads=num_heads,
                               d_ff=d_ff,
                               vocab_size=input_vocab_size,
                               dropout_rate=dropout_rate,
                               ffn_activation=ffn_activation,
                               scope="%s/encoder" % scope)

        self.decoder = Decoder(num_layers=num_layers,
                               d_model=d_model,
                               num_heads=num_heads,
                               d_ff=d_ff,
                               vocab_size=target_vocab_size,
                               dropout_rate=dropout_rate,
                               ffn_activation=ffn_activation,
                               scope="%s/decoder" % scope)

        self.final_layer = Dense(target_vocab_size,
                                 activation='softmax',
                                 name="%s/dense" % scope)

        self.padding_mask = PaddingMask(name="%s/padding_mask" % scope)
        self.lookahead_mask = PaddingAndLookaheadMask(
            name="%s/lookahead_mask" % scope)

    def __call__(self, inputs, target):
        padding_mask = self.padding_mask(inputs)
        lookahead_mask = self.lookahead_mask(target)

        enc_output, enc_attention = self.encoder(inputs, padding_mask)

        dec_output, dec_attention, enc_dec_attention = self.decoder(
            target, enc_output, lookahead_mask, padding_mask)

        final_output = self.final_layer(dec_output)

        return final_output, enc_attention, dec_attention, enc_dec_attention

class Decoder:
    def __init__(self,
                 num_layers,
                 d_model,
                 num_heads,
                 d_ff,
                 vocab_size,
                 dropout_rate,
                 ffn_activation=tf.keras.activations.relu,
                 scope="decoder"):
        self.d_model = d_model
        self.num_layers = num_layers
        self.scope = scope

        self.embedding = Embedding(input_dim=vocab_size,
                                   output_dim=d_model,
                                   name="%s/embedding" % scope)
        self.pos_encoding = PositionalEncoding(d_model,
                                               name="%s/positional_encoding" %
                                               scope)

        self.dec_layers = [
            DecoderLayer(d_model=d_model,
                         num_heads=num_heads,
                         d_ff=d_ff,
                         dropout_rate=dropout_rate,
                         ffn_activation=ffn_activation,
                         scope="%s/decoder_layer_%d" % (scope, i))
            for i in range(num_layers)
        ]

        self.dropout = Dropout(dropout_rate, name="%s/dropout" % self.scope)

    def __call__(self, x, enc_output, lookahead_mask, padding_mask):
        x = self.embedding(x)
        x = MultiplyConstant(self.d_model, name="%s/multiply" % self.scope)(x)
        x = Add(name="%s/add" % self.scope)([x, self.pos_encoding(x)])

        dec_attention_weights = {}
        enc_dec_attention_weights = {}

        for i in range(self.num_layers):
            x, dec_attention, enc_dec_attention = self.dec_layers[i](
                x, enc_output, lookahead_mask, padding_mask)

            dec_attention_weights["layer_%d" % i] = dec_attention
            enc_dec_attention_weights["layer_%d" % i] = enc_dec_attention

        return x, dec_attention_weights, enc_dec_attention_weights


class Encoder:
    def __init__(self,
                 num_layers,
                 d_model,
                 num_heads,
                 d_ff,
                 vocab_size,
                 dropout_rate,
                 ffn_activation=tf.keras.activations.relu,
                 scope="encoder"):
        self.d_model = d_model
        self.num_layers = num_layers
        self.scope = scope

        self.embedding = Embedding(input_dim=vocab_size,
                                   output_dim=d_model,
                                   name="%s/embedding" % scope)
        self.pos_encoding = PositionalEncoding(d_model,
                                               name="%s/positional_encoding" %
                                               scope)

        self.enc_layers = [
            EncoderLayer(d_model=d_model,
                         num_heads=num_heads,
                         d_ff=d_ff,
                         dropout_rate=dropout_rate,
                         ffn_activation=ffn_activation,
                         scope="%s/encoder_layer_%d" % (scope, i))
            for i in range(num_layers)
        ]

        self.dropout = Dropout(dropout_rate, name="%s/dropout" % self.scope)

    def __call__(self, x, padding_mask):
        x = self.embedding(x)
        x = MultiplyConstant(self.d_model, name="%s/multiply" % self.scope)(x)
        x = Add(name="%s/add" % self.scope)([x, self.pos_encoding(x)])

        enc_attention_weights = {}

        for i in range(self.num_layers):
            x, enc_attention = self.enc_layers[i](x, padding_mask)
            enc_attention_weights["layer_%d" % i] = enc_attention

        return x, enc_attention_weights


class DecoderLayer:
    def __init__(self,
                 d_model,
                 num_heads,
                 d_ff,
                 dropout_rate,
                 ffn_activation=tf.keras.activations.relu,
                 scope="decoder_layer"):
        self.scope = scope

        self.mha1 = MultiHeadAttention(d_model,
                                       num_heads,
                                       scope="%s/multi_head_attention_1" %
                                       scope)
        self.mha2 = MultiHeadAttention(d_model,
                                       num_heads,
                                       scope="%s/multi_head_attention_2" %
                                       scope)
        self.ffn = PointwiseFeedForwardNetwork(
            d_model,
            d_ff,
            activation=ffn_activation,
            scope="%s/pointwise_feed_forward_network" % scope)

        self.layernorm1 = LayerNormalization(epsilon=1e-6,
                                             name="%s/layer_norm_1" % scope)
        self.layernorm2 = LayerNormalization(epsilon=1e-6,
                                             name="%s/layer_norm_2" % scope)
        self.layernorm3 = LayerNormalization(epsilon=1e-6,
                                             name="%s/layer_norm_3" % scope)

        self.dropout1 = Dropout(dropout_rate, name="%s/dropout_1" % scope)
        self.dropout2 = Dropout(dropout_rate, name="%s/dropout_2" % scope)
        self.dropout3 = Dropout(dropout_rate, name="%s/dropout_3" % scope)

    def __call__(self, x, enc_output, lookahead_mask, padding_mask):
        out1, dec_dec_attention = self.mha1(x, x, x, lookahead_mask)
        out1 = self.dropout1(out1)
        x = Add(name="%s/add_1" % self.scope)([x, out1])
        x = self.layernorm1(x)

        out2, enc_dec_attention = self.mha2(x, enc_output, enc_output,
                                            padding_mask)
        out2 = self.dropout2(out2)
        x = Add(name="%s/add_2" % self.scope)([x, out2])
        x = self.layernorm2(x)

        ffn_output = self.ffn(x)
        ffn_output = self.dropout3(ffn_output)
        x = Add(name="%s/add_3" % self.scope)([x, ffn_output])
        x = self.layernorm3(x)

        return x, dec_dec_attention, enc_dec_attention

class EncoderLayer:
    def __init__(self,
                 d_model,
                 num_heads,
                 d_ff,
                 dropout_rate,
                 ffn_activation=tf.keras.activations.relu,
                 scope="encoder_layer"):
        self.scope = scope

        self.mha1 = MultiHeadAttention(d_model,
                                       num_heads,
                                       scope="%s/multi_head_attention_1" %
                                       scope)
        self.ffn = PointwiseFeedForwardNetwork(
            d_model,
            d_ff,
            activation=ffn_activation,
            scope="%s/pointwise_feed_forward_network" % scope)

        self.layernorm1 = LayerNormalization(epsilon=1e-6,
                                             name="%s/layer_norm_1" % scope)
        self.layernorm2 = LayerNormalization(epsilon=1e-6,
                                             name="%s/layer_norm_2" % scope)

        self.dropout1 = Dropout(dropout_rate, name="%s/dropout_1" % scope)
        self.dropout2 = Dropout(dropout_rate, name="%s/dropout_2" % scope)

    def __call__(self, x, padding_mask):
        out1, enc_enc_attention = self.mha1(x, x, x, padding_mask)
        out1 = self.dropout1(out1)
        x = Add(name="%s/add_1" % self.scope)([x, out1])
        x = self.layernorm1(x)

        ffn_output = self.ffn(x)
        ffn_output = self.dropout2(ffn_output)
        x = Add(name="%s/add_2" % self.scope)([x, ffn_output])
        x = self.layernorm2(x)

        return x, enc_enc_attention


class PointwiseFeedForwardNetwork:
    def __init__(self,
                 d_model,
                 d_ff,
                 activation=tf.keras.activations.relu,
                 scope="pointwise_feed_forward_network"):
        self.dense_1 = Dense(d_ff,
                             activation=activation,
                             name="%s/dense_1" % scope)
        self.dense_2 = Dense(d_model,
                             activation=None,
                             name="%s/dense_2" % scope)

    def __call__(self, x):
        return self.dense_2(self.dense_1(x))


class MultiHeadAttention:
    def __init__(self, d_model, num_heads, scope="multi_head_attention"):
        assert d_model % num_heads == 0

        self.wq = Dense(d_model, name="%s/dense_q" % scope)
        self.wk = Dense(d_model, name="%s/dense_k" % scope)
        self.wv = Dense(d_model, name="%s/dense_v" % scope)

        self.reshapeq = Reshape((-1, num_heads, d_model // num_heads),
                                name="%s/reshape_q" % scope)
        self.reshapek = Reshape((-1, num_heads, d_model // num_heads),
                                name="%s/reshape_k" % scope)
        self.reshapev = Reshape((-1, num_heads, d_model // num_heads),
                                name="%s/reshape_v" % scope)

        self.transposeq = Permute((2, 1, 3), name="%s/transpose_q" % scope)
        self.transposek = Permute((2, 1, 3), name="%s/transpose_k" % scope)
        self.transposev = Permute((2, 1, 3), name="%s/transpose_v" % scope)

        self.reshape_output = Reshape((-1, d_model),
                                      name="%s/reshape_output" % scope)

        self.transpose_output = Permute((2, 1, 3),
                                        name="%s/transpose_output" % scope)

        self.dense = Dense(d_model, name="%s/dense" % scope)

        self.attention = Attention(name="%s/attention" % scope)

    def __call__(self, q, k, v, mask):
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.reshapeq(q)
        k = self.reshapek(k)
        v = self.reshapev(v)

        q = self.transposeq(q)
        k = self.transposek(k)
        v = self.transposev(v)

        x, attention_weights = self.attention([q, k, v, mask])

        x = self.transpose_output(x)
        x = self.reshape_output(x)
        x = self.dense(x)

        return x, attention_weights


class Attention(Layer):
    def __call__(self, inputs):
        q, k, v, mask = inputs

        matmul_qk = tf.matmul(q, k, transpose_b=True)

        dk = tf.cast(tf.shape(k)[-1], tf.float32)
        scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

        scaled_attention_logits += mask * -1e9

        attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)

        output = tf.matmul(attention_weights, v)

        return output, attention_weights


class PositionalEncoding(Layer):
    def __init__(self, d_model, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.d_model = d_model

    def __call__(self, inputs):
        position = tf.shape(inputs)[1]

        position_dims = tf.range(position)[:, tf.newaxis]
        embed_dims = tf.range(self.d_model)[tf.newaxis, :]
        angle_rates = 1 / tf.pow(
            10000.0, tf.cast(
                (2 * (embed_dims // 2)) / self.d_model, tf.float32))
        angle_rads = tf.cast(position_dims, tf.float32) * angle_rates

        sines = tf.sin(angle_rads[:, 0::2])
        cosines = tf.cos(angle_rads[:, 1::2])

        pos_encoding = tf.concat([sines, cosines], axis=-1)
        pos_encoding = pos_encoding[tf.newaxis, ...]
        return tf.cast(pos_encoding, tf.float32)

    def get_config(self):
        base = super().get_config()
        return dict(list(base.items()) + [("d_model", self.d_model)])


class MultiplyConstant(Layer):
    def __init__(self, c, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.c = c

    def __call__(self, inputs):
        return inputs * self.c

    def get_config(self):
        base = super().get_config()
        return dict(list(base.items()) + [("c", self.c)])


class PaddingMask(Layer):
    def __call__(self, inputs):
        seq = tf.cast(tf.math.equal(inputs, 0), tf.float32)
        return seq[:, tf.newaxis, tf.newaxis, :]


class PaddingAndLookaheadMask(Layer):
    def __call__(self, inputs):
        size = tf.shape(inputs)[1]
        lhm = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)

        seq = tf.cast(tf.math.equal(inputs, 0), tf.float32)
        seq = seq[:, tf.newaxis, tf.newaxis, :]

        return tf.maximum(lhm, seq)