In [1]:
#데이터 불러오기
import glob
import os
import re
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split

txt_file_path = os.getenv('HOME')+'/aiffel/lyricist/data/lyrics/*'

txt_list = glob.glob(txt_file_path)

raw_corpus = []

# 여러개의 txt 파일을 모두 읽어서 raw_corpus 에 담습니다.
for txt_file in txt_list:
    with open(txt_file, "r") as f:
        raw = f.read().splitlines()
        raw_corpus.extend(raw)

print("데이터 크기:", len(raw_corpus))
print("Examples:\n", raw_corpus[:3])

데이터 크기: 187088
Examples:
 ['"Don\'t worry about a thing,', "'Cause every little thing gonna be all right.", 'Singin\': "Don\'t worry about a thing,']


In [2]:
# 문장 indexing
for idx, sentence in enumerate(raw_corpus):
    if len(sentence) == 0: continue   # 길이가 0인 문장은 스킵

    if idx > 9: break
        
    print(sentence)

"Don't worry about a thing,
'Cause every little thing gonna be all right.
Singin': "Don't worry about a thing,
'Cause every little thing gonna be all right!" Rise up this mornin',
Smiled with the risin' sun,
Three little birds
Perch by my doorstep
Singin' sweet songs
Of melodies pure and true,
Sayin', ("This is my message to you-ou-ou:") Singin': "Don't worry 'bout a thing,


In [3]:
# 문장 전처리 함수
def preprocess_sentence(sentence):
    sentence = sentence.lower().strip() # 소문자로 바꾸고 양쪽 공백을 삭제

    # 정규식을 이용하여 문장 처리
    sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence) # 패턴의 특수문자를 만나면 특수문자 양쪽에 공백을 추가
    sentence = re.sub(r'[" "]+', " ", sentence) # 공백 패턴을 만나면 스페이스 1개로 치환
    sentence = re.sub(r"[^a-zA-Z?.!,¿]+", " ", sentence) # a-zA-Z?.!,¿ 패턴을 제외한 모든 문자(공백문자까지도)를 스페이스 1개로 치환

    sentence = sentence.strip()

    sentence = '<start> ' + sentence + ' <end>'
    return sentence

In [4]:
corpus = []

# 모든 문장에 전처리 함수 적용
for sentence in raw_corpus:
    if len(sentence) == 0:
        continue
    if sentence[-1] == ":":
        continue
    temp = preprocess_sentence(sentence)
    if len(temp.split()) <= 15:
        corpus.append(temp)
        
print(corpus[:2])
print(len(corpus))

['<start> don t worry about a thing , <end>', '<start> cause every little thing gonna be all right . <end>']
156013


In [5]:
print(corpus[0].split())
len(corpus[0].split())

['<start>', 'don', 't', 'worry', 'about', 'a', 'thing', ',', '<end>']


9

In [6]:
def tokenize(corpus):
    # 텐서플로우에서 제공하는 Tokenizer 패키지를 생성
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words=15000,  # 전체 단어의 개수 
        filters=' ',      # 전처리 로직
        oov_token="<unk>" # out-of-vocabulary, 사전에 없는 단어
    )
    tokenizer.fit_on_texts(corpus) # 우리가 구축한 corpus로부터 Tokenizer가 사전을 자동구축

    # tokenizer를 활용하여 모델에 입력할 데이터셋을 구축
    tensor = tokenizer.texts_to_sequences(corpus) # tokenizer는 구축한 사전으로부터 corpus를 해석해 Tensor로 변환

    # 입력 데이터의 시퀀스 길이를 일정하게 맞추기 위한 padding 메소드
    # maxlen의 디폴트값은 None입니다. 이 경우 corpus의 가장 긴 문장을 기준으로 시퀀스 길이가 맞춰진다
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor,
                                                           padding='pre',
                                                           maxlen=15)

    print(tensor,tokenizer)
    return tensor, tokenizer

tensor, tokenizer = tokenize(corpus)

[[  0   0   0 ... 183   5   3]
 [  0   0   0 ...  87  22   3]
 [  0   0   0 ... 183   5   3]
 ...
 [  0   0   0 ...  13  86   3]
 [  0   0   0 ...  20 211   3]
 [  0   0   0 ... 648 138   3]] <keras_preprocessing.text.Tokenizer object at 0x7f9704c7c7d0>


In [7]:
print(len(tensor[10,:])) # 생성된 텐서 데이터 확인

15


In [8]:
# 단어 사전의 index
for idx in tokenizer.index_word:
    print(idx, ":", tokenizer.index_word[idx])

    if idx >= 20: break

1 : <unk>
2 : <start>
3 : <end>
4 : i
5 : ,
6 : the
7 : you
8 : and
9 : a
10 : to
11 : it
12 : me
13 : my
14 : in
15 : that
16 : t
17 : s
18 : on
19 : your
20 : of


In [9]:
# 문장 생성
src_input = tensor[:, :-1] # tensor에서 마지막 토큰을 잘라내서 소스 문장을 생성. 마지막 토큰은 <end>가 아니라 <pad>일 가능성이 높다.
tgt_input = tensor[:, 1:]  # tensor에서 <start>를 잘라내서 타겟 문장을 생성.

In [10]:
# 생성된 문장 확인
print(src_input[0])
print(tgt_input[0])

[  0   0   0   0   0   0   2  40  16 700 111   9 183   5]
[  0   0   0   0   0   2  40  16 700 111   9 183   5   3]


In [11]:
enc_train, enc_val, dec_train, dec_val = train_test_split(src_input,
                                                          tgt_input,
                                                          test_size=0.2,
                                                          shuffle=True)

In [12]:
# 분리된 데이터 확인
print("Source Train:", enc_train.shape)
print("Target Train:", dec_train.shape)

Source Train: (124810, 14)
Target Train: (124810, 14)


In [13]:
# 데이터셋 구축
BUFFER_SIZE = len(enc_train)
BATCH_SIZE = 1024
steps_per_epoch = len(enc_train) // BATCH_SIZE

VOCAB_SIZE = tokenizer.num_words + 1    # 0:<pad>를 포함하여 dictionary 갯수 + 1

dataset = tf.data.Dataset.from_tensor_slices((enc_train, dec_train)).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
dataset

<BatchDataset shapes: ((1024, 14), (1024, 14)), types: (tf.int32, tf.int32)>

In [14]:
# 모델 생성 함수
class TextGenerator(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super(TextGenerator, self).__init__()
        
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size)
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.linear = tf.keras.layers.Dense(vocab_size)
        
    def call(self, x):
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.rnn_2(out)
        out = self.linear(out)
        
        return out 

In [15]:
# 모델 생성
embedding_size = 256 # 워드 벡터의 차원 수
hidden_size = 1024 # LSTM Layer의 hidden 차원 수
model = TextGenerator(tokenizer.num_words + 1, embedding_size , hidden_size)

In [16]:
# 모델의 데이터 확인
for src_sample, tgt_sample in dataset.take(1): break
model(src_sample)

<tf.Tensor: shape=(1024, 14, 15001), dtype=float32, numpy=
array([[[-9.43810210e-06,  7.45531070e-05, -1.69945196e-07, ...,
          2.23979805e-04,  1.20146569e-05, -5.51804587e-05],
        [-3.36680387e-05,  2.09059479e-04, -1.21870007e-05, ...,
          5.69494965e-04, -3.31099691e-05, -1.03232749e-04],
        [-7.05335988e-05,  3.83212930e-04, -4.12597474e-05, ...,
          9.66588384e-04, -1.54205743e-04, -1.14841263e-04],
        ...,
        [-1.36596663e-03,  7.84573902e-04, -1.07793731e-03, ...,
          5.31776677e-05, -1.64403464e-03,  5.52472949e-04],
        [-1.65708666e-03,  7.34062283e-04, -1.12860184e-03, ...,
         -7.47543672e-05, -1.59776700e-03,  4.48618579e-04],
        [-1.81829429e-03,  7.21130578e-04, -1.12979312e-03, ...,
         -2.39322282e-04, -1.45445240e-03,  3.67796049e-04]],

       [[-9.43810210e-06,  7.45531070e-05, -1.69945196e-07, ...,
          2.23979805e-04,  1.20146569e-05, -5.51804587e-05],
        [-3.36680387e-05,  2.09059479e-04, -

In [17]:
# 모델의 최종 출력 shape는 (256, 14, 15001)
# 256은 batch_size, 14는 squence_length, 15001은 단어의 갯수(Dense Layer 출력 차원 수)

model.summary() # sequence_length를 모르기 때문에 Output shape를 정확하게 모른다.

Model: "text_generator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        multiple                  3840256   
_________________________________________________________________
lstm (LSTM)                  multiple                  5246976   
_________________________________________________________________
lstm_1 (LSTM)                multiple                  8392704   
_________________________________________________________________
dense (Dense)                multiple                  15376025  
Total params: 32,855,961
Trainable params: 32,855,961
Non-trainable params: 0
_________________________________________________________________


In [18]:
# 모델 학습
optimizer = tf.keras.optimizers.Adam()
loss = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True,
    reduction='none'
)

model.compile(loss=loss, optimizer=optimizer)
model.fit(dataset, epochs=30)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


<tensorflow.python.keras.callbacks.History at 0x7f9704433190>

In [19]:
def generate_text(model, tokenizer, init_sentence="<start>", max_len=20):
    # 테스트를 위해서 입력받은 init_sentence도 텐서로 변환
    test_input = tokenizer.texts_to_sequences([init_sentence])
    test_tensor = tf.convert_to_tensor(test_input, dtype=tf.int64)
    end_token = tokenizer.word_index["<end>"]

    # 단어를 하나씩 생성
    while True:
        predict = model(test_tensor)  # 입력받은 문장
        predict_word = tf.argmax(tf.nn.softmax(predict, axis=-1), axis=-1)[:, -1]   # 예측한 단어가 새로 생성된 단어 

        # 우리 모델이 새롭게 예측한 단어를 입력 문장의 뒤에 붙이기
        test_tensor = tf.concat([test_tensor, tf.expand_dims(predict_word, axis=0)], axis=-1)

        # 우리 모델이 <end>를 예측했거나, max_len에 도달하지 않았다면  while 루프를 또 돌면서 다음 단어를 예측
        if predict_word.numpy()[0] == end_token: break
        if test_tensor.shape[1] >= max_len: break

    generated = ""
    # 생성된 tensor 안에 있는 word index를 tokenizer.index_word 사전을 통해 실제 단어로 하나씩 변환
    for word_index in test_tensor[0].numpy():
        generated += tokenizer.index_word[word_index] + " "

    return generated   # 최종 생성된 자연어 문장

In [20]:
generate_text(model, tokenizer, init_sentence="<start> i love", max_len=20)

'<start> i love you to be a shot , i m not a man <end> '