In [13]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

# 示例数据（假设每天一个值）
data = np.array([10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130], dtype=np.float32)

# 创建特征-标签对
def create_dataset(data, window_size=4):
    X, y = [], []
    for i in range(len(data)-window_size):
        X.append(data[i:i+window_size])
        y.append(data[i+window_size])
    return np.array(X), np.array(y)

X, y = create_dataset(data)

# 划分训练集和测试集
split = int(0.8 * len(X))
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]

print(f"训练样本: {X_train.shape}, 测试样本: {X_test.shape}")

训练样本: (7, 4), 测试样本: (2, 4)


In [15]:
class TimeSeriesDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.tensor(features)
        self.labels = torch.tensor(labels)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

# 创建数据集和数据加载器
train_dataset = TimeSeriesDataset(X_train, y_train)
test_dataset = TimeSeriesDataset(X_test, y_test)

batch_size = 2
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1)

In [17]:
class DNNRegressor(torch.nn.Module):
    def __init__(self, input_size):
        super().__init__()
        self.network = torch.nn.Sequential(
            torch.nn.Linear(input_size, 128),
            torch.nn.ReLU(),
            torch.nn.Linear(128, 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, 1)
        )
    
    def forward(self, x):
        return self.network(x)

# 初始化模型
model = DNNRegressor(input_size=4)
print(model)

DNNRegressor(
  (network): Sequential(
    (0): Linear(in_features=4, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=64, bias=True)
    (3): ReLU()
    (4): Linear(in_features=64, out_features=1, bias=True)
  )
)


In [19]:
# 设置训练参数
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
epochs = 200

# 训练循环
for epoch in range(epochs):
    model.train()
    for batch_features, batch_labels in train_loader:
        # 前向传播
        outputs = model(batch_features)
        loss = criterion(outputs.squeeze(), batch_labels)
        
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    # 每20个epoch打印损失
    if (epoch+1) % 20 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

Epoch [20/200], Loss: 150.2244
Epoch [40/200], Loss: 4.8982
Epoch [60/200], Loss: 21.9907
Epoch [80/200], Loss: 0.4987
Epoch [100/200], Loss: 2.4936
Epoch [120/200], Loss: 0.6840
Epoch [140/200], Loss: 0.0228
Epoch [160/200], Loss: 0.0001
Epoch [180/200], Loss: 0.0031
Epoch [200/200], Loss: 0.0018


In [20]:
model.eval()
with torch.no_grad():
    test_loss = 0
    for features, labels in test_loader:
        outputs = model(features)
        test_loss += criterion(outputs.squeeze(), labels).item()
    
    print(f'测试集平均损失: {test_loss/len(test_loader):.4f}')

# 示例预测
sample_input = torch.tensor([110, 120, 130, 140], dtype=torch.float32)
prediction = model(sample_input)
print(f'预测结果: {prediction.item():.2f}')

测试集平均损失: 7.6491
预测结果: 157.12
