<a href="https://colab.research.google.com/github/as9786/NLP/blob/main/Attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Attention 신경망 구현 및 학습

In [1]:
!pip install konlpy

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting konlpy
  Downloading konlpy-0.6.0-py2.py3-none-any.whl (19.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.4/19.4 MB[0m [31m51.9 MB/s[0m eta [36m0:00:00[0m
Collecting JPype1>=0.7.0
  Downloading JPype1-1.4.1-cp39-cp39-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (465 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m465.3/465.3 KB[0m [31m39.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: JPype1, konlpy
Successfully installed JPype1-1.4.1 konlpy-0.6.0


In [2]:
import random
import tensorflow as tf
from konlpy.tag import Okt
import pandas as pd

# 초매개변수

In [3]:
EPOCHS = 200
NUM_WORDS = 2000

# Encoder

In [5]:
class Encoder(tf.keras.Model):

  def __init__(self):
    super(Encoder,self).__init__()
    self.emb = tf.keras.layers.Embedding(NUM_WORDS,64) 
    self.lstm = tf.keras.layers.LSTM(512,return_state=True,return_sequences=True)

  def call(self,x,training=False,mask=None):
    x = self.emb(x)
    H,h,c = self.lstm(x)
    return H,h,c

# Decoder

In [35]:
class Decoder(tf.keras.Model):

  def __init__(self):
    super(Decoder,self).__init__()
    self.emb = tf.keras.layers.Embedding(NUM_WORDS,64)
    self.lstm = tf.keras.layers.LSTM(512,return_sequences=True,return_state=True)
    self.att = tf.keras.layers.Attention() 
    self.dense = tf.keras.layers.Dense(NUM_WORDS,activation='softmax')

  def call(self,inputs,training=False,mask=None): 
    x,s0,c0,H = inputs  # h와 c는 context
    x = self.emb(x) 
    S, h, c = self.lstm(x,initial_state=[s0,c0])

    s_ = tf.concat([s0[:,tf.newaxis,:],S[:,:-1,:]],axis=1)
    A = self.att([s_,H])
    y = tf.concat([S,A],axis=-1)
    return self.dense(y),h,c

# Seq2seq

In [41]:
class Seq2seq(tf.keras.Model):

  def __init__(self,sos,eos):
    super(Seq2seq,self).__init__()
    self.enc = Encoder()
    self.dec = Decoder()
    self.sos = sos
    self.eos = eos
  
  def call(self, inputs, training=False, mask=None): 
    if training is True:
      x,y = inputs # 학습을 위해서는 정답 data도 같이 알아야 함
      H,h,c = self.enc(x) # Hidden state, cell state
      y, _, _ = self.dec((y,h,c,H))
      return y #전체 문장장

    else:
      x = inputs # Test 시에는 정답이 있으면 안됨됨
      H,h,c = self.enc(x)
      y = tf.convert_to_tensor(self.sos) # 첫 번째 입력
      y = tf.reshape(y,(1,1))

      seq = tf.TensorArray(tf.int32,64)

      for idx in tf.range(64):
        y, h, c = self.dec([y,h,c,H])
        y = tf.cast(tf.argmax(y,axis=-1),dtype=tf.int32)
        y = tf.reshape(y,(1,1))
        seq = seq.write(idx,y)

        if y == self.eos:
          break
      return tf.reshape(seq.stack(),(1,64))

# 학습, test loop

In [42]:
# Implement training loop
@tf.function
def train_step(model, inputs, labels, loss_object, optimizer, train_loss, train_accuracy):
    output_labels = labels[:, 1:]
    shifted_labels = labels[:, :-1]
    with tf.GradientTape() as tape:
        predictions = model([inputs, shifted_labels], training=True)
        loss = loss_object(output_labels, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    train_loss(loss)
    train_accuracy(output_labels, predictions)

# Implement algorithm test
@tf.function
def test_step(model, inputs):
    return model(inputs, training=False)

In [30]:
dataset_file = '/content/drive/MyDrive/chatbot_data.csv' # acquired from 'http://www.aihub.or.kr' and modified
okt = Okt()

with open(dataset_file, 'r') as file:
    lines = file.readlines()
    seq = [' '.join(okt.morphs(line)) for line in lines]

questions = seq[::2]
answers = ['\t ' + lines for lines in seq[1::2]]

num_sample = len(questions)

perm = list(range(num_sample))
random.seed(0)
random.shuffle(perm)

train_q = list()
train_a = list()
test_q = list()
test_a = list()

for idx, qna in enumerate(zip(questions, answers)):
    q, a = qna
    if perm[idx] > num_sample//5:
        train_q.append(q)
        train_a.append(a)
    else:
        test_q.append(q)
        test_a.append(a)

tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=NUM_WORDS,
                                                  filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~')

tokenizer.fit_on_texts(train_q + train_a)

train_q_seq = tokenizer.texts_to_sequences(train_q)
train_a_seq = tokenizer.texts_to_sequences(train_a)

test_q_seq = tokenizer.texts_to_sequences(test_q)
test_a_seq = tokenizer.texts_to_sequences(test_a)

x_train = tf.keras.preprocessing.sequence.pad_sequences(train_q_seq,
                                                        value=0,
                                                        padding='pre',
                                                        maxlen=64)
y_train = tf.keras.preprocessing.sequence.pad_sequences(train_a_seq,
                                                        value=0,
                                                        padding='post',
                                                        maxlen=65)


x_test = tf.keras.preprocessing.sequence.pad_sequences(test_q_seq,
                                                       value=0,
                                                       padding='pre',
                                                       maxlen=64)
y_test = tf.keras.preprocessing.sequence.pad_sequences(test_a_seq,
                                                       value=0,
                                                       padding='post',
                                                       maxlen=65)

train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(10000).batch(32).prefetch(1024)
test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(1).prefetch(1024)

# 학습 환경 정의

In [43]:
# Create model
model = Seq2seq(sos=tokenizer.word_index['\t'],
                eos=tokenizer.word_index['\n'])

# Define loss and optimizer
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

# Define performance metrics
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

In [44]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

In [45]:
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

# 학습

In [46]:
for epoch in range(EPOCHS):
    for seqs, labels in train_ds:
        train_step(model, seqs, labels, loss_object, optimizer, train_loss, train_accuracy)

    template = 'Epoch {}, Loss: {}, Accuracy: {}'
    print(template.format(epoch + 1,
                          train_loss.result(),
                          train_accuracy.result() * 100))

    train_loss.reset_states()
    train_accuracy.reset_states()

Epoch 1, Loss: 2.906686544418335, Accuracy: 83.16102600097656
Epoch 2, Loss: 0.6184918880462646, Accuracy: 90.45661163330078
Epoch 3, Loss: 0.5658604502677917, Accuracy: 91.19282531738281
Epoch 4, Loss: 0.5496801137924194, Accuracy: 91.14974975585938
Epoch 5, Loss: 0.5448196530342102, Accuracy: 91.14974975585938
Epoch 6, Loss: 0.5266262292861938, Accuracy: 91.14974975585938
Epoch 7, Loss: 0.5261322855949402, Accuracy: 91.21631622314453
Epoch 8, Loss: 0.5129985809326172, Accuracy: 91.23590087890625
Epoch 9, Loss: 0.4967699348926544, Accuracy: 91.39253997802734
Epoch 10, Loss: 0.47819092869758606, Accuracy: 91.53743743896484
Epoch 11, Loss: 0.4633048474788666, Accuracy: 91.81156158447266
Epoch 12, Loss: 0.4567403197288513, Accuracy: 92.03868865966797
Epoch 13, Loss: 0.4474528133869171, Accuracy: 92.285400390625
Epoch 14, Loss: 0.4389043152332306, Accuracy: 92.35980224609375
Epoch 15, Loss: 0.429273784160614, Accuracy: 92.50469970703125
Epoch 16, Loss: 0.4283314347267151, Accuracy: 92.508

In [47]:
for test_seq, test_labels in test_ds:
    prediction = test_step(model, test_seq)
    test_text = tokenizer.sequences_to_texts(test_seq.numpy())
    gt_text = tokenizer.sequences_to_texts(test_labels.numpy())
    texts = tokenizer.sequences_to_texts(prediction.numpy())
    print('_')
    print('q: ', test_text)
    print('a: ', gt_text)
    print('p: ', texts)

_
q:  ['여기 기프티콘 되죠 \n']
a:  ['\t 네 현금영수증 해드릴까 요 \n']
p:  ['여기 카드 카드 역 면 사용 주시 입니다 \n']
_
q:  ['네 에 테이크 아웃 도 가능한가요 \n']
a:  ['\t 네 로 오시 면 테이크 아웃 잔 에 담아 드려요 \n']
p:  ['네 고객 님 시럽 는 아 주문 됩니다 \n']
_
q:  ['아메리카노 톨 사이즈 로 주세요 \n']
a:  ['\t 따뜻한 거 로 드릴 까요 \n']
p:  ['다른 건 필요 없으신 가요 \n']
_
q:  ['진동 을 따로 주시나요 \n']
a:  ['\t 주 번호 로 드리겠습니다 \n']
p:  ['스콘 은 데워 드릴 까요 \n']
_
q:  ['자리 있나요 \n']
a:  ['\t 네 있습니다 \n']
p:  ['아니요 치즈케이크 는 지금 없어요 \n']
_
q:  ['그럼 루이보스 밀크 티 하나 \n']
a:  ['\t 네 알겠습니다 \n']
p:  ['네 4500원 입니다 \n']
_
q:  ['다음 에 무료 로 하고 엔 도장 찍어주세요 \n']
a:  ['\t 네 \n']
p:  ['네 쿠폰 만 주시 면 적립 드리겠습니다 \n']
_
q:  ['아메리카노 한 잔 에 얼마 죠 \n']
a:  ['\t 입니다 \n']
p:  ['4000원 입니다 \n']
_
q:  ['얼마나 \n']
a:  ['\t 바로 만들어 드릴게요 \n']
p:  ['10분 정도 걸려요 \n']
_
q:  ['카푸치노 는 로 주시 고 아메리카노 는 로 \n']
a:  ['\t 네 더 없으세요 \n']
p:  ['아이스 아메리카노 한잔 주문 도 와 드리겠습니다 \n']
_
q:  ['아메리카노 는 어떤 종류 가 있나요 \n']
a:  ['\t 디카 페인 과 기본 아메리카노 2 종류 있습니다 \n']
p:  ['네 고객 님 티 종류 다 가능해요 \n']
_
q:  ['카카오 페이 로 결제 가능한가요 \n']
a:  ['\t 네 가능합니다 \n']
p:  ['네 있습니다 \n']
_
q:  