<a href="https://colab.research.google.com/github/MunkiPark/Deeplearning-Application/blob/main/seq2seq_example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Tatoeba Project의 프랑스어-영어 이중 언어 데이터셋을 이용하여 영어를 프랑스어로 번역하는 seq2seq 모델

입력 : 영어 문장<br>
출력 : 프랑스어 문장

##seq2seq 개요
- 기존의 RNN은 입력 시퀀스와 똑같은 길이의 시퀀스를 출력(입력token마다 출력 token을 출력)
- 언어 번역 시 입력과 출력의 길이가 달라지는 경우에는 사용하기 어렵다는 단점이 존재
- seq2seq는 입력 시퀀스를 통째로 입력하고 출력 시퀀스를 통째로 받아 해당 문제점을 해결

##seq2seq 구성
- 인코더와 디코더로 구성
- 인코더와 디코더 모두 RNN 계열(LSTM, GRU등)으로 구성
- 인코더 : 입력 시퀀스를 입력받아 context vector를 생성
 - context vector : 입력 시퀀스의 의미를 담은 인코더의 마지막 hidden state vector
-디코더 : context vector를 바탕으로 출력 시퀀스를 출력
 - h0로 context vector를 받고, t1에서의 input으로 start를 받아 EOS가 나올 때까지 y(t-1)을 입력으로 받으면서 시퀀스 출력

In [22]:
import nltk
import numpy as np
import re
import shutil
import tensorflow as tf
import os
import unicodedata
from nltk.translate.bleu_score import sentence_bleu
from nltk.translate.bleu_score import SmoothingFunction
num_sent_pairs = 30000 # 문장 개수

In [23]:
from google.colab import drive
drive.mount('/content/drive') #drive 연결

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [24]:
def clean_up_logs(data_dir):
    checkpoint_dir = os.path.join(data_dir, "checkpoints")
    if os.path.exists(checkpoint_dir):
        shutil.rmtree(checkpoint_dir, ignore_errors=True)
        os.makedirs(checkpoint_dir)
    return checkpoint_dir

data_dir = "./data"
checkpoint_dir = clean_up_logs(data_dir)

###전처리
- 데이터셋 : dataset 폴더 안의 fra.txt
- 인코더의 입력 : 영어 단어의 시퀀스
- 디코더의 입력 : 프랑스어 단어 집합
- 디코더의 출력 : 1칸씩 밀린 프랑스어 단어 시퀀스(EOS 포함)
- 입력 전처리
 - 문자는 모두 아스키화
 - 특정 문장 부호 분리
 - 알파벳, 특정 문장 보호 이외의 모든 문자 제거
 - 문장은 모두 소문자로 변화

In [25]:
def preprocess_sentence(sent):
  sent = "".join([c for c in unicodedata.normalize('NFD', sent) if unicodedata.category(c) != 'Mn']) # 영어 알파벳에 없는 문자들을 정규화(조합형 문자 분리 및 결합형 문자 제거)
  sent = re.sub(r"([!.?])", r" \1", sent) # 문장에서 [!.?]가 나오면 뒤에 공백 추가
  sent = re.sub(r"[^a-zA-Z!.?]+", r" ", sent) # !.?을 제외한 특수문자, 숫자들을 공백으로 변환(=삭제) -> 모델이 영어 알파벳과 !.?만 다루도록 전처리
  sent = re.sub(r"\s+", " ", sent) # 2칸 이상의 공백을 모두 1칸의 공백으로 변경"
  return sent.lower() #모든 대문자를 소문자로 변경하여 return

- txt파일 불러오기 및 문자 시퀀스로 변경


In [26]:
def download_and_read():
  en_sents, fr_sents_in, fr_sents_out = [], [], [] # en 입력 시퀀스, fr 입력 시퀀스, fr 출력 시퀀스 리스트 선언
  local_file = os.path.join('/',"content","drive","MyDrive","intern ai","dataset", "fra.txt") # local_file에 저장될 fra.txt의 경로를 저장
  with open(local_file, "r") as fin: #local_file를 open
    for i, line in enumerate(fin): # i : index, line : 영어-프랑스어 라인
      en_sent, fr_sent, _ = line.strip().split('\t') # 영어와 프랑스어 문장에서 앞 뒤의 공백들 제거한 후 tab을 기준으로 분리해서 각각 en_sent, fr_sent에 저장(문장 전체를 하나의 element로 취급하여 저장)
      en_sent = [w for w in preprocess_sentence(en_sent).split()] # 영어 문장을 위의 전처리 함수로 처리한 후 단어들로 분리하여 en_sent에 저장(단어 1개가 list의 element1개로 취급)
      fr_sent = preprocess_sentence(fr_sent) # 프랑스어 문장을 전처리 함수로 처리(여전히 문장 1개가 list의 element 1개)
      fr_sent_in = [w for w in ("BOS "+ fr_sent).split()] # 입력 문장은 앞에 BOS를 추가하여 단어별로 분리 -> BOS를 포함한 단어들로 list 구성
      fr_sent_out = [w for w in (fr_sent + " EOS").split()] # 출력 문장은 뒤에 EOS를 추가하여 단어별로 분리 _> EOS를 포함한 단어들로 list 구성
      en_sents.append(en_sent) # en_sents에 en_sent리스트를 element로 추가(2차원 list)
      fr_sents_in.append(fr_sent_in) #fr_snets_in에 fr_sent_in 리스트를 element로 추가
      fr_sents_out.append(fr_sent_out) #fr_snets_out에 fr_sent_out 리스트를 element로 추가
      if i >= num_sent_pairs -1: # index가 문장 개수를 넘어가면 for문 탈출
        break
  return en_sents, fr_sents_in, fr_sents_out# 생성한 2차원 리스트들을 리턴
sents_en, sents_fr_in, sents_fr_out = download_and_read() #위의 함수를 실행하여 실제 시퀀스 sents_en, setns_fr_in, sents_fr_out 생성

- 입력 토큰화 및 어휘 생성
 - 문장의 각 단어들에 숫자를 할당 -> 1차원 embading

In [27]:
tokenizer_en = tf.keras.preprocessing.text.Tokenizer(filters="", lower=False) # 영어 tokenizer 객체 생성
tokenizer_en.fit_on_texts(sents_en) # 만든 tokenizer 객체를 sents_en에 적용
data_en = tokenizer_en.texts_to_sequences(sents_en) # 문장의 임베딩 리스트를 data_en에 저장
data_en = tf.keras.preprocessing.sequence.pad_sequences(data_en, padding="post") # 모든 시퀀스들의 길이를 맞추기 위한 패딩 추가

tokenizer_fr = tf.keras.preprocessing.text.Tokenizer(filters="", lower=False)# 프랑스어 tokenizer 객체 생성
# 만든 tokenizer 객체를 입력, 출력 시퀀스에 각각 적용
tokenizer_fr.fit_on_texts(sents_fr_in)
tokenizer_fr.fit_on_texts(sents_fr_out)
# 문장에 임베딩을 적용하여 저장
data_fr_in = tokenizer_fr.texts_to_sequences(sents_fr_in)
data_fr_out = tokenizer_fr.texts_to_sequences(sents_fr_out)
# 모든 시퀀스들의 길이를 맞추기 위한 패딩 추가(패딩이 시퀀스 앞에 붙음)
data_fr_in = tf.keras.preprocessing.sequence.pad_sequences(data_fr_in, padding="post")
data_fr_out = tf.keras.preprocessing.sequence.pad_sequences(data_fr_out, padding="post")

# 인덱스 길이=단어 종류수 저장
vocab_size_en = len(tokenizer_en.word_index)
vocab_size_fr = len(tokenizer_fr.word_index)

# word와 index간의 변환을 위한 딕셔너리 생성
word2idx_en = tokenizer_en.word_index
idx2word_en = {v:k for k, v in word2idx_en.items()}
word2idx_fr = tokenizer_fr.word_index
idx2word_fr = {v:k for k, v in word2idx_fr.items()}
print("vocab size (en): {:d}, vocab size (fr): {:d}".format(vocab_size_en, vocab_size_fr))

# 영어, 프랑스어 각 문장들의 최대 길이
maxlen_en = data_en.shape[1]
maxlen_fr = data_fr_out.shape[1]
print("seqlen (en): {:d}, (fr): {:d}".format(maxlen_en, maxlen_fr))

vocab size (en): 4285, vocab size (fr): 7474
seqlen (en): 7, (fr): 16


- 데이터를 tensor로 변환한 후 training set과 test set으로 분류

In [28]:
batch_size = 64 #minibatch 크기
dataset = tf.data.Dataset.from_tensor_slices((data_en, data_fr_in, data_fr_out)) #data들을 slice한 후 tensor로 변환
# 하나의 element가 (en, fr_in, fr_out)으로 구성
# en, fr_in, fr_out은 모두 영단어 임베딩(숫자 1개로 구성된 벡터)으로 구성된 시퀀스(문장)

dataset = dataset.shuffle(10000) # dataset을 무작위로 shuffle
test_size = num_sent_pairs // 4 # test set의 크기 지정 : 전체 데이터의 1/4
test_dataset = dataset.take(test_size).batch(batch_size, drop_remainder=True) # dataset에서 test_size만큼의 tensor들을 가져와 batch_size만큼의 minibatch로 분할
train_dataset = dataset.skip(test_size).batch(batch_size, drop_remainder=True) #  dataset에서 test_size만큼 skip하여(test_size 이후의) tensor들을 가져와 batch_size만큼의 minibatch로 분할

- 인코더, 디코더 클래스 작성

In [29]:
# vocab_size : 시퀀스의 단어 종류 수
# num_timesteps : 시퀀스(문장)의 길이
# embedding_dim : 임베딩 레이어의 차원(단어 1개에 배정되는 값의 개수)
# encoder_dim : GRU 레이어의 state 차원
class Encoder(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, num_timesteps, encoder_dim, **kwargs):
    super(Encoder, self).__init__(**kwargs)
    self.encoder_dim = encoder_dim
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim, input_length=num_timesteps) # 임베딩 레이어 : 주어진 값들의 시퀀스를 embedding_dim 차원의 벡터 시퀀스로 변환
    self.rnn = tf.keras.layers.GRU(encoder_dim, return_sequences=False, return_state=True) # GRU 레이어 : encoder_dim 개수의 노드를 가진 레이어
  def call(self, x, state): # 실제 RNN 계산
    x = self.embedding(x)
    x, state = self.rnn(x, initial_state=state)
    return x, state
  def init_state(self, batch_size): # 모든 param을 0으로 초기화
    return tf.zeros((batch_size, self.encoder_dim)) # batch_size * encoder_dim 크기의 0으로 초기화된 tensor 리턴

class Decoder(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, num_timesteps, decoder_dim, **kwargs):
    super(Decoder, self).__init__(**kwargs)
    self.decoder_dim = decoder_dim
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim, input_length=num_timesteps)
    self.rnn = tf.keras.layers.GRU(decoder_dim, return_sequences=True, return_state=True)
    self.dense = tf.keras.layers.Dense(vocab_size) # RNN 노드에서의 예측 단어 확률을 출력
  def call(self, x, state):
    x = self.embedding(x)
    x, state = self.rnn(x, state)
    x = self.dense(x) # 입력 x : RNN 노드의 출력(벡터) / 출력 x : RNN 노드의 출력 벡터를 일반 레이어에 넣어서 뽑아낸 각 단어들의 예측 확률
    return x, state

embedding_dim = 256
encoder_dim, decoder_dim = 1024, 1024
encoder = Encoder(vocab_size_en+1, embedding_dim, maxlen_en, encoder_dim) # 인코더 객체 생성
decoder = Decoder(vocab_size_fr+1, embedding_dim, maxlen_fr, decoder_dim) # 디코더 객체 생성



- 인코더, 디코더의 입력, 출력의 형태

In [30]:
for encoder_in, decoder_in, decoder_out in train_dataset:
  encoder_state = encoder.init_state(batch_size) # 인코더의 초기 state 생성
  encoder_out, encoder_state = encoder(encoder_in, encoder_state)
  decoder_state = encoder_state
  decoder_pred, decoder_state = decoder(decoder_in, decoder_state)
  break
print("encoder input :", encoder_in.shape)
print("encoder output :", encoder_out.shape,"state  :",encoder_state.shape)
print("decoder output(logits) :", decoder_pred.shape, "state :", decoder_state.shape)
print("decoder output(labels) :", decoder_out.shape)

encoder input : (64, 7)
encoder output : (64, 1024) state  : (64, 1024)
decoder output(logits) : (64, 16, 7475) state : (64, 1024)
decoder output(labels) : (64, 16)


- cost func 정의

In [31]:
def loss_fn(ytrue, ypred): # ytrue : 실제값 / ypred : 예측값
  scce = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) # 다중클래스 분류에서 사용되는 cost func 객체 생성/ from_logits=True : ypred가 softmax를 거치지 않은 logit이기 때문에 True로 설정
  mask = tf.math.logical_not(tf.math.equal(ytrue, 0)) # 패딩값을 무시하는 마스크를 생성
  mask = tf.cast(mask, dtype=tf.int64) # bool 타입인 마스크를 tf.int64 타입으로 변환 -> 손실 계산에서 가중치로 계산되어 마스크가 적용
  loss = scce(ytrue, ypred, sample_weight=mask) # cost func에 마스크를 적용하여 계산
  return loss

- training 함수 정의

In [32]:
@tf.function #tf의 그래프 실행을 활성화하는 decorator
def train_step(encoder_in, decoder_in, decoder_out, encoder_state):
  with tf.GradientTape() as tape: # 아래 block들에서의 미분을 자동으로 계산 및 저장
    decoder_state = encoder_state # decoder에 context vector 전달
    decoder_pred, decoder_state = decoder(decoder_in, decoder_state) # 디코더 실행
    loss = loss_fn(decoder_out, decoder_pred) # cost func 실행하여 loss 계산
  variables = (encoder.trainable_variables + decoder.trainable_variables) # encoder, decoder에서 훈련 가능한 param들을 리스트로 저장
  gradients = tape.gradient(loss, variables) # param들과 loss들로 gradient를 계산
  optimizer.apply_gradients(zip(gradients, variables)) # gradient를 이용하여 param들을 update
  return loss

- 실제 예측을 실행하는 함수 정의
 - dataset에서 무작위로 영어 문장을 샘플링
 - 참조를 위하여 대응되는 프랑스어 문장의 label도 표시

In [33]:
def predict(encoder, decoder, batch_size, sents_en, data_en, sents_fr_out, word2idx_fr, idx2word_fr):
  random_id = np.random.choice(len(sents_en)) # 무작위 영어 문장의 인덱스로 사용할 숫자 1개를 sampling
  print("input : ", " ".join(sents_en[random_id])) # 숫자를 index num으로 사용하는 영어 문장 출력
  print("label : ", " ".join(sents_fr_out[random_id])) # 해당 영어 문장에 대응되는 정답 프랑스어 문장 출력
  encoder_in = tf.expand_dims(data_en[random_id], axis=0) # 해당하는 data_en의 시퀀스를 batch dim을 추가하여 2D tensor로 변환 -> 형태 : [1, sequence_length]
  decoder_out = tf.expand_dims(sents_fr_out[random_id], axis=0) # 해당하는 정답 프랑스어 문장의 시퀀스를 위와 똑같이 처리 = 2D tensor로 변환
  encoder_state = encoder.init_state(1) # 인코더의 초기 상태 설정(문장 1개만 넣을 것이므로 batch_size=1)
  encoder_out, encoder_state = encoder(encoder_in, encoder_state) # 인코더 실행
  decoder_state = encoder_state # context vector 전달
  decoder_in = tf.expand_dims(tf.constant([word2idx_fr["BOS"]]), axis=0) # 디코더에 입력할 BOS를 tensor로 변환
  pred_sent_fr = [] # 디코더의 출력(=예측 프랑스어 시퀀스)를 저장할 리스트
  while True:
    decoder_pred, decoder_state = decoder(decoder_in, decoder_state) # 디코더 실행
    decoder_pred = tf.argmax(decoder_pred, axis=-1) # 위에서 얻은 출력을 argmax에 넣어 가장 확률이 높은 단어 하나 선택(단, index가 들어감)
    pred_word = idx2word_fr[decoder_pred.numpy()[0][0]] # 선택한 index에 해당하는 단어 저장
    pred_sent_fr.append(pred_word) # 예측한 단어를 리스트에 추가
    if pred_word == "EOS": # 디코더 종료 조건 : 결과값이 EOS
      break
    decoder_in = decoder_pred # t-1에서의 결과를 t에서의 입력으로 저장
  print("predicted : ", " ".join(pred_sent_fr)) # 예측 문장 출력

- 예측된 문장을 평가하는 함수 정의
 - 정답과 예측값을 BLEU 점수로 계산
 - 정답과 예측값에서 최대 4-gram에서의 유사도를 평가

In [34]:
def evaluate_bleu_score(encoder, decoder, test_dataset, word2idx_fr, idx2word_fr):
  bleu_scores = [] # 점수들을 저장할 리스트
  smooth_fn = SmoothingFunction() # bleu 점수 계산에 사용되는 smoothing func 선택
  for encoder_in, decoder_in, decoder_out in test_dataset: # test_dataset(시퀀스 30000개)만큼 반복
    encoder_state = encoder.init_state(batch_size) # 인코더 param 초기화(batch_size만큼)
    encoder_out, encoder_state = encoder(encoder_in, encoder_state) # 인코더 실행
    decoder_state = encoder_state # context vector 전달
    decoder_pred, decoder_state = decoder(decoder_in, decoder_state) # 디코더 실행 -> 각 단어들의 확률 분포가 pred에 저장
    decoder_out = decoder_out.numpy() # tensor로 되어있는 확률 분포를 numpy array로 변환
    decoder_pred = tf.argmax(decoder_pred, axis=-1).numpy() # 가장 확률이 높은 단어를 선택하여 numpy array로 변환
    for i in range(decoder_out.shape[0]): # batch_size(시퀀스 64개)만큼 반복
      ref_sent = [idx2word_fr[j] for j in decoder_out[i, :].tolist() if j > 0] # 정답 단어 인덱스값을 실제 단어로 변환하여 실제 문장을 리스트 형태로 저장(패딩 제외)
      hyp_sent = [idx2word_fr[j] for j in decoder_pred[i, :].tolist() if j > 0] # 예측값 단어 인덱스값을 실제 단어로 변환하여 실제 문장을 리스트 형태로 저장(패딩 제외)
      ref_sent = ref_sent[0:-1] # 마지막의 EOS 제거
      bleu_score = sentence_bleu([ref_sent], hyp_sent, smoothing_function=smooth_fn.method1) # blue 점수 계산
      bleu_scores.append(bleu_score) # minibatch 안의 시퀀스들의 점수를 리스트에 저장
  return np.mean(np.array(bleu_scores)) # 전체 test_dataset의 모든 시퀀스들의 점수들의 평균을 계산

- 훈련 및 예측, 평가 실행(main 함수)

In [35]:
optimizer = tf.keras.optimizers.Adam()
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(optimizer=optimizer, encoder=encoder, decoder=decoder)
num_epoch = 250
eval_scores= []

for e in range(num_epoch):
    encoder_state = encoder.init_state(batch_size)

    for batch, data in enumerate(train_dataset):
        encoder_in, decoder_in, decoder_out = data
        # print(encoder_in.shape, decoder_in.shape, decoder_out.shape)
        loss = train_step(
            encoder_in, decoder_in, decoder_out, encoder_state)

    print("Epoch: {}, Loss: {:.4f}".format(e + 1, loss.numpy()))

    if e % 10 == 0:
        checkpoint.save(file_prefix=checkpoint_prefix)

    predict(encoder, decoder, batch_size, sents_en, data_en,
        sents_fr_out, word2idx_fr, idx2word_fr)

    eval_score = evaluate_bleu_score(encoder, decoder, test_dataset, word2idx_fr, idx2word_fr)
    print("Eval Score (BLEU): {:.3e}".format(eval_score))
    # eval_scores.append(eval_score)

checkpoint.save(file_prefix=checkpoint_prefix)




Epoch: 1, Loss: 1.3132
input :  i like sauerkraut .
label :  j aime la choucroute . EOS
predicted :  je suis un peu . EOS
Eval Score (BLEU): 1.530e-02
Epoch: 2, Loss: 1.1809
input :  i m working .
label :  je suis en train de travailler . EOS
predicted :  je ne suis pas si . EOS
Eval Score (BLEU): 1.773e-02


KeyboardInterrupt: 