## 创建训练数据集

In [1]:
import keras
shakespeare_url = "https://homl.info/shakespeare"
filepath = keras.utils.get_file("shakespeare.txt",shakespeare_url)
with open(filepath) as f:
  shakespeare_text = f.read()

Downloading data from https://homl.info/shakespeare


In [2]:
!pip install keras-preprocessing --upgrade

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting keras-preprocessing
  Downloading Keras_Preprocessing-1.1.2-py2.py3-none-any.whl (42 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.6/42.6 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: keras-preprocessing
Successfully installed keras-preprocessing-1.1.2


In [3]:

from keras_preprocessing import text

tokenizer = text.Tokenizer(char_level=True)
tokenizer.fit_on_texts([shakespeare_text])
sequences = tokenizer.texts_to_sequences(["First"])




In [4]:
sequences

[[20, 6, 9, 8, 3]]

In [5]:
tokenizer.sequences_to_texts([[20, 6, 9, 8, 3]])

['f i r s t']

In [6]:
max_id = len(tokenizer.word_index)
max_id

39

In [7]:
dataset_size = tokenizer.document_count
dataset_size

1

In [8]:
import numpy as np
[encoded] = np.array(tokenizer.texts_to_sequences([shakespeare_text])) -1
encoded

array([19,  5,  8, ..., 20, 26, 10])

## 如何拆分顺序数据集

In [9]:
train_size = dataset_size * 90 // 100

In [10]:
import tensorflow as tf
dataset = tf.data.Dataset.from_tensor_slices(encoded[:train_size])

## 将顺序数据集切成多个窗口

In [11]:
n_steps = 100
window_length = n_steps + 1
dataset  = dataset.window(window_length,shift=1,drop_remainder=True)

## 展平数据集

In [12]:
dataset = dataset.flat_map(lambda window: window.batch(window_length))

In [13]:
batch_size = 32
dataset = dataset.shuffle(10000).batch(batch_size)
dataset = dataset.map(lambda windows:(windows[:,:-1],windows[:,1:]))

In [14]:
dataset = dataset.map(
    lambda X_batch,Y_batch:(tf.one_hot(X_batch,depth=max_id),Y_batch)
)
dataset = dataset.prefetch(1)

## 创建和训练Char-RNN模型

In [15]:
model = keras.models.Sequential([
    keras.layers.GRU(128, return_sequences=True, input_shape=[None, max_id],
                     dropout=0.2, recurrent_dropout=0.2),
    keras.layers.GRU(128, return_sequences=True, dropout=0.2,
                     recurrent_dropout=0.2),
    keras.layers.TimeDistributed(keras.layers.Dense(max_id,
                                                    activation="softmax"))
])



In [16]:
# model.compile(loss="sprase_categorical_corssentropy",optimizer="adam")
# history = model.fit(dataset,epochs=10)

In [19]:
import tensorflow as tf

# tf.config.run_functions_eagerly(True) # 启用 run_eagerly 模式

model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", run_eagerly=True)
history = model.fit(dataset, epochs=10)

Epoch 1/10


ValueError: ignored

In [18]:
dataset

<_PrefetchDataset element_spec=(TensorSpec(shape=(None, None, 39), dtype=tf.float32, name=None), TensorSpec(shape=(None, None), dtype=tf.int64, name=None))>

In [4]:
import tensorflow as tf
import numpy as np
import os


# 加载和处理数据
def load_data():
    import keras
    shakespeare_url = "https://homl.info/shakespeare"
    filepath = keras.utils.get_file("shakespeare.txt",shakespeare_url)
    with open(filepath) as f:
        text = f.read()
        # 转为小写
        text = text.lower()
        # 去除标点符号
        text = text.translate(str.maketrans('', '', '!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'))
    # 构建字符表
    chars = sorted(set(text))
    char_to_index = {c: i for i, c in enumerate(chars)}
    index_to_char = {i: c for i, c in enumerate(chars)}
    # 将文本转换为数字
    encoded_text = [char_to_index[c] for c in text]
    return encoded_text, char_to_index, index_to_char


# 生成批次数据
def get_batch(encoded_text, batch_size, seq_length):
    # 计算每个批次中字符的数量
    n = batch_size * seq_length
    # 将编码后的文本转为矩阵
    batches = np.array(encoded_text[:len(encoded_text) // n * n], dtype=np.int32)
    # 设定输入和目标矩阵
    x = batches.reshape([batch_size, -1])
    y = np.roll(batches, -1).reshape([batch_size, -1])
    # 分割每个批次的矩阵
    for i in range(0, x.shape[1], seq_length):
        yield x[:, i: i + seq_length], y[:, i: i + seq_length]


# 定义Char-RNN模型
class CharRNN(tf.keras.Model):
    def __init__(self, num_chars, embedding_size, hidden_size):
        super().__init__()
        self.embedding = tf.keras.layers.Embedding(num_chars, embedding_size)
        self.lstm = tf.keras.layers.LSTM(hidden_size, return_sequences=True, return_state=True)
        self.dense = tf.keras.layers.Dense(num_chars)

    def call(self, inputs, states=None, training=False):
        # 输入经过嵌入层
        x = self.embedding(inputs)
        # LSTM单元处理输入和状态
        output, state_h, state_c = self.lstm(x, initial_state=states)
        # 全连接层输出各字符的概率分布
        logits = self.dense(output)
        return logits, state_h, state_c


# 训练模型
def train():
    # 加载和处理数据
    encoded_text, char_to_index, index_to_char = load_data()
    num_chars = len(char_to_index)
    # 定义训练超参数
    batch_size = 32
    seq_length = 100
    learning_rate = 0.01
    num_epochs = 5
    # 创建模型
    model = CharRNN(num_chars, 128, 256)
    # 定义损失函数和优化器
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    optimizer = tf.keras.optimizers.Adam(learning_rate)
    # 定义检查点对象
    checkpoint_directory = './training_checkpoints'
    checkpoint_prefix = os.path.join(checkpoint_directory, 'ckpt_{epoch}')
    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix, save_weights_only=True)

    # 迭代训练模型
    for epoch in range(num_epochs):
        states = None
        for batch_x, batch_y in get_batch(encoded_text, batch_size, seq_length):
            with tf.GradientTape() as tape:
                # 计算模型输出和损失
                logits, state_h, state_c = model(batch_x, states=states, training=True)
                loss = loss_fn(batch_y, logits)
            # 计算梯度并更新模型参数
            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))
            states = (state_h, state_c)
        # 保存检查点
        model.save_weights(checkpoint_prefix.format(epoch=epoch))
        # 输出每个轮次的损失值
        print('Epoch {}, Loss {:.4f}'.format(epoch + 1, loss))

    return model, index_to_char


def predict(model, index_to_char, char_to_index, seed_text, length):
    """
    使用训练好的 Char-RNN 模型生成新文本

    参数:
    - model: 训练好的 Char-RNN 模型
    - index_to_char: 将数字映射回字符的字典
    - char_to_index: 将字符映射到数字的字典
    - seed_text: 用于生成新文本的种子文本
    - length: 生成文本的长度
    """

    # 将种子文本转换为数字
    x = [char_to_index[c] for c in seed_text]
    x = tf.expand_dims(x, 0)

    # 初始化状态
    states = None

    # 生成新文本
    for i in range(length):
        # 在模型上执行一步推理，并获取预测结果和新状态
        logits, state_h, state_c = model(x, states=states, training=False)

        # 将 logits 转换为概率分布，并从中采样
        logits = tf.squeeze(logits, 0)
        prob = tf.nn.softmax(logits / 0.5).numpy()
        index = np.random.choice(len(index_to_char), p=prob)

        # 添加新字符
        x = tf.expand_dims([index], 0)

        # 更新状态
        states = (state_h, state_c)

        # 输出新字符
        print(index_to_char[index], end='')


# 载入数据
encoded_text, char_to_index, index_to_char = load_data()

# 训练模型
model, _ = train()

predict(model, index_to_char, char_to_index, "shall i compare thee to a summer's day?\n", 500)



Epoch 1, Loss 1.5997
Epoch 2, Loss 1.4801
Epoch 3, Loss 1.4317
Epoch 4, Loss 1.4129
Epoch 5, Loss 1.4001


KeyError: ignored

In [None]:
def preprocess(texts):
  

In [None]:
model = keras.models.Sequential([
    keras.layers.GRU(128, return_sequences=True, input_shape=[None, max_id],
                     dropout=0.2, recurrent_dropout=0.2),
    keras.layers.GRU(128, return_sequences=True, dropout=0.2,
                     recurrent_dropout=0.2),
    keras.layers.TimeDistributed(keras.layers.Dense(max_id,
                                                    activation="softmax"))
])

model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", run_eagerly=True)
history = model.fit(dataset, epochs=10)