In [ ]:
from model.LSTM_data_handle import data_for_lstm
from data.get_data import get_common_data
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

In [ ]:


symbol = 'SHSE.510300'

# 2006-2020作为模型的训练集
# 2020-2021作为模型的验证集
# 2021-2024作为模型的测试集
# 比例为  14：1：3
# 虽然这个比例相对较大，但是由于量化交易数据的动态性，历史数据本来就远大于模型预期要应用于的数据

train_start_date = '2006-01-01'
train_end_date = '2020-01-01'
val_start_date = '2020-01-01'
val_end_date = '2021-01-01'
test_start_date = '2021-01-01'
test_end_date = '2024-01-01'

In [ ]:
T = 3
train_data = get_common_data(symbol, train_start_date, train_end_date)

In [ ]:
# 为LSTM模型处理好数据
window_size = 40
X, Y = data_for_lstm(train_data, window_size)

In [ ]:
# 定义模型参数
input_dim = X.shape[2] # X的形状为(样本数量, 窗口大小, 特征数量)，y的形状为(样本数量,)。
hidden_dim = 50
num_layers = 3
output_dim = 1

In [ ]:
# 检查CUDA是否可用，并选择设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
        super(LSTMModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=0.2)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # 初始化隐藏状态和细胞状态
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(device).requires_grad_()
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(device).requires_grad_()
        out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))
        out = self.fc(out[:, -1, :])  # 取LSTM最后一个时间步的输出
        return out

In [ ]:
model = LSTMModel(input_dim, hidden_dim, num_layers, output_dim).to(device)  # 移动模型到GPU

In [ ]:
# 定义损失函数和优化器
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

In [ ]:
# 准备数据
X_tensor = torch.tensor(X, dtype=torch.float32).to(device)  # 移动数据到GPU
Y_tensor = torch.tensor(Y.reshape(-1, 1), dtype=torch.float32).to(device)  # 移动数据到GPU
train_data = TensorDataset(X_tensor[:-T], Y_tensor[:-T])
train_loader = DataLoader(dataset=train_data, batch_size=16, shuffle=True)

In [ ]:
# 训练模型
epochs = 100
model.train()  # 设置模型为训练模式
for epoch in range(epochs):
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        y_pred = model(inputs)
        loss = criterion(y_pred, targets)
        loss.backward()
        optimizer.step()
    if (epoch + 1) % 10 == 0:
        print(f'Epoch {epoch + 1}, Loss: {loss.item()}')


In [ ]:
# 预测
model.eval()  # 设置模型为评估模式
with torch.no_grad():
    X_test_tensor = torch.tensor(X[-T:], dtype=torch.float32).to(device)  # 确保测试数据也在GPU上
    predicted_returns = model(X_test_tensor)
    predicted_returns = predicted_returns.cpu().numpy()  # 将预测结果移回CPU