In [85]:
import pandas as pd
import numpy as np
import os
import json
import random
import re
import warnings
import tensorflow as tf
from tensorflow.keras.saving import register_keras_serializable
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import  Input, LSTM, Embedding, Dense, Masking, Layer, Attention, Concatenate, TimeDistributed
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping

%matplotlib inline
warnings.filterwarnings('ignore') 

In [86]:
# JSON 파일이 있는 디렉토리 경로
directory_path = r"C:\Users\user\Desktop\hansol\4번째 프로젝트\TL_QA_seoul"

# JSON 데이터를 저장할 리스트
json_data_list = []

# 디렉토리 내의 모든 파일을 순회
for file_name in os.listdir(directory_path):
    if file_name.endswith(".json"):
        file_path = os.path.join(directory_path, file_name)
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                data = json.load(file)
                
                # 이미지 정보가 저장된 부분 삭제
                if 'images' in data:
                    del data['images']
                
                # 수정된 데이터를 리스트에 추가
                json_data_list.append(data)
                
                # 수정된 JSON 파일 저장
                with open(file_path, 'w', encoding='utf-8') as file:
                    json.dump(data, file, ensure_ascii=False, indent=4)

        except FileNotFoundError:
            print(f"File {file_name} not found.")
        except json.JSONDecodeError:
            print(f"Error decoding JSON from file {file_name}.")

# 데이터 전처리

In [87]:
questions = []
answers = []

for item in json_data_list:
    # 'annotations' 리스트가 있는지 확인
    if 'annotations' in item:
        for annotation in item['annotations']:
            # 'question' 리스트가 있는지 확인
            if 'question' in annotation:
                for question in annotation['question']:
                    questions.append(question['question'])
                    answers.append(question['answer'])

print("Questions:", questions[:5])
print("Answers:", answers[:5])

Questions: ['이 곳의 주소를 알 수 있나요?', '이 곳의 주요 메뉴는 무엇이 있나요?', '이 곳에 인접한 시설이 있나요?', '주차시설이 있나요?', '이 곳의 주소는 어떻게 되나요?']
Answers: ['서울 강서구 우장산로16길 26', '생대구탕, 특대구탕', '강서구청, 커피바알아, 우리은행', '없음', '서울 강서구 우장산로16길 26']


In [88]:
# questions와 answers 리스트를 데이터프레임으로 변환
my_data = pd.DataFrame({
    'Source': questions,
    'Target': answers
})

# 데이터프레임 출력
print(my_data)

                       Source             Target
0           이 곳의 주소를 알 수 있나요?  서울 강서구 우장산로16길 26
1        이 곳의 주요 메뉴는 무엇이 있나요?         생대구탕, 특대구탕
2           이 곳에 인접한 시설이 있나요?  강서구청, 커피바알아, 우리은행
3                  주차시설이 있나요?                 없음
4           이 곳의 주소는 어떻게 되나요?  서울 강서구 우장산로16길 26
...                       ...                ...
36055       이 곳의 주소를 알 수 있나요?  서울 동작구 동작대로35길 10
36056           연락처를 알 수 있나요?      070-8876-6424
36057              주차시설이 있나요?                 없음
36058          영업시간은 어떻게 되나요?      12:00 - 22:00
36059  이 곳의 대표적인 메뉴는 무엇이 있나요?        아메리카노, 카페라떼

[36060 rows x 2 columns]


In [89]:
# 기본적인 전처리.
my_data['Source'] = my_data['Source'].str.replace('\W',' ',regex=True)      # 특수 문자 공백으로 대체.
my_data['Source'] = my_data['Source'].str.replace('\s+',' ',regex=True)     # 잉여 공백 제거.
my_data['Source'] = my_data['Source'].apply(lambda x: x.lower()) # 소문자 변환

my_data['Target'] = my_data['Target'].str.replace('\W',' ',regex=True)      # 특수 문자 공백으로 대체.
my_data['Target'] = my_data['Target'].str.replace('\s+',' ',regex=True)     # 잉여 공백 제거.
my_data['Target'] = my_data['Target'].apply(lambda x: x.lower()) # 소문자 변환

In [90]:
# 단어 단위로 토큰화 하고, decoder 입력과 출력에 필요한 Target에는 <sos>태그와 <eos> 태그를 붙여준다.
my_src_in = []
my_tar_in = []
my_tar_out = []
for i in range(my_data.shape[0]):
    a_src = my_data.Source.loc[i].split()           # Encoder에 입력되는 Source는 있는 그대로 split해 둘 수 있다.
    a_tar = my_data.Target.loc[i]
    a_tar_in = [a_word for a_word in ('<sos> ' + a_tar).split()]
    a_tar_out = [a_word for a_word in (a_tar + ' <eos>').split()]
    my_src_in.append(a_src)
    my_tar_in.append(a_tar_in)
    my_tar_out.append(a_tar_out)

In [91]:
# 전처리 이후 출력.
print(my_src_in[10:20])
print(my_tar_in[10:20])
print(my_tar_out[10:20])

[['이', '곳의', '주소를', '알', '수', '있나요'], ['이', '곳의', '대표적인', '메뉴는', '무엇이', '있나요'], ['이', '곳에', '인접한', '시설이', '있나요'], ['연락처가', '어떻게', '되나요'], ['이', '곳의', '대표적인', '메뉴는', '무엇이', '있나요'], ['영업시간을', '알', '수', '있나요'], ['휴무일은', '어떻게', '되나요'], ['영업시간을', '알', '수', '있나요'], ['이', '곳의', '대표적인', '메뉴를', '알', '수', '있나요'], ['연락처를', '알', '수', '있나요']]
[['<sos>', '서울', '강서구', '우장산로16길', '26'], ['<sos>', '생대구탕', '특대구탕'], ['<sos>', '강서구청', '커피바알아', '우리은행'], ['<sos>', '02', '2605', '3248'], ['<sos>', '생대구탕', '특대구탕'], ['<sos>', '10', '00', '22', '00'], ['<sos>', '없음'], ['<sos>', '10', '00', '22', '00'], ['<sos>', '생대구탕', '특대구탕'], ['<sos>', '02', '2605', '3248']]
[['서울', '강서구', '우장산로16길', '26', '<eos>'], ['생대구탕', '특대구탕', '<eos>'], ['강서구청', '커피바알아', '우리은행', '<eos>'], ['02', '2605', '3248', '<eos>'], ['생대구탕', '특대구탕', '<eos>'], ['10', '00', '22', '00', '<eos>'], ['없음', '<eos>'], ['10', '00', '22', '00', '<eos>'], ['생대구탕', '특대구탕', '<eos>'], ['02', '2605', '3248', '<eos>']]


In [92]:
# 정수 인코딩 Source.
my_tokenizer_src = Tokenizer()
my_tokenizer_src.fit_on_texts(my_src_in)
my_encoder_in = my_tokenizer_src.texts_to_sequences(my_src_in)

# 정수 인코딩 Target. In과 Out 별도
my_tokenizer_tar = Tokenizer()
my_tokenizer_tar.fit_on_texts(my_tar_in)         # 학습.
my_tokenizer_tar.fit_on_texts(my_tar_out)        # 추가 학습.
my_decoder_in = my_tokenizer_tar.texts_to_sequences(my_tar_in)
my_decoder_out = my_tokenizer_tar.texts_to_sequences(my_tar_out)

In [93]:
# 패딩; 가장 긴 문장에 맞추어진다.
my_encoder_in_pad = pad_sequences(my_encoder_in, padding='post')
my_decoder_in_pad = pad_sequences(my_decoder_in, padding='post')
my_decoder_out_pad = pad_sequences(my_decoder_out, padding='post')

# 다음과 같이 tar_in에만 들어있는 '<sos>'와 '<eos>'가 제대로 학습되었음을 확인해 본다.
print( my_tokenizer_tar.word_index['<sos>'] )
print( my_tokenizer_tar.word_index['<eos>'] )

1
2


In [94]:
# Vocabulary 크기.
SRC_VOCAB_SIZE = len(my_tokenizer_src.word_index) + 1     # 0이 패딩용으로 추가되어 +1.
TAR_VOCAB_SIZE = len(my_tokenizer_tar.word_index) + 1     # 0이 패딩용으로 추가되어 +1.
print('Source vocabulary size : {:d}, Target vocabulary size : {:d}'.format(SRC_VOCAB_SIZE, TAR_VOCAB_SIZE))

Source vocabulary size : 162, Target vocabulary size : 8649


In [95]:
# 변환 사전을 간추려 둔다.
src_to_index = my_tokenizer_src.word_index
index_to_src = my_tokenizer_src.index_word

tar_to_index = my_tokenizer_tar.word_index
index_to_tar = my_tokenizer_tar.index_word

In [96]:
# 데이터 준비
encoder_input_data = np.array(my_encoder_in_pad)
decoder_input_data = np.array(my_decoder_in_pad)
decoder_output_data = np.array(my_decoder_out_pad)

In [97]:
# 인코더 설계
encoder_inputs = Input(shape=(None,)) # 입력 시퀀스
enc_emb = Embedding(SRC_VOCAB_SIZE, 50)(encoder_inputs)
encoder_lstm = LSTM(100, return_state=True)
_, state_h, state_c = encoder_lstm(enc_emb)
encoder_states = [state_h, state_c]

# 디코더 설계
decoder_inputs = Input(shape=(None,))
dec_emb_layer = Embedding(TAR_VOCAB_SIZE, 100)
dec_emb = dec_emb_layer(decoder_inputs)
decoder_lstm = LSTM(100, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(dec_emb, initial_state=encoder_states)
decoder_dense = Dense(TAR_VOCAB_SIZE, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

In [98]:
# 모델 정의
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

# 모델 컴파일
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')

# Callback 설정
# 분류 문제에서는 val_accuracy를 더 사용하며, 회귀 문제에서는 val_loss를 사용하는 것이 일반적 (val_loss를 사용할 경우는 mode='min')
early_stopping = EarlyStopping(monitor='val_accuracy', mode='max', patience=5, restore_best_weights=True) # 성능 개선이 없을 때 최대 5번의 에포크를 더 훈련

# 모델 학습
history = model.fit(
    [encoder_input_data, decoder_input_data], 
    np.expand_dims(decoder_output_data, -1), # 디코더 출력 데이터를 3차원으로 변환 => 손실함수 sparse_categorical_ crossentropy의 경우 디코더의 출력 데이터가 (samples, timesteps, features)의 형태여야함
                                             # samples = 배치 크기, timesteps = 출력 시퀀스의 길이, features = 각 시간 단계에서의 클래스 수(타겟 어휘 크기)
    batch_size=64,
    epochs=200,
    validation_split=0.2,
    callbacks=[early_stopping]
)

Epoch 1/200
[1m451/451[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m42s[0m 87ms/step - loss: 3.4612 - val_loss: 1.5893
Epoch 2/200
[1m451/451[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 85ms/step - loss: 1.4290 - val_loss: 1.3079
Epoch 3/200
[1m451/451[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 85ms/step - loss: 1.0974 - val_loss: 1.2662
Epoch 4/200
[1m451/451[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 85ms/step - loss: 0.9882 - val_loss: 1.2851
Epoch 5/200
[1m451/451[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 85ms/step - loss: 0.9283 - val_loss: 1.3091
Epoch 6/200
[1m451/451[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 85ms/step - loss: 0.8837 - val_loss: 1.3269
Epoch 7/200
[1m451/451[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 85ms/step - loss: 0.8457 - val_loss: 1.3453
Epoch 8/200
[1m451/451[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 86ms/step - loss: 0.8136 - val_loss: 1.3651
Epoch 9/200
[1m

In [102]:
# 인퍼런스 모델 설계
encoder_model = Model(encoder_inputs, encoder_states)

# 디코더 설계
decoder_state_input_h = Input(shape=(100,))
decoder_state_input_c = Input(shape=(100,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

dec_emb2 = dec_emb_layer(decoder_inputs) # 입력 단어를 임베딩 벡터로 변환
decoder_outputs2, state_h2, state_c2 = decoder_lstm(
    dec_emb2, initial_state=decoder_states_inputs)
decoder_states2 = [state_h2, state_c2]
decoder_outputs2 = decoder_dense(decoder_outputs2)

# 디코더 모델 정의
decoder_model = Model(
    [decoder_inputs] + decoder_states_inputs,
    [decoder_outputs2] + decoder_states2
)

In [103]:
def decode_sequence(input_seq):
    # 입력 문장을 인코딩하여 상태를 얻음
    states_value = encoder_model.predict(input_seq)

    # <sos> 토큰으로 디코더 입력 시작
    target_seq = np.zeros((1, 1))
    target_seq[0, 0] = tar_to_index['<sos>']

    stop_condition = False
    decoded_sentence = ''
    
    while not stop_condition:
        output_tokens, h, c = decoder_model.predict(
            [target_seq] + states_value)

        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_char = index_to_tar[sampled_token_index]

        if sampled_char != '<eos>':
            decoded_sentence += ' ' + sampled_char

        if (sampled_char == '<eos>' or
           len(decoded_sentence) > 30): # 출력 시퀀스의 최대 길이 = 30
            stop_condition = True

        target_seq = np.zeros((1, 1))
        target_seq[0, 0] = sampled_token_index

        states_value = [h, c]

    return decoded_sentence

In [104]:
# 테스트
for seq_index in range(10):
    input_seq = encoder_input_data[seq_index: seq_index + 1]
    decoded_sentence = decode_sequence(input_seq)
    print('-')
    print('Input sentence:', ' '.join([index_to_src[idx] for idx in input_seq[0] if idx != 0]))
    print('Decoded sentence:', decoded_sentence)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 132ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 132ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
-
Input sentence: 이 곳의 주소를 알 수 있나요
Decoded sentence:  서울 광진구 아차산로 413 6
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
-
Input sentence: 이 곳의 주요 메뉴는 무엇이 있나요
Decoded sentence:  아메리카노 카페라떼
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms