# lyricist

## Read data

In [1]:
import glob, os, re
import tensorflow as tf
from sklearn.model_selection import train_test_split

#################
### Read data ###
#################
txt_file_path = r"/content/drive/MyDrive/AIFFEL/Exploration/lyricist/data/lyrics/*"

txt_list = glob.glob(txt_file_path)

raw_corpus = []

# 여러개의 txt 파일을 모두 읽어서 raw_corpus 에 담습니다.
for txt_file in txt_list:
    with open(txt_file, "r") as f: # 읽기전용으로 file을 불러온다. file as f
        raw = f.read().splitlines() # file을 한줄씩 읽어오는데 .splitlines()로 종료문자 \n을 포함하지 않음.
        raw_corpus.extend(raw)

print("데이터 크기:", len(raw_corpus))
print("Examples:\n", raw_corpus[:3]) # 0~3 인덱스까지

데이터 크기: 187088
Examples:
 ['Well, summer slipped us underneath her tongue', 'Our days and nights are perfumed with obsession', 'Half of my wardrobe is on your bedroom floor']


## Data cleansing

In [2]:
################################################
### Data cleansing using regular expressions ###
################################################
def preprocess_sentence(sentence):
    sentence = sentence.lower().strip() # 소문자로 바꾸고(.lower()), 양쪽 공백 제거(.strip())
    sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence) # 특수문자 양쪽에 공백 삽입
    sentence = re.sub(r'[" "]+', " ", sentence) # 여러공백은 하나의 공백으로
    sentence = re.sub(r"[^a-zA-Z?.!,¿]+", " ", sentence) # a-zA-Z?.!,¿ 가 아닌 모든문자를 하나의 공백으로 바꾼다
    sentence = sentence.strip() # 다시 양쪽공백지움
    sentence = '<start> ' + sentence + ' <end>' # 문장 시작에는 <start>추가, 끝에는 <end>추가
    return sentence

print(preprocess_sentence("This @_is ;;;sample        sentence."))

corpus = []   #빈 리스트 생성

for sentence in raw_corpus:   #raw_corpus: 한 줄 단위로 저장된 배열
    if len(sentence) == 0: continue   #한 글자도 없으면 continue
    if len(sentence.split()) > 15: continue   # 토큰의 개수가 15개를 넘어가는 문장 제외
    if sentence[-1] == ":": continue   #마지막 글자가 ":"이면 continue
    
    preprocessed_sentence = preprocess_sentence(sentence)   #정규식에 따라 문자열 변환
    corpus.append(preprocessed_sentence)   #리스트에 추가하기
        
print(corpus[:10])   #0 ~ 9의 요소

<start> this is sample sentence . <end>
['<start> well , summer slipped us underneath her tongue <end>', '<start> our days and nights are perfumed with obsession <end>', '<start> half of my wardrobe is on your bedroom floor <end>', '<start> use our eyes , throw our hands overboard i am your sweetheart psychopathic crush <end>', '<start> drink up your movements , still i can t get enough <end>', '<start> i overthink your p punctuation use <end>', '<start> not my fault , just a thing that my mind do a rush at the beginning <end>', '<start> i get caught up , just for a minute <end>', '<start> but lover , you re the one to blame , all that you re doing <end>', '<start> can you hear the violence ? <end>']


## Tokenize

In [3]:
################
### Tokenize ###
################
def tokenize(corpus):
    # 토큰화 시 텐서플로우의 Tokenizer와 pad_sequences 사용
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words=12000, # 12000단어를 기억할 수 있는 tokenizer를 만들기 (각 단어에 인덱스를 부여)
        filters=' ', # filter == empty, (default값: '!"#$%&()*+,-./:;<=>?@[\]^_`{|}~\t\n')
        oov_token="<unk>" # 12000단어에 포함되지 못하면 <unk>로 변환
    )
    tokenizer.fit_on_texts(corpus) # 문자데이터를 입력받아서 리스트의 형태로 변환
    tensor = tokenizer.texts_to_sequences(corpus) # 텍스트 안의 단어들을 숫자의 시퀀스형태(tensor)로 변환  

    # 입력 데이터의 시퀀스 길이를 일정하게 맞추되, 시퀀스가 짧으면 문장 뒤에 패딩 <pad> 붙여서 길이 맞추기
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')  # 0을 이용하여 같은 길이의 시퀀스로 변환
    # 만약 문장 앞에 패딩을 붙여 길이를 맞추고 싶다면 padding='pre'사용
    
    print('tensor :', '\n', tensor, '\n', 'tokenizer :', '\n', tokenizer)
    return tensor, tokenizer

tensor, tokenizer = tokenize(corpus)

tensor : 
 [[  2 142   4 ...   0   0   0]
 [  2 153 365 ...   0   0   0]
 [  2 540  19 ...   0   0   0]
 ...
 [  2   3   0 ...   0   0   0]
 [  2   3   0 ...   0   0   0]
 [  2   3   0 ...   0   0   0]] 
 tokenizer : 
 <keras_preprocessing.text.Tokenizer object at 0x7f7bca0ea750>


In [4]:
print(tensor[:3, :10])

for idx in tokenizer.index_word:
    print(idx, ":", tokenizer.index_word[idx]) # tokenizer변수에 저장되어 있는 단어 사전의 인덱스

    if idx >= 10: break # word_index가 10일 때 까지 출력하기

[[   2  142    4  557 3121  126 1217   69  957    3]
 [   2  153  365    8  833   77 9158   31 9159    3]
 [   2  540   19   13 5081   26   18   21 1454  357]]
1 : <unk>
2 : <start>
3 : <end>
4 : ,
5 : i
6 : the
7 : you
8 : and
9 : a
10 : to


## Data splitting - train/test

In [5]:
src_input = tensor[:, :-1]  # tensor 의 모든 행에서 마지막열을 제외한 src_input생성
tgt_input = tensor[:, 1:] # tensor의 모든 행에서 첫번째열(start)을 제외한 tgt_input생성

print(src_input[0])
print(tgt_input[0])

[   2  142    4  557 3121  126 1217   69  957    3    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0]
[ 142    4  557 3121  126 1217   69  957    3    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0]


In [6]:
# 훈련 데이터와 평가 데이터 분리, 총 데이터의 20% 를 평가 데이터셋
enc_train, enc_test, dec_train, dec_test = train_test_split(src_input, tgt_input, test_size=0.2, ram)
print(f"enc_train: {enc_train.shape}")
print(f"enc_test: {enc_test.shape}")
print(f"dec_train: {dec_train.shape}")
print(f"dec_test: {dec_test.shape}")

enc_train: (134685, 32)
enc_test: (33672, 32)
dec_train: (134685, 32)
dec_test: (33672, 32)


In [7]:
BUFFER_SIZE = len(src_input) #입력 문장 수
BATCH_SIZE = 256
steps_per_epoch = len(src_input) // BATCH_SIZE #입력 문장 수를 배치사이즈로 에포크 시행마다 나누어 훈련
VOCAB_SIZE = tokenizer.num_words + 1   # num_words + 0:<pad>를 포함


dataset = tf.data.Dataset.from_tensor_slices((src_input, tgt_input)) # tensor --> tf.data.Dataset으로 변환
dataset = dataset.shuffle(BUFFER_SIZE) # 완벽한 셔플링을 위해서는 데이터셋의 전체 크기보다 크거나 같은 Buffersize필요
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True) # 256개씩 묶고 나머지 제거 drop_remainder = True 요소개수가 부족한 마지막 배치 삭제
dataset

<BatchDataset shapes: ((256, 32), (256, 32)), types: (tf.int32, tf.int32)>

## Train model

In [8]:
###################
### Train model ###
###################
"""
자연어 처리에서 특징 추출을 통해 수치화를 해줘야 하는데
이때 사용하는 것이 "언어의 벡터화"이다.
이런 벡터화의 과정을 Word Embedding


RNN의 일종인 Long Short-Term Memory models(LSTM)
 Long Short Term Memory의 줄임말로 주로 시계열 처리나 자연어 처리(현재는 잘 사용 안 하지만)를 사용하는 데 사용한다
"""
class TextGenerator(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size): 
        super().__init__()
        # embedding Layer
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size) 
        # 인덱스 값을 해당 인덱스 번째의 워드벡터로 바꿔준다. embedding_size : 단어가 추상적으로 표현되는 크기
        # 2개의 LSTM Layer
        # #return_sequence:불리언. 아웃풋 시퀀스의 마지막 아웃풋을 반환할지, 혹은 시퀀스 전체를 반환할지 여부.
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)

        # 1개의 Dense Layer
        self.linear = tf.keras.layers.Dense(vocab_size)
        
    def call(self, x):
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.rnn_2(out)
        out = self.linear(out)
        
        return out
    
embedding_size = 256
hidden_size = 1024
model = TextGenerator(tokenizer.num_words + 1, embedding_size , hidden_size)

In [9]:
#모델의 input_size 설정을 위한 데이터 일부분 입력
for src_sample, tgt_sample in dataset.take(1): break   #dataset.take(n) n번 불러옴
model(src_sample)   #모델에 소스 데이터를 넣어준다.

<tf.Tensor: shape=(256, 32, 12001), dtype=float32, numpy=
array([[[-1.14694973e-04, -1.58335053e-04, -2.34046063e-04, ...,
         -1.45526850e-04, -1.23740356e-05,  8.73077806e-05],
        [ 1.45598708e-04, -2.77503947e-04, -3.33121599e-04, ...,
         -4.43905214e-04, -1.25581108e-04,  2.24001531e-04],
        [ 2.05124961e-04, -4.47647530e-04, -1.98332520e-04, ...,
         -6.65084983e-04,  2.33842802e-05,  1.77574897e-04],
        ...,
        [ 1.11394773e-04, -1.91318046e-03,  2.41728779e-03, ...,
         -5.67316497e-03, -1.18044112e-03,  1.71276450e-04],
        [ 1.51501445e-04, -1.92936638e-03,  2.43444368e-03, ...,
         -5.70128625e-03, -1.18233426e-03,  1.81382158e-04],
        [ 1.87054553e-04, -1.94308278e-03,  2.44889292e-03, ...,
         -5.72412508e-03, -1.18478888e-03,  1.90147257e-04]],

       [[-1.14694973e-04, -1.58335053e-04, -2.34046063e-04, ...,
         -1.45526850e-04, -1.23740356e-05,  8.73077806e-05],
        [-1.16429495e-04, -1.91800485e-04, -1

In [10]:
model.summary()

Model: "text_generator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        multiple                  3072256   
_________________________________________________________________
lstm (LSTM)                  multiple                  5246976   
_________________________________________________________________
lstm_1 (LSTM)                multiple                  8392704   
_________________________________________________________________
dense (Dense)                multiple                  12301025  
Total params: 29,012,961
Trainable params: 29,012,961
Non-trainable params: 0
_________________________________________________________________


In [11]:
optimizer = tf.keras.optimizers.Adam() # 어떤 최적화 방법을 사용해서 loss function 값을 줄여 나갈 것인지, 여기서는 Adam 사용
loss = tf.keras.losses.SparseCategoricalCrossentropy( # 다중분류 손실함수로, 여기서 사용한 방법의 경우..
    # sparse_categorical_crossentropy: 입력되는 출력 실측값을 그대로 사용 (지금과 같이 흔히 데이터가 각 클래스에 명확히 분류되는 경우)
    # categorical_crossentropy: one-hot vector형태로 입력 됨 (확률적인 개념이 추가되어, 하나의 데잍터가 여러 클래스에 해단하는 경우)
    from_logits=True, # 모델의 출력값이 확률인지(logit=False), 아닌지(logit=True)
    reduction='none' #  모델의 출력값을 합쳐서('sum') 사용할 지 아니면 그냥 각자의 값을('none') 사용할지
)

model.compile(loss=loss, optimizer=optimizer)
model.fit(enc_train, dec_train, validation_data=(enc_test, dec_test),epochs=10, batch_size=512)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7f7bc70bcd90>

## Generate text & test model

In [12]:
#####################
### Generate text ###
#####################
def generate_text(model, tokenizer, init_sentence="<start>", max_len=20):
    # 테스트를 위해서 init_sentence를 텐서로 변환합니다
    test_input = tokenizer.texts_to_sequences([init_sentence])
    test_tensor = tf.convert_to_tensor(test_input, dtype=tf.int64)
    end_token = tokenizer.word_index["<end>"]

    while True:
        # 1 init_sentence의 텐서를 입력합니다.
        predict = model(test_tensor) 
        # 2 init_sentence 이후에 나올 수 있는 가장 확률 높은 단어의 word_index를 뽑아냅니다.
        predict_word = tf.argmax(tf.nn.softmax(predict, axis=-1), axis=-1)[:, -1] 
        # 3 2에서 예측한 word_index를 이후에 붙입니다.
        test_tensor = tf.concat([test_tensor, tf.expand_dims(predict_word, axis=0)], axis=-1)
        # 4 <end> 토크이 나오거나 max_len = 20일 경우 문장의 생성을 종료합니다.
        if predict_word.numpy()[0] == end_token: break
        if test_tensor.shape[1] >= max_len: break

    generated = ""
    for word_index in test_tensor[0].numpy():
        generated += tokenizer.index_word[word_index] + " "

    return generated

In [13]:
generate_text(model, tokenizer, init_sentence="<start> i love")

'<start> i love you , i m a fool <end> '

In [14]:
generate_text(model, tokenizer, init_sentence="<start> you")

'<start> you re the only one <end> '

## 결론

- 대표적인 순환 신경망 LSTM을 활용하여 특정 text를 입력받았을 때 그 뒤에 이어지는 sequential text를 예측하는 모델을 학습시킴
- 학습이 진행될수록 train loss는 1.2###에 수렴 후 더 이상 쉽게 내려가지 않음을 확인
- 학습된 모델은 "i love"를 입력받아 "i love you , i m a fool"의 가사를, 그리고 "you"를 입력받아 "you re the only one"라는 가사를 생성
- 데이터 전처리 과정에서 특수문자를 제거하였으며, 토큰화 시 패딩처리 등의 과정을 수행함
- 특히 tokenizer 생성 시 토큰 개수가 15개를 넘어가는 문장을 학습 데이터에서 제외하였으며, 데이터셋은 train:test를 8:2의 비율로 나눔
- 총 학습 epoch는 10으로 설정하였으며, 최종 학습 loss는 1.2318가 나옴