In [14]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import time  # 用于计时

# 设置随机种子以确保可重复性
torch.manual_seed(42)

def load_data_fashion_mnist(batch_size, resize=None):
     # 定义数据预处理（转换）
    transform_list = [transforms.ToTensor()]
    
    if resize:
        transform_list.insert(0, transforms.Resize(resize))
    
    transform = transforms.Compose(transform_list)

    # 下载并加载训练集和测试集
    train_dataset = datasets.FashionMNIST(root='./data', train=True, download=True, transform=transform)
    test_dataset = datasets.FashionMNIST(root='./data', train=False, download=True, transform=transform)

    # 创建 DataLoader
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=get_dataloader_workers())
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=get_dataloader_workers())

    return train_loader, test_loader


def get_dataloader_workers():  #@save
    return 10



device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
print(f"Using device: {device}")


 

seq_len = 10
X, y = generate_sine_wave(seq_len)
# 移动数据到 GPU
X, y = X.to(device), y.to(device)
print(f"Input shape: {X.shape}, Target shape: {y.shape}")

# 分割训练集和测试集
train_size = int(0.8 * len(X))
X_train, y_train = X[:train_size], y[:train_size]
X_test, y_test = X[train_size:], y[train_size:]

# 3. 定义 RNN 模型
class SimpleRNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(SimpleRNN, self).__init__()
        self.hidden_dim = hidden_dim
        
        # 初始化权重，使用 torch.normal
        self.W_xh = nn.Parameter(torch.normal(mean=0.0, std=0.01, size=(input_dim, hidden_dim)))
        self.W_hh = nn.Parameter(torch.normal(mean=0.0, std=0.01, size=(hidden_dim, hidden_dim)))
        self.b_h = nn.Parameter(torch.zeros(hidden_dim))
        self.W_hy = nn.Parameter(torch.normal(mean=0.0, std=0.01, size=(hidden_dim, output_dim)))
        self.b_y = nn.Parameter(torch.zeros(output_dim))
        
    def forward(self, x):
        batch_size = x.size(0)
        # 初始化隐藏状态并移动到 GPU
        h_t = torch.zeros(batch_size, self.hidden_dim, device=x.device)
        
        # 遍历时间步
        for t in range(x.size(1)):
            x_t = x[:, t, :]  # 当前时间步输入
            # RNN 计算：h_t = tanh(W_xh * x_t + W_hh * h_{t-1} + b_h)
            h_t = torch.tanh(
                torch.matmul(x_t, self.W_xh) + torch.matmul(h_t, self.W_hh) + self.b_h
            )
        
        # 输出层：y = W_hy * h_T + b_y
        y = torch.matmul(h_t, self.W_hy) + self.b_y
        return y

# 4. 实例化模型、损失函数和优化器
input_dim = 1
hidden_dim = 32
output_dim = 1
model = SimpleRNN(input_dim, hidden_dim, output_dim).to(device)  # 移动模型到 GPU
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# 5. 训练模型
num_epochs = 100
model.train()
start_time = time.time()
for epoch in range(num_epochs):
    optimizer.zero_grad()
    X_train_reshaped = X_train.unsqueeze(-1)  # (num_samples, seq_len, 1)
    outputs = model(X_train_reshaped)
    loss = criterion(outputs.squeeze(), y_train)
    loss.backward()
    optimizer.step()
    
    if (epoch + 1) % 20 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.6f}")
print(f"Training time: {time.time() - start_time:.2f} seconds")

# 6. 测试模型
model.eval()
with torch.no_grad():
    X_test_reshaped = X_test.unsqueeze(-1)
    predictions = model(X_test_reshaped).squeeze()
    test_loss = criterion(predictions, y_test)
    print(f"Test Loss: {test_loss.item():.6f}")



Using device: cpu
Input shape: torch.Size([990, 10]), Target shape: torch.Size([990])


  return torch.tensor(sequences, dtype=torch.float32), torch.tensor(targets, dtype=torch.float32)


Epoch [20/100], Loss: 0.043456
Epoch [40/100], Loss: 0.014001
Epoch [60/100], Loss: 0.006185
Epoch [80/100], Loss: 0.003927
Epoch [100/100], Loss: 0.002676
Training time: 0.10 seconds
Test Loss: 0.002982
