In [10]:
!pip install konlpy

import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from konlpy.tag import Okt
from sklearn.model_selection import train_test_split

# 태그 단어
PAD = "<PADDING>"   # 패딩
STA = "<START>"     # 시작
END = "<END>"       # 끝
OOV = "<OOV>"       # 없는 단어(Out of Vocabulary)

# 태그 인덱스
PAD_INDEX = 0
STA_INDEX = 1
END_INDEX = 2
OOV_INDEX = 3

# 데이터 타입
ENCODER_INPUT  = 0
DECODER_INPUT  = 1
DECODER_TARGET = 2

# 한 문장에서 단어 시퀀스의 최대 개수
max_sequences = 30

# 임베딩 벡터 차원
embedding_dim = 100

# LSTM 히든레이어 차원
lstm_hidden_dim = 512

# 정규 표현식 필터
RE_FILTER = re.compile("[.,!?\"':;~()]")

# Data Frame Create
story_df = pd.read_csv('/content/drive/MyDrive/my_ws/project/Final/Data/story.csv', encoding='cp949')

# 1. 정규식을 사용해서 특수문자, 숫자 제거
# story_df의 각 열에 대해서 정규 표현식을 사용하여 문자열 치환을 수행하는 반복문입니다.
for i in range(10):  # 0부터 9까지의 열 인덱스
    story_df[str(i)] = story_df[str(i)].str.replace('[^A-Za-z가-힣ㄱ-ㅎㅏ-ㅣ ]', '')

story_df.drop(['결론'], axis=1, inplace=True)

# 결과를 저장할 빈 데이터프레임 리스트를 초기화합니다.
dataframes = []

# story_df에서 9열까지 있다고 가정합니다 (0부터 9까지, 총 10개의 열).
# 0열부터 8열까지를 input으로 하고, 그 다음 열을 output으로 하는 데이터프레임을 생성합니다.
for i in range(9):  # 0부터 8까지 (9는 포함되지 않음)
    # 각 행에 대해 원하는 열을 공백으로 구분하여 합치기 위한 람다 함수를 apply 메서드에 사용합니다.
    input_column = story_df.iloc[:, :(i + 1)].apply(lambda row: ' '.join(row.values.astype(str)), axis=1)
    output_column = story_df.iloc[:, i + 1]

    # 새로운 데이터프레임을 만들고 dataframes 리스트에 추가합니다.
    df = pd.DataFrame({'input': input_column, 'output': output_column})
    dataframes.append(df)

# 모든 데이터프레임을 세로 방향으로 합쳐서 result_df를 생성합니다.
result_df = pd.concat(dataframes, ignore_index=True)

# 결과를 출력합니다.
result_df

x_data_df = result_df['input'];
t_data_df = result_df['output'];

x_data_train, x_data_test, t_data_train, t_data_test = \
train_test_split(x_data_df,
                 t_data_df,
                 test_size=0.2,
                 random_state=42)

question, answer = list(x_data_train), list(t_data_train)
# 챗봇 데이터 출력
for i in range(5):
    print('Q : ' + question[i])
    print('A : ' + answer[i])
    print()

Collecting konlpy
  Downloading konlpy-0.6.0-py2.py3-none-any.whl (19.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.4/19.4 MB[0m [31m52.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting JPype1>=0.7.0 (from konlpy)
  Downloading JPype1-1.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (488 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m488.6/488.6 kB[0m [31m50.1 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: JPype1, konlpy
Successfully installed JPype1-1.5.0 konlpy-0.6.0
Q : 알렉스는 특별한 능력을 가진 청년으로 그의 능력은 남들이 볼 수 없는 꿈의 세계를 볼 수 있다는 것입니다
A : 그는 이 능력을 숨기며 평범한 삶을 살고 있었지만 어느 날 그의 꿈에 미지의 여성 에밀리가 나타납니다

Q : 영화는 평범한 삶을 살던 주인공 제이크의 이야기로 시작된다 제이크는 어느 날 잠에서 깨어나면 다른 세상에 떨어진 것을 발견한다 그곳은 고대 시대로 제이크는 거대한 동물들 사이에서 생존을 위한 투쟁을 시작한다 그러나 그는 점차 그 세상의 독특한 문화와 사람들에게 매료되어 간다
A : 그리고 그는 고대 세상의 여왕 레나와 사랑에 빠진다

Q : 이야기는 외계 행성에 살고 있는 과학자 훈이의 일상에서 시작된다 훈은 외계 생명체 연구를 통해 새로운 에너지원을 발견하게 된다 그러나 그의 발견은 비밀스럽게 행성을 지배하는 조직의 이목을 끈다
A : 조직은 훈의 연구를 이

  story_df[str(i)] = story_df[str(i)].str.replace('[^A-Za-z가-힣ㄱ-ㅎㅏ-ㅣ ]', '')


In [11]:
# 형태소분석 함수
def pos_tag(sentences):

    # KoNLPy 형태소분석기 설정
    tagger = Okt()

    # 문장 품사 변수 초기화
    sentences_pos = []

    # 모든 문장 반복
    for sentence in sentences:
        # 특수기호 제거
        sentence = re.sub(RE_FILTER, "", sentence)

        # 배열인 형태소분석의 출력을 띄어쓰기로 구분하여 붙임
        sentence = " ".join(tagger.morphs(sentence))
        sentences_pos.append(sentence)

    return sentences_pos

# 형태소분석 수행
question = pos_tag(question)
answer = pos_tag(answer)

# 형태소분석으로 변환된 챗봇 데이터 출력
for i in range(10):
    print('Q : ' + question[i])
    print('A : ' + answer[i])
    print()

Q : 알렉스 는 특별한 능력 을 가진 청년 으로 그 의 능력 은 남 들 이 볼 수 없는 꿈 의 세계 를 볼 수 있다는 것 입니다
A : 그 는 이 능력 을 숨기 며 평범한 삶 을 살 고 있었지만 어느 날 그 의 꿈 에 미지 의 여성 에밀리 가 나타납니다

Q : 영화 는 평범한 삶 을 살던 주인공 제이크 의 이야기 로 시작 된다 제이크 는 어느 날 잠 에서 깨어나면 다른 세상 에 떨어진 것 을 발견 한다 그 곳 은 고대 시대 로 제이크 는 거대한 동물 들 사이 에서 생존 을 위 한 투쟁 을 시작 한 다 그러나 그 는 점차 그 세상 의 독특한 문화 와 사람 들 에게 매료 되어 간다
A : 그리고 그 는 고대 세상 의 여왕 레나 와 사랑 에 빠진다

Q : 이야기 는 외계 행성 에 살 고 있는 과학자 훈이 의 일상 에서 시작 된다 훈 은 외계 생명체 연구 를 통해 새로운 에너지 원 을 발견 하게 된다 그러나 그 의 발견 은 비밀 스럽게 행성 을 지배 하는 조직 의 이목 을 끈다
A : 조직 은 훈 의 연구 를 이용 해 행성 을 지배 하려는 계획 을 세운다

Q : 조용한 작은 마을 에서 살던 청년 톰 은 어느 날 자신 이 마법 의 힘 을 가진 사람 임 을 알 게 된다 이는 그 의 할아버지 로부터 전해진 비밀 로 그 는 금방 이 힘 을 제어 하는 법 을 배우지 못 한다 그 의 마법 은 자주 부주의하게 행동 하며 톰 을 곤경 에 빠뜨린다 그러나 그 는 이 힘 을 바른 방향 으로 사용 하기 위해 노력 하며 자신 만의 능력 을 발견 한다 그 는 주변 사람 들 의 도움 을 받아 마법 의 힘 을 제어 하는 법 을 배우고 그 힘 을 사용 해 사람 들 을 도와주기 시작 한 다 그 의 마법 은 마을 사람 들 에게 기쁨 과 행복 을 주며 그 는 마을 의 영웅 이 된다 그러나 어둠 의 마법사 가 그 의 마법 의 힘 을 원하며 마을 을 위협 하기 시작 한 다
A : 톰 은 마을 을 지키기 위해 어둠 의 마법사 와 대결 하며 그 의 진정한 용기 를 보여준다

Q : 서울 의 한 가운데

In [13]:
# 질문과 대답 문장들을 하나로 합침
sentences = []
sentences.extend(question)
sentences.extend(answer)

words = []

# 단어들의 배열 생성
for sentence in sentences:
    for word in sentence.split():
        words.append(word)

# 길이가 0인 단어는 삭제
words = [word for word in words if len(word) > 0]

# 중복된 단어 삭제
words = list(set(words))

# 제일 앞에 태그 단어 삽입
words[:0] = [PAD, STA, END, OOV]

# 단어와 인덱스의 딕셔너리 생성
word_to_index = {word: index for index, word in enumerate(words)}
index_to_word = {index: word for index, word in enumerate(words)}

In [14]:
# 문장을 인덱스로 변환
def convert_text_to_index(sentences, vocabulary, type):

    sentences_index = []

    # 모든 문장에 대해서 반복
    for sentence in sentences:
        sentence_index = []

        # 디코더 입력일 경우 맨 앞에 START 태그 추가
        if type == DECODER_INPUT:
            sentence_index.extend([vocabulary[STA]])

        # 문장의 단어들을 띄어쓰기로 분리
        for word in sentence.split():
            if vocabulary.get(word) is not None:
                # 사전에 있는 단어면 해당 인덱스를 추가
                sentence_index.extend([vocabulary[word]])
            else:
                # 사전에 없는 단어면 OOV 인덱스를 추가
                sentence_index.extend([vocabulary[OOV]])

        # 최대 길이 검사
        if type == DECODER_TARGET:
            # 디코더 목표일 경우 맨 뒤에 END 태그 추가
            if len(sentence_index) >= max_sequences:
                sentence_index = sentence_index[:max_sequences-1] + [vocabulary[END]]
            else:
                sentence_index += [vocabulary[END]]
        else:
            if len(sentence_index) > max_sequences:
                sentence_index = sentence_index[:max_sequences]

        # 최대 길이에 없는 공간은 패딩 인덱스로 채움
        sentence_index += (max_sequences - len(sentence_index)) * [vocabulary[PAD]]

        # 문장의 인덱스 배열을 추가
        sentences_index.append(sentence_index)

    return np.asarray(sentences_index)

In [69]:
# 인덱스를 문장으로 변환
def convert_index_to_text(indexs, vocabulary):

    sentence = ''

    # 모든 문장에 대해서 반복
    for index in indexs:
        if index == END_INDEX:
            # 종료 인덱스면 중지
            break;
        elif vocabulary.get(index) is not None:
            # 사전에 있는 인덱스면 해당 단어를 추가
            sentence += vocabulary[index]
        else:
            # 사전에 없는 인덱스면 OOV 단어를 추가
            sentence += vocabulary[OOV_INDEX]

        # 빈칸 추가
        sentence += ' '

    return sentence

In [72]:
# 인코더 입력 인덱스 변환
x_encoder = convert_text_to_index(question, word_to_index, ENCODER_INPUT)

# 첫 번째 인코더 입력 출력 (12시 땡)
print(x_encoder[0])

# 디코더 입력 인덱스 변환
x_decoder = convert_text_to_index(answer, word_to_index, DECODER_INPUT)

# 첫 번째 디코더 입력 출력 (START 하루 가 또 가네요)
print(x_decoder[0])

# 디코더 목표 인덱스 변환
y_decoder = convert_text_to_index(answer, word_to_index, DECODER_TARGET)

# 첫 번째 디코더 목표 출력 (하루 가 또 가네요 END)
print(y_decoder[0])

# 원핫인코딩 초기화
one_hot_data = np.zeros((len(y_decoder), max_sequences, len(words)))

# 디코더 목표를 원핫인코딩으로 변환
# 학습시 입력은 인덱스이지만, 출력은 원핫인코딩 형식임
for i, sequence in enumerate(y_decoder):
    for j, index in enumerate(sequence):
        one_hot_data[i, j, index] = 1

# 디코더 목표 설정
y_decoder = one_hot_data

# 첫 번째 디코더 목표 출력
print(y_decoder[0])

[ 664  229  260 1231 1279 1037 1110 2589 1345 2072 1231 2427 1152 2176
  849 2569  191  982  283 2072 2355 2157 2569  191 1208 1548  175    0
    0    0]
[   1 1345  229  849 1231 1279   10  565 1135 2204 1279  188 1937 1804
  752 2130 1345 2072  283 1648  575 2072 2508  791  859 1868    0    0
    0    0]
[1345  229  849 1231 1279   10  565 1135 2204 1279  188 1937 1804  752
 2130 1345 2072  283 1648  575 2072 2508  791  859 1868    2    0    0
    0    0]
[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]]


In [1]:
import tensorflow as tf
from tensorflow import keras
from keras import layers

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim #입력 토큰 벡터의 크기
        self.dense_dim = dense_dim #내부 밀집 층의 크기
        self.num_heads = num_heads #어텐션 헤드의 개수
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None): #call()메서드에 연산을 수행한다.
        if mask is not None: #Embedding층에서 생성하는 마스크는 2D이지만 어텐션 층은 3D또는 4D를 기대한다.
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self): #모델 저장을 위한 직렬화 구현
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

In [2]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
    #위치 임베딩의 단점은 시퀀스 길이를 미리 알아야한다는 것
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding( #토큰 인덱스를 위한 Embedding층을 준비한다.
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions #두 임베딩 벡터를 더한다.

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super().get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [3]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True #이 속성은 층이 입력 마스킹을 출력으로 전달하도록 만든다.
        #케라스에서 마스킹을 사용하려면 명시적으로 설정을 해야한다.

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

In [73]:
vocab_size = 20000
sequence_length = 30
embed_dim = 256
dense_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="english")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
#소스 문장을 인코딩한다.

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="spanish")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
#타깃 시퀀스를 인코딩하고 인코딩된 소스 문장과 합친다.
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x) #출력 위치마다 하나의 단어를 예측한다

transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [74]:
transformer.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])

y_decoder_reshaped = np.reshape(y_decoder, (-1, sequence_length))

history = transformer.fit(
    [x_encoder, x_decoder],
    y_decoder_reshaped,
    epochs=10,
    validation_split=0.2
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [59]:
max_decoded_sentence_length = 30

def decode_sequence(input_sentence):
    tokenized_input_sentence = input_sentence
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = convert_text_to_index(
            [decoded_sentence])[:, :-1]
        predictions = transformer(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = convert_index_to_text(sampled_token_index, index_to_word)
        decoded_sentence += " " + sampled_token
        if sampled_token == "[end]":
            break
    return decoded_sentence

In [None]:
# 텍스트 생성
def generate_text(input_seq):

    # 입력을 인코더에 넣어 마지막 상태 구함
    states = encoder_model.predict(input_seq)

    # 목표 시퀀스 초기화
    target_seq = np.zeros((1, 1))

    # 목표 시퀀스의 첫 번째에 <START> 태그 추가
    target_seq[0, 0] = STA_INDEX

    # 인덱스 초기화
    indexs = []

    # 디코더 타임 스텝 반복
    while 1:
        # 디코더로 현재 타임 스텝 출력 구함
        # 처음에는 인코더 상태를, 다음부터 이전 디코더 상태로 초기화
        decoder_outputs, \
        state_text_forward_h, state_text_forward_c, \
        state_text_backward_h, state_text_backward_c = decoder_model.predict(
                                                [target_seq] + states)

        # 결과의 원핫인코딩 형식을 인덱스로 변환
        index = np.argmax(decoder_outputs[0, 0, :])
        indexs.append(index)

        # 종료 검사
        if len(indexs) >= max_sequences:
            break

        # 목표 시퀀스를 바로 이전의 출력으로 설정
        target_seq = np.zeros((1, 1))
        target_seq[0, 0] = index

        # 디코더의 이전 상태를 다음 디코더 예측에 사용
        states = [state_text_forward_h, state_text_forward_c, state_text_backward_h, state_text_backward_c]

    # 인덱스를 문장으로 변환
    sentence = convert_index_to_text(indexs, index_to_word)
    return sentence

In [26]:
# 예측을 위한 입력 생성
def make_predict_input(sentence):

    sentences = []
    sentences.append(sentence)
    sentences = pos_tag(sentences)
    input_seq = convert_text_to_index(sentences, word_to_index, ENCODER_INPUT)

    return input_seq

In [67]:
# 문장을 인덱스로 변환
input_seq = make_predict_input('이야기 거리가 없다')
input_seq

array([[ 178, 1708,  859,    3,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0]])

In [71]:
# 예측 모델로 텍스트 생성
predictions = decode_sequence(input_seq)
predictions

TypeError: convert_text_to_index() missing 2 required positional arguments: 'vocabulary' and 'type'

In [None]:
!wget http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
!unzip -q spa-eng.zip
#파일 다운받기

from sklearn.model_selection import train_test_split

text_file = "spa-eng/spa.txt"
with open(text_file) as f:
    lines = f.read().split("\n")[:-1]
text_pairs = []
for line in lines:
    english, spanish = line.split("\t") #각 라인은 영어 구절과 이에 해당하는 스페인 번역을 포함해 탭으로 구분됨.
    spanish = "[start] " + spanish + " [end]" #형식 맞추기 위해 스타트와 엔드 추가
    text_pairs.append((english, spanish))
    #파일 파싱해보기

import random
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples]