<a href="https://colab.research.google.com/github/dagyeom23658/project_dayeom_chatbot/blob/main/%EA%B0%90%EC%84%B1%EB%B6%84%EC%84%9D_%ED%95%99%EC%8A%B5%EB%90%9C%EB%AA%A8%EB%8D%B8%EB%A1%9C_%EC%98%88%EC%B8%A1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# pip install konlpy

In [None]:
# pip install git+https://github.com/ssut/py-hanspell.git

In [None]:
import numpy as np
import tensorflow as tf
import pandas as pd
import re
from konlpy.tag import Okt
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [None]:
@tf.keras.utils.register_keras_serializable()
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, embedding_dim, num_heads=8, **kwargs):
        super(MultiHeadAttention, self).__init__()
        self.embedding_dim = embedding_dim # d_model
        self.num_heads = num_heads

        assert embedding_dim % self.num_heads == 0

        self.projection_dim = embedding_dim // num_heads
        self.query_dense = tf.keras.layers.Dense(embedding_dim)
        self.key_dense = tf.keras.layers.Dense(embedding_dim)
        self.value_dense = tf.keras.layers.Dense(embedding_dim)
        self.dense = tf.keras.layers.Dense(embedding_dim)


    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'embedding_dim': self.embedding_dim,
            'num_heads': self.num_heads,
            'projection_dim': self.projection_dim,
            'query_dense': self.query_dense,
            'key_dense': self.key_dense,
            'self.value_dense': self.value_dense,
            'self.dense': self.dense,
        })
        return config

    def scaled_dot_product_attention(self, query, key, value):
        matmul_qk = tf.matmul(query, key, transpose_b=True)
        depth = tf.cast(tf.shape(key)[-1], tf.float32)
        logits = matmul_qk / tf.math.sqrt(depth)
        attention_weights = tf.nn.softmax(logits, axis=-1)
        output = tf.matmul(attention_weights, value)
        return output, attention_weights

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        # x.shape = [batch_size, seq_len, embedding_dim]
        batch_size = tf.shape(inputs)[0]

        # (batch_size, seq_len, embedding_dim)
        query = self.query_dense(inputs)
        key = self.key_dense(inputs)
        value = self.value_dense(inputs)

        # (batch_size, num_heads, seq_len, projection_dim)
        query = self.split_heads(query, batch_size)  
        key = self.split_heads(key, batch_size)
        value = self.split_heads(value, batch_size)

        scaled_attention, _ = self.scaled_dot_product_attention(query, key, value)
        # (batch_size, seq_len, num_heads, projection_dim)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  

        # (batch_size, seq_len, embedding_dim)
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.embedding_dim))
        outputs = self.dense(concat_attention)
        return outputs

In [None]:
@tf.keras.utils.register_keras_serializable()
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, embedding_dim2, num_heads, dff, rate=0.1, **kwargs):
        super(TransformerBlock, self).__init__()
        self.embedding_dim2=embedding_dim2
        self.num_heads = num_heads
        self.dff=dff
        self.att = MultiHeadAttention(embedding_dim2, num_heads)
        self.ffn = tf.keras.Sequential(
            [tf.keras.layers.Dense(dff, activation="relu"),
             tf.keras.layers.Dense(embedding_dim2),]
        )
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def get_config(self):

        config = super().get_config().copy()
        config.update({
            'embedding_dim2': self.embedding_dim2,
            'num_heads' : self.num_heads,
            'dff' : self.dff,
            'att': self.att,
            'ffn': self.ffn,
            'layernorm1': self.layernorm1,
            'layernorm2': self.layernorm2,
            'dropout1': self.dropout1,
            'dropout2': self.dropout2,
        })
        return config

    def call(self, inputs, training):
        attn_output = self.att(inputs) # 첫번째 서브층 : 멀티 헤드 어텐션
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output) # Add & Norm
        ffn_output = self.ffn(out1) # 두번째 서브층 : 포지션 와이즈 피드 포워드 신경망
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output) # Add & Norm


In [None]:
@tf.keras.utils.register_keras_serializable()
class TokenAndPositionEmbedding(tf.keras.layers.Layer):
    def __init__(self, max_len, vocab_size, embedding_dim2, **kwargs):
        super(TokenAndPositionEmbedding, self).__init__()
        self.max_len = max_len   # 아래 config떄문에 이 부분도 추가하고...
        self.vocab_size = vocab_size    #
        self.embedding_dim2= embedding_dim2       #
        self.token_emb = tf.keras.layers.Embedding(vocab_size, embedding_dim2)
        self.pos_emb = tf.keras.layers.Embedding(max_len, embedding_dim2)

    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'max_len':self.max_len,          # 이 부분에서 config에 max_len, vocab_size, embedding_dim을 추가해주지 않아서 모델로딩이 계속 안되었던것 같은데(추측) 
            'vocab_size': self.vocab_size,       #  TypeError: __init__() missing 3 required positional arguments: 'max_len', 'vocab_size', and 'embedding' 
            'embedding_dim2':self.embedding_dim2,          # 그런데, 'embedding_dim'을 추가하려했더니 계속 이미 config목록에 있다고 추가가 안되서 키값을 바꿔서 넣어줬다. 
            'token_emb': self.token_emb,            # 모델 로드할 떄 이게 왜 필요한지 모르겠다. 
            'pos_emb': self.pos_emb,
        })
        return config

    def call(self, x):
        max_len = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=max_len, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [None]:
stop_word = pd.read_excel('/content/drive/MyDrive/프로젝트1/stop_words.xlsx',header=None) 
stop_words=set(stop_word.iloc[:,0].values.tolist())

In [None]:
feel_bic_dic = {'기쁨': 0, '당황': 2, '분노': 4, '불안': 1, '상처': 5, '슬픔': 3}
feel_bic_dic_reverse = {0: '기쁨', 1: '불안', 2: '당황', 3: '슬픔', 4: '분노', 5: '상처'}
model = tf.keras.models.load_model('feel_analysis_model.h5')

In [None]:
# 예측해보기
from hanspell import spell_checker
okt=Okt() 
tokenizer = Tokenizer()

def sentiment_predict(new_sentence):

  spelled_sent = spell_checker.check(new_sentence)    # 챗봇에 단어를 입력할 때는 비문법이 많으므로 문법을 맞춰주고 띄어쓰기를 시켜준다. 이걸 추가하니까 성능이 훨씬 좋아졌다.
  hanspell_sent = spelled_sent.checked
  new_sentence = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]','', hanspell_sent)
  new_sentence = okt.morphs(new_sentence, stem=True) # 토큰화
  new_sentence = [word for word in new_sentence if not word in stop_words] # 불용어 제거
  encoded = tokenizer.texts_to_sequences([new_sentence]) # 정수 인코딩
  pad_new = pad_sequences(encoded, maxlen = 25) # 패딩
  score = model.predict(pad_new) # 예측
#   print(score[0, score.argmax()])
  return feel_bic_dic_reverse[score.argmax()]

In [None]:
sentiment_predict('상장을 받았어')

'기쁨'