# LSTM 写作模型
## 引入 & 环境参数

In [None]:
import re
import os
import numpy as np
from IPython.display import clear_output
import tensorflow.keras as keras

load_saved_model = True
train_model = False

SECTION = 'write'
RUN_ID = '0001'
RUN_FOLDER = 'run/{}/'.format(SECTION)
RUN_FOLDER += '_'.join([RUN_ID, 'aesop'])

if not os.path.exists(RUN_FOLDER):
    os.makedirs(RUN_FOLDER)

## 数据的加载和预处理

In [None]:
# Load in the text and perform some cleanup
token_type = 'word'
seq_length = 20

filename = './data/aesop/data.txt'
with open(filename, encoding='utf-8') as f:
    text = f.read()

# Removing text before and after the main stories
start = text.find("THE FOX AND THE GRAPES\n\n\n")
end = text.find("ILLUSTRATIONS\n\n\n[")
text = text[start:end]

start_story = '| ' * seq_length

text = start_story + text;
text = text.lower()
text = text.replace('\n\n\n\n\n', start_story)
text = text.replace('\n', ' ')
text = re.sub('  +', '. ', text).strip()
text = text.replace('..', '.')

text = re.sub('([!"#$%&()*+,-./:;<=>?@[\]^_`{|}~])', r' \1 ', text)
text = re.sub('\s{2,}', ' ', text)

print(len(text))
print(text)

In [None]:
# Tokenization
if token_type == 'word':
    tokenizer = keras.preprocessing.text.Tokenizer(char_level=False, filters='')
else:
    tokenizer = keras.preprocessing.text.Tokenizer(char_level=True, filters='', lower=False)

tokenizer.fit_on_texts([text])
total_words = len(tokenizer.word_index) + 1
token_list = tokenizer.texts_to_sequences([text])[0]

print(total_words)
print(tokenizer.word_index)
print(token_list)

In [None]:
def generate_sequences(token_list, step):
    X = []
    y = []

    for i in range(0, len(token_list) - seq_length, step):
        X.append(token_list[i : i + seq_length])
        y.append(token_list[i + seq_length])
    y = keras.utils.to_categorical(y, num_classes=total_words)

    num_seq = len(X)
    print('Number of sequences: ', num_seq)
    return X, y, num_seq

X, y, num_seq = generate_sequences(token_list, step=1)
X = np.array(X)
y = np.array(y)
print(X.shape)
print(y.shape)

## 搭建模型 or 加载模型

In [None]:
if load_saved_model:
    model = keras.models.load_model(os.path.join(RUN_FOLDER, 'aesop_no_dropout_1000.h5'))
else:
    n_units = 256
    embedding_size = 100

    text_in = keras.Input(shape=(None,))
    embedding = keras.layers.Embedding(total_words, embedding_size)
    x = embedding(text_in)
    x = keras.layers.LSTM(n_units)(x)
    text_out = keras.layers.Dense(total_words, activation='softmax')(x)

    model = keras.Model(text_in, text_out)
    optim = keras.optimizers.RMSprop(learning_rate=0.001)
    model.compile(loss='categorical_crossentropy', optimizer=optim)

model.summary()

## 辅助函数

In [None]:
def sample_with_temp(preds, temperature=1.0):
    # Helper function to sample an index from a probability array
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probs = np.random.multinomial(1, preds, 1)
    return np.argmax(probs)

def generate_text(seed_text, next_word, model, max_sequence_len, temp):
    output_text = seed_text
    seed_text = start_story + seed_text

    for _ in range(next_word):
        token_list = tokenizer.texts_to_sequences([seed_text])[0]
        token_list = token_list[-max_sequence_len:]
        token_list = np.reshape(token_list, (1, max_sequence_len))

        probs = model.predict(token_list, verbose=0)[0]
        y_class = sample_with_temp(probs, temperature=temp)

        if y_class == 0:
            output_word = ''
        else:
            output_word = tokenizer.index_word[y_class]
        
        if output_word == '|':
            break

        if token_type == 'word':
            output_text += output_word + ' '
            seed_text += output_word + ' '
        else:
            output_text += output_word
            seed_text += output_word
    
    return output_text

## 训练模型

In [None]:
def on_epoch_end(epoch, logs):
    seed_text = ''
    gen_words = 500

    print('Temp 0.2')
    print(generate_text(seed_text, gen_words, model, seq_length, temp=0.2))
    print('Temp 0.33')
    print(generate_text(seed_text, gen_words, model, seq_length, temp=0.33))
    print('Temp 0.5')
    print(generate_text(seed_text, gen_words, model, seq_length, temp=0.5))
    print('Temp 1.0')
    print(generate_text(seed_text, gen_words, model, seq_length, temp=1))

if train_model:
    epochs = 1000
    batch_size = 32
    num_batches = int(len(X) / batch_size)
    callback = keras.callbacks.LambdaCallback(on_epoch_end=on_epoch_end)
    model.fit(X, y, epochs=epochs, batch_size=batch_size, callbacks=[callback], shuffle=True)
    model.save(os.path.join(RUN_FOLDER, 'aesop_no_dropout_1000.h5'));

## 验证模型

In [None]:
seed_text = "the ass and his lion . "
gen_words = 500
temp = 0.2

print(generate_text(seed_text, gen_words, model, seq_length, temp))

In [None]:
seed_text = "the great man and the time . "
gen_words = 500
temp = 1.0

print(generate_text(seed_text, gen_words, model, seq_length, temp))

In [None]:
def top10_next_words(seed_text, model):
    print(seed_text)
    seed_text = start_story + seed_text
    token_list = tokenizer.texts_to_sequences([seed_text])[0]
    token_list = token_list[-seq_length:]
    token_list = np.reshape(token_list, (1, seq_length))

    probs = model.predict(token_list, verbose=0)[0]

    top_10_idx = np.flip(np.argsort(probs)[-10:])
    top_10_probs = [probs[x] for x in top_10_idx]
    top_10_words = tokenizer.sequences_to_texts([[x] for x in top_10_idx])

    for prob, word in zip(top_10_probs, top_10_words):
        print('{:<6.1%} : {}'.format(prob, word))

In [None]:
top10_next_words('the fox and the stag . there was a', model)

In [None]:
top10_next_words('the fox and the snake . one day a fox', model)

In [None]:
top10_next_words('the dog and the hare . the dog was lying', model)

In [None]:
top10_next_words('the farmer and his sheep . a farmer was', model)

In [None]:
top10_next_words('the eagle and the sea .', model)

In [None]:
top10_next_words('the lion said ,', model)
top10_next_words('the lion said , and', model)

In [None]:
def generate_human_led_text(model, max_sequence_len):
    output_text = ''
    seed_text = start_story

    while 1:
        token_list = tokenizer.texts_to_sequences([seed_text])[0]
        token_list = token_list[-max_sequence_len:]
        token_list = np.reshape(token_list, (1, max_sequence_len))

        probs = model.predict(token_list, verbose=0)[0]

        top_10_idx = np.flip(np.argsort(probs)[-10:])
        top_10_probs = [probs[x] for x in top_10_idx]
        top_10_words = tokenizer.sequences_to_texts([[x] for x in top_10_idx])

        for prob, word in zip(top_10_probs, top_10_words):
            print('{:<6.1%} : {}'.format(prob, word))

        chosen_word = input()

        if chosen_word == '|':
            break

        seed_text += chosen_word + ' '
        output_text += chosen_word + ' '

        clear_output()

        print(output_text)

In [None]:
generate_human_led_text(model, 20)