In [1]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import pandas as pd
import os
import re
import matplotlib.pyplot as plt
import tensorflow_datasets as tfds
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Input, Attention, Dense, LSTM, Embedding, \
    Bidirectional, Concatenate
from tensorflow.keras.models import Model

# TensorFlow 버전 확인
print(tf.__version__)

# 데이터 다운로드 및 준비
data_path = os.getenv('HOME') + '/aiffel/transformer_chatbot/data/ChatbotData.csv'
data = pd.read_csv(data_path)

# 전처리 함수
def preprocess_sentence(sentence):
    sentence = sentence.lower().strip()
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = re.sub(r"[^a-zA-Z가-힣?.!,]+", " ", sentence)
    sentence = sentence.strip()
    return sentence

def preprocess_data(data):
    data['Q'] = data['Q'].apply(preprocess_sentence)
    data['A'] = data['A'].apply(preprocess_sentence)
    return data

# 데이터 전처리
data = preprocess_data(data)

# 질문과 대답 데이터셋 생성
questions = data['Q']
answers = data['A']

# 질문과 대답 데이터셋 분리
tokenizer = Tokenizer()
tokenizer.fit_on_texts(list(questions) + list(answers))

# 단어 집합 크기
vocab_size = len(tokenizer.word_index) + 1
print('단어 집합 크기:', vocab_size)

# 토크나이징 및 패딩
def tokenize_and_pad(texts, tokenizer, max_len):
    sequences = tokenizer.texts_to_sequences(texts)
    padded_sequences = pad_sequences(sequences, maxlen=max_len, padding='post')
    return padded_sequences

max_len = 30

# 질문 데이터 토크나이징 및 패딩
questions = tokenize_and_pad(questions, tokenizer, max_len)

# 대답 데이터 토크나이징 및 패딩
answers = tokenize_and_pad(answers, tokenizer, max_len)

# 학습 데이터와 검증 데이터 분리
num_samples = len(questions)
train_ratio = 0.8
train_samples = int(num_samples * train_ratio)

train_questions = questions[:train_samples]
train_answers = answers[:train_samples]

val_questions = questions[train_samples:]
val_answers = answers[train_samples:]

# Transformer 모델 구성
def transformer_model(vocab_size, max_len, embed_dim, num_heads, ff_dim, num_blocks):
    inputs = Input(shape=(max_len,))
    
    # Embedding 레이어
    embedding = Embedding(vocab_size, embed_dim)(inputs)
    
    # Positional Encoding 레이어
    positions = tf.range(start=0, limit=max_len, delta=1)
    positions = tf.expand_dims(positions, axis=0)
    positions = tf.tile(positions, multiples=(tf.shape(inputs)[0], 1))
    
    positional_encoding = get_positional_encoding(positions, embed_dim)
    x = embedding + positional_encoding
    
    # Transformer Block 레이어
    for _ in range(num_blocks):
        x = transformer_block(x, embed_dim, num_heads, ff_dim)
    
    # Decoder 레이어
    x = Dense(vocab_size, activation='softmax')(x)
    
    # Model 생성
    model = Model(inputs=inputs, outputs=x)
    return model

# Positional Encoding 함수
def get_positional_encoding(positions, d_model):
    angle_rates = 1 / np.power(10000, (2 * (d_model // 2)) / np.float32(max_len))
    angle_rads = positions * angle_rates
    
    # 배열 짝수 인덱스에는 사인 함수 적용
    sines = np.sin(angle_rads[:, 0::2])
    
    # 배열 홀수 인덱스에는 코사인 함수 적용
    cosines = np.cos(angle_rads[:, 1::2])
    
    # 배열을 결합하여 positional encoding 생성
    pos_encoding = np.concatenate([sines, cosines], axis=-1)
    pos_encoding = tf.cast(pos_encoding, dtype=tf.float32)
    return pos_encoding

# Multi-head Attention 레이어
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        if embed_dim % num_heads != 0:
            raise ValueError("Embedding dimension should be divisible by number of heads.")
        self.projection_dim = embed_dim // num_heads
        self.query_dense = Dense(embed_dim)
        self.key_dense = Dense(embed_dim)
        self.value_dense = Dense(embed_dim)
        self.combine_heads = Dense(embed_dim)
    
    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True)
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
        scaled_score = score / tf.math.sqrt(dim_key)
        weights = tf.nn.softmax(scaled_score, axis=-1)
        output = tf.matmul(weights, value)
        return output, weights
    
    def separate_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])
    
    def call(self, inputs):
        query = inputs['query']
        key = inputs['key']
        value = inputs['value']
        mask = inputs['mask']
        batch_size = tf.shape(query)[0]
        
        # Query, Key, Value에 각각 Dense 레이어 적용
        query = self.query_dense(query)
        key = self.key_dense(key)
        value = self.value_dense(value)
        
        # Multi-heads로 나누기
        query = self.separate_heads(query, batch_size)
        key = self.separate_heads(key, batch_size)
        value = self.separate_heads(value, batch_size)
        
        # Attention 적용
        attention, weights = self.attention(query, key, value)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(attention, (batch_size, -1, self.embed_dim))
        
        # Output 생성
        outputs = self.combine_heads(concat_attention)
        return outputs

# Transformer Block 레이어
def transformer_block(embed_dim, num_heads, ff_dim, rate=0.1):
    inputs = Input(shape=(None, embed_dim))
    attn_outputs = MultiHeadAttention(embed_dim, num_heads)(inputs={
        'query': inputs,
        'key': inputs,
        'value': inputs,
        'mask': None
    })
    attn_outputs = tf.keras.layers.Dropout(rate)(attn_outputs)
    out1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(inputs + attn_outputs)
    ff_outputs = tf.keras.layers.Dense(ff_dim, activation='relu')(out1)
    ff_outputs = tf.keras.layers.Dense(embed_dim)(ff_outputs)
    ff_outputs = tf.keras.layers.Dropout(rate)(ff_outputs)
    out2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(out1 + ff_outputs)
    model = tf.keras.Model(inputs=inputs, outputs=out2)
    return model

# 하이퍼파라미터 설정
embed_dim = 256
num_heads = 8
ff_dim = 1024
num_blocks = 4

# 모델 생성
model = transformer_model(vocab_size, max_len, embed_dim, num_heads, ff_dim, num_blocks)

# Loss, Optimizer 설정
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam')

# 모델 학습
history = model.fit(train_questions, train_answers, batch_size=128, epochs=20, validation_data=(val_questions, val_answers))

# 예측 함수
def predict(sentence):
    sentence = preprocess_sentence(sentence)
    sentence = tokenize_and_pad([sentence], tokenizer, max_len)
    result = model.predict(sentence)
    result = np.argmax(result, axis=-1)
    return result

# 예측 결과 확인
print(predict("안녕하세요"))

2.6.0


FileNotFoundError: [Errno 2] No such file or directory: '/aiffel/aiffel/transformer_chatbot/data/ChatbotData.csv'