In [1]:
import pandas as pd

# CSV 파일 읽기
data = pd.read_csv("ChatbotData.csv")

# 질문과 답변을 각각 questions, answers 변수에 저장
questions = data['Q']
answers = data['A']


In [2]:
import re

def preprocess_sentence(sentence):
    # 영문자는 소문자로 변환
    sentence = sentence.lower()
    
    # 영문자, 한글, 숫자를 제외한 문자를 공백으로 치환
    # 특수문자는 '!?', '.,' 이 포함되어야 하므로 이를 추가합니다
    sentence = re.sub(r"[^a-zㄱ-ㅎㅏ-ㅣ가-힣0-9!?.,]", " ", sentence)

    return sentence


In [3]:
from konlpy.tag import Mecab
import numpy as np

mecab = Mecab()

def build_corpus(src, tgt, tokenizer, length_limit):
    que_corpus = []
    ans_corpus = []

    # 정제 및 토큰화
    for s, t in zip(src, tgt):
        que_tokens = tokenizer(preprocess_sentence(s))
        ans_tokens = tokenizer(preprocess_sentence(t))
        
        # 토큰의 개수가 일정 길이 이상인 문장은 제외
        if len(que_tokens) <= length_limit and len(ans_tokens) <= length_limit:
            que_corpus.append(que_tokens)
            ans_corpus.append(ans_tokens)
    
    # 중복 제거
    que_corpus = list(set([" ".join(sentence) for sentence in que_corpus]))
    ans_corpus = list(set([" ".join(sentence) for sentence in ans_corpus]))
    que_corpus = [sentence.split(" ") for sentence in que_corpus]
    ans_corpus = [sentence.split(" ") for sentence in ans_corpus]

    return que_corpus, ans_corpus


# 데이터에 적용
length_limit = 40 # 이 값은 필요에 따라 변경 가능
que_corpus, ans_corpus = build_corpus(questions, answers, mecab.morphs, length_limit)


In [None]:


import random
from gensim.models import Word2Vec

def lexical_sub(sentence, word2vec):
    
    try:
        _from = random.choice(sentence)
        _to = word2vec.wv.most_similar(_from)[0][0]
    except:
        return None
    
    res = []
    for x in sentence:
        if x is _from: res.append(_to)
        else: res.append(x)

    return res

def augment_corpus(src_corpus, tgt_corpus, wv):
    new_src_corpus = []
    new_tgt_corpus = []
    corpus_size = len(src_corpus)
    
    for i in range(corpus_size):
        q = src_corpus[i]
        a = tgt_corpus[i]
        
        new_src = lexical_sub(q, wv)
        new_tgt = lexical_sub(a, wv)
        
        if new_src: 
            new_src_corpus.append(new_src)
            new_tgt_corpus.append(a)
            
        if new_tgt: 
            new_src_corpus.append(q)
            new_tgt_corpus.append(new_tgt)
    
    return new_src_corpus, new_tgt_corpus


# Word2Vec 모델 로드
word2vec = Word2Vec.load('/aiffel/aiffel/transformer_chatbot/data/word2vec_ko.model')
# que_corpus = [" ".join(sentence) for sentence in que_corpus]
# ans_corpus = [" ".join(sentence) for sentence in ans_corpus]

aug_que, aug_anw = augment_corpus(que_corpus, ans_corpus, word2vec)

que_corpus += aug_que
ans_corpus += aug_anw
    
len(que_corpus), len(ans_corpus)




In [5]:
import random
from gensim.models import Word2Vec

def lexical_sub(sentence, word2vec):
    words = sentence # 이미 리스트라고 가정

    # 단어가 없는 경우 처리
    if not words:
        return []
    
    # 랜덤하게 대체할 단어 선택
    idx = random.choice(range(len(words)))
    word = words[idx]

    # Word2Vec 모델을 사용하여 가장 가까운 단어를 찾음
    if word in word2vec.wv.key_to_index: # 이 부분 수정
        similar_words = word2vec.wv.most_similar(word)
        # 유사 단어가 없는 경우 처리
        if not similar_words:
            return words

        # 가장 유사한 단어로 대체
        words[idx] = similar_words[0][0]
    
    return words


# Word2Vec 모델 로드
word2vec = Word2Vec.load('/aiffel/aiffel/transformer_chatbot/data/word2vec_ko.model')

# Augmentation
aug_que_corpus = [lexical_sub(s, word2vec) for s in que_corpus]
aug_ans_corpus = [lexical_sub(s, word2vec) for s in ans_corpus]

# 병렬 구조 유지를 위한 데이터 병합
total_que_corpus = que_corpus + aug_que_corpus + que_corpus
total_ans_corpus = ans_corpus + ans_corpus + aug_ans_corpus


AttributeError: 'Word2VecKeyedVectors' object has no attribute 'key_to_index'

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# 타겟 데이터에 토큰 추가
ans_corpus = [["<start>"] + s + ["<end>"] for s in ans_corpus]

# que_corpus와 ans_corpus 결합
total_corpus = que_corpus + ans_corpus

# 토큰화
tokenizer = Tokenizer(filters="", lower=False, oov_token="<unk>")
tokenizer.fit_on_texts(total_corpus)

# 단어 사전 크기 설정 (padding 고려 +1)
vocab_size = len(tokenizer.word_index) + 1

# 벡터화
que_seqs = tokenizer.texts_to_sequences(que_corpus)
ans_seqs = tokenizer.texts_to_sequences(ans_corpus)

# 패딩
enc_train = pad_sequences(que_seqs, padding="post")
dec_train = pad_sequences(ans_seqs, padding="post")


In [None]:

def positional_encoding(pos, d_model):
    def cal_angle(position, i):
        return position / np.power(10000, (2*(i//2)) / np.float32(d_model))

    def get_posi_angle_vec(position):
        return [cal_angle(position, i) for i in range(d_model)]

    sinusoid_table = np.array([get_posi_angle_vec(pos_i) for pos_i in range(pos)])

    sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])
    sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])

    return sinusoid_table

In [None]:
def generate_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]

def generate_lookahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask

def generate_masks(src, tgt):
    enc_mask = generate_padding_mask(src)
    dec_enc_mask = generate_padding_mask(src)

    dec_lookahead_mask = generate_lookahead_mask(tgt.shape[1])
    dec_tgt_padding_mask = generate_padding_mask(tgt)
    dec_mask = tf.maximum(dec_tgt_padding_mask, dec_lookahead_mask)

    return enc_mask, dec_enc_mask, dec_mask

In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        
        self.depth = d_model // self.num_heads
        
        self.W_q = tf.keras.layers.Dense(d_model)
        self.W_k = tf.keras.layers.Dense(d_model)
        self.W_v = tf.keras.layers.Dense(d_model)
        
        self.linear = tf.keras.layers.Dense(d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask):
        d_k = tf.cast(K.shape[-1], tf.float32)
        QK = tf.matmul(Q, K, transpose_b=True)

        scaled_qk = QK / tf.math.sqrt(d_k)

        if mask is not None: scaled_qk += (mask * -1e9)  

        attentions = tf.nn.softmax(scaled_qk, axis=-1)
        out = tf.matmul(attentions, V)

        return out, attentions
        

    def split_heads(self, x):
        bsz = x.shape[0]
        split_x = tf.reshape(x, (bsz, -1, self.num_heads, self.depth))
        split_x = tf.transpose(split_x, perm=[0, 2, 1, 3])

        return split_x

    def combine_heads(self, x):
        bsz = x.shape[0]
        combined_x = tf.transpose(x, perm=[0, 2, 1, 3])
        combined_x = tf.reshape(combined_x, (bsz, -1, self.d_model))

        return combined_x

    
    def call(self, Q, K, V, mask):
        WQ = self.W_q(Q)
        WK = self.W_k(K)
        WV = self.W_v(V)
        
        WQ_splits = self.split_heads(WQ)
        WK_splits = self.split_heads(WK)
        WV_splits = self.split_heads(WV)
        
        out, attention_weights = self.scaled_dot_product_attention(
            WQ_splits, WK_splits, WV_splits, mask)
                        
        out = self.combine_heads(out)
        out = self.linear(out)
            
        return out, attention_weights

In [None]:
class PoswiseFeedForwardNet(tf.keras.layers.Layer):
    def __init__(self, d_model, d_ff):
        super(PoswiseFeedForwardNet, self).__init__()
        self.d_model = d_model
        self.d_ff = d_ff

        self.fc1 = tf.keras.layers.Dense(d_ff, activation='relu')
        self.fc2 = tf.keras.layers.Dense(d_model)

    def call(self, x):
        out = self.fc1(x)
        out = self.fc2(out)
            
        return out


In [None]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, n_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()

        self.enc_self_attn = MultiHeadAttention(d_model, n_heads)
        self.ffn = PoswiseFeedForwardNet(d_model, d_ff)

        self.norm_1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.norm_2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.do = tf.keras.layers.Dropout(dropout)
        
    def call(self, x, mask):
        '''
        Multi-Head Attention
        '''
        residual = x
        out = self.norm_1(x)
        out, enc_attn = self.enc_self_attn(out, out, out, mask)
        out = self.do(out)
        out += residual
        
        '''
        Position-Wise Feed Forward Network
        '''
        residual = out
        out = self.norm_2(out)
        out = self.ffn(out)
        out = self.do(out)
        out += residual
        
        return out, enc_attn

In [None]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()

        self.dec_self_attn = MultiHeadAttention(d_model, num_heads)
        self.enc_dec_attn = MultiHeadAttention(d_model, num_heads)

        self.ffn = PoswiseFeedForwardNet(d_model, d_ff)

        self.norm_1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.norm_2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.norm_3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.do = tf.keras.layers.Dropout(dropout)
    
    def call(self, x, enc_out, dec_enc_mask, padding_mask):
        '''
        Masked Multi-Head Attention
        '''
        residual = x
        out = self.norm_1(x)
        out, dec_attn = self.dec_self_attn(out, out, out, padding_mask)
        out = self.do(out)
        out += residual

        '''
        Multi-Head Attention
        '''
        residual = out
        out = self.norm_2(out)
        # Q, K, V 순서에 주의하세요!
        out, dec_enc_attn = self.enc_dec_attn(Q=out, K=enc_out, V=enc_out, mask=dec_enc_mask)
        out = self.do(out)
        out += residual
        
        '''
        Position-Wise Feed Forward Network
        '''
        residual = out
        out = self.norm_3(out)
        out = self.ffn(out)
        out = self.do(out)
        out += residual

        return out, dec_attn, dec_enc_attn

In [None]:
class Encoder(tf.keras.Model):
    def __init__(self,
                    n_layers,
                    d_model,
                    n_heads,
                    d_ff,
                    dropout):
        super(Encoder, self).__init__()
        self.n_layers = n_layers
        self.enc_layers = [EncoderLayer(d_model, n_heads, d_ff, dropout) 
                        for _ in range(n_layers)]
    
        self.do = tf.keras.layers.Dropout(dropout)
        
    def call(self, x, mask):
        out = x
    
        enc_attns = list()
        for i in range(self.n_layers):
            out, enc_attn = self.enc_layers[i](out, mask)
            enc_attns.append(enc_attn)
        
        return out, enc_attns

In [None]:
class Decoder(tf.keras.Model):
    def __init__(self,
                    n_layers,
                    d_model,
                    n_heads,
                    d_ff,
                    dropout):
        super(Decoder, self).__init__()
        self.n_layers = n_layers
        self.dec_layers = [DecoderLayer(d_model, n_heads, d_ff, dropout) 
                            for _ in range(n_layers)]
                            
    def call(self, x, enc_out, dec_enc_mask, padding_mask):
        out = x
    
        dec_attns = list()
        dec_enc_attns = list()
        for i in range(self.n_layers):
            out, dec_attn, dec_enc_attn = \
            self.dec_layers[i](out, enc_out, dec_enc_mask, padding_mask)

            dec_attns.append(dec_attn)
            dec_enc_attns.append(dec_enc_attn)

        return out, dec_attns, dec_enc_attns

In [None]:
class Transformer(tf.keras.Model):
    def __init__(self,
                    n_layers,
                    d_model,
                    n_heads,
                    d_ff,
                    src_vocab_size,
                    tgt_vocab_size,
                    pos_len,
                    dropout=0.2,
                    shared_fc=True,
                    shared_emb=False):
        super(Transformer, self).__init__()
        
        self.d_model = tf.cast(d_model, tf.float32)

        if shared_emb:
            self.enc_emb = self.dec_emb = \
            tf.keras.layers.Embedding(src_vocab_size, d_model)
        else:
            self.enc_emb = tf.keras.layers.Embedding(src_vocab_size, d_model)
            self.dec_emb = tf.keras.layers.Embedding(tgt_vocab_size, d_model)

        self.pos_encoding = positional_encoding(pos_len, d_model)
        self.do = tf.keras.layers.Dropout(dropout)

        self.encoder = Encoder(n_layers, d_model, n_heads, d_ff, dropout)
        self.decoder = Decoder(n_layers, d_model, n_heads, d_ff, dropout)

        self.fc = tf.keras.layers.Dense(tgt_vocab_size)

        self.shared_fc = shared_fc

        if shared_fc:
            self.fc.set_weights(tf.transpose(self.dec_emb.weights))

    def embedding(self, emb, x):
        seq_len = x.shape[1]

        out = emb(x)

        if self.shared_fc: out *= tf.math.sqrt(self.d_model)

        out += self.pos_encoding[np.newaxis, ...][:, :seq_len, :]
        out = self.do(out)

        return out

        
    def call(self, enc_in, dec_in, enc_mask, dec_enc_mask, dec_mask):
        enc_in = self.embedding(self.enc_emb, enc_in)
        dec_in = self.embedding(self.dec_emb, dec_in)

        enc_out, enc_attns = self.encoder(enc_in, enc_mask)
        
        dec_out, dec_attns, dec_enc_attns = \
        self.decoder(dec_in, enc_out, dec_enc_mask, dec_mask)
        
        logits = self.fc(dec_out)
        
        return logits, enc_attns, dec_attns, dec_enc_attns

In [None]:
transformer = Transformer(
    n_layers=2,
    d_model=512,
    n_heads=8,
    d_ff=2048,
    src_vocab_size=VOCAB_SIZE,
    tgt_vocab_size=VOCAB_SIZE,
    pos_len=200,
    dropout=0.3,
    shared_fc=True,
    shared_emb=True)

d_model = 512

In [None]:
class LearningRateScheduler(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super(LearningRateScheduler, self).__init__()
        
        self.d_model = d_model
        self.warmup_steps = warmup_steps
    
    def __call__(self, step):
        arg1 = step ** -0.5
        arg2 = step * (self.warmup_steps ** -1.5)
        
        return (self.d_model ** -0.5) * tf.math.minimum(arg1, arg2)


In [None]:

learning_rate = LearningRateScheduler(d_model)

optimizer = tf.keras.optimizers.Adam(learning_rate,
                                        beta_1=0.9,
                                        beta_2=0.98, 
                                        epsilon=1e-9)

In [None]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_sum(loss_)/tf.reduce_sum(mask)

In [None]:
@tf.function()
def train_step(src, tgt, model, optimizer):
    tgt_in = tgt[:, :-1]  # Decoder의 input
    gold = tgt[:, 1:]     # Decoder의 output과 비교하기 위해 right shift를 통해 생성한 최종 타겟

    enc_mask, dec_enc_mask, dec_mask = generate_masks(src, tgt_in)

    with tf.GradientTape() as tape:
        predictions, enc_attns, dec_attns, dec_enc_attns = \
        model(src, tgt_in, enc_mask, dec_enc_mask, dec_mask)
        loss = loss_function(gold, predictions)

    gradients = tape.gradient(loss, model.trainable_variables)    
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    return loss, enc_attns, dec_attns, dec_enc_attns

In [None]:
EPOCHS = 3

for epoch in range(EPOCHS):
    total_loss = 0

    dataset_count = tf.data.experimental.cardinality(train_dataset).numpy()
    tqdm_bar = tqdm(total=dataset_count)
    
    for (src, tgt) in train_dataset:
        loss, enc_attns, dec_attns, dec_enc_attns = train_step(src, tgt, transformer, optimizer)
        total_loss += loss
        tqdm_bar.set_description(f'Epoch {epoch + 1} Loss {total_loss.numpy():.4f}')
        tqdm_bar.update()
    tqdm_bar.close()

    print(f'Epoch {epoch + 1} Loss {total_loss.numpy():.4f}')
