In [6]:

import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn import preprocessing
from sklearn.metrics import mean_squared_error, mean_absolute_error
from math import sqrt


ModuleNotFoundError: No module named 'torch'

In [3]:
import torch

# 检查是否能够使用 GPU
print("CUDA Available:", torch.cuda.is_available())

# 检查 GPU 的名称
if torch.cuda.is_available():
    print("GPU Name:", torch.cuda.get_device_name(0))
    print("CUDA Version:", torch.version.cuda)


CUDA Available: True
GPU Name: NVIDIA GeForce RTX 4060 Laptop GPU
CUDA Version: 12.1


In [4]:
window = 5
lstm_units = 16
dropout = 0.01
epoch = 60


In [5]:
df1 = pd.read_csv('data.csv')
df1 = df1.iloc[:, 2:]
df1.tail()

# 数据归一化
min_max_scaler = preprocessing.MinMaxScaler()
df0 = min_max_scaler.fit_transform(df1)
df = pd.DataFrame(df0, columns=df1.columns)
input_size = len(df.iloc[1, :])

# 构建 LSTM 输入
stock = df
seq_len = window
amount_of_features = len(stock.columns)
data = stock.to_numpy()  # pd.DataFrame(stock) 表格转化为矩阵
sequence_length = seq_len + 1
result = []
for index in range(len(data) - sequence_length):
    result.append(data[index: index + sequence_length])
result = np.array(result)
row = round(0.9 * result.shape[0])  # 划分训练集测试集
train = result[:int(row), :]
x_train = train[:, :-1]
y_train = train[:, -1][:, -1]
x_test = result[int(row):, :-1]
y_test = result[int(row):, -1][:, -1]

# reshape 成 6天*3特征
X_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], amount_of_features))
X_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], amount_of_features))


In [6]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, lstm_units, dropout):
        super(LSTMModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=input_size, out_channels=lstm_units, kernel_size=1)
        self.pool = nn.MaxPool1d(pool_size=window)
        self.dropout = nn.Dropout(dropout)
        self.lstm = nn.LSTM(input_size=lstm_units, hidden_size=lstm_units, bidirectional=True, batch_first=True)
        self.attention = nn.Linear(lstm_units * 2, lstm_units * 2)
        self.fc = nn.Linear(lstm_units * 2, 1)

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = self.pool(x)
        x = self.dropout(x)
        x, _ = self.lstm(x)
        attention_weights = torch.sigmoid(self.attention(x))
        x = x * attention_weights
        x = self.fc(x[:, -1, :])  # 获取 LSTM 的最后一个输出
        return x


In [7]:
# 将数据转为 Tensor
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)

# 初始化模型，损失函数，优化器
model = LSTMModel(input_size=amount_of_features, lstm_units=lstm_units, dropout=dropout)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

# 训练模型
for epoch in range(epoch):
    model.train()
    optimizer.zero_grad()

    # 前向传播
    outputs = model(X_train_tensor)

    # 计算损失
    loss = criterion(outputs, y_train_tensor)

    # 反向传播
    loss.backward()

    # 更新参数
    optimizer.step()

    # 打印每个 epoch 的损失
    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{epoch}], Loss: {loss.item():.4f}')


TypeError: _MaxPoolNd.__init__() got an unexpected keyword argument 'pool_size'

In [None]:
plt.plot(range(epoch), loss_values, label="Train Loss")
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()


In [None]:
model.eval()
with torch.no_grad():
    y_train_predict = model(X_train_tensor).numpy()
    y_test_predict = model(X_test_tensor).numpy()

print('训练集上的MAE/MSE/MAPE/涨跌准确率')
print(mean_absolute_error(y_train_predict, y_train))
print(mean_squared_error(y_train_predict, y_train))
print(mape(y_train_predict, y_train))
print(up_down_accuracy(y_train_predict, y_train))

print('测试集上的MAE/MSE/MAPE/涨跌准确率')
print(mean_absolute_error(y_test_predict, y_test))
print(mean_squared_error(y_test_predict, y_test))
print(mape(y_test_predict, y_test))
print(up_down_accuracy(y_test_predict, y_test))
