In [8]:
%matplotlib inline
import math
import torch
import torch.nn.functional as F
from torch import nn
import numpy as np
import os
import re
from collections import Counter
import numpy as np
from visualization import TrainingVisualizer

In [6]:
# 检查GPU是否可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


# 处理文本

## 1.直接读

In [None]:
# 加载文本数据
with open('data/time_machine_txt/timemachine.txt', 'r') as f:
    text = f.read()

# 创建字符映射表
chars = sorted(list(set(text))) # 字母级别的token
char_to_idx = {ch: idx for idx, ch in enumerate(chars)}
idx_to_char = {idx: ch for idx, ch in enumerate(chars)}

# 转换文本为索引
text_as_int = np.array([char_to_idx[c] for c in text])

## 2.考虑大小写和阈值

In [None]:
# 设置阈值
threshold = 5

# 加载文本数据
with open('data/time_machine_txt/timemachine.txt', 'r') as f:
    text = f.read()

# 将所有字符转换为小写
text = text.lower()

# 计算字符的词频
char_counts = Counter(text)

# 创建字符映射表，保留词频不低于阈值的字符，其他字符设为<unk>
chars = sorted([ch for ch, count in char_counts.items() if count >= threshold])
chars.append('<unk>')  # 添加<unk>标记

char_to_idx = {ch: idx for idx, ch in enumerate(chars)}
idx_to_char = {idx: ch for idx, ch in enumerate(chars)}
unk_idx = char_to_idx['<unk>']

# 转换文本为索引，如果字符词频低于阈值，则转换为<unk>
text_as_int = np.array([char_to_idx.get(c, unk_idx) for c in text])

# 打印结果示例
print("Unique characters:", len(chars))
print("Character to Index mapping:", char_to_idx)
print("First 100 characters as indices:", text_as_int[:10])

## 3.考虑大小写和阈值，不考虑标点符号

In [63]:
# 设置阈值
threshold = 5

# 加载文本数据
with open('data/time_machine_txt/timemachine.txt', 'r') as f:
    text = f.read()

# 将所有字符转换为小写
text = text.lower()

# 移除所有标点符号
text = re.sub(r'[^\w\s]', '', text)  # 仅保留字母、数字和空格

# 计算字符的词频
char_counts = Counter(text)

# 创建字符映射表，保留词频不低于阈值的字符，其他字符设为<unk>
chars = sorted([ch for ch, count in char_counts.items() if count >= threshold])
chars.append('<unk>')  # 添加<unk>标记

char_to_idx = {ch: idx for idx, ch in enumerate(chars)}
idx_to_char = {idx: ch for idx, ch in enumerate(chars)}
unk_idx = char_to_idx['<unk>']

# 转换文本为索引，如果字符词频低于阈值，则转换为<unk>
text_as_int = np.array([char_to_idx.get(c, unk_idx) for c in text])

# 打印结果示例
print("Unique characters:", len(chars))
print("Character to Index mapping:", char_to_idx)
print("First 100 characters as indices:", text_as_int[:10])

Unique characters: 30
Character to Index mapping: {'\n': 0, ' ': 1, '_': 2, 'a': 3, 'b': 4, 'c': 5, 'd': 6, 'e': 7, 'f': 8, 'g': 9, 'h': 10, 'i': 11, 'j': 12, 'k': 13, 'l': 14, 'm': 15, 'n': 16, 'o': 17, 'p': 18, 'q': 19, 'r': 20, 's': 21, 't': 22, 'u': 23, 'v': 24, 'w': 25, 'x': 26, 'y': 27, 'z': 28, '<unk>': 29}
First 100 characters as indices: [22 10  7  1 22 11 15  7  1 15]


# 设置统一的参数

In [10]:
# 定义超参数
vocab_size = len(chars)  # 字符的个数
seq_size = 50  # 序列长度 - 一个句子100个单词
batch_size = 128
hidden_size = 256
embedding_size = 64  # input_size 嵌入向量的大小
num_layers = 1
learning_rate = 0.001
num_epochs = 10

# 转化成适合训练的张量

In [12]:
def create_dataset(text_as_int, seq_size, batch_size):
    sequences = []
    targets = []
    for i in range(0, len(text_as_int) - seq_size):
        sequences.append(text_as_int[i:i + seq_size])
        targets.append(text_as_int[i + 1:i + seq_size + 1])
    sequences = torch.tensor(sequences, dtype=torch.long)
    targets = torch.tensor(targets, dtype=torch.long)
    dataset = torch.utils.data.TensorDataset(sequences, targets)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
    return dataloader

dataloader = create_dataset(text_as_int, seq_size, batch_size)

In [14]:
print(len(dataloader))
# test - 查看 DataLoader 中的具体元素
for batch_idx, (data, labels) in enumerate(dataloader):
    if batch_idx >= 100:
        print(f"Batch {batch_idx + 1}:")
        print("Data:")
        print(data.shape)
        print("Labels:")
        print(labels.shape)
        print()
        break

1355
Batch 101:
Data:
torch.Size([128, 50])
Labels:
torch.Size([128, 50])



# 从矩阵相乘开始重构RNN

In [16]:
def initialize_Wb(embedding_size, hidden_size, vocab_size, device):
    """
    hidden_size：神经元个数
    注意：虽然一般的X为(batch_size, seq_size, vocab_size)，但是由于RNN的特性，每次循环是对于一个时间步，所以X的size一般需要转化成(seq_size, batch_size, vocab_size)，
    那么每次循环我们需要处理的输入的size就是：(batch_size, vocab_size)；此外，由于我们需要经过一个嵌入层，所以我们实际需要处理的size为：(batch_size, embedding_size)
    """
    # 隐藏层参数
    # X's size: (batch_size, seq_size, vocab_size) -> (seq_size, batch_size, vocab_size) -> (seq_size, batch_size, embedding_size)
    W_xh =  torch.normal(0, 1, (embedding_size, hidden_size), device = device) * 0.01
    # H = torch.normal(0, 1, (batch_size, hidden_size))
    W_hh = torch.normal(0, 1, (hidden_size, hidden_size), device = device) * 0.01
    b_h = torch.zeros((1, hidden_size), device = device) # 注意这里的偏置项是加在神经元上，不是batch_size*hidden_size

    # 输出层参数
    W_hq = torch.normal(0, 1, (hidden_size, vocab_size), device = device) * 0.01
    b_q = torch.zeros((1, vocab_size), device = device)
    # 然后经过softmax，得到每个vocab的输出概率？？？

    # 附加梯度
    params = [W_xh, W_hh, b_h, W_hq, b_q]
    for param in params:
        param.requires_grad_(True)
    
    return params

In [17]:
# 定义一个函数初始化隐变量H
def initialize_H(batch_size, hidden_size, device):
    H = torch.zeros((batch_size, hidden_size), device = device)
    return H

In [21]:
# 定义矩阵计算
def RNN_calculate(inputs, params, H, device):
    outputs = []
    # inputs的形状：(seq_size，batch_size, embedding_size)
    W_xh, W_hh, b_h, W_hq, b_q = params
    for X in inputs: # X的形状：(batch_size, embedding_size)
        H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
        O = torch.mm(H, W_hq) + b_q
        outputs.append(O) # outputs是每一个时间步的输出 - [tensor(batch_size, vocab_size), tensor(batch_size, vocab_size), ......]
        
    outputs = torch.stack(outputs)
    return outputs, H # outputs 的 size：(seq_size, batch_size, vocab_size)；H 的 size：(batch_size, hidden_size)

In [22]:
# test
inputs = torch.normal(0, 1, (seq_size, batch_size, embedding_size), device = device)

params = initialize_Wb(embedding_size, hidden_size, vocab_size, device)
H = initialize_H(batch_size, hidden_size, device)

outputs, H = RNN_calculate(inputs, params, H, device)

outputs.shape, H.shape

(torch.Size([50, 128, 30]), torch.Size([128, 256]))

# 定义RNNModel

In [95]:
class RNNModel(nn.Module):
    def __init__(self, embedding_size, hidden_size, vocab_size, initialize_Wb, initialize_H, RNN_calculate, device):
        super(RNNModel, self).__init__()
        self.device = device
        self.embedding = nn.Embedding(vocab_size, embedding_size)
        self.params = initialize_Wb(embedding_size, hidden_size, vocab_size, device)
        self.initialize_H = initialize_H
        self.RNN_calculate = RNN_calculate

    def forward(self, X, H): 
        # 输入的X的形状为：(batch_size, seq_size)；H的形状为：(batch_size, hidden_size)
        X = self.embedding(X) # 此时X的形状变为：(batch_size, seq_size, embedding_size)
        X = X.permute(1, 0, 2).to(self.device) # 此时X的形状变为：(seq_size, batch_size, embedding_size)
        Y, H = self.RNN_calculate(X, self.params, H, self.device)
        return Y, H # 返回Y的size：(seq_size, batch_size, vocab_size)

    def begin_H(self, batch_size, hidden_size, device):
        return self.initialize_H(batch_size, hidden_size, device)

In [96]:
# test
X = torch.randint(0, vocab_size, (batch_size, seq_size))

net = RNNModel(embedding_size, hidden_size, vocab_size, initialize_Wb, initialize_H, RNN_calculate, device)

H =  net.begin_H(batch_size, hidden_size, device)

Y, H = net(X, H)

Y.shape, H.shape

(torch.Size([50, 128, 30]), torch.Size([128, 256]))

# 预测

In [82]:
print(char_to_idx, "\n", idx_to_char)

{'\n': 0, ' ': 1, '_': 2, 'a': 3, 'b': 4, 'c': 5, 'd': 6, 'e': 7, 'f': 8, 'g': 9, 'h': 10, 'i': 11, 'j': 12, 'k': 13, 'l': 14, 'm': 15, 'n': 16, 'o': 17, 'p': 18, 'q': 19, 'r': 20, 's': 21, 't': 22, 'u': 23, 'v': 24, 'w': 25, 'x': 26, 'y': 27, 'z': 28, '<unk>': 29} 
 {0: '\n', 1: ' ', 2: '_', 3: 'a', 4: 'b', 5: 'c', 6: 'd', 7: 'e', 8: 'f', 9: 'g', 10: 'h', 11: 'i', 12: 'j', 13: 'k', 14: 'l', 15: 'm', 16: 'n', 17: 'o', 18: 'p', 19: 'q', 20: 'r', 21: 's', 22: 't', 23: 'u', 24: 'v', 25: 'w', 26: 'x', 27: 'y', 28: 'z', 29: '<unk>'}


In [97]:
def text_prediction(prefix, num_preds, net, device):
    """在prefix后面生成新字符"""
    H =  net.begin_H(1, hidden_size, device) # def begin_H(self, batch_size, hidden_size, device):
    outputs = [char_to_idx[prefix[0]]] # 初始化为第一个字符的数字表示
    
    # 定义一个匿名函数（lambda 函数）。从变量 outputs 的最后一个元素创建一个新的张量，并将其形状调整为 (1, 1)。
    get_input = lambda: torch.tensor([outputs[-1]]).reshape((1, 1))
    
    for char in prefix[1:]:  # 预热期 - 逐渐把outputs中的元素添加进model，更新H - 看看H的计算公式，理解是如何更新H的
        _, H = net(get_input(), H)
        outputs.append(char_to_idx[char])
    # 到这里，outputs为prefix每个字母转化为它的数字表示的一维list
     
    for _ in range(num_preds):  # 预测num_preds步
        Y, H = net(get_input(), H) # # 返回Y的size：(seq_size, batch_size, vocab_size)
        # print(Y.argmax(dim=2).reshape(1))
        outputs.append(int(Y.argmax(dim=2).reshape(1)))
    return ''.join([idx_to_char[i] for i in outputs])

In [98]:
# test
text_prediction('time traveller ', 10, net, device)

'time traveller <unk>gba\nnh<unk>e<unk>'

# 梯度裁剪

In [99]:
def grad_clipping(net, theta):
    """裁剪梯度"""
    if isinstance(net, nn.Module):
        params = [p for p in net.parameters() if p.requires_grad]
    else:
        params = net.params
    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
    if norm > theta:
        for param in params:
            param.grad[:] *= theta / norm # 整体进行一个缩放

# 训练

In [100]:
# test - 查看 DataLoader 中的具体元素
for batch_idx, (data, labels) in enumerate(dataloader):
    print(f"Batch {batch_idx + 1}:")
    print("Data:")
    print(data.shape)
    print("Labels:")
    print(labels.shape)
    print()
    break

Batch 1:
Data:
torch.Size([128, 50])
Labels:
torch.Size([128, 50])



In [None]:
def RNN_train_epoch(net, train_iter, loss, updater, device, use_random_iter):
    """训练网络一个迭代周期"""
    H = None
    metric = Accumulator(2)  # 训练损失之和,词元数量 - 作用暂时未知
    for X, Y in train_iter:
        # print(X.shape, Y.shape) # torch.Size([128, 50]) torch.Size([128, 50])
        # print(type(H)) # - <class 'NoneType'> -> <class 'torch.Tensor'>

        if H is None or use_random_iter or H.size(0) != X.size(0):
            H = net.begin_H(X.shape[0], hidden_size, device)
        # if H is None or use_random_iter:
        #     # 在第一次迭代或使用随机抽样时初始化state
        #     H = net.begin_H(X.shape[0], hidden_size, device) # batch_size 等于 X 的第一个维度（从一开始）
        else: # 如果不是第一次迭代，并且没有使用随机抽样，需要维护 state 的梯度信息
            if isinstance(net, nn.Module) and not isinstance(H, tuple):  
                # state对于nn.GRU是个张量 - 如果 net 是 nn.Module 的实例，并且 state 不是元组（即 state 是一个张量，通常用于 nn.GRU），则直接对 state 调用 detach_()
                H.detach_()
            else: 
                # state对于nn.LSTM或对于我们从零开始实现的模型是个张量
                # 如果 state 是一个元组（通常用于 nn.LSTM 或自定义的 RNN 实现），则需要对元组中的每个张量调用 detach_()。
                for h in H:
                    h.detach_()

        y = Y.T # Y的size从(batch_size, seq_size) -> (seq_size, batch_size)
        X, y = X.to(device), y.to(device)
        y_hat, H = net(X, H)
        # 为了计算交叉熵损失，我们需要调整y_hat和y的size，这也是课本代码中使用reshape的原因
        # print(y_hat.shape, y.shape)
        y_hat = y_hat.reshape(-1, y_hat.shape[2])
        y = y.reshape(-1) # 把y变为一维的张量
        # print(y_hat.shape, y.shape)
        
        l = loss(y_hat, y.long()).mean()
        # print(l)
        
        if isinstance(updater, torch.optim.Optimizer):
            updater.zero_grad()
            l.backward()
            grad_clipping(net, 1)
            updater.step()
        else:
            l.backward()
            grad_clipping(net, 1)
            # 因为已经调用了mean函数
            updater(batch_size=1)
        metric.add(l * y.numel(), y.numel())
        
    return math.exp(metric[0] / metric[1])

In [None]:
def sgd(params, lr, batch_size):  
    """小批量随机梯度下降"""
    with torch.no_grad():
        for param in params:
            param -= lr * param.grad / batch_size
            param.grad.zero_()

def RNN_train(net, train_iter, char_to_idx, lr, num_epochs, device, use_random_iter=False):
    """训练模型"""
    loss = nn.CrossEntropyLoss()
    visualizer = TrainingVisualizer(xlabel='Epoch', ylabel='Value', title='Train RNN_scratch in Time_Machine', legend=['ppl'])  

    # 初始化 - 为啥要这样处理，分为是不是nn.Module? - 不是nn.Module当然没法用nn库里的梯度下降方法 maybe
    if isinstance(net, nn.Module):
        updater = torch.optim.SGD(net.parameters(), lr)
    else:
        updater = lambda batch_size: sgd(net.params, lr, batch_size)
        
    predict = lambda prefix: text_prediction(prefix, 50, net, char_to_idx, device) # 匿名函数，输入prefix预测50个token
    
    # 训练和预测
    for epoch in range(num_epochs):
        ppl = RNN_train_epoch(net, train_iter, loss, updater, device, use_random_iter)
        
        # if (epoch + 1) % 10 == 0:
        #     print(predict('time traveller'))
        #     # 更新可视化
        #     visualizer.add(epoch, [ppl])
        visualizer.add(epoch, [ppl])
             
    # print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}')
    print(predict('time traveller'))
    print(predict('traveller'))

In [None]:
num_epochs, lr = 500, 1
RNN_train(net, dataloader, char_to_idx, lr, num_epochs, device)

# 出现了最后一个batch的大小小于设置的batch_size

将 `__call__` 方法改成 `forward` 方法有多个好处，特别是在使用 PyTorch 框架进行深度学习模型开发时：

### PyTorch 的设计约定

1. **符合 PyTorch 的约定**：
   - 在 PyTorch 中，所有继承自 `nn.Module` 的类都应定义 `forward` 方法，而不是 `__call__` 方法。
   - 当调用模型实例时，例如 `output = model(input)`，PyTorch 会自动调用 `forward` 方法。这是 PyTorch 的设计约定，确保代码的可读性和一致性。

2. **支持内置功能**：
   - 使用 `forward` 方法可以使模型更好地与 PyTorch 的许多内置功能和模块集成，如 `torch.jit.trace`、`torch.nn.DataParallel` 等。
   - 这些功能通常会依赖于 `forward` 方法的存在，而不是 `__call__` 方法。

### 代码示例

以下是你的修改后的 `RNNModel` 类，使用 `forward` 方法：

```python
class RNNModel(nn.Module):
    def __init__(self, vocab_size, hidden_size, seq_size, embedding_size, device, 
                 initialize_Wb, initialize_H, RNN_calculate):
        super(RNNModel, self).__init__()  # 调用 nn.Module 的构造函数
        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        self.seq_size = seq_size
        self.embedding_size = embedding_size
        self.device = device
        
        self.params = initialize_Wb(vocab_size, hidden_size, embedding_size, device)
        self.initial_H = initialize_H
        self.RNN_calculate = RNN_calculate

        self.embedding = nn.Embedding(vocab_size, embedding_size).to(device)

    def forward(self, X, H):  # 将 __call__ 方法改为 forward
        X = X.to(self.device)  # 确保输入在正确的设备上
        X = self.embedding(X).type(torch.float32)
        Y, H = self.RNN_calculate(X, self.params, H, self.device)
        return Y, H

    def begin_H(self, seq_size, hidden_size, device):
        return self.initial_H(seq_size, hidden_size, device)
```

### 调用模型

当定义了 `forward` 方法后，可以像调用函数一样调用模型实例，PyTorch 会自动调用 `forward` 方法：

```python
# 初始化模型
model = RNNModel(vocab_size, hidden_size, seq_size, embedding_size, device, 
                 initialize_Wb, initialize_H, RNN_calculate)

# 示例输入
X = torch.randint(0, vocab_size, (batch_size, seq_size), dtype=torch.long).to(device)
H = model.begin_H(batch_size, hidden_size, device)

# 调用模型
output, H = model(X, H)
```

### 主要好处总结

1. **代码一致性和可读性**：
   - 遵循 PyTorch 的设计约定，保持代码风格一致，使其他开发者更容易理解和维护代码。

2. **支持 PyTorch 的高级功能**：
   - 使用 `forward` 方法可以与 PyTorch 的许多高级功能和模块更好地集成，如模型的序列化、并行计算和优化等。

3. **简化调用**：
   - 通过定义 `forward` 方法，可以直接调用模型实例进行前向传播，而不需要额外定义 `__call__` 方法。

### 总结

使用 `forward` 方法是 PyTorch 中定义模型前向传播的标准方式。它不仅符合 PyTorch

    # def __call__(self, X, H): # state 就是 H
    #     # print("进入嵌入层之前的X的维度为", X.shape)
    #     X = self.embedding(X).type(torch.float32)
    #     # print("经过嵌入层之后的X的维度为", X.shape)
    #     return self.RNN_calculate(X, self.params, H, self.device) 的设计约定，提高代码的一致性和可读性，还能更好地支持 PyTorch 的许多内置功能和工具。通过遵循这些约定，可以使你的代码更加规范和易于维护。