# 함수로 코드짜기

In [1]:
# 라이브러리
import os
import re
import glob
import numpy as np

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.callbacks import LearningRateScheduler
from sklearn.model_selection import train_test_split

In [2]:
# seed 고정 함수
def set_seed(seed):
    tf.random.set_seed(seed)
    np.random.seed(seed)

In [3]:
# os.getenv('HOME')+'/aiffel//lyricist/data/lyrics/*'
# 데이터 불러오기 함수
def data_load(file_path):
    """
    glob으로 받을 것이기 때문에 끝단에 디렉토리/* 로 설정
    """
    
    # 파일읠 경로를 받아서 glob으로 불러오기
    txt_file_path = file_path
    txt_list = glob.glob(txt_file_path)

    # 코퍼스를 저장할 리스트 생성
    raw_corpus = []

    # 여러개의 txt 파일을 모두 읽어서 raw_corpus 에 담습니다.
    # txt_list에는 text파일들의 개별 경로가 저장되어 있음
    for txt_file in txt_list:
        # 하나 씩 받아서 오픈
        with open(txt_file, "r") as f:
            # 텍스트 파일 전체를 읽어서 라인별로 저장
            raw = f.read().splitlines()
            # 코퍼스 리스트에 저장
            raw_corpus.extend(raw)

    print("데이터 크기:", len(raw_corpus))
    print("Examples:\n", raw_corpus[:3])
    return raw_corpus

In [4]:
# 데이터 전처리 함수
# raw_corpus를 받아서 전처리된 corpus를 내보내줌
def corpus_preprocessing(raw_corpus):
    '''
    raw_corpus를 넣어주면 전처리가 되어서 나온다
    '''
    # corpus 안에 문장을 전처리하는 함수
    def preprocess_sentence(sentence):
        sentence = sentence.lower().strip()       # 소문자로 바꾸고 양쪽 공백을 삭제
        
        # 아래 3단계를 거쳐 sentence는 스페이스 1개를 delimeter로 하는 소문자 단어 시퀀스로 바뀝니다.
        sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence)        # 패턴의 특수문자를 만나면 특수문자 양쪽에 공백을 추가
        sentence = re.sub(r'[" "]+', " ", sentence)                  # 공백 패턴을 만나면 스페이스 1개로 치환
        sentence = re.sub(r"[^a-zA-Z?.!,¿]+", " ", sentence)  # a-zA-Z?.!,¿ 패턴을 제외한 모든 문자(공백문자까지도)를 스페이스 1개로 치환

        sentence = sentence.strip() # 양 끝단 공백 삭제

        sentence = '<start> ' + sentence + ' <end>'      # 이전 스텝에서 본 것처럼 문장 앞뒤로 <start>와 <end>를 단어처럼 붙여 줍니다

        return sentence
    # 전처리 완료된 코퍼스를 받을 리스트 생성
    preprocessed_corpus = []
    # 코퍼스에서 문장을 하나씩 받아서
    for sentence in raw_corpus:
        # 문장이 없거나
        if len(sentence) == 0: 
            continue
        # 마지막이 : 로 끝나면 반복문을 넘어감
        if sentence[-1] == ":": 
            continue
        # 문장을 전처리함수에 넣고 전처리된 corpus에 추가
        preprocessed_corpus.append(preprocess_sentence(sentence))
    
    # 전처리 끝난 코퍼스 리턴
    return preprocessed_corpus

In [5]:
# 토크나이징 함수
def tokenize(corpus):
    # 텐서플로우에서 제공하는 Tokenizer 패키지를 생성
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words=20000,  # 전체 단어의 개수 
        filters=' ',    # 별도로 전처리 로직을 추가할 수 있습니다. 이번에는 사용하지 않겠습니다.
        oov_token="<unk>"  # out-of-vocabulary, 사전에 없었던 단어는 어떤 토큰으로 대체할지
    )
    tokenizer.fit_on_texts(corpus)   # 우리가 구축한 corpus로부터 Tokenizer가 사전을 자동구축하게 됩니다.

    # 이후 tokenizer를 활용하여 모델에 입력할 데이터셋을 구축하게 됩니다.
    tensor = tokenizer.texts_to_sequences(corpus)   # tokenizer는 구축한 사전으로부터 corpus를 해석해 Tensor로 변환합니다.

    # 입력 데이터의 시퀀스 길이를 일정하게 맞추기 위한 padding  메소드를 제공합니다.
    # maxlen의 디폴트값은 None입니다. 이 경우 corpus의 가장 긴 문장을 기준으로 시퀀스 길이가 맞춰집니다.
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post', maxlen=14)  

    print(tensor,'\n',tokenizer)
    return tensor, tokenizer

In [6]:
# 데이터셋 분리 함수
def split_dataset(data, test_size):
    """
    학습을 위한 데이터셋 분리
    """
    # source와 target을 분리
    src_input = data[:, :-1]
    tgt_input = data[:, 1:]
    
    # sklearn train_test_split 함수를 써서 분리 (X_train, X_test, Y_train, Y_test)
    enc_train, enc_val, dec_train, dec_val = train_test_split(src_input, tgt_input, test_size=test_size, random_state=1234)
    
    # shape확인
    print(enc_train.shape, enc_val.shape, dec_train.shape, dec_val.shape)
    
    return enc_train, enc_val, dec_train, dec_val

In [7]:
# 하이퍼파라미터 셋팅 함수
def hyper_params(BUFFER_SIZE, BATCH_SIZE, steps_per_epoch, VOCAB_SIZE, embedding_size, hidden_size, learning_rate, patience):
    '''
    0 : BUFFER_SIZE,
    1 : BATCH_SIZE,
    2 : steps_per_epoch,
    3 : VOCAB_SIZE,
    4 : embedding_size,
    5 : hidden_size,
    6 : learning_rate,
    7 : patience
    '''
    # 하이퍼파라미터 셋팅
    BUFFER_SIZE = BUFFER_SIZE
    BATCH_SIZE = BATCH_SIZE
    steps_per_epoch = steps_per_epoch
    VOCAB_SIZE = VOCAB_SIZE
    embedding_size = embedding_size
    hidden_size = hidden_size
    
    return [BUFFER_SIZE, BATCH_SIZE, steps_per_epoch, VOCAB_SIZE, embedding_size, hidden_size, learning_rate, patience]

In [8]:
# tf data 만들기 함수
def make_tfdata(train, val, BUFFER_SIZE, BATCH_SIZE):
    '''
    train : (enc_train, dec_train)
    val : (enc_val, dec_val)
    '''
    # train set
    # from_tensor_slices로 tfdata를 만들고 shuffle로 섞고 batch로 batch_size 만큼 분리
    train_dataset = tf.data.Dataset.from_tensor_slices((train[0], train[1])).shuffle(BUFFER_SIZE)
    train_dataset = train_dataset.batch(BATCH_SIZE, drop_remainder=True)

    # validation set
    val_dataset = tf.data.Dataset.from_tensor_slices((val[0], val[1])).shuffle(BUFFER_SIZE)
    val_dataset = val_dataset.batch(BATCH_SIZE, drop_remainder=True)
    
    return train_dataset, val_dataset

In [9]:
# 모델
class TextGenerator(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super(TextGenerator, self).__init__()
        # 필요한 레이어를 원하는 만큼 만든다
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size)
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True, recurrent_initializer='glorot_uniform')
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True, recurrent_initializer='glorot_uniform')
        self.linear = tf.keras.layers.Dense(vocab_size)
        self.batchnorm_1 = tf.keras.layers.BatchNormalization()
        self.batchnorm_2 = tf.keras.layers.BatchNormalization()
        self.dropout = tf.keras.layers.Dropout(0.2)
        
    def call(self, x):
        # 그리고 쌓는다
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.batchnorm_1(out)
        out = self.rnn_2(out)
        out = self.batchnorm_2(out)
        out = self.dropout(out)
        out = self.linear(out)
        
        return out

In [10]:
# 에포크마다 학습률을 약 10%씩 감소시켜보자
def simple_learning_rate_decay(epoch, lr):
    return lr * np.exp(-0.1)

In [11]:
def train(train_dataset, val_dataset, hyper_params):
    # 콜백함수 등 선언
    # 러닝레이트 스케쥴러
    lr_scheduler = LearningRateScheduler(simple_learning_rate_decay)
    # 옵티마이저
    optimizer = tf.keras.optimizers.Adam(learning_rate=hyper_params[6])
    # 로스
    loss = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction='none')
    # 얼리스탑
    early_stopping = keras.callbacks.EarlyStopping(monitor="val_loss", patience=hyper_params[7])
    
    model = TextGenerator(hyper_params[3], hyper_params[4] , hyper_params[5])
    model.compile(loss=loss, optimizer=optimizer)
    model.fit(train_dataset, epochs=30, validation_data=val_dataset, callbacks=[lr_scheduler, early_stopping])
    return model

In [12]:
# text생성 테스트
def generate_text(model, tokenizer, init_sentence="<start>", max_len=20):
    # 테스트를 위해서 입력받은 init_sentence도 일단 텐서로 변환합니다.
    test_input = tokenizer.texts_to_sequences([init_sentence])
    test_tensor = tf.convert_to_tensor(test_input, dtype=tf.int64)
    end_token = tokenizer.word_index["<end>"]

    # 텍스트를 실제로 생성할때는 루프를 돌면서 단어 하나씩 생성해야 합니다. 
    while True:
        predict = model(test_tensor)  # 입력받은 문장의 텐서를 입력합니다. 
        predict_word = tf.argmax(tf.nn.softmax(predict, axis=-1), axis=-1)[:, -1]   # 우리 모델이 예측한 마지막 단어가 바로 새롭게 생성한 단어가 됩니다. 

        # 우리 모델이 새롭게 예측한 단어를 입력 문장의 뒤에 붙여 줍니다. 
        test_tensor = tf.concat([test_tensor, tf.expand_dims(predict_word, axis=0)], axis=-1)

        # 우리 모델이 <END>를 예측했거나, max_len에 도달하지 않았다면  while 루프를 또 돌면서 다음 단어를 예측해야 합니다.
        if predict_word.numpy()[0] == end_token: break
        if test_tensor.shape[1] >= max_len: break

    generated = ""
    # 생성된 tensor 안에 있는 word index를 tokenizer.index_word 사전을 통해 실제 단어로 하나씩 변환합니다. 
    for word_index in test_tensor[0].numpy():
        generated += tokenizer.index_word[word_index] + " "

    return generated   # 이것이 최종적으로 모델이 생성한 자연어 문장입니다.

In [13]:
# 데이터 불러오기부터 학습까지 통째로 함수로 만들기
def main():
    print("작사가 인공지능 만들기 시작")
    data_path = os.getenv('HOME')+'/aiffel//lyricist/data/lyrics/*'
    # 코퍼스 만들기
    raw_corpus = data_load(data_path)
    # 코퍼스 전처리
    preprocessed_corpus = corpus_preprocessing(raw_corpus)
    # data와 tokenizer 분리
    data, tokenizer = tokenize(preprocessed_corpus)
    # dataset분리
    enc_train, enc_val, dec_train, dec_val = split_dataset(data, 0.2)
    # 하이퍼파라미터 셋팅 shift + tab으로 인자보면서 넣어줌
    params = hyper_params(len(enc_train), 256, len(enc_train) // 256, tokenizer.num_words + 1, 512, 1024, 0.001, 7)
    # tf data만들기
    train_dataset, val_dataset = make_tfdata((enc_train, dec_train), (enc_val, dec_val), params[0], params[1])
    # model 학습
    print("... 모델 학습중")
    train_model = train(train_dataset,val_dataset, params)
    print("... 학습 끝")
    return train_model, tokenizer

In [14]:
set_seed(1234)
# 빠르게 오버피팅되는데 TextGenerator에서 모델 구조와 hyper_params 함수를 수정하고
# main() 을 실행하면 쉽게 재실험을 해볼 수 있음
train_model, tokenizer = main()

작사가 인공지능 만들기 시작
데이터 크기: 187088
Examples:
 ['Looking for some education', 'Made my way into the night', 'All that bullshit conversation']
[[   2  304   28 ...    0    0    0]
 [   2  221   13 ...    0    0    0]
 [   2   24   17 ...    0    0    0]
 ...
 [   2   48   16 ...    0    0    0]
 [   9 2883   14 ...  264   19    3]
 [   2    6  179 ...    0    0    0]] 
 <keras_preprocessing.text.Tokenizer object at 0x7f5ea261d390>
(140599, 13) (35150, 13) (140599, 13) (35150, 13)
... 모델 학습중
Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
... 학습 끝


<__main__.TextGenerator at 0x7f5ea35cec10>

In [None]:
# 테스트
generate_text(train_model, tokenizer, init_sentence="<start> he")