In [None]:
%matplotlib inline
import pandas as pd
from tqdm.notebook import tqdm
import numpy as np
import matplotlib.pyplot as plt
import csv

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
import os
import re
import string
import random

In [None]:
df = pd.read_excel('/content/drive/MyDrive/Colab Notebooks/동화 데이터.xlsx')

In [None]:
df.head()

Unnamed: 0,storyid,storyname,storytext,storyimage,story
0,1,선녀와 나무꾼,"옛날 옛적, 가난해서 장가를 못간 나무꾼이 있었습니다.\n하루는 나무를 하러 산에 ...",,
1,2,"금도끼, 은도끼",옛날 한 작은 마을에 착한 나무꾼이 살았습니다. 나무꾼은 가난했지만 착하고 정직한 ...,,
2,3,심청전,옛날 한 마을에 심봉사 라고 불리는 장님에게 심청이라는 착하고 이쁜 딸이 있었습니다...,,
3,4,은혜갚은 까치,옛날 한 선비가 과거를 보러 한양를 가기 위해 산을 오르다 구렁이에게 잡혀먹을 뻔한...,,
4,5,흥부전,"흥부전은 작자·연대 미상의 고전소설입니다. 조선 후기 소설로 흥보전,박흥보전,놀부전...",,


In [None]:
# spacing 이 2번이상 인경우를 제외하기 위함
comment_preprocessing_list = []
for i, row in tqdm(df.iterrows()):
    comment = row['storytext'] # 댓글 불러오기
    # 띄어쓰기대로 나눠 어절만 가져오기, 가져온 어절을 " " 1칸 단위로 합쳐주기
    comment = ' '.join(comment.split()) 
    comment = comment.strip() # 앞뒤 띄어쓰기 존재시 제거 위함
    comment_preprocessing_list.append(comment) # 새로운 dataframe 만들기 위한 리스트 추가

df = pd.DataFrame(comment_preprocessing_list, columns= ['comment']) # 새로운 df 생성
df.head()

0it [00:00, ?it/s]

Unnamed: 0,comment
0,"옛날 옛적, 가난해서 장가를 못간 나무꾼이 있었습니다. 하루는 나무를 하러 산에 갔..."
1,옛날 한 작은 마을에 착한 나무꾼이 살았습니다. 나무꾼은 가난했지만 착하고 정직한 ...
2,옛날 한 마을에 심봉사 라고 불리는 장님에게 심청이라는 착하고 이쁜 딸이 있었습니다...
3,옛날 한 선비가 과거를 보러 한양를 가기 위해 산을 오르다 구렁이에게 잡혀먹을 뻔한...
4,"흥부전은 작자·연대 미상의 고전소설입니다. 조선 후기 소설로 흥보전,박흥보전,놀부전..."


In [None]:
# comment list build
comment_list = []
for i, row in tqdm(df.iterrows()):
    comment = row
    comment_list.append(comment)

# tensorflow 입력값에 맞추기 위해 데이터셋 객체로 변환
text_ds = tf.data.Dataset.from_tensor_slices(comment_list)

# batchsize 설정해 메모리 터지는거 방지
batch_size = 128
text_ds = text_ds.shuffle(buffer_size=256)
text_ds = text_ds.batch(batch_size)

vocab_size = 200000
maxlen = 40

vectorize_layer = TextVectorization(
    # standardize=custom_standardization,
    max_tokens=vocab_size - 1,
    output_mode="int",
    output_sequence_length=maxlen+1,
)
vectorize_layer.adapt(text_ds)
vocab = vectorize_layer.get_vocabulary()  # To get words back from token indices

0it [00:00, ?it/s]

In [None]:
def prepare_lm_inputs_labels(text):
    """
    Shift word sequences by 1 position so that the target for position (i) is
    word at position (i+1). The model will use all words up till position (i)
    to predict the next word.
    """
    #text = tf.expand_dims(text, -1)
    tokenized_sentences = vectorize_layer(text)
    x = tokenized_sentences[:, :-1]
    y = tokenized_sentences[:, 1:]
    return x, y
    
text_ds = text_ds.map(prepare_lm_inputs_labels)
text_ds = text_ds.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
text_ds

<PrefetchDataset element_spec=(TensorSpec(shape=(None, 40), dtype=tf.int64, name=None), TensorSpec(shape=(None, 40), dtype=tf.int64, name=None))>

In [None]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        # token 위치에 따른 embedding을 하기위함
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [None]:
def causal_attention_mask(batch_size, n_dest, n_src, dtype):
    """
    Mask the upper half of the dot product matrix in self attention.
    This prevents flow of information from future tokens to current token.
    1's in the lower triangle, counting from the lower right corner.
    """
    i = tf.range(n_dest)[:, None]
    j = tf.range(n_src)
    m = i >= j - n_src + n_dest
    mask = tf.cast(m, dtype)
    mask = tf.reshape(mask, [1, n_dest, n_src])
    mult = tf.concat(
        [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
    )
    return tf.tile(mask, mult)

In [None]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = layers.MultiHeadAttention(num_heads, embed_dim)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        causal_mask = causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        attention_output = self.att(inputs, inputs, attention_mask=causal_mask)
        attention_output = self.dropout1(attention_output)
        out1 = self.layernorm1(inputs + attention_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        return self.layernorm2(out1 + ffn_output)

In [None]:
# 변수 정의 및 모델 함수 정의

embed_dim = 256  # Embedding size for each token
num_heads = 2  # Number of attention heads
feed_forward_dim = 256  # Hidden layer size in feed forward network inside transformer

def create_model():
    inputs = layers.Input(shape=(maxlen,), dtype=tf.int32) 
    # input 정의
    
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim) 
    # token Embedding + positional Embedding layer class 정의
    
    x = embedding_layer(inputs) 
    # 선언한 Embedding layer class 이용해 Embedding
    
    transformer_block = TransformerBlock(embed_dim, num_heads, feed_forward_dim) 
    # transformer block layer class 정의
    
    x = transformer_block(x) 
    # 선언한 transformer layer class 이용해 학습
    
    outputs = layers.Dense(vocab_size)(x) 
    # 압축된 결과를 vocab에 맞춰 팽창 (후보단어 선별을 위한 각 단어에 대한 결과치 도출)
    
    model = keras.Model(inputs=inputs, outputs=[outputs, x])  
    # model 정의
    
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) 
    model.compile(
        "adam", loss=[loss_fn, None],
    )  # No loss and optimization based on word embeddings from transformer block
    model.summary()
    return model

In [None]:
class TextGenerator(keras.callbacks.Callback):
    """A callback to generate text from a trained model.
    1. Feed some starting prompt to the model
    2. Predict probabilities for the next token
    3. Sample the next token and add it to the next input

    Arguments:
        max_tokens: Integer, the number of tokens to be generated after prompt.
        start_tokens: List of integers, the token indices for the starting prompt.
        index_to_word: List of strings, obtained from the TextVectorization layer.
        top_k: Integer, sample from the `top_k` token predictions.
        print_every: Integer, print after this many epochs.
    """

    def __init__(
        self, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1
    ):
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.print_every = print_every
        self.k = top_k

    def sample_from(self, logits):
        logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def detokenize(self, number):
        return self.index_to_word[number]

    def on_epoch_end(self, epoch, logs=None):
        start_tokens = [_ for _ in self.start_tokens]
        if (epoch + 1) % self.print_every != 0:
            return
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= self.max_tokens:
            pad_len = maxlen - len(start_tokens)
            sample_index = len(start_tokens) - 1
            if pad_len < 0:
                x = start_tokens[:maxlen]
                sample_index = maxlen - 1
            elif pad_len > 0:
                x = start_tokens + [0] * pad_len
            else:
                x = start_tokens
            x = np.array([x])
            y, _ = self.model.predict(x)
            sample_token = self.sample_from(y[0][sample_index])
            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)
        txt = " ".join(
            [self.detokenize(_) for _ in self.start_tokens + tokens_generated]
        )
        print(f"generated text:\n{txt}\n")

In [None]:
# Tokenize starting prompt
word_to_index = {}
for index, word in enumerate(vocab):
    word_to_index[word] = index

start_prompt = "옛날 옛날 바보가 살았어요."
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
num_tokens_generated = 40
text_gen_callback = TextGenerator(num_tokens_generated, start_tokens, vocab)

In [None]:
model = create_model()

Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 40)]              0         
                                                                 
 token_and_position_embeddin  (None, 40, 256)          51210240  
 g_1 (TokenAndPositionEmbedd                                     
 ing)                                                            
                                                                 
 transformer_block_1 (Transf  (None, 40, 256)          26424064  
 ormerBlock)                                                     
                                                                 
 dense_5 (Dense)             (None, 40, 200000)        51400000  
                                                                 
Total params: 129,034,304
Trainable params: 129,034,304
Non-trainable params: 0
_____________________________________________

In [None]:
model.fit(text_ds, verbose=1, epochs=10, callbacks=[text_gen_callback])

Epoch 1/10
옛날 옛날 바보가 [UNK] 옛날에 마을에 말했어요 살고 어느 한 명 한 마리가 하지만 어느 마을에 있었습니다 있었습니다 어느 살고 말했어요 한 살고 있는 살고 있었습니다 한 살고 있었어요 있었습니다 살고 그 살고 있었습니다 하지만 이 그 한 마리가 한 분 명이 마리 분 분

Epoch 2/10
옛날 옛날 바보가 [UNK] 어느 살고 있었어요 살고 하지만 어느 날 한 번 한 명 있었어요 한 명이 날 어느 날 그가 있었어요 수 살고 한 번 들어볼래 살고 있었어요 있었습니다 말했어요 살고 있었습니다 한 마리 하지만 한 명이 있었어요 그런데 살고 수 살고 있었어요

Epoch 3/10
옛날 옛날 바보가 [UNK] 어느 날 한 살고 있었어요 있었어요 있었습니다 살고 있었습니다 말했어요 하지만 한 번은 있었어요 그래서 말했어요 “충신 하지만 있었어요 그래서 있었어요 그 씨앗들이 한 번 이 그래서 그래서 난로 옆에 수 있게 옛적에 그 적들이 들이 살고 있었어요 닫쳐 하지만 수

Epoch 4/10
옛날 옛날 바보가 [UNK] 둔 아주 한 가난한 살고 있었어요 살고 있었어요 한 분 있었어요 있었어요 하지만 있었어요 그가 있었습니다 어느 날 그 말했어요 “난 아무 살고 있었습니다 어느 고을에 이 한 번 해봐도 한 번은 그 이 그 모자만 한 그가 그가 수 살고

Epoch 5/10
generated text:
옛날 옛날 바보가 [UNK] 어느 옛적에 살고 있었어요 있었습니다 어느 마을에 한 번 마을에 살고 하지만 어느 마을에 어느 마을에 있었습니다 살고 살고 그래서 수 밖에 있었어요 하지만 한 번은 한 명이 그 돈을 어느 마을에 어느 마을에 있었어요 그는 한 그 마을에 그 그

Epoch 6/10
옛날 옛날 바보가 [UNK] 어느 아주 오랜 날 그 어느 날 그 아이는 날 살고 그 돈으로는 어느 마을에 한 번은 그가 어디로 가시는 길이냐고 아주 날 꼬부랑 살고 있었어요 그는 부모님께 깔깔댑니다 말했어요 “네가 날 살고 한 마리가 살고 한 계셨어요 있었지요 있

<keras.callbacks.History at 0x7fcd345a6410>