In [1]:
import glob
import os
import re
import tensorflow as tf
from sklearn.model_selection import train_test_split

In [2]:
# 문장 전처리
# 1. 소문자로 변환, 양쪽 공백을 제거
# 2. 특수문자 양쪽에 공백 추가
# 3. 여러개의 공백은 하나의 공백으로 변환
# 4. a-zA-Z?.!,¿가 아닌 모든 문자를 하나의 공백으로 변환
# 5. 양쪽 공백 제거
# 6. 문장 시작에는 <start>, 끝에는 <end>를 추가
def preprocess_sentence(sentence):
    sentence = sentence.lower().strip() # 1
    sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence) # 2
    sentence = re.sub(r'[" "]+', " ", sentence) # 3
    sentence = re.sub(r"[^a-zA-Z?.!,¿]+", " ", sentence) # 4
    sentence = sentence.strip() # 5
    sentence = '<start> ' + sentence + ' <end>' # 6
    return sentence

In [3]:
# 문장 토큰화
def tokenize(corpus):
    # 12000단어의 tokenizer 제작
    # 12000단어에 포함되지 못한 단어는 '<unk>'로 변환
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words = 17000, 
        filters = ' ',
        oov_token = "<unk>"
    )
    tokenizer.fit_on_texts(corpus)
    tensor = tokenizer.texts_to_sequences(corpus)   
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')    
    return tensor, tokenizer

In [4]:
# 모델 정의
class TextGenerator(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super().__init__()
        
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size)
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.linear = tf.keras.layers.Dense(vocab_size)
        
    def call(self, x):
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.rnn_2(out)
        out = self.linear(out)
        
        return out

In [5]:
# 문장 생성
def generate_text(model, tokenizer, init_sentence="<start>", max_len=20):
    test_input = tokenizer.texts_to_sequences([init_sentence])
    test_tensor = tf.convert_to_tensor(test_input, dtype=tf.int64)
    end_token = tokenizer.word_index["<end>"]

    # 1. 입력받은 문장의 텐서를 입력
    # 2. 예측된 값 중 가장 높은 확률인 word index를 추출
    # 3. 2에서 예측된 word index를 문장 뒤에 추가
    # 4. 모델이 <end>를 예측했거나, max_len에 도달했다면 문장 생성 종료
    while True:
        # 1
        predict = model(test_tensor) 
        # 2
        predict_word = tf.argmax(tf.nn.softmax(predict, axis=-1), axis=-1)[:, -1] 
        # 3 
        test_tensor = tf.concat([test_tensor, tf.expand_dims(predict_word, axis=0)], axis=-1)
        # 4
        if predict_word.numpy()[0] == end_token: break
        if test_tensor.shape[1] >= max_len: break

    generated = ""
    # tokenizer를 이용해 word index를 단어로 하나씩 변환
    for word_index in test_tensor[0].numpy():
        generated += tokenizer.index_word[word_index] + " "

    return generated

In [6]:
# file load
txt_file_path = os.getenv('HOME')+'/aiffel/Exploration/4_LyricsMaking/lyricist/data/lyrics/*'
txt_list = glob.glob(txt_file_path)

In [7]:
# raw_corpus에 txt 파일 저장
raw_corpus = []
for txt_file in txt_list:
    with open(txt_file, "r") as f:
        raw = f.read().splitlines()
        raw_corpus.extend(raw)

# print("데이터 크기:", len(raw_corpus))
# print("Examples:\n", raw_corpus[:3])

In [8]:
# 문장 전처리
corpus = []
for sentence in raw_corpus:
    if len(sentence)==0:
        continue
    # 토큰 갯수가 15개 초과되면 제외
    if len(set(sentence.split(' '))) > 15:
        continue
    preprocessed_sentence = preprocess_sentence(sentence)
    corpus.append(preprocessed_sentence)
        
# print(corpus[:10])
# print(len(corpus))

# 문장 토큰화
tensor, tokenizer = tokenize(corpus)
# print(tensor,tokenizer)
# print(tensor[:3, :10])
# for idx in tokenizer.index_word:
#     print(idx, ":", tokenizer.index_word[idx])
#     if idx >= 10: break

In [9]:
# input 생성
src_input = tensor[:, :-1]
tgt_input = tensor[:, 1:]    
# print(src_input[0])
# print(tgt_input[0])

In [10]:
# 데이터셋 객체 생성
BUFFER_SIZE = len(src_input)
BATCH_SIZE = 256
steps_per_epoch = BUFFER_SIZE // BATCH_SIZE
VOCAB_SIZE = tokenizer.num_words + 1 # tokenizer가 구축한 단어사전 12000개와, 여기 포함되지 않은 0:<pad>를 포함하여 12001개

dataset = tf.data.Dataset.from_tensor_slices((src_input, tgt_input))
dataset = dataset.shuffle(BUFFER_SIZE)
# dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
# dataset

In [11]:
# train set과 validation set 나누기
embedding_size = 256
hidden_size = 1024
model = TextGenerator(VOCAB_SIZE, embedding_size , hidden_size)
train_size = int(0.8 * len(dataset))
train_dataset = dataset.take(train_size).batch(BATCH_SIZE, drop_remainder=True)
val_dataset = dataset.skip(train_size).batch(BATCH_SIZE, drop_remainder=True)

In [13]:
# 학습
optimizer = tf.keras.optimizers.Adam()
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
model.compile(loss=loss, optimizer=optimizer)
model.fit(train_dataset, epochs=8, validation_data=val_dataset)
model.summary()

Epoch 1/8
Epoch 2/8
Epoch 3/8
Epoch 4/8
Epoch 5/8
Epoch 6/8
Epoch 7/8
Epoch 8/8
Model: "text_generator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        multiple                  4352256   
_________________________________________________________________
lstm (LSTM)                  multiple                  5246976   
_________________________________________________________________
lstm_1 (LSTM)                multiple                  8392704   
_________________________________________________________________
dense (Dense)                multiple                  17426025  
Total params: 35,417,961
Trainable params: 35,417,961
Non-trainable params: 0
_________________________________________________________________


In [16]:
# 문장 생성
generate_text(model, tokenizer, init_sentence="<start> i love", max_len=20)

'<start> i love you so much , i love you so much , i love you so much , i '

# 회고
이번 프로젝트에서 어려웠던 점<br/>
우선 학습 시간이 너무 길어서 다양한 조작을 시도해보기 어려웠고, tf.data.Dataset을 처음 접해봐서 dataset을 편집하는데 많은 시간이 걸렸다.

프로젝트를 진행하면서 알아낸 점 혹은 아직 모호한 점<br/>
LSTM이 어떤 구조로 학습을 진행하는지 아직 이해를 못했다.

자기 다짐<br/>
LSTM의 내부 구조에 대해 좀 더 알아봐야겠다