In [2]:
from google.colab import drive
drive.mount('/content/drive')

import os
os.chdir('drive/MyDrive/DL202010821/Attention')

%load_ext autoreload
%autoreload 2

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
import os
import shutil
import zipfile

import pandas as pd
import tensorflow as tf
import urllib3
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical

In [4]:
lines = pd.read_csv('dataset/fra.txt', names=['src', 'tar', 'lic'], sep='\t')
del lines['lic']
print('전체 샘플의 개수 :',len(lines))

전체 샘플의 개수 : 232736


In [5]:
lines = lines.loc[:, 'src':'tar']
lines = lines[0:30000] # 3만개만 사용
lines.sample(10)

Unnamed: 0,src,tar
5010,Have a snack.,Prenez un en-cas !
12153,Is this common?,C'est courant ?
5715,I'm so sorry.,Je suis tellement désolée !
7065,You're upset.,Tu es contrariée.
1230,I'm broke.,Je suis fauché.
23615,Tom isn't a snob.,Tom n'est pas snob.
2934,Be prepared.,Tenez-vous prêtes !
27122,I lost my glasses.,J'ai perdu mes lunettes.
28734,No one was crying.,Personne ne pleurait.
20358,How tall you are!,Comme tu es grand !


In [6]:
lines.tar = lines.tar.apply(lambda x : '\t '+ x + ' \n')
lines.sample(10)

Unnamed: 0,src,tar
10636,Do as you like.,\t Fais comme il te plaira ! \n
10795,Don't waste it.,\t Ne le gaspille pas. \n
21889,Is this your car?,\t Est-ce votre voiture ? \n
12157,Is this for me?,\t Est-ce que c'est pour moi ? \n
24487,What's your name?,\t Comment tu t’appelles ? \n
29334,That's pretty big.,\t C'est plutôt gros. \n
19037,You're generous.,\t Tu es généreux. \n
12172,It doesn't fit.,\t Ça ne convient pas. \n
1123,I hate it.,\t Je déteste ça. \n
22933,That's too risky.,\t C'est trop risqué. \n


In [7]:
# 문자 집합 구축
src_vocab = set()
for line in lines.src: # 1줄씩 읽음
    for char in line: # 1개의 문자씩 읽음
        src_vocab.add(char)

tar_vocab = set()
for line in lines.tar:
    for char in line:
        tar_vocab.add(char)
src_vocab_size = len(src_vocab)+1
tar_vocab_size = len(tar_vocab)+1
print('source 문장의 char 집합 :',src_vocab_size)
print('target 문장의 char 집합 :',tar_vocab_size)


source 문장의 char 집합 : 77
target 문장의 char 집합 : 102


In [8]:
src_vocab = sorted(list(src_vocab))
tar_vocab = sorted(list(tar_vocab))
print(src_vocab[45:75])
print(tar_vocab[45:75])


['W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
['V', 'W', 'X', 'Y', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']


In [9]:
src_to_index = dict([(word, i+1) for i, word in enumerate(src_vocab)])
tar_to_index = dict([(word, i+1) for i, word in enumerate(tar_vocab)])
print(src_to_index)
print(tar_to_index)

{' ': 1, '!': 2, '"': 3, '$': 4, '%': 5, '&': 6, "'": 7, ',': 8, '-': 9, '.': 10, '/': 11, '0': 12, '1': 13, '2': 14, '3': 15, '4': 16, '5': 17, '6': 18, '7': 19, '8': 20, '9': 21, ':': 22, '?': 23, 'A': 24, 'B': 25, 'C': 26, 'D': 27, 'E': 28, 'F': 29, 'G': 30, 'H': 31, 'I': 32, 'J': 33, 'K': 34, 'L': 35, 'M': 36, 'N': 37, 'O': 38, 'P': 39, 'Q': 40, 'R': 41, 'S': 42, 'T': 43, 'U': 44, 'V': 45, 'W': 46, 'X': 47, 'Y': 48, 'Z': 49, 'a': 50, 'b': 51, 'c': 52, 'd': 53, 'e': 54, 'f': 55, 'g': 56, 'h': 57, 'i': 58, 'j': 59, 'k': 60, 'l': 61, 'm': 62, 'n': 63, 'o': 64, 'p': 65, 'q': 66, 'r': 67, 's': 68, 't': 69, 'u': 70, 'v': 71, 'w': 72, 'x': 73, 'y': 74, 'z': 75, 'é': 76}
{'\t': 1, '\n': 2, ' ': 3, '!': 4, '"': 5, '$': 6, '%': 7, '&': 8, "'": 9, ',': 10, '-': 11, '.': 12, '0': 13, '1': 14, '2': 15, '3': 16, '4': 17, '5': 18, '6': 19, '7': 20, '8': 21, '9': 22, ':': 23, '?': 24, 'A': 25, 'B': 26, 'C': 27, 'D': 28, 'E': 29, 'F': 30, 'G': 31, 'H': 32, 'I': 33, 'J': 34, 'K': 35, 'L': 36, 'M': 3

In [10]:
encoder_input = []

# 1개의 문장
for line in lines.src:
  encoded_line = []
  # 각 줄에서 1개의 char
  for char in line:
    # 각 char을 정수로 변환
    encoded_line.append(src_to_index[char])
  encoder_input.append(encoded_line)
print('source 문장의 정수 인코딩 :',encoder_input[:5])


source 문장의 정수 인코딩 : [[30, 64, 10], [30, 64, 10], [30, 64, 10], [30, 64, 10], [31, 58, 10]]


In [11]:
decoder_input = []
for line in lines.tar:
  encoded_line = []
  for char in line:
    encoded_line.append(tar_to_index[char])
  decoder_input.append(encoded_line)
print('target 문장의 정수 인코딩 :',decoder_input[:5])


target 문장의 정수 인코딩 : [[1, 3, 46, 50, 3, 4, 3, 2], [1, 3, 37, 50, 67, 52, 57, 54, 12, 3, 2], [1, 3, 29, 63, 3, 67, 64, 70, 69, 54, 3, 4, 3, 2], [1, 3, 26, 64, 70, 56, 54, 3, 4, 3, 2], [1, 3, 43, 50, 61, 70, 69, 3, 4, 3, 2]]


In [12]:
decoder_target = []
for line in lines.tar:
  timestep = 0
  encoded_line = []
  for char in line:
    if timestep > 0:
      encoded_line.append(tar_to_index[char])
    timestep = timestep + 1
  decoder_target.append(encoded_line)
print('target 문장 레이블의 정수 인코딩 :',decoder_target[:5])


target 문장 레이블의 정수 인코딩 : [[3, 46, 50, 3, 4, 3, 2], [3, 37, 50, 67, 52, 57, 54, 12, 3, 2], [3, 29, 63, 3, 67, 64, 70, 69, 54, 3, 4, 3, 2], [3, 26, 64, 70, 56, 54, 3, 4, 3, 2], [3, 43, 50, 61, 70, 69, 3, 4, 3, 2]]


In [13]:
max_src_len = max([len(line) for line in lines.src])
max_tar_len = max([len(line) for line in lines.tar])
print('source 문장의 최대 길이 :',max_src_len)
print('target 문장의 최대 길이 :',max_tar_len)


source 문장의 최대 길이 : 18
target 문장의 최대 길이 : 61


In [14]:
encoder_input = pad_sequences(encoder_input, maxlen=max_src_len, padding='post')
decoder_input = pad_sequences(decoder_input, maxlen=max_tar_len, padding='post')
decoder_target = pad_sequences(decoder_target, maxlen=max_tar_len, padding='post')
encoder_input = to_categorical(encoder_input)
decoder_input = to_categorical(decoder_input)
decoder_target = to_categorical(decoder_target)


# Attention layer를 추가한 Seq2Seq 모델 학습해보기

In [15]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Concatenate, Dot, Activation, Lambda, Softmax
from tensorflow.keras.optimizers import RMSprop
import numpy as np
import tensorflow as tf

In [16]:
# 어텐션 레이어
class AttentionLayer(tf.keras.layers.Layer):
    def __init__(self):
        super(AttentionLayer, self).__init__()

    def call(self, query, key, value):
        scores = tf.matmul(query, key, transpose_b=True)
        attention_weights = Softmax(axis=-1)(scores)
        context_vector = tf.matmul(attention_weights, value)
        return context_vector, attention_weights

In [17]:
# 인코더 정의
encoder_inputs = Input(shape=(None, src_vocab_size))
encoder_lstm = LSTM(256, return_sequences=True, return_state=True)
encoder_outputs, state_h, state_c = encoder_lstm(encoder_inputs)

# 디코더 정의
decoder_inputs = Input(shape=(None, tar_vocab_size))
decoder_lstm = LSTM(256, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=[state_h, state_c])


In [19]:

# 어텐션 레이어 추가
# 1) AttentioLayer  선언
# 2) context_vector, attention_weights 에 출력 담기
attention_layer = AttentionLayer()
context_vector, attention_weights = attention_layer(decoder_outputs, encoder_outputs, encoder_outputs)
# 컨텍스트 벡터와 디코더 출력을 연결
decoder_concat_input = Concatenate(axis=-1)([context_vector, decoder_outputs])

# 출력 레이어
decoder_dense = Dense(tar_vocab_size, activation='softmax')
decoder_outputs = decoder_dense(decoder_concat_input)

# 전체 모델 정의
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

# 모델 컴파일
model.compile(optimizer=RMSprop(), loss='categorical_crossentropy', metrics=['accuracy'])

# 모델 요약
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, None, 77)]           0         []                            
                                                                                                  
 input_2 (InputLayer)        [(None, None, 102)]          0         []                            
                                                                                                  
 lstm (LSTM)                 [(None, None, 256),          342016    ['input_1[0][0]']             
                              (None, 256),                                                        
                              (None, 256)]                                                        
                                                                                              

In [20]:
# 모델 학습
model.fit(
    [encoder_input, decoder_input],
    decoder_target,
    batch_size=64,
    epochs=40,
    validation_split=0.2
)



Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40
Epoch 11/40
Epoch 12/40
Epoch 13/40
Epoch 14/40
Epoch 15/40
Epoch 16/40
Epoch 17/40
Epoch 18/40
Epoch 19/40
Epoch 20/40
Epoch 21/40
Epoch 22/40
Epoch 23/40
Epoch 24/40
Epoch 25/40
Epoch 26/40
Epoch 27/40
Epoch 28/40
Epoch 29/40
Epoch 30/40
Epoch 31/40
Epoch 32/40
Epoch 33/40
Epoch 34/40
Epoch 35/40
Epoch 36/40
Epoch 37/40
Epoch 38/40
Epoch 39/40
Epoch 40/40


<keras.src.callbacks.History at 0x7af07a300c10>

In [21]:
index_to_src = dict((i, char) for char, i in src_to_index.items())
index_to_tar = dict((i, char) for char, i in tar_to_index.items())


In [22]:
# 인코더 모델
encoder_model = Model(encoder_inputs, [encoder_outputs, state_h, state_c])

# 디코더
# 입력 정의
decoder_state_input_h = Input(shape=(256,), name="decoder_state_input_h")
decoder_state_input_c = Input(shape=(256,), name="decoder_state_input_c")

# 디코더 LSTM
decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=[decoder_state_input_h, decoder_state_input_c])

# 어텐션 레이어 추가
context_vector, attention_weights = attention_layer(decoder_outputs, encoder_outputs, encoder_outputs)

# 컨텍스트 벡터와 디코더 출력을 결합
decoder_concat_input = Concatenate(name="concatenate_layer")([context_vector, decoder_outputs])

# 최종 출력 레이어
decoder_final_output = decoder_dense(decoder_concat_input)

# 디코더 모델 생성
decoder_model = Model(
    inputs=[decoder_inputs, encoder_outputs, decoder_state_input_h, decoder_state_input_c],
    outputs=[decoder_final_output, state_h, state_c, attention_weights]
)


# 오류 메시지에 나타난 문제를 디버깅하기 위해 모델의 개요를 출력
encoder_model.summary()
decoder_model.summary()



Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, None, 77)]        0         
                                                                 
 lstm (LSTM)                 [(None, None, 256),       342016    
                              (None, 256),                       
                              (None, 256)]                       
                                                                 
Total params: 342016 (1.30 MB)
Trainable params: 342016 (1.30 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, None, 102)]          0         []  

In [None]:

# 번역 결과를 디코딩하는 함수
def decode_sequence(input_seq):
    # 인코더의 상태를 얻음
    encoder_output, state_h, state_c = encoder_model.predict(input_seq)

    # 디코더의 초기 입력 (시작 심볼)
    target_seq = np.zeros((1, 1, tar_vocab_size))
    target_seq[0, 0, tar_to_index['\t']] = 1.

    # 디코딩 루프
    stop_condition = False
    decoded_sentence = ''
    while not stop_condition:
        output_tokens, h, c, a = decoder_model.predict([target_seq, encoder_output, state_h, state_c])

        # 샘플링
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_char = index_to_tar[sampled_token_index]
        decoded_sentence += sampled_char

        # 종료 조건: 최대 길이 초과 또는 종료 심볼
        if (sampled_char == '\n' or len(decoded_sentence) > max_tar_len):
            stop_condition = True

        # 다음 디코더 입력 업데이트
        target_seq = np.zeros((1, 1, tar_vocab_size))
        target_seq[0, 0, sampled_token_index] = 1.

        # 상태 업데이트
        state_h, state_c = h, c

    return decoded_sentence

In [None]:
# 테스트 데이터 사용 예시
for seq_index in [3,50,100,300,1001]: # 입력 문장의 인덱스
  input_seq = encoder_input[seq_index:seq_index+1]
  decoded_sentence = decode_sequence(input_seq)
  print(35 * "-")
  print('입력 문장:', lines.src[seq_index])
  print('정답 문장:', lines.tar[seq_index][2:len(lines.tar[seq_index])-1]) # '\t'와 '\n'을 빼고 출력
  print('번역 문장:', decoded_sentence[1:len(decoded_sentence)-1]) # '\n'을 빼고 출력
