<a href="https://colab.research.google.com/github/Ohsoo46/AIFFEL_QUEST/blob/main/%EC%9E%91%EC%82%AC%EA%B0%80%5B%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8%5D.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [14]:
# 드라이브 마운트

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [15]:
import os, glob, re
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split


In [3]:
lyrics_dir = "/content/drive/MyDrive/ds6_4SEC/lyrics"
txt_list = glob.glob(os.path.join(lyrics_dir, "*.txt"))

raw_corpus = []
for p in txt_list:
    with open(p, "r", encoding="utf-8") as f:
        raw_corpus.extend(f.read().splitlines())

print("txt files:", len(txt_list))
print("raw lines:", len(raw_corpus))
print("sample:", raw_corpus[:3])

txt files: 49
raw lines: 187088
sample: ['[Verse 1]', 'They come from everywhere', 'A longing to be free']


In [4]:
def preprocess_sentence(sentence: str) -> str:
    sentence = sentence.lower().strip()
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = re.sub(r"[^a-z?.!,]+", " ", sentence)
    sentence = sentence.strip()
    sentence = "<start> " + sentence + " <end>"
    return sentence

corpus = [preprocess_sentence(s) for s in raw_corpus if isinstance(s, str) and s.strip()]
print("corpus:", len(corpus))
print("sample:", corpus[:2])


corpus: 175960
sample: ['<start> verse <end>', '<start> they come from everywhere <end>']


In [5]:
def tokenize(corpus, vocab_size=12000, max_len=15):
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words=vocab_size,
        filters="",     # 이미 preprocess에서 정제했으니 filters 비우기
        oov_token="<unk>"
    )
    tokenizer.fit_on_texts(corpus)
    tensor = tokenizer.texts_to_sequences(corpus)
    tensor = tf.keras.preprocessing.sequence.pad_sequences(
        tensor, padding="post", maxlen=max_len
    )
    return tensor, tokenizer

MAX_LEN = 15
VOCAB_SIZE = 12000

tensor, tokenizer = tokenize(corpus, vocab_size=VOCAB_SIZE, max_len=MAX_LEN)
print("tensor shape:", tensor.shape)
print("vocab size (actual):", min(VOCAB_SIZE, len(tokenizer.word_index)+1))


tensor shape: (175960, 15)
vocab size (actual): 12000


In [6]:
enc_inputs  = tensor[:, :-1]  # (N, 14)
dec_targets = tensor[:,  1:]  # (N, 14)

enc_train, enc_val, dec_train, dec_val = train_test_split(
    enc_inputs, dec_targets, test_size=0.2, random_state=42, shuffle=True
)

print(enc_train.shape, dec_train.shape, enc_val.shape, dec_val.shape)


(140768, 14) (140768, 14) (35192, 14) (35192, 14)


In [25]:
import tensorflow as tf

class TextGenerator(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super().__init__()
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size)
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.linear = tf.keras.layers.Dense(vocab_size, dtype="float32")

    def call(self, x):
        x = self.embedding(x)
        x = self.rnn_1(x)
        x = self.rnn_2(x)
        x = self.linear(x)   # logits
        return x


# 하이퍼파라미터
embedding_size = 256
hidden_size = 512
vocab_size = min(VOCAB_SIZE, len(tokenizer.word_index) + 1)

# 모델 생성
model = TextGenerator(vocab_size, embedding_size, hidden_size)

# loss (PAD=0 마스킹)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction="none")

def masked_loss(y_true, y_pred):
    loss = loss_fn(y_true, y_pred)
    mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
    loss = loss * mask
    return tf.reduce_sum(loss) / tf.reduce_sum(mask)

model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=masked_loss
)

print(" model 정의 완료")


 model 정의 완료


In [18]:
from sklearn.model_selection import train_test_split
import numpy as np
import tensorflow as tf

# 1) input/target 만들기
enc_inputs  = tensor[:, :-1]   # (N, 14)
dec_targets = tensor[:,  1:]   # (N, 14)

# 2) train(80) + temp(20)
enc_train, enc_temp, dec_train, dec_temp = train_test_split(
    enc_inputs, dec_targets, test_size=0.2, random_state=42, shuffle=True
)

# 3) temp(20)을 val(10) + test(10)로 나누기
enc_val, enc_test, dec_val, dec_test = train_test_split(
    enc_temp, dec_temp, test_size=0.5, random_state=42, shuffle=True
)

print("train:", enc_train.shape, dec_train.shape)
print("val  :", enc_val.shape, dec_val.shape)
print("test :", enc_test.shape, dec_test.shape)


train: (140768, 14) (140768, 14)
val  : (17596, 14) (17596, 14)
test : (17596, 14) (17596, 14)


In [19]:
AUTOTUNE = tf.data.AUTOTUNE
BATCH_SIZE = 256

train_ds = (tf.data.Dataset.from_tensor_slices((enc_train, dec_train))
            .cache()
            .shuffle(len(enc_train), reshuffle_each_iteration=True)
            .batch(BATCH_SIZE, drop_remainder=True)
            .prefetch(AUTOTUNE))

val_ds = (tf.data.Dataset.from_tensor_slices((enc_val, dec_val))
          .cache()
          .batch(BATCH_SIZE)
          .prefetch(AUTOTUNE))

test_ds = (tf.data.Dataset.from_tensor_slices((enc_test, dec_test))
           .cache()
           .batch(BATCH_SIZE)
           .prefetch(AUTOTUNE))

print(" train/val/test dataset ready")


✅ train/val/test dataset ready


In [20]:
FAST_STEPS = 40        # 10~50 추천 (빠른 실험)
FAST_VAL_STEPS = 10

callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=2, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", patience=1, factor=0.5),
]

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=5,
    steps_per_epoch=FAST_STEPS,
    validation_steps=FAST_VAL_STEPS,
    callbacks=callbacks
)


Epoch 1/5
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 241ms/step - loss: 7.6917 - val_loss: 6.2531 - learning_rate: 0.0010
Epoch 2/5
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 235ms/step - loss: 6.2155 - val_loss: 6.1404 - learning_rate: 0.0010
Epoch 3/5
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 233ms/step - loss: 6.1186 - val_loss: 6.0967 - learning_rate: 0.0010
Epoch 4/5
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 235ms/step - loss: 6.0693 - val_loss: 6.0766 - learning_rate: 0.0010
Epoch 5/5
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 236ms/step - loss: 6.0674 - val_loss: 6.0612 - learning_rate: 0.0010


In [21]:
# 4) 테스트평가
test_loss = model.evaluate(test_ds)
print(" test_loss:", test_loss)


[1m69/69[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 77ms/step - loss: 6.0540
✅ test_loss: 6.054206371307373


In [23]:
def generate_text_ilove(
    model,
    tokenizer,
    max_len=30,
    temperature=0.85,
    top_k=50
):
    # 강제 시작 규칙
    seed = "<start> i love"
    seq = tokenizer.texts_to_sequences([seed])[0]

    START_ID = tokenizer.word_index["<start>"]
    END_ID   = tokenizer.word_index["<end>"]

    generated = ["I", "love"]

    for _ in range(max_len):
        x = tf.keras.preprocessing.sequence.pad_sequences(
            [seq[-(MAX_LEN-1):]],
            maxlen=(MAX_LEN-1),
            padding="post"
        )
        x = tf.convert_to_tensor(x, dtype=tf.int32)

        logits = model(x)[0, -1, :]
        logits = logits / temperature

        # PAD 제거
        logits = tf.tensor_scatter_nd_update(
            logits,
            indices=[[0]],
            updates=[-1e10]
        )

        # top-k
        values, _ = tf.math.top_k(logits, k=top_k)
        logits = tf.where(logits < values[-1], -1e10, logits)

        next_id = int(tf.random.categorical(
            tf.expand_dims(logits, 0), 1
        )[0, 0].numpy())

        if next_id == END_ID:
            break

        word = tokenizer.index_word.get(next_id, "")
        generated.append(word)
        seq.append(next_id)

    return " ".join(generated)


In [24]:
print(generate_text_ilove(model, tokenizer))


I love what me the can
