In [3]:
import numpy as np
import torch
import yfinance as yf
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset, random_split
import torch.nn as nn
import torch.optim as optim
from datetime import datetime, timedelta

ticker = "000001.SS"
stock = yf.Ticker(ticker)
data = stock.history(start="2010-01-01", end="2024-05-10")

# 过滤数据，只包含交易日
data = data[data['Volume'] > 0]
data.drop(columns=['Dividends', 'Stock Splits'], inplace=True)

# 数据归一化
scaler = MinMaxScaler()
features = ['Open', 'High', 'Low', 'Close', 'Volume']
data[features] = scaler.fit_transform(data[features])

# 定义时间序列长度
time_steps =10  # 根据模型需求调整

# 创建序列数据
def create_sequences(data, time_steps):
    X, y = [], []
    for i in range(len(data) - time_steps):
        # 获取原始特征数据
        X_seq = data.iloc[i:(i + time_steps)][features].values
        # 计算新特征：收盘价-开盘价
        diff_close_open = X_seq[:, 3] - X_seq[:, 0]
        # 计算新特征：今日收盘价-昨日收盘价
        diff_close_prev_close = np.diff(X_seq[:, 3], prepend=0)
        # 将新特征与原始特征拼接起来
        X_seq = np.concatenate([X_seq, diff_close_open.reshape(-1, 1), diff_close_prev_close.reshape(-1, 1)], axis=1)
        X.append(X_seq)
        y.append(data.iloc[i + time_steps][features].values)
    return np.array(X), np.array(y)

# 创建训练和测试数据
train_data = data.loc[:'2022-12-31'] # 训练集包括 2023 年之前和 2024 年之后的数据
test_data = data.loc['2023-01-01':]  # 测试集包括 2023 年的数据

X_train, y_train = create_sequences(train_data, time_steps)
X_test, y_test = create_sequences(test_data, time_steps)

# 转换为张量
X_train_tensor = torch.Tensor(X_train)
y_train_tensor = torch.Tensor(y_train)
X_test_tensor = torch.Tensor(X_test)
y_test_tensor = torch.Tensor(y_test)

# DataLoader
train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=64, shuffle=True)
test_loader = DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=64, shuffle=True)

# 定义模型
class LSTMNet(nn.Module):
    def __init__(self, input_size=7, hidden_size=128, num_layers=2, output_size=5):  # 修改 input_size 为 7
        super(LSTMNet, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)  # 修改全连接层输入维度为 hidden_size

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))

        out = self.fc(out[:, -1, :])  # 只选择序列的最后一个时间步作为输出

        return out

# 训练函数
def train_model(model, train_loader, val_loader, epochs=100, patience=10):
    best_val_loss = float('inf')
    epochs_no_improve = 0
    early_stop = False
    for epoch in range(epochs):
        if early_stop:
            print(f"Early stopping at epoch {epoch+1}")
            break
        model.train()
        train_loss = 0
        for seq, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(seq)
            loss = criterion(outputs, labels)
            loss.backward()  # 反向传播
            optimizer.step()  # 梯度下降
            train_loss += loss.item()
        train_loss /= len(train_loader)

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for seq, labels in val_loader:
                outputs = model(seq)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
            val_loss /= len(val_loader)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pth')
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1

        if epochs_no_improve == patience:
            early_stop = True

        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch + 1}, Training Loss: {train_loss:.5f}, Validation Loss: {val_loss:.5f}")

    # 模型评估
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for seq, labels in test_loader:
            outputs = model(seq)
            loss = criterion(outputs, labels)
            test_loss += loss.item()
        test_loss /= len(test_loader)
    print(f"Test Loss: {test_loss:.5f}")

# 定义模型
model = LSTMNet(input_size=7, hidden_size=128, num_layers=2, output_size=5)

# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)  # 使用Adam优化器

# 划分验证集
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
total_size = len(train_dataset)
val_size = int(0.2 * total_size)
train_size = total_size - val_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

# 创建 DataLoader
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=True)

# 训练模型
train_model(model, train_loader, val_loader, epochs=400, patience=10)


Epoch 10, Training Loss: 0.00092, Validation Loss: 0.00070
Epoch 20, Training Loss: 0.00060, Validation Loss: 0.00049
Epoch 30, Training Loss: 0.00053, Validation Loss: 0.00049
Epoch 40, Training Loss: 0.00049, Validation Loss: 0.00049
Epoch 50, Training Loss: 0.00048, Validation Loss: 0.00045
Early stopping at epoch 52
Test Loss: 0.00070


In [24]:
def predict_future(model, start_date, days):
    # 将开始日期转换为模型所需的序列数据
    start_index = data.index[data.index == start_date][0]
    start_index -= time_steps  # 考虑到时间步长

    # 准备模型输入
    input_seq = torch.Tensor(X_test[start_index:start_index+time_steps]).unsqueeze(0)  # 添加批次维度

    # 预测未来days天的数据
    predictions = []
    with torch.no_grad():
        for _ in range(days):
            # 使用模型预测下一个时间步
            output = model(input_seq)
            predictions.append(output.squeeze().tolist())

            # 更新输入序列，移除第一个时间步，添加预测值到最后一个时间步
            input_seq = torch.cat([input_seq[:, 1:, :], output.unsqueeze(0).unsqueeze(0)], dim=1)

    # 将预测值转换为DataFrame，并添加日期索引
    prediction_dates = [start_date + timedelta(days=i) for i in range(1, days+1)]
    predictions_df = pd.DataFrame(predictions, columns=features, index=prediction_dates)

    return predictions_df


In [25]:
# 加载最佳模型权重
model.load_state_dict(torch.load('best_model.pth'))

# 设置预测开始日期和预测天数
start_date_str = "2024-05-15"
days = 14

# 进行预测
predictions = predict_future(model, start_date_str, days)

print(predictions)


IndexError: index 0 is out of bounds for axis 0 with size 0