In [1]:
import glob                                                #대구_주민규
import os, re
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split       #train_test_split 사용하기 위해서 임폴트

txt_file_path = os.getenv('HOME')+'/aiffel/lyricist/data/lyrics/*'        #이부분은 과정중에 데이터와 위치가 주어졌기 때문에 그대로

txt_list = glob.glob(txt_file_path)    

raw_corpus = []

# 여러개의 txt 파일을 모두 읽어서 raw_corpus 에 담습니다.
for txt_file in txt_list:
    with open(txt_file, "r") as f:
        raw = f.read().splitlines()
        raw_corpus.extend(raw)

print("데이터 크기:", len(raw_corpus))
print("Examples:\n", raw_corpus[:3])

데이터 크기: 187088
Examples:
 ["Now I've heard there was a secret chord", 'That David played, and it pleased the Lord', "But you don't really care for music, do you?"]


In [2]:
# 입력된 문장을                                                         #이러한 형태로 만드는 이유는 대소문자를 다른 문장으로 구분
#     1. 소문자로 바꾸고, 양쪽 공백을 지웁니다                          #대소문자 같은것으로 판단 못하기 때문에 하나의 형태로 만들어 같은 단어인걸 알려줌
#     2. 특수문자 양쪽에 공백을 넣고                                    #이래야 문장들이 문장부호들로 나누어지지 않는다
#     3. 여러개의 공백은 하나의 공백으로 바꿉니다                       #의미 없는 공백의 길이를 줄여준다.
#     4. a-zA-Z?.!,¿가 아닌 모든 문자를 하나의 공백으로 바꿉니다       #특문으로 이어지는 문자를 하나의 단어로 인식하기때문에 특문을 없애어준다.
#     5. 다시 양쪽 공백을 지웁니다                                      #공백으로 치환했던것을 없에주어 문장을 정리
#     6. 문장 시작에는 <start>, 끝에는 <end>를 추가합니다
                                                                        #문장안 요소에 대한 전처리             
def preprocess_sentence(sentence):
    sentence = sentence.lower().strip() 
    sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence) 
    sentence = re.sub(r"[^a-zA-Z?.!,¿]+", " ", sentence) 
    sentence = sentence.strip() 
    sentence = '<start> ' + sentence + ' <end>' 
    return sentence

# 이 문장이 어떻게 필터링되는지 확인해 보세요.               #제대로 나오는 것을 학인
print(preprocess_sentence("This @_is ;;;sample        sentence."))

<start> this is sample sentence . <end>


In [3]:
corpus = []

for sentence in raw_corpus:
    # 우리가 원하지 않는 문장은 건너뜁니다             문장 자체에 대한 전처리
    if len(sentence) == 0: continue               #문장의 길이가 0인것은 제외해줘야 된다.
    if sentence[-1] == ":": continue              #:로 끝나는 문장은 화자의 이름 혹은 정보의 이름 따라서 필요 없다.
    if len(sentence.split())>15: continue            #단어가 16 이상인것 제외(학습속도 향상을 위해) 처음에 이것을 설정하지 않고 실행을 하니
                                                         #한번의 학습당12분이상이 예상되는걸 보고 어디에 제한을 되야되나 찾다 여기에서 단어의 수를 제한하면
    preprocessed_sentence = preprocess_sentence(sentence) #학습해야 할 문장의 수가 줄어드므로 단어 16이상의 문장을 제외 시켰다 그러니 학습당 2분정도가 소요되었다
    corpus.append(preprocessed_sentence)
        
# 정제된 결과를 10개만 확인/ 잘나옴           
corpus[:10]

['<start> now i ve heard there was a secret chord <end>',
 '<start> that david played , and it pleased the lord <end>',
 '<start> but you don t really care for music , do you ? <end>',
 '<start> it goes like this <end>',
 '<start> the fourth , the fifth <end>',
 '<start> the minor fall , the major lift <end>',
 '<start> the baffled king composing hallelujah hallelujah <end>',
 '<start> hallelujah <end>',
 '<start> hallelujah <end>',
 '<start> hallelujah your faith was strong but you needed proof <end>']

In [4]:
def tokenize(corpus):
    # 12000단어를 기억할 수 있는 tokenizer*요구사항)
    # 위에서 필터로 이미 문장들을 정제
    # 12000단어에 포함되지 못한 단어는 '<unk>'로 변환
    tokenizer = tf.keras.preprocessing.text.Tokenizer(      #옆의 패키지는 단어 사전을 만들어주며, 데이터를 숫자로 변환(벡터화(vectorize))숫자로 변환된 데이터: 텐서
        num_words=12000, 
        filters=' ',
        oov_token="<unk>"
    )
    # corpus를 이용해 tokenizer 내부의 단어장을 완성합니다
    tokenizer.fit_on_texts(corpus)
    # 준비한 tokenizer를 이용해 corpus를 Tensor로 변환합니다
    tensor = tokenizer.texts_to_sequences(corpus)   
    # 입력 데이터의 시퀀스 길이를 일정하게 맞춰줍니다
    # 시퀀스의 길이가 다르다면 에러가 뜨기 때문에 패딩을 넣어서 각각의 시퀀스 길이를 동일하게 만듭니다. 패딩은 앞으로 혹은 뒤로 넣을 수 있는데 
    # 문장 앞에 패딩을 붙여 길이를 맞추고 싶다면 padding='pre'를 사용합니다
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')  
    
    print(tensor,tokenizer)
    return tensor, tokenizer

tensor, tokenizer = tokenize(corpus)

[[   2   50    5 ...    0    0    0]
 [   2   17 2706 ...    0    0    0]
 [   2   34    7 ...    0    0    0]
 ...
 [   2  259  194 ...    0    0    0]
 [   2  132    5 ...    0    0    0]
 [   2    7   33 ...    0    0    0]] <keras_preprocessing.text.Tokenizer object at 0x7f1f5f1673d0>


In [5]:
# tensor에서 끝을 잘라 소스 문장을 만든다.
src_input = tensor[:, :-1]  
# tensor에서 <start>를 잘라내서 타겟 문장을 생성합니다.
tgt_input = tensor[:, 1:]    

print(src_input[0])
print(tgt_input[0])

[   2   50    5   91  307   62   57    9  957 5739    3    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0]
[  50    5   91  307   62   57    9  957 5739    3    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0]


In [6]:
BUFFER_SIZE = len(src_input)                             #
BATCH_SIZE = 256                                         #
steps_per_epoch = len(src_input) // BATCH_SIZE

 # tokenizer가 구축한 단어사전 내 12000개와, 여기 포함되지 않은 0:<pad>를 포함하여 12001개
VOCAB_SIZE = tokenizer.num_words + 1   

# 준비한 데이터 소스로부터 데이터셋을 만듭니다
# 데이터셋에 대해서는 아래 문서를 참고하세요
# 자세히 알아둘수록 도움이 많이 되는 중요한 문서입니다
#dataset = tf.data.Dataset.from_tensor_slices((src_input, tgt_input)) 의형태로 슬라이스 하여 소스와 타겟 데이터로 분리 하였지만
#dataset = dataset.shuffle(BUFFER_SIZE)                          이번 ex의 조건이
#sklearn 모듈의 train_test_split() 함수를 사용해 훈련 데이터와 평가 데이터를 분리가 조건 4:1로 랜덤으로 분리 하였다 랜덤 state의 숫자는 큰 영향은 없고 일정하게만 해주면 된다고 한다.

enc_train, enc_val, dec_train, dec_val = train_test_split(src_input,tgt_input,test_size=0.2,random_state=7)
                                
print('enc_train 개수: ', len(enc_train),', enc_val 개수: ', len(enc_val))

enc_train 개수:  134685 , enc_val 개수:  33672


In [7]:
class TextGenerator(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size):               #클래스 객체를 만들때 init으로 인스턴스 초기화 특성들을 설정 
        super().__init__()
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size)   #텐서에는 단어 사전의 인덱스를 Embedding 레이어는 이 인덱스 값을 해당 인덱스 번째의 워드 벡터로 바꿔 줘 단어의 표현으로 사용된다.
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.linear = tf.keras.layers.Dense(vocab_size)
        
    def call(self, x):                                          #call로 객체 호출
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.rnn_2(out)
        out = self.linear(out)
        
        return out
    
embedding_size = 256                                 #지금 2가지 파라미터를 조절해서 loss값이 2.2 이하로 만들면 된다. 그런데 처음부터 그 범위안으로 들어와서 다시한번 실행 해 보고 튀는 값이 나왔나 확인해 보았다.
hidden_size = 1024
model = TextGenerator(tokenizer.num_words + 1, embedding_size , hidden_size)

In [8]:
optimizer = tf.keras.optimizers.Adam()              #embedding_size 256 hidden size 1024
loss = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True,
    reduction='none'
)

model.compile(loss=loss, optimizer=optimizer)  
model.fit(enc_train, dec_train, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7f1e8f8ecfa0>

In [9]:
def generate_text(model, tokenizer, init_sentence="<start>", max_len=20):
    # 테스트를 위해서 입력받은 init_sentence도 텐서로 변환합니다
    test_input = tokenizer.texts_to_sequences([init_sentence])
    test_tensor = tf.convert_to_tensor(test_input, dtype=tf.int64)
    end_token = tokenizer.word_index["<end>"]

    # 단어 하나씩 예측해 문장을 만듭니다
    #    1. 입력받은 문장의 텐서를 입력합니다
    #    2. 예측된 값 중 가장 높은 확률인 word index를 뽑아냅니다
    #    3. 2에서 예측된 word index를 문장 뒤에 붙입니다
    #    4. 모델이 <end>를 예측했거나, max_len에 도달했다면 문장 생성을 마칩니다 이번엔 15까지만 나오게 하였으므로 16에서 멈추거나 end가 나오면 종료
    while True:
        # 1
        predict = model(test_tensor) 
        # 2
        predict_word = tf.argmax(tf.nn.softmax(predict, axis=-1), axis=-1)[:, -1] 
        # 3 
        test_tensor = tf.concat([test_tensor, tf.expand_dims(predict_word, axis=0)], axis=-1)
        # 4
        if predict_word.numpy()[0] == end_token: break
        if test_tensor.shape[1] >= max_len: break

    generated = ""
    # tokenizer를 이용해 word index를 단어로 하나씩 변환합니다 결국 start 단어 단어 .......<end>의 형태 혹은 <start> 단어 *15개 의형태가 가능
    for word_index in test_tensor[0].numpy():
        generated += tokenizer.index_word[word_index] + " "

    return generated

In [10]:
generate_text(model, tokenizer, init_sentence="<start> i love", max_len=20)#1차 i love it when you call me big poppa 2차 i love you

'<start> i love you <end> '

In [12]:
#후기 만약 결과 loss값이 요구한 2.2이하가 되지 않거나 이상한 가사가 출력이 되면 파라미터들을 바꿔 가면서 해보려 했는데 결과가 좋게 나와 스탑하였다.
#RNN을 이용한 자연어 처리를 해 보았다.Embedding 레이어의 인풋이 되는 문장 벡터는 그 길이가 일정하여야 함으로 길이를 일정하게 바꿔주는
#텐서의 tf.keras.preprocessing.sequence.pad_sequences를 이용 빈 공간을 padding으로 채워 길이를 같게 하였다. 처음에는 가사의 길이를 설정하지 않았는데
#한번의 학습에 12분~13분이 걸렸다. 상황에 맞게 데이터를 전 처리하고 파라미터를 설정해야 된다는 것을 확실히 알게 되었다. 다시 위에 길이 0인걸 
#과 :로 끝나는 문장을 넘어가는 조건을 만드는데 토큰이 15보다 많을때 넘어가는 조건을 만들어 너무 긴 문장들을 전 처리 해 주니 많은 시간을 단축 할 수 있었다다

IndentationError: unexpected indent (3136560311.py, line 5)

In [None]:
#이번 과정은 자연어 처리를 하는것이 였지만 과정중에 데이터를 전 처리 해 주는 과정이 있어 편하게 하였다. 만약 그런 과정이 나와있지 않고 직접 전처리를 해야 된다면 할 수 있을까란 생각이 들었다.
#아직 코딩이 익숙하지 않아 읽어는 지는데 작성하라고 하면 쫌 막히는 경향이 있다. 직접 짜는건 아직 어려우니 다른 좋은 코딩들을 읽어보기라도 해서 다양한 예를 늘려야 겠다.