In [1]:
# google colab전용
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# exploration 6번째 과제</br>
@ 황한용(3기/쏘카)

## 라이브러리 선언

In [2]:
import glob
import os
import re
from typing import List, Tuple, Union

import tensorflow as tf
from pathlib import Path
from sklearn.model_selection import train_test_split

## 상수선언

In [3]:
DATA_PATH = "/content/drive/MyDrive/Colab Notebooks/data/lyrics/*" # 데이터 기본경로

TOKENIZER_NUM_WORDS = 12000 # tokenizer 사전단어의 갯수. 12000개의 단어를 사전으로 사용
MAX_WORD_OF_SENTENCE = 17 # <start>, <end> 를 포함한 한 문장 당 최대 단어의 갯수
BATCH_SIZE = 256 # 학습시의 batch사이즈
EMBEDDING_SIZE = 512 # 워드 벡터의 차원수
HIDDEN_SIZE = 1024 # hidden layer수
RUNNING_RATE = 0.005
BASE_SENTENCE_DICT = { # 문장의 기본 dictonary
    "start":"<start>"
    ,"end":"<end>"
    ,"unknown":"<unk>"
    ,"sep":" "
}
TRAIN_TEST_SPLIT_KWARGS = {
    "test_size":0.2, "random_state":2022
}
fit_kwargs = {
    "epochs":10 # epoch 횟수
    ,"validation_data": None # 추후 추가예정
    , "validation_freq":2 # 검증빈도
    , "shuffle" : True #epoch당 셔플을 할지의 여부
}

## 함수/class선언

In [4]:
class TextGenerator(tf.keras.Model):
    """
    문장 학습모델

    Attributes
    ----------
    None

    Methods
    -------
        __init__(self, vocab_size, embedding_size, hidden_size)
            사전의 단어수, 단어 임베이딩 수, hidden layer의 노드 수로
            모델을 구성한다.
        call(self, x)
            모델의 포워드 패스 과정을 구현한 함수
    """

    def __init__(self, vocab_size:int, embedding_size:int, hidden_size:int) -> tf.keras.Model:
        """
        생성자
        사전의 단어수, 단어 임베이딩 수, hidden layer의 노드 수로
        모델을 구성한다.

        Parameters
        ----------
        self : self
          self
        vocab_size : int
          사전의 단어수
        embedding_size: int
          단어 임베이딩 수
        hidden_size: int
          hidden layer의 노드 수

        Returns
        -------
        None
        """
        super().__init__()
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size) 
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True, dropout=0.3)  
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True, dropout=0.3)
        # self.rnn_3 = tf.keras.layers.LSTM(hidden_size, return_sequences=True, dropout=0.4)
        self.linear = tf.keras.layers.Dense(vocab_size)
        
    def call(self, x) -> Union[List, List[List]]:
        """
        모델의 포워드 패스 과정을 구현한 함수

        Parameters
        ----------
        self : self
            self
        x : array like
            모델에 연산을 수행할 tensor 데이터

        Returns
        -------
        out : array like or list of array like
            단일 혹은 리스트 형태의 tensor 데이터
        """
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.rnn_2(out)
        # out = self.rnn_3(out)
        out = self.linear(out)
        
        return out

def preprocess_sentence(sentence:str) -> str:
    """
    입력된 문장을 다음과 같은 전처리를 진행한다
    1. 소문자로 바꾸고, 양쪽 공백을 지움
    2. 특수문자 양쪽에 padding
    3. 여러개의 공백은 하나의 `BASE_SENTENCE_DICT["sep"]`으로 변환
    4. a-zA-Z?.!,¿가 아닌 모든 문자를 하나의 `BASE_SENTENCE_DICT["sep"]`으로 변환
    5. 2 ~ 4과정서 생긴 양쪽 공백을 지움
    6. `BASE_SENTENCE_DICT["sep"]` 구분자로
       문장 시작에는 `BASE_SENTENCE_DICT["start"]`,
       끝에는 `BASE_SENTENCE_DICT["end"]`를 추가
  
    Parameters
    ----------
    sentence : str
        전처리를 진행할 문장

    Returns
    -------
    sentence : str
        전처리가 완료된 문장
    """
    sentence = sentence.lower().strip() # 1
    sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence) # 2
    sentence = re.sub(r'[" "]+', BASE_SENTENCE_DICT["sep"], sentence) # 3
    sentence = re.sub(r"[^a-zA-Z?.!,¿]+", BASE_SENTENCE_DICT["sep"], sentence) # 4
    sentence = sentence.strip() # 5
    sentence = BASE_SENTENCE_DICT["sep"].join([BASE_SENTENCE_DICT["start"], sentence, BASE_SENTENCE_DICT["end"]]) # 6

    return sentence

def tokenize(corpus:List[List[str]], num_words:int, filters:str=BASE_SENTENCE_DICT["sep"]):
    """
    tensor flow`의 `Tokenizer`와 `pad_sequences`를 사용
    다음과 같은 전처리를 진행한다
        1. 다음과 같은 tokenize를 하는 tokenizer 생성
        - `num_words`갯수만큼의 단어를 사전화
            초과하는 단어에 대해서는 `BASE_SENTENCE_DICT["unknown"]`(unknown)취급
        - 단어의 구분은 `filters`의 규칙대로 생성
        2. 문자 데이터를 입력받아 tokenizer의 규칙대로 사전을 내부적으로 생성
        3. 사전을 기반으로 텍스트 안의 단어들을 숫자의 시퀀스 형태로 변환
        4. 문장 뒤에 padding을 붙여 입력 데이터의 시퀀스 길이를 일정하게 맞춤
  
    Parameters
    ----------
    sentence : str
        전처리를 진행할 문장
    num_words : int
    tokenizer 사전단어의 갯수
    filters : str, default = `BASE_SENTENCE_DICT["sep"]`
        단어를 구분지을 규칙

    Returns
    -------
    tensor : str
        숫자의 시퀀스 형태의 tensor 데이터
    tokenizer : str
        tokenizer
  """
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words=num_words
        , filters=filters
        , oov_token=BASE_SENTENCE_DICT["unknown"]
    ) # 1
    tokenizer.fit_on_texts(corpus) # 2
    tensor = tokenizer.texts_to_sequences(corpus) # 3
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post') # 4
    
    print(type(tensor))
    print(type(tokenizer))

    return tensor, tokenizer

def generate_text(model:TextGenerator, tokenizer, init_sentence:str=BASE_SENTENCE_DICT["start"], max_len:int=20):
    """
    모델에게 시작 문장을 전달하면 모델이 시작 문장을 바탕으로 작문을 진행
    다음과 같은 과정으로 문장을 생성한다
        1. `init_sentence`의 단어들을 숫자의 시퀀스의 형태로 변환 후 tensor 데이터로 변경
        2. 루프를 돌면서 다음의 문장 시행
            - 입력받은 문장의 tensor 데이터를 `model`에 입력
            - 예측된 값 중 가장 높은 확률인 word index를 탐색
            - 전 과정서 예측된 word index를 문장 뒤에 붙입니다
            - 모델이 문장의 끝을 예측했거나, `max_len`에 도달했다면 문장 생성을 종료
        3. `tokenizer`를 이용해 word index를 단어로 변환 

    Parameters
    ----------
    sentence : str
        전처리를 진행할 문장
    tokenizer : tokenizer
        tokenizer

    Returns
    -------
    str
        완성된 문장
    """

    # 1    
    test_input = tokenizer.texts_to_sequences([init_sentence])
    test_tensor = tf.convert_to_tensor(test_input, dtype=tf.int64)

    # 2
    end_token = tokenizer.word_index[BASE_SENTENCE_DICT["end"]]
    while True:
        predict = model(test_tensor)
        predict_word = tf.argmax(tf.nn.softmax(predict, axis=-1), axis=-1)[:, -1]
        test_tensor = tf.concat([test_tensor, tf.expand_dims(predict_word, axis=0)], axis=-1) 
        if predict_word.numpy()[0] == end_token: break
        if test_tensor.shape[1] >= max_len: break

    return BASE_SENTENCE_DICT["sep"].join([tokenizer.index_word[word_index] for word_index in test_tensor[0].numpy()]) # 3


모델클레스 설계시 기본 입력데이터의 0.3정도를 드랍함으로써 과학습을 방지하였다.</br>
실험결과 레이어를 늘려 학습시키는 것은 별반 도움이 되지 않았다. 

## 메인

In [5]:
raw_corpus = [] 
for txt_file in glob.glob(DATA_PATH):
    with open(txt_file, "r") as f:
        raw = f.read().splitlines()
        raw_corpus.extend(raw)

print("데이터 크기:", len(raw_corpus))
print("Examples:\n", raw_corpus[:3])

데이터 크기: 187088
Examples:
 ['I. LIFE.', '', '']


여러개의 txt 파일을 모두 읽어서 `raw_corpus` 에 문자열단위로 저장

In [6]:
corpus = []
for sentence in raw_corpus:
    
    # 공백을 제외하고 문자열 길이가 0일때
    if len(sentence.strip()) == 0: continue
    # 등장인물의 이름을 제외할때
    if sentence[-1] == ":": continue
    # 언어 전처리
    prep_sentence = preprocess_sentence(sentence)
    # new! <start>, <end>을 포함한 토큰의 길이가 17를 넘을 때
    if len(prep_sentence.split(" ")) > MAX_WORD_OF_SENTENCE: continue
    corpus.append(prep_sentence)

print("데이터 크기:", len(corpus))
print("Examples:\n", corpus[:3])

데이터 크기: 163462
Examples:
 ['<start> i . life . <end>', '<start> i . <end>', '<start> success . <end>']


raw_corpus list에 저장된 문장들을 순서대로 전처리하여 sentence에 저장
다음의 문장을 순서대로 제외한다.
 - 공백을 제외하고 문자열 길이가 0일때
 - 각본의 등장인물의 이름을 제외
 - `preprocess_sentence`함수로 전처리가 끝난 문장의 길이가 `<start>`, `<end>`를 포함하여 17이 넘을 때<br>
  ※ 15단어의 기준을 처음과 끝을 제외한 15단어라고 해석<br>
     총 길이가 17미만인 문장만 학습

그 후 총 학습할 문장의 수와 예시문장 출력

In [7]:
tensor, tokenizer = tokenize(corpus, TOKENIZER_NUM_WORDS)

src_input = tensor[:, :-1]
tgt_input = tensor[:, 1:]

vocab_size = tokenizer.num_words + 1

<class 'numpy.ndarray'>
<class 'keras_preprocessing.text.Tokenizer'>


`src_input(data)`: `<start>`를 제외한 문장을 학습데이터로 생성</br>
`tgt_input(target)`: 마지막 토큰을 제외한 문장을 타겟데이터로 생성</br>

In [8]:
enc_train, enc_val, dec_train, dec_val = train_test_split(src_input, tgt_input, **TRAIN_TEST_SPLIT_KWARGS)
dataset = tf.data.Dataset.from_tensor_slices((enc_train, dec_train)).shuffle(len(src_input)).batch(BATCH_SIZE, drop_remainder=True)
fit_kwargs["validation_data"] = (enc_val, dec_val)

학습과 검증은 각각 2:8비율로 나누었으며,</br>
완벽한 shuffle을 위해 전체 학습문장의 수만큼을 버퍼사이즈로 설정하고 섞었다.</br> 
한번당 학습할 횟수(`BATCH_SIZE`)를 256으로 설정하였다.</br>

In [9]:
model = TextGenerator(tokenizer.num_words + 1, EMBEDDING_SIZE , HIDDEN_SIZE)

model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, # 모델의 계산식에 nomalize하는 부분이 없으므로 `True`로 설정
        reduction=tf.keras.losses.Reduction.NONE  # 출력해서 나오는 값을 모두 원하므로 `reduction`은 설정하지 않음
    )
    , optimizer=tf.keras.optimizers.Adam(
        learning_rate=RUNNING_RATE # running rate
    ) # optimizer로 Adam방식. NPL일시 Adam을 사용하는게 일반적이다.
)
model.fit(
    dataset
    , **fit_kwargs
)
model.summary()

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Model: "text_generator"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       multiple                  6144512   
                                                                 
 lstm (LSTM)                 multiple                  6295552   
                                                                 
 lstm_1 (LSTM)               multiple                  8392704   
                                                                 
 dense (Dense)               multiple                  12301025  
                                                                 
Total params: 33,133,793
Trainable params: 33,133,793
Non-trainable params: 0
_________________________________________________________________


`vocab_size`는 전체 `단어 사이즈 + 공백`으로 설정</br>
`optimzier`는 `NPL`에서 가장많이 사용하는 방식이 `Adam` 이므로 Adam으로 설정하였다.</br>
학습시 `epoch`당 단어를 셔플함으로써 학습에 오버피팅을 방지하였다.</br>
`running rate`는 0.005로 설정하여 오히려 학습률을 올렸다.(기본은 0.001)</br>

In [10]:
generate_text(model, tokenizer, init_sentence="<start> i love", max_len=20)

'<start> i love you so much <end>'

학습한 데이터를 출력한 결과 해석하는데 지장이 없는 문장이 완성되었다.</br>
대체로 학습률이 낮을수록 문장이 해석이 안되는 현상이 보였지만 여기에서는 따로 언급을 안하겠다.</br>

## 회고

1. `running rate`, `batch size`, `dropout` 등 많은 부분에서</br>
 학습에 관해 조정이 가능한 부분이 있었지만</br>
`epoch`이 10으로 제한되어있어 많은 어려움이 있었다.
2. layer의 `dropout`, 학습의 `shuffle`이 오버피팅을 방지하는데 큰 도움이 되었다.
3. `epoch`이 10인 관계로 validation loss가 2.2이상 2.3이하로 측정되는 케이스가 많았다.</br> 이는 학습률을 올리면서 깔끔하게 해결되었다.