# 모델

In [1]:
import tensorflow as tf
import numpy as np
# from transformer.Layers import EncoderLayer, DecoderLayer

# 함수 정의
# 필요한 함수 및 클래스 정의
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)

def create_look_ahead_mask(size):
    # size = tf.shape(size)[1]
    look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return look_ahead_mask[tf.newaxis, tf.newaxis, :, :]


class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, position, d_model, **kwargs):
        super(PositionalEncoding, self).__init__(**kwargs)
        self.pos_encoding = self.positional_encoding(position, d_model)

    def get_angles(self, position, i, d_model):
        angles = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
        return position * angles

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(
            np.arange(position)[:, np.newaxis],
            np.arange(d_model)[np.newaxis, :],
            d_model)

        sines = np.sin(angle_rads[:, 0::2])
        cosines = np.cos(angle_rads[:, 1::2])

        pos_encoding = np.concatenate([sines, cosines], axis=-1)
        pos_encoding = pos_encoding[np.newaxis, ...]

        return tf.cast(pos_encoding, dtype=tf.float32)

    def call(self, inputs):
        return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "position": self.pos_encoding.shape[1],
            "d_model": self.pos_encoding.shape[2],
        })
        return config
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)

class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding, rate=0.1, **kwargs):
        super(Decoder, self).__init__(**kwargs)

        self.num_layers = num_layers
        self.d_model = d_model
        self.num_heads = num_heads
        self.dff = dff
        self.target_vocab_size = target_vocab_size
        self.maximum_position_encoding = maximum_position_encoding
        self.rate = rate

        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(maximum_position_encoding, d_model)
        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, training, look_ahead_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}

        x = self.embedding(x)  # (batch_size, target_seq_len, d_model)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x = self.pos_encoding(x)
        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.dec_layers[i](x, training, look_ahead_mask)

        return x  # (batch_size, target_seq_len, d_model)
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "num_layers": self.num_layers,
            "d_model": self.d_model,
            "num_heads": self.num_heads,
            "dff": self.dff,
            "target_vocab_size": self.target_vocab_size,
            "maximum_position_encoding": self.maximum_position_encoding,
            "rate": self.rate,
        })
        return config
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)
    
def transformer(vocab_size, num_layers, units, d_model, num_heads, dropout, name="transformer"):
    # inputs = tf.keras.Input(shape=(None,), name="inputs")
    dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")

    # enc_padding_mask = tf.keras.layers.Lambda(create_padding_mask, output_shape=(1, 1, None), name='enc_padding_mask')(inputs)
    # look_ahead_mask = tf.keras.layers.Lambda(create_look_ahead_mask, output_shape=(1, None, None), name='look_ahead_mask')(dec_inputs)
    look_ahead_mask = tf.keras.layers.Lambda(
        lambda x: create_look_ahead_mask(tf.shape(x)[1])
    )(dec_inputs)
    # dec_padding_mask = tf.keras.layers.Lambda(create_padding_mask, output_shape=(1, 1, None), name='dec_padding_mask')(inputs)

    # enc_output = Encoder(num_layers, d_model, num_heads, units, vocab_size, maximum_position_encoding=vocab_size, rate=dropout)(inputs, training=True, mask=enc_padding_mask)
    # dec_output = Decoder(num_layers, d_model, num_heads, units, vocab_size, maximum_position_encoding=vocab_size, rate=dropout)(dec_inputs, enc_output, training=True, look_ahead_mask=look_ahead_mask, padding_mask=dec_padding_mask)
    dec_output = Decoder(num_layers, d_model, num_heads, units, vocab_size, maximum_position_encoding=vocab_size, rate=dropout)(dec_inputs, training=True, look_ahead_mask=look_ahead_mask)

    outputs = tf.keras.layers.Dense(units=vocab_size, name="outputs")(dec_output)

    # return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)
    return tf.keras.Model(inputs=dec_inputs, outputs=outputs, name=name)



In [2]:
# from transformer.SubLayers import MultiHeadAttention, point_wise_feed_forward_network

class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1, **kwargs):
        super(EncoderLayer, self).__init__(**kwargs)

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask):
        attn_output = self.mha(x, x, x, mask)  # (batch_size, input_seq_len, d_model)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)  # (batch_size, input_seq_len, d_model)

        ffn_output = self.ffn(out1)  # (batch_size, input_seq_len, d_model)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)  # (batch_size, input_seq_len, d_model)

        return out2
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "d_model": self.mha.d_model,
            "num_heads": self.mha.num_heads,
            "dff": self.ffn.layers[0].units,
            "rate": self.dropout1.rate,
        })
        return config
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)
    
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1, **kwargs):
        super(DecoderLayer, self).__init__(**kwargs)

        self.mha1 = MultiHeadAttention(d_model, num_heads)
        # self.mha2 = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        # self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        # self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)

    def call(self, x, training, look_ahead_mask):
        attn1 = self.mha1(x, x, x, look_ahead_mask)  # (batch_size, target_seq_len, d_model)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(x + attn1)  # (batch_size, target_seq_len, d_model)

        # attn2 = self.mha2(enc_output, enc_output, out1, padding_mask)  # (batch_size, target_seq_len, d_model)
        # attn2 = self.dropout2(attn2, training=training)
        # out2 = self.layernorm2(out1 + attn2)  # (batch_size, target_seq_len, d_model)

        ffn_output = self.ffn(out1)  # (batch_size, target_seq_len, d_model)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(out1 + ffn_output)  # (batch_size, target_seq_len, d_model)

        return out3
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "d_model": self.mha1.d_model,
            "num_heads": self.mha1.num_heads,
            "dff": self.ffn.layers[0].units,
            "rate": self.dropout1.rate,
        })
        return config
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [3]:
# 스케일드 닷 프로덕트 어텐션 함수 정의
def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)

    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    output = tf.matmul(attention_weights, v)

    return output, attention_weights

In [7]:
import numpy as np
from transformer.Modules import scaled_dot_product_attention
import tensorflow as tf

class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, **kwargs):
        super(MultiHeadAttention, self).__init__(**kwargs)
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)

        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention, _ = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])

        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
        output = self.dense(concat_attention)

        return output
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "d_model": self.d_model,
            "num_heads": self.num_heads,
        })
        return config
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)
    
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(dff, activation='relu'),  # (batch_size, seq_len, dff)
        tf.keras.layers.Dense(d_model)  # (batch_size, seq_len, d_model)
    ])

# 모델 훈련

In [4]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # ERROR 메시지만 출력하도록 설정
# GPU 설정 로그 억제
import tensorflow as tf
tf.get_logger().setLevel('ERROR')
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

import tensorflow as tf
import tensorflow_datasets as tfds
import datetime
import re
from transformer.Models import transformer
from transformer.Loss import loss_function

# 하이퍼파라미터
MAX_SAMPLES = 50000
MAX_LENGTH = 40
BATCH_SIZE = 64
BUFFER_SIZE = 20000

# 전처리 함수
def preprocess_sentence(sentence):
  # 입력받은 sentence의 양쪽 공백을 제거
  sentence = sentence.strip()

  # 단어와 구두점(punctuation) 사이의 거리를 만듭니다.
  # 예를 들어서 "저는 학생입니다." => "저는 학생 입니다 ."와 같이
  # 학생과 마침표 사이에 거리를 만듭니다.
  sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
  sentence = re.sub(r'[" "]+', " ", sentence)
  # (한글, ".", "?", "!", ",")를 제외한 모든 문자를 공백인 ' '로 대체합니다.
  sentence = re.sub(r"[^가-힣?.!,]+", " ", sentence)
  sentence = sentence.strip()
  return sentence

# 데이터 로드 및 전처리
path_to_dataset = os.path.join(os.getcwd(),'data\ChatbotData.csv')

def load_conversation():
    # 논문에서 초기에 라벨이 없는 데이터 셋을 받는다고 하여 라벨을 제거하고 진행
    inputs = []
    with open(path_to_dataset, 'rt', encoding='UTF8') as file:
        lines = file.readlines()
        for line in lines[1:]:
            parts = line.split(',')
            # inputs.append(preprocess_sentence(parts[0]))
            # outputs.append(preprocess_sentence(parts[1]))
            inputs.append(preprocess_sentence(parts[1]))

            if len(inputs) >= MAX_SAMPLES:
                return inputs
    return inputs

# 데이터를 로드하고 전처리하여 질문을 questions, 답변을 answers에 저장합니다.
# questions, answers = load_conversation()
inputs = load_conversation()

# 질문과 답변 데이터셋에 대해서 Vocabulary 생성
tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(inputs, target_vocab_size=2**13)

# 시작 토큰과 종료 토큰에 고유한 정수를 부여합니다.
START_TOKEN, END_TOKEN = [tokenizer.vocab_size], [tokenizer.vocab_size + 1]

# 시작 토큰과 종료 토큰을 고려하여 +2를 하여 단어장의 크기를 산정합니다.
VOCAB_SIZE = tokenizer.vocab_size + 2



# 정수 인코딩, 최대 길이를 초과하는 샘플 제거, 패딩
def tokenize_and_filter(inputs):
  tokenized_inputs = []
  
  for sentence in inputs:
    # 정수 인코딩 과정에서 시작 토큰과 종료 토큰을 추가
    sentence = START_TOKEN + tokenizer.encode(sentence) + END_TOKEN
    # sentence2 = START_TOKEN + tokenizer.encode(sentence2) + END_TOKEN

    # 최대 길이 40 이하인 경우에만 데이터셋으로 허용
    if len(sentence) <= MAX_LENGTH:
      tokenized_inputs.append(sentence)
      #tokenized_outputs.append(sentence2)
  
  # 최대 길이 40으로 모든 데이터셋을 패딩
  tokenized_inputs = tf.keras.preprocessing.sequence.pad_sequences(
      tokenized_inputs, maxlen=MAX_LENGTH, padding='post')
  #tokenized_outputs = tf.keras.preprocessing.sequence.pad_sequences(
  #    tokenized_outputs, maxlen=MAX_LENGTH, padding='post')
  
  return tokenized_inputs

inputs = tokenize_and_filter(inputs)

# 디코더는 이전의 target을 다음의 input으로 사용합니다.
# 이에 따라 outputs에서는 START_TOKEN을 제거하겠습니다.
"""
dataset = tf.data.Dataset.from_tensor_slices((
    {
        # 'inputs': questions,
        # 'dec_inputs': answers[:, :-1]
        'dec_inputs': answers[:, :-1]
    },
    {
        'outputs': answers[:, 1:]
    },
))
"""
dataset = tf.data.Dataset.from_tensor_slices((inputs, inputs))
dataset = dataset.cache()
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)

# 하이퍼파라미터
VOCAB_SIZE = 8000  # 예시 값
NUM_LAYERS = 6  # 예시 값
D_MODEL = 256  # 예시 값
NUM_HEADS = 8  # 예시 값
UNITS = 512  # 예시 값
DROPOUT = 0.15  # 예시 값
EPOCHS = 20  # 예시 값



if __name__ == '__main__':

    model = transformer(
        vocab_size=VOCAB_SIZE,
        num_layers=NUM_LAYERS,
        units=UNITS,
        d_model=D_MODEL,
        num_heads=NUM_HEADS,
        dropout=DROPOUT
    )

    # 모델 컴파일
    learning_rate = 1e-5
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate),
                # loss = loss_function,
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

    # 모델 요약 정보 출력
    model.summary()

    # 데이터셋 사용하여 모델 학습
    model.fit(dataset, epochs=EPOCHS)
    time = datetime.datetime.now().strftime('%Y%m%d%H%M')
    model_name = f'model/{time}_{EPOCHS}epoch_transformer.h5'

    model.save(model_name)

  from .autonotebook import tqdm as notebook_tqdm


Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 dec_inputs (InputLayer)        [(None, None)]       0           []                               
                                                                                                  
 lambda (Lambda)                (1, 1, None, None)   0           ['dec_inputs[0][0]']             
                                                                                                  
 decoder (Decoder)              (None, None, 256)    5210624     ['dec_inputs[0][0]',             
                                                                  'lambda[0][0]']                 
                                                                                                  
 outputs (Dense)                (None, None, 8000)   2056000     ['decoder[0][0]']      

In [6]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # ERROR 메시지만 출력하도록 설정
# GPU 설정 로그 억제
import tensorflow as tf
tf.get_logger().setLevel('ERROR')

# from train import preprocess_sentence, tokenizer, START_TOKEN, END_TOKEN, MAX_LENGTH
import tensorflow_datasets as tfds

def decoder_inference(sentence, model, tokenizer):
  sentence = preprocess_sentence(sentence)

  # 입력된 문장을 정수 인코딩 후, 시작 토큰과 종료 토큰을 앞뒤로 추가.
  # ex) Where have you been? → [[8331   86   30    5 1059    7 8332]]
  sentence = tf.expand_dims(
      START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)

  # 디코더의 현재까지의 예측한 출력 시퀀스가 지속적으로 저장되는 변수.
  # 처음에는 예측한 내용이 없음으로 시작 토큰만 별도 저장. ex) 8331
  output_sequence = tf.expand_dims(START_TOKEN, 0)

  # 디코더의 인퍼런스 단계
  for i in range(MAX_LENGTH):
    # 디코더는 최대 MAX_LENGTH의 길이만큼 다음 단어 예측을 반복합니다.
    # combined_input = tf.concat([sentence, output_sequence], axis=-1)
    predictions = model(inputs=output_sequence, training=False)
    predictions = predictions[:, -1:, :]

    # 현재 예측한 단어의 정수
    predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)

    # 만약 현재 예측한 단어가 종료 토큰이라면 for문을 종료
    if tf.equal(predicted_id, END_TOKEN[0]):
      break

    # 예측한 단어들은 지속적으로 output_sequence에 추가됩니다.
    # 이 output_sequence는 다시 디코더의 입력이 됩니다.
    output_sequence = tf.concat([output_sequence, predicted_id], axis=-1)

  return tf.squeeze(output_sequence, axis=0)

def sentence_generation(sentence, model, tokenizer):
  # 입력 문장에 대해서 디코더를 동작 시켜 예측된 정수 시퀀스를 리턴받습니다.
  prediction = decoder_inference(sentence, model, tokenizer)

  # 정수 시퀀스를 다시 텍스트 시퀀스로 변환합니다.
  predicted_sentence = tokenizer.decode(
      [i for i in prediction if i < tokenizer.vocab_size])

  # print('나 : {}'.format(sentence))
  print('챗봇 : {}'.format(predicted_sentence))

  return predicted_sentence

def real_time_translation(model, tokenizer):
  print("종료를 원하시면 '종료'를 입력해주세요.")
  while True:
    sentence = input("나 : ")
    if sentence == '종료':
      break
    sentence_generation(sentence, model, tokenizer)

In [9]:
tokenizer = tfds.deprecated.text.SubwordTextEncoder.load_from_file('tokenizer/tokenizer')
model_location = 'model/'

model = 'transformer_15epoch_202406211422.h5'

model_name = model_location + model

# 저장된 모델 불러오기
loaded_model = tf.keras.models.load_model(model_name,
                                          custom_objects={#'loss_function': loss_function,
                                                          'PositionalEncoding': PositionalEncoding,
                                                          'MultiHeadAttention': MultiHeadAttention,
                                                          # 'EncoderLayer': EncoderLayer,
                                                          'DecoderLayer': DecoderLayer,
                                                          # 'Encoder': Encoder,
                                                          'Decoder': Decoder})


sentence_generation('지금까지 어디 있었어? 좀 우울한데 어떻게 해야 할까?', loaded_model, tokenizer)

# real_time_translation(loaded_model, tokenizer)

챗봇 : 0여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여


'0여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여여'