<a href="https://colab.research.google.com/github/dellaanima/NLP_GoingDeeper/blob/main/12/Project_Transformer_Chatbot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Project: Transformer 로 챗봇 만들기

<br>

## 데이터 분석


In [None]:
import re   #정규식
import random   #데이터 증강용 난수
import numpy as np   #행렬 연산
import pandas as pd   #데이터프레임
import tensorflow as tf   #신경망
import matplotlib.pyplot as plt   #데이터 시각화
from tqdm.notebook import tqdm   #학습 과정 출력


import gensim   #Word2Vec
from konlpy.tag import Mecab   #형태소 분석


from nltk.translate.bleu_score import sentence_bleu   #bleu 연산
from nltk.translate.bleu_score import SmoothingFunction   #bleu 연산

In [None]:
dataset = pd.read_csv("./dataset/ChatbotData.csv")
dataset.drop(("label"), axis="columns", inplace=True)

display(dataset.sample(5))

Unnamed: 0,Q,A
8725,헤이진지 이제 딱 한달,후폭풍이 지나갔길 바랄게요.
11697,친구들이 짝녀를 별로라고 하네요.,무슨 이유에서 그랬을까요.
4928,피자 막막 먹고 싶어.,배달 시키세요.
2172,불 날 뻔했어,조심하세요.
11707,친구의 남친 좋아하게 된 거 같아,사랑은 쟁취하는 거예요.


In [None]:
dataset.info()
dup = dataset.duplicated()
dup = dup.value_counts()[True]

print(f"Dupliacted Data Num: {dup:,}")

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11823 entries, 0 to 11822
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   Q       11823 non-null  object
 1   A       11823 non-null  object
dtypes: object(2)
memory usage: 184.9+ KB
Dupliacted Data Num: 73


<br>

## 데이터 전처리


In [None]:
dataset.drop_duplicates(inplace=True)
print(f"Data Num: {len(dataset):,}")

Data Num: 11,750


In [None]:

def preprocess_sentence(sentence):
    sentence = sentence.lower().strip()
    sentence = re.sub(r"([0-9?.!,])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = re.sub(r"[^a-zA-Zㄱ-ㅎ가-힣0-9?.!,]+", " ", sentence)
    sentence = sentence.strip()
    return sentence


dataset["Q"] = dataset["Q"].apply(preprocess_sentence)
dataset["A"] = dataset["A"].apply(preprocess_sentence)

display(dataset.sample(5))

<br>

#### Mecab을 이용한 형태소 기준 문장 분할


In [None]:
m = Mecab()

dataset["Q"] = dataset["Q"].apply(lambda x: " ".join(m.morphs(x)))
dataset["A"] = dataset["A"].apply(lambda x: " ".join(m.morphs(x)))

display(dataset.sample(5))

Unnamed: 0,Q,A
1771,문 안 열림,힘껏 밀 어 보 세요 .
6234,마지막 이 겠 죠,마지막 이 아닐 지도 몰라요 .
4301,직구 로 사 야지,좋 은 거 사 나 봐요 .
8779,회복 되 는 게 하루하루 가 다른 거 같 아 ! !,회복력 이 좋 으시 군요 .
9210,나이 먹 고 짝사랑 하 는 내 가 한심 해 .,사랑 에 는 나이 가 상관 없 어요 .


<br>

#### 디코더 문장 &lt;SOS>, &lt;EOS> 토큰 추가


In [None]:
dataset["A"] = dataset["A"].apply(lambda x: "<sos> " + x + " <eos>")

display(dataset.sample(5))

Unnamed: 0,Q,A
749,남자 친구 는 어디 서 만나,<sos> 원 하 는 사람 이 있 는 장소 에 가 보 세요 . <eos>
8618,헤어진지 1 년 .,<sos> 아직 도 힘들 지 않 았 으면 좋 겠 어요 . <eos>
11663,첫 사랑 을 추억 해,<sos> 첫 사랑 은 항상 추억 의 대상 이 죠 . <eos>
2632,술 좀 그만 마셔야 지,<sos> 술 은 적당히 즐기 세요 . <eos>
10489,어떻게 여러 명 을 좋아할 수 있 어 ?,<sos> 저 도 이해 는 안 갑니다 . <eos>


<br>

## 데이터 토큰화


#### 토크나이저 생성


In [None]:
def get_tokenizer(corpus, vocab_size):
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        filters='',
        oov_token="<UNK>",
        num_words=vocab_size
    )
    corpus_input = [sentence.split() for sentence in corpus]
    tokenizer.fit_on_texts(corpus_input)

    if vocab_size is not None:
        words_frequency = [w for w,c in tokenizer.word_index.items() if c >= vocab_size + 1]
        for w in words_frequency:
            del tokenizer.word_index[w]
            del tokenizer.word_counts[w]

    return tokenizer


concat = pd.concat([dataset["Q"], dataset["A"]])
tokenizer = get_tokenizer(concat, None)

print("Tokenizer Vocab Size:", f"{len(tokenizer.word_index):,}")

Tokenizer Vocab Size: 6,810


<br>

#### 문장 정수화


In [None]:
def encoding_sentence(copus, tokenizer):
    tensor = tokenizer.texts_to_sequences(copus)
    tensor = tf.keras.preprocessing.sequence.pad_sequences(
        tensor, padding='post'
    )
    return tensor


enc_tensor = encoding_sentence(dataset["Q"], tokenizer)
dec_tensor = encoding_sentence(dataset["A"], tokenizer)

<br>

#### 토크나이저 생성


In [None]:

concat = pd.concat([dataset["Q"], dataset["A"]])
tokenizer = get_tokenizer(concat, 5872)



#문장 길이
q = dataset["Q"].apply(lambda x: len(tokenizer.texts_to_sequences([x])[0]) <= 15)
a = dataset["A"].apply(lambda x: len(tokenizer.texts_to_sequences([x])[0]) <= 18)
dataset = dataset[q & a]



print("Tokenizer Vocab Size:", f"{len(tokenizer.word_index):,}")

Tokenizer Vocab Size: 5,872


<br>

#### 테스트 데이터 분할


In [None]:
test_dataset = dataset[:100]
dataset = dataset[100:]

display(test_dataset.sample(5))

Unnamed: 0,Q,A
74,같이 놀 러 갈 친구 가 없 어,<sos> 혼자 도 좋 아요 . <eos>
87,개념 도 놓 고 옴,<sos> 그게 제일 중요 한 건데요 . <eos>
68,강아지 키우 고 싶 어,<sos> 책임 질 수 있 을 때 키워 보 세요 . <eos>
79,같이 할 수 있 는 취미 생활 뭐 있 을까,<sos> 함께 하 면 서로 를 더 많이 알 게 될 거 예요 . <eos>
88,개념 이 없 어,<sos> 그게 제일 중요 한 건데요 . <eos>


<br>

## 데이터 증강


#### 한국어 Word2Vec 불러오기


In [None]:
w2v = gensim.models.Word2Vec.load('./dataset/ko.bin')

<br>

#### 데이터 증강 함수 생성


In [None]:
#Lexical Substitution
def lexical_sub(sentence, word2vec, enc_arg=True):
    toks = sentence.split()
    if not enc_arg:   #<sos>, <eos> 토큰 제외
        toks = toks[1:-1]

    _from = random.choice(toks)

    try:
        _to = word2vec.most_similar(_from)[0][0]
    except:
        return "_"

    res = ""
    for tok in sentence.split():
        if tok == _from:
            res += _to + " "
        else:
            res += tok + " "
    return res


#Question, Answer에 따른 데이터 증강 함수
def argument_data(dataset, word2vec, enc_arg=True):
    qna = "Q" if enc_arg else "A"
    arg = dataset[qna].apply(lambda x: lexical_sub(x, word2vec, enc_arg))

    arg_data = dataset.copy()
    arg_data[qna] = arg

    arg_data = arg_data[arg_data[qna] != "_"]
    return arg_data


<br>

#### 데이터 증강 수행


In [None]:
enc_alpha = argument_data(dataset, w2v, True)
dec_alpha = argument_data(dataset, w2v, False)


enc_idx = set(dataset.index)
enc_alpha_idx = set(enc_alpha.index)
dec_alpha_idx = set(dec_alpha.index)

vet = enc_idx & enc_alpha_idx & dec_alpha_idx
vet = list(vet)[0]

print(f"Question Sentence: {dataset['Q'][vet]} ======> {enc_alpha['Q'][vet]}")
print(f"Answer Sentence: {dataset['A'][vet]} ======> {dec_alpha['A'][vet]}")


  # Remove the CWD from sys.path while we load stuff.




<br>

#### 기존 데이터와 증강 데이터 합치기


In [None]:
dataset = pd.concat([dataset, enc_alpha, dec_alpha])
dataset = dataset.sample(frac=1)

print(f"Dataset Num: {len(dataset):,}")
display(dataset[:5])

Dataset Num: 29,951


Unnamed: 0,Q,A
9539,내일 대관식 이 야,<sos> 떨리 겠 어요 . <eos>
882,내 사랑 은 어디 있 나,<sos> 같 은 하늘 아래 묻히 에 . <eos>
2311,산 뛰어넘 어 산 이 네,<sos> 그래도 넘 을 수 있 을 거 예요 . <eos>
7158,오늘 도 보 고 왔 어서,<sos> 그것 이 최선 의 선택 일거 라 생각 해요 . <eos>
107,건강 관리,<sos> 운동 을 해의 보 세요 . <eos>


<br>

#### 문장 정수 인코딩 수행


In [None]:
enc_tensor = encoding_sentence(dataset["Q"], tokenizer)
dec_tensor = encoding_sentence(dataset["A"], tokenizer)

print("Data num:", f"{len(enc_tensor):,}")

Data num: 29,951


<br>

## Transformer 모델 생성


In [None]:

def positional_encoding(pos, d_model):
    def cal_angle(position, i):
        return position / np.power(10000, int(i)/d_model)

    def get_posi_angle_vec(position):
        return [cal_angle(position, i) for i in range(d_model)]

    sinusoid_table = np.array([get_posi_angle_vec(pos_i) for pos_i in range(pos)])

    sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])
    sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])

    return sinusoid_table




class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        self.depth = d_model // self.num_heads

        self.W_q = tf.keras.layers.Dense(d_model)
        self.W_k = tf.keras.layers.Dense(d_model)
        self.W_v = tf.keras.layers.Dense(d_model)

        self.linear = tf.keras.layers.Dense(d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask):
        d_k = tf.cast(K.shape[-1], tf.float32)
        QK = tf.matmul(Q, K, transpose_b=True)

        scaled_qk = QK / tf.math.sqrt(d_k)

        if mask is not None: scaled_qk += (mask * -1e9)

        attentions = tf.nn.softmax(scaled_qk, axis=-1)
        out = tf.matmul(attentions, V)

        return out, attentions


    def split_heads(self, x):
        batch_size = x.shape[0]
        split_x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        split_x = tf.transpose(split_x, perm=[0, 2, 1, 3])

        return split_x

    def combine_heads(self, x):
        batch_size = x.shape[0]
        combined_x = tf.transpose(x, perm=[0, 2, 1, 3])
        combined_x = tf.reshape(combined_x, (batch_size, -1, self.d_model))

        return combined_x


    def call(self, Q, K, V, mask):
        WQ = self.W_q(Q)
        WK = self.W_k(K)
        WV = self.W_v(V)

        WQ_splits = self.split_heads(WQ)
        WK_splits = self.split_heads(WK)
        WV_splits = self.split_heads(WV)

        out, attention_weights = self.scaled_dot_product_attention(
            WQ_splits, WK_splits, WV_splits, mask
        )
        out = self.combine_heads(out)
        out = self.linear(out)

        return out, attention_weights


class PoswiseFeedForwardNet(tf.keras.layers.Layer):
    def __init__(self, d_model, d_ff):
        super(PoswiseFeedForwardNet, self).__init__()
        self.w_1 = tf.keras.layers.Dense(d_ff, activation='relu')
        self.w_2 = tf.keras.layers.Dense(d_model)

    def call(self, x):
        out = self.w_1(x)
        out = self.w_2(out)

        return out

def generate_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]

def generate_causality_mask(src_len, tgt_len):
    mask = 1 - np.cumsum(np.eye(src_len, tgt_len), 0)
    return tf.cast(mask, tf.float32)

def generate_masks(src, tgt):
    enc_mask = generate_padding_mask(src)
    dec_mask = generate_padding_mask(tgt)

    dec_enc_causality_mask = generate_causality_mask(tgt.shape[1], src.shape[1])
    dec_enc_mask = tf.maximum(enc_mask, dec_enc_causality_mask)

    dec_causality_mask = generate_causality_mask(tgt.shape[1], tgt.shape[1])
    dec_mask = tf.maximum(dec_mask, dec_causality_mask)

    return enc_mask, dec_enc_mask, dec_mask



In [None]:

class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, n_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()

        self.enc_self_attn = MultiHeadAttention(d_model, n_heads)
        self.ffn = PoswiseFeedForwardNet(d_model, d_ff)

        self.norm_1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.norm_2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout = tf.keras.layers.Dropout(dropout)

    def call(self, x, mask):
        residual = x
        out = self.norm_1(x)
        out, enc_attn = self.enc_self_attn(out, out, out, mask)
        out = self.dropout(out)
        out += residual

        residual = out
        out = self.norm_2(out)
        out = self.ffn(out)
        out = self.dropout(out)
        out += residual

        return out, enc_attn


class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()

        self.dec_self_attn = MultiHeadAttention(d_model, num_heads)
        self.enc_dec_attn = MultiHeadAttention(d_model, num_heads)

        self.ffn = PoswiseFeedForwardNet(d_model, d_ff)

        self.norm_1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.norm_2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.norm_3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout = tf.keras.layers.Dropout(dropout)

    def call(self, x, enc_out, causality_mask, padding_mask):
        residual = x
        out = self.norm_1(x)
        out, dec_attn = self.dec_self_attn(out, out, out, padding_mask)
        out = self.dropout(out)
        out += residual

        residual = out
        out = self.norm_2(out)
        out, dec_enc_attn = self.enc_dec_attn(out, enc_out, enc_out, causality_mask)
        out = self.dropout(out)
        out += residual

        residual = out
        out = self.norm_3(out)
        out = self.ffn(out)
        out = self.dropout(out)
        out += residual

        return out, dec_attn, dec_enc_attn


In [None]:

class Encoder(tf.keras.Model):
    def __init__(self,
                 n_layers,
                 d_model,
                 n_heads,
                 d_ff,
                 dropout):
        super(Encoder, self).__init__()
        self.n_layers = n_layers
        self.enc_layers = [EncoderLayer(d_model, n_heads, d_ff, dropout)
                        for _ in range(n_layers)]

    def call(self, x, mask):
        out = x

        enc_attns = list()
        for i in range(self.n_layers):
            out, enc_attn = self.enc_layers[i](out, mask)
            enc_attns.append(enc_attn)

        return out, enc_attns

class Decoder(tf.keras.Model):
    def __init__(self,
                 n_layers,
                 d_model,
                 n_heads,
                 d_ff,
                 dropout):
        super(Decoder, self).__init__()
        self.n_layers = n_layers
        self.dec_layers = [DecoderLayer(d_model, n_heads, d_ff, dropout)
                            for _ in range(n_layers)]


    def call(self, x, enc_out, causality_mask, padding_mask):
        out = x

        dec_attns = list()
        dec_enc_attns = list()
        for i in range(self.n_layers):
            out, dec_attn, dec_enc_attn = \
            self.dec_layers[i](out, enc_out, causality_mask, padding_mask)

            dec_attns.append(dec_attn)
            dec_enc_attns.append(dec_enc_attn)

        return out, dec_attns, dec_enc_attns


In [None]:
class Transformer(tf.keras.Model):
    def __init__(
        self,
        n_layers, d_model, n_heads, d_ff,
        src_vocab_size, tgt_vocab_size,
        pos_len,
        dropout=0.2,
        shared=True
    ):
        super(Transformer, self).__init__()
        self.d_model = tf.cast(d_model, tf.float32)

        self.enc_emb = tf.keras.layers.Embedding(src_vocab_size, d_model)
        self.dec_emb = tf.keras.layers.Embedding(tgt_vocab_size, d_model)

        self.pos_encoding = positional_encoding(pos_len, d_model)
        self.dropout = tf.keras.layers.Dropout(dropout)

        self.encoder = Encoder(n_layers, d_model, n_heads, d_ff, dropout)
        self.decoder = Decoder(n_layers, d_model, n_heads, d_ff, dropout)

        self.fc = tf.keras.layers.Dense(tgt_vocab_size)

        self.shared = shared

        if shared: self.fc.set_weights(tf.transpose(self.dec_emb.weights))


    def embedding(self, emb, x):
        seq_len = x.shape[1]
        out = emb(x)

        if self.shared: out *= tf.math.sqrt(self.d_model)

        out += self.pos_encoding[np.newaxis, ...][:, :seq_len, :]
        out = self.dropout(out)

        return out


    def call(self, enc_in, dec_in, enc_mask, causality_mask, dec_mask):
        enc_in = self.embedding(self.enc_emb, enc_in)
        dec_in = self.embedding(self.dec_emb, dec_in)

        enc_out, enc_attns = self.encoder(enc_in, enc_mask)

        dec_out, dec_attns, dec_enc_attns = \
        self.decoder(dec_in, enc_out, causality_mask, dec_mask)

        logits = self.fc(dec_out)

        return logits, enc_attns, dec_attns, dec_enc_attns

<br>

#### 모델 생성


In [None]:
transformer = Transformer(
    n_layers=2,
    d_model=128,
    n_heads=8,
    d_ff=128,
    dropout=0.5,
    pos_len=200,
    shared=True,
    src_vocab_size=5872, tgt_vocab_size=5872
)

<br>

## 모델 학습

In [None]:
class LearningRateScheduler(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super(LearningRateScheduler, self).__init__()
        self.d_model = d_model
        self.warmup_steps = warmup_steps

    def __call__(self, step):
        arg1 = step ** -0.5
        arg2 = step * (self.warmup_steps ** -1.5)

        return (self.d_model ** -0.5) * tf.math.minimum(arg1, arg2)


def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_sum(loss_)/tf.reduce_sum(mask)


learning_rate = LearningRateScheduler(512)
optimizer = tf.keras.optimizers.Adam(
    learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9
)

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

In [None]:
@tf.function()
def train_step(src, tgt, model, optimizer):
    tgt_in = tgt[:, :-1]  # Decoder의 input
    gold = tgt[:, 1:]     # Decoder의 output과 비교하기 위해 right shift를 통해 생성한 최종 타겟

    enc_mask, dec_enc_mask, dec_mask = generate_masks(src, tgt_in)

    with tf.GradientTape() as tape:
        predictions, enc_attns, dec_attns, dec_enc_attns = \
        model(src, tgt_in, enc_mask, dec_enc_mask, dec_mask)
        loss = loss_function(gold, predictions)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    return loss, enc_attns, dec_attns, dec_enc_attns


<br>

#### 모델 학습


In [None]:
def model_fit(enc_train, dec_train, model, epochs, batch_size):
    for epoch in range(epochs):
        total_loss = 0

        idx_list = list(range(0, enc_train.shape[0], batch_size))
        random.shuffle(idx_list)
        t = tqdm(idx_list)

        for (batch, idx) in enumerate(t):
            batch_loss, enc_attns, dec_attns, dec_enc_attns = \
            train_step(
                enc_train[idx:idx+batch_size],
                dec_train[idx:idx+batch_size],
                model,
                optimizer
            )

            total_loss += batch_loss

            t.set_description_str('Epoch %2d' % (epoch + 1))
            t.set_postfix_str('Loss %.4f' % (total_loss.numpy() / (batch + 1)))


model_fit(enc_tensor, dec_tensor, transformer, epochs=10, batch_size=64)

  0%|          | 0/468 [00:00<?, ?it/s]

  0%|          | 0/468 [00:00<?, ?it/s]

  0%|          | 0/468 [00:00<?, ?it/s]

  0%|          | 0/468 [00:00<?, ?it/s]

  0%|          | 0/468 [00:00<?, ?it/s]

  0%|          | 0/468 [00:00<?, ?it/s]

  0%|          | 0/468 [00:00<?, ?it/s]

  0%|          | 0/468 [00:00<?, ?it/s]

  0%|          | 0/468 [00:00<?, ?it/s]

  0%|          | 0/468 [00:00<?, ?it/s]

### 결과 확인

In [None]:
def translate(sentence, model, tokenizer, enc_tensor, dec_tensor):
    enc_maxlen = enc_tensor.shape[-1]
    dec_maxlen = dec_tensor.shape[-1]

    sos_idx = tokenizer.word_index['<sos>']
    eos_idx = tokenizer.word_index['<eos>']

    sentence = preprocess_sentence(sentence)

    m = Mecab()
    sentence = m.morphs(sentence)

    _input = tokenizer.texts_to_sequences([sentence])
    _input = tf.keras.preprocessing.sequence.pad_sequences(
        _input,
        maxlen=enc_maxlen,
        padding='post'
    )

    ids = []
    output = tf.expand_dims([sos_idx], 0)

    for i in range(dec_maxlen):
        enc_padding_mask, combined_mask, dec_padding_mask = generate_masks(
            _input, output
        )

        predictions, enc_attns, dec_attns, dec_enc_attns = model(
            _input, output, enc_padding_mask, combined_mask, dec_padding_mask
        )

        predicted_id = tf.argmax(
            tf.math.softmax(predictions, axis=-1)[0, -1]
        ).numpy().item()

        if predicted_id == eos_idx:
            result = tokenizer.sequences_to_texts([ids])
            return result

        ids.append(predicted_id)
        output = tf.concat([output, tf.expand_dims([predicted_id], 0)], axis=-1)
    result = tokenizer.sequences_to_texts([ids])
    return result


print("=" * 100)
test_sentences = [
    "지루하다, 놀러가고 싶어.",
    "오늘 일찍 일어났더니 피곤하다.",
    "간만에 여자친구랑 데이트 하기로 했어.",
    "집에 있는다는 소리야."
]

for sentence in test_sentences:
    ans = translate(sentence, transformer, tokenizer, enc_tensor, dec_tensor)[0]
    print(f"Quenstion: {sentence:<30}\tAnswer: {ans:<30}")
print("=" * 100)

Quenstion: 지루하다, 놀러가고 싶어.                	Answer: 시간 이 필요 한가 봐요 .               
Quenstion: 오늘 일찍 일어났더니 피곤하다.             	Answer: 아침 일 이 많 았 나 봅니다 .            
Quenstion: 간만에 여자친구랑 데이트 하기로 했어.         	Answer: 좋 은 친구 가 여기 가 여기 가 길 바랄게요 .   
Quenstion: 집에 있는다는 소리야.                  	Answer: 익숙 <UNK> 을 <UNK> 군요 .         


<br>

## 모델 평가


#### Beam Search 및 BLEU 계산 함수 정의


In [None]:
#모델 입력 및 출력 함수
def calc_prob(src_ids, tgt_ids, model):
    enc_padding_mask, combined_mask, dec_padding_mask = generate_masks(
        src_ids, tgt_ids
    )

    predictions, enc_attns, dec_attns, dec_enc_attns = model(
        src_ids, tgt_ids, enc_padding_mask, combined_mask, dec_padding_mask
    )
    return tf.math.softmax(predictions, axis=-1)
=
def beam_search_decoder(
    sentence, model, tokenizer,
    enc_maxlen, dec_maxlen,
    beam_size
):
    sos_idx = tokenizer.word_index['<sos>']
    eos_idx = tokenizer.word_index['<eos>']

    tokens = tokenizer.texts_to_sequences([sentence])
    src_in = tf.keras.preprocessing.sequence.pad_sequences(
        tokens,
        maxlen=enc_maxlen,
        padding='post'
    )

    pred_cache = np.zeros((beam_size * beam_size, dec_maxlen), dtype=np.long)
    pred = np.zeros((beam_size, dec_maxlen), dtype=np.long)

    eos_flag = np.zeros((beam_size, ), dtype=np.long)
    scores = np.ones((beam_size, ))

    pred[:, 0] = sos_idx

    dec_in = tf.expand_dims(pred[0, :1], 0)
    prob = calc_prob(src_in, dec_in, model)[0, -1].numpy()


    for seq_pos in range(1, dec_maxlen):
        score_cache = np.ones((beam_size * beam_size, ))

        # init
        for branch_idx in range(beam_size):
            cache_pos = branch_idx*beam_size

            score_cache[cache_pos:cache_pos+beam_size] = scores[branch_idx]
            pred_cache[cache_pos:cache_pos+beam_size, :seq_pos] = \
            pred[branch_idx, :seq_pos]

        for branch_idx in range(beam_size):
            cache_pos = branch_idx*beam_size

            if seq_pos != 1:   # 모든 Branch를 로 시작하는 경우를 방지
                dec_in = pred_cache[branch_idx, :seq_pos]
                dec_in = tf.expand_dims(dec_in, 0)

                prob = calc_prob(src_in, dec_in, model)[0, -1].numpy()

            for beam_idx in range(beam_size):
                max_idx = np.argmax(prob)

                score_cache[cache_pos+beam_idx] *= prob[max_idx]
                pred_cache[cache_pos+beam_idx, seq_pos] = max_idx

                prob[max_idx] = -1

        for beam_idx in range(beam_size):
            if eos_flag[beam_idx] == -1: continue

            max_idx = np.argmax(score_cache)
            prediction = pred_cache[max_idx, :seq_pos+1]

            pred[beam_idx, :seq_pos+1] = prediction
            scores[beam_idx] = score_cache[max_idx]
            score_cache[max_idx] = -1

            if prediction[-1] == eos_idx:
                eos_flag[beam_idx] = -1
    return pred

def calculate_bleu(reference, candidate, weights=[0.25, 0.25, 0.25, 0.25]):
    return sentence_bleu(
        [reference],
        candidate,
        weights=weights,
        smoothing_function=SmoothingFunction().method1
    )

def beam_bleu(reference, ids, tokenizer, verbose=False):
    reference = reference.split()

    total_score = 0.0
    for _id in ids:
        seq2text = tokenizer.sequences_to_texts([_id])[0]
        _idx =  seq2text.find("<eos>")
        seq2text = seq2text[6:_idx]
        candidate = seq2text.split()
        score = calculate_bleu(reference, candidate)

        if verbose:
            print("=" * 100)
            print("Reference:".ljust(10), " ".join(reference))
            print("Candidate:".ljust(10), " ".join(candidate), end="\n\n")
            print("BLEU:".ljust(10), f"{calculate_bleu(reference, candidate):.3f}")
            print("=" * 100, end="\n\n")

        total_score += score

    return total_score / len(ids)


<br>

#### 예문의 Beam search 문장과 BLEU 출력


In [None]:
idx = 15
test_enc_sentence = test_dataset["Q"][idx]

test_dec_tensor = encoding_sentence(test_dataset["A"], tokenizer)
test_dec_sentence = tokenizer.sequences_to_texts([test_dec_tensor[idx]])[0]
_idx = test_dec_sentence.find("<eos>")
test_dec_sentence = test_dec_sentence[6:_idx]


ids = beam_search_decoder(
    test_enc_sentence,
    transformer, tokenizer,
    enc_tensor.shape[-1], dec_tensor.shape[-1],
    beam_size=5
)

bleu = beam_bleu(test_dec_sentence, ids, tokenizer, verbose=True)

Reference: 땀 을 식혀 주 세요 .
Candidate: 잘 하 셨 어요 .

BLEU:      0.044

Reference: 땀 을 식혀 주 세요 .
Candidate: 잘 하 셨 어요 는데

BLEU:      0.000

Reference: 땀 을 식혀 주 세요 .
Candidate: 잘 하 셨 나 .

BLEU:      0.044

Reference: 땀 을 식혀 주 세요 .
Candidate: 잘 <UNK> 셨 어요 .

BLEU:      0.044

Reference: 땀 을 식혀 주 세요 .
Candidate: 잘 하 셨 나 는데

BLEU:      0.000



<br>

#### 테스트 데이터 BLEU 구하기


In [None]:
enc_maxlen = enc_tensor.shape[-1]
dec_maxlen = dec_tensor.shape[-1]

aver_bleu = 0
for _, que, ans in test_dataset.itertuples():
    ids = beam_search_decoder(
        que,
        transformer, tokenizer,
        enc_maxlen, dec_maxlen,
        beam_size=5
    )

    test_dec_sentence = tokenizer.sequences_to_texts([que])[0]
    _idx = test_dec_sentence.find("<eos>")
    test_dec_sentence = test_dec_sentence[6:_idx]

    aver_bleu += beam_bleu(test_dec_sentence, ids, tokenizer, verbose=False)

print("=" * 100)
print(f"Test Data BLEU: {aver_bleu:.3f}")
print("=" * 100)

Test Data BLEU: 0.419
