# [ NEW ] 
# 작사가 모델 만들기
## Step 1. 데이터 읽어오기

In [1]:
import os, glob, re
import numpy as np
import tensorflow as tf

txt_file_path = os.getenv("HOME") + "/aiffel/lyricist/data/lyrics/*"
txt_list = glob.glob(txt_file_path)

# 원본 데이터를 raw_corpus 안에 담습니다
raw_corpus = []

for txt_file in txt_list:
    with open(txt_file, 'r') as f:
        raw = f.read().splitlines()
        raw_corpus.extend(raw)
        
print("데이터크기 : ", len(raw_corpus))
print("Examples:\n", raw_corpus[:3])

데이터크기 :  187088
Examples:
 ['[Hook]', "I've been down so long, it look like up to me", 'They look up to me']


* 샘플 데이터 10줄 정도 읽어오기

In [2]:
for idx, sentence in enumerate(raw_corpus):
    if len(sentence) == 0: continue
    if sentence[-1] == ':': continue
        
    if idx > 9: break
    
    print(sentence)

[Hook]
I've been down so long, it look like up to me
They look up to me
I got fake people showin' fake love to me
Straight up to my face, straight up to my face
I've been down so long, it look like up to me
They look up to me
I got fake people showin' fake love to me
Straight up to my face, straight up to my face [Verse 1]
Somethin' ain't right when we talkin'


## Step 2. 데이터 정제
1. 처리하기 까다로운 특수문자, 대문자 등의 정보를 제거함
2. 문장 앞,뒤에 시작-종료 토큰을 추가함
3. 효율적인 학습을 위해 지나치게 긴 문장을 제거 -> 토큰 개수 15개 미만 문장만 사용

In [3]:
def preprocess_sentence(sentence):
    sentence = sentence.lower().strip()
    sentence = re.sub(r"([?.!,¿])", r"\1", sentence)
    sentence = re.sub(r'[""]+', "", sentence)
    sentence = re.sub(r"[^a-zA-Z?.!,¿]+"," ",sentence)
    sentence = sentence.strip()
    sentence = '<start> ' + sentence + ' <end>'
    return sentence

print(preprocess_sentence("This @_is ;;; sample   setnence"))

<start> this is sample setnence <end>


In [4]:
print(raw_corpus[0].split())
print(len(raw_corpus[1].split()))

['[Hook]']
11


In [5]:
corpus = []

for sentence in raw_corpus:
    if len(sentence) == 0: continue
    if len(sentence.split()) >= 15: continue
    if sentence[-1] == ":": continue
    if sentence == "[Hook]": continue
  
    preprocessed_sentence = preprocess_sentence(sentence)
    corpus.append(preprocessed_sentence)
    
print(corpus[:10])
print(len(corpus))

['<start> i ve been down so long, it look like up to me <end>', '<start> they look up to me <end>', '<start> i got fake people showin fake love to me <end>', '<start> straight up to my face, straight up to my face <end>', '<start> i ve been down so long, it look like up to me <end>', '<start> they look up to me <end>', '<start> i got fake people showin fake love to me <end>', '<start> straight up to my face, straight up to my face verse <end>', '<start> somethin ain t right when we talkin <end>', '<start> somethin ain t right when we talkin <end>']
166091


In [6]:
# count = 0
# for idx, sent in enumerate(raw_corpus):
#     if len(sent.split()) >= 15:
# #         print(idx,':',sent)
# #         print(idx, ":", len(sent.split()))
#         count += 1
#         if count > 50: break
# print(count)

In [7]:
def tokenize(corpus):
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
    num_words=12000,
    filters='',
    oov_token='<unk>'
    )
    
    tokenizer.fit_on_texts(corpus)
    tensor = tokenizer.texts_to_sequences(corpus)
    
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')
    
    print(tensor.shape,tokenizer)
    return tensor, tokenizer

tensor, tokenizer = tokenize(corpus)


(166091, 30) <keras_preprocessing.text.Tokenizer object at 0x7fde5737f5d0>


In [8]:
for idx in tokenizer.index_word:
    print(idx, ":", tokenizer.index_word[idx])
    
    if idx >= 10: break
    

1 : <unk>
2 : <start>
3 : <end>
4 : i
5 : the
6 : you
7 : and
8 : a
9 : to
10 : it


## Step 3. 평가 데이터셋 분리
* 단어장 크기 12000 개 이상
* 총 데이터 20 % 사용

In [9]:
from sklearn.model_selection import train_test_split

src_input = tensor[:, :-1]

tgt_input = tensor[:, 1:]

#print(src_input[0])
#print(tgt_input[0])

enc_train, enc_val, dec_train, dec_val = train_test_split(src_input,
                                                          tgt_input, 
                                                          test_size=0.2,
                                                          random_state=42
                                                         )
#print(enc_train[0])
#print(dec_train[0])
print("Source Train:", enc_train.shape)
print("Target Train", dec_train.shape)

Source Train: (132872, 29)
Target Train (132872, 29)


## Step 4. 모델 빌드 & 학습하기
1. Hyperparameter : `Embedding Size`, `Hidden Size`
2. 10 epochs 안에 val_loss 2.2 미만 수준으로 학습

In [10]:
BUFFER_SIZE = len(enc_train)
BATCH_SIZE = 256
steps_per_epoch = len(enc_train) // BATCH_SIZE

VOCAB_SIZE = tokenizer.num_words + 1

dataset = tf.data.Dataset.from_tensor_slices((enc_train, dec_train))
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
dataset

<BatchDataset shapes: ((256, 29), (256, 29)), types: (tf.int32, tf.int32)>

In [11]:
class TextGenerator(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super().__init__()
        
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size)
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.linear = tf.keras.layers.Dense(vocab_size)
        
    def call(self, x):
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.rnn_2(out)
        out = self.linear(out)
        
        return out
    
embedding_size = 19
hidden_size = 1024
lyricist = TextGenerator(tokenizer.num_words + 1, embedding_size, hidden_size)

In [12]:
optimizer = tf.keras.optimizers.Adam()
loss = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, 
    reduction='none'
)

lyricist.compile(loss=loss, optimizer=optimizer)

# 학습 중 loss 값 비교를 위해 history 객체로 정보 저장
# validation 추가
history = lyricist.fit(dataset, 
                       epochs=10,
                       batch_size=256,
                       validation_data=(enc_val, dec_val),
                       verbose=1
                      )

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [13]:
import matplotlib.pyplot as plt

plot_curve(history.epoch, history.history, ['loss', 'val_loss'])

NameError: name 'plot_curve' is not defined

In [16]:
def generate_text(model, tokenizer, init_sentence='<start>', max_len=20):
    test_input = tokenizer.texts_to_sequences([init_sentence])
    test_tensor = tf.convert_to_tensor(test_input, dtype=tf.int64)
    end_token = tokenizer.word_index['<end>']
    
    while True:
        predict = model(test_tensor)
        predict_word = tf.argmax(tf.nn.softmax(predict, axis=-1), axis=-1)[:, -1]
        test_tensor = tf.concat([test_tensor, tf.expand_dims(predict_word, axis=0)], axis=-1)
        if predict_word.numpy()[0] == end_token: break
        if test_tensor.shape[1] >= max_len: break
            
    generated = ''
    for word_index in test_tensor[0].numpy():
        generated += tokenizer.index_word[word_index] + " "
        
    return generated

In [18]:
generate_text(lyricist, tokenizer, init_sentence="<start> i need", max_len=15)

'<start> i need to sunroof top <end> '

## 회고
* 학습이 잘 됐다고 판단하는 근거가 필요함 -> 그리드 탐색 후 결과물 비교
* 