### 1.1 lstm+全连接层

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset


# 假设数据存储在一个名为 data.csv 的文件中，文件包含 30 个 x 列和 1 个 y 列
data = pd.read_csv('data.csv')
x = data.iloc[:, :-1].values
y = data.iloc[:, -1].values


# 数据标准化
scaler_x = MinMaxScaler()
scaler_y = MinMaxScaler()
x_scaled = scaler_x.fit_transform(x)
y_scaled = scaler_y.fit_transform(y.reshape(-1, 1))


def create_sequences(input_data, target_data, seq_length):
    """
    创建时间序列数据窗口
    :param input_data: 输入数据
    :param target_data: 目标数据
    :param seq_length: 时间序列长度
    :return: 序列输入和序列目标
    """
    sequences_x = []
    sequences_y = []
    for i in range(len(input_data) - seq_length):
        sequences_x.append(input_data[i:i + seq_length])
        sequences_y.append(target_data[i + seq_length])
    return np.array(sequences_x), np.array(sequences_y)


# 时间网络窗口大小
time_windows = [10, 15, 20]


# 存储不同窗口大小下的结果
results = {}


for t in time_windows:
    sequences_x, sequences_y = create_sequences(x_scaled, y_scaled, t)


    # 转换为 PyTorch 张量
    sequences_x = torch.FloatTensor(sequences_x)
    sequences_y = torch.FloatTensor(sequences_y)


    # 数据集划分
    train_size = int(0.7 * len(sequences_x))
    val_size = int(0.2 * len(sequences_x))
    test_size = len(sequences_x) - train_size - val_size


    train_x = sequences_x[:train_size]
    train_y = sequences_y[:train_size]
    val_x = sequences_x[train_size:train_size + val_size]
    val_y = sequences_y[train_size:train_size + val_size]
    test_x = sequences_x[train_size + val_size:]
    test_y = sequences_y[train_size + val_size:]


    train_dataset = TensorDataset(train_x, train_y)
    val_dataset = TensorDataset(val_x, val_y)
    test_dataset = TensorDataset(test_x, test_y)


    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32)
    test_loader = DataLoader(test_dataset, batch_size=32)


    # LSTM 模型定义
    class LSTMModel(nn.Module):
        def __init__(self, input_size, hidden_size, num_layers, output_size):
            super(LSTMModel, self).__init__()
            self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
            self.fc = nn.Linear(hidden_size, output_size)


        def forward(self, x):
            out, _ = self.lstm(x)
            out = self.fc(out[:, -1, :])
            return out


    # 超参数
    input_size = x.shape[1]
    hidden_size = 50
    num_layers = 2
    output_size = 1
    num_epochs = 100
    learning_rate = 0.001


    model = LSTMModel(input_size, hidden_size, num_layers, output_size)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)


    # 训练模型
    for epoch in range(num_epochs):
        model.train()
        for inputs, targets in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()


        # 在验证集上评估
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for inputs, targets in val_loader:
                outputs = model(inputs)
                val_loss += criterion(outputs, targets).item()
        val_loss /= len(val_loader)
        print(f'Time window {t}, Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss}')


    # 在测试集上评估
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for inputs, targets in test_loader:
            outputs = model(inputs)
            test_loss += criterion(outputs, targets).item()
    test_loss /= len(test_loader)
    print(f'Time window {t}, Test Loss: {test_loss}')


    results[t] = {'model': model, 'test_loss': test_loss}


# 保存最佳模型（假设使用最小的测试损失作为最佳模型）
best_t = min(results, key=lambda x: results[x]['test_loss'])
best_model = results[best_t]['model']
torch.save(best_model.state_dict(), 'best_lstm_model.pth')


# 加载模型进行预测
def predict(model, x):
    model.eval()
    with torch.no_grad():
        output = model(x)
        return output


# 示例预测
sample_x = torch.FloatTensor(sequences_x[0].unsqueeze(0))
prediction = predict(best_model, sample_x)
prediction = scaler_y.inverse_transform(prediction.numpy())
print(f'Prediction: {prediction}')

### 1.2 lstm神经网络模型+全连接层（单时间窗口、单输入文件）

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset

# 使用header参数指定第2行作为列名（索引），跳过第1行，读取从第2行开始的数据
file='E:/2412毕业论文/0104数据特征/wind250109/0109沪深300股票1/处理2/处理/000001.SZ_平安银行_all2.xlsx'

# 假设我们想要跳过名为'Column2'和'Column4'的列
skip_column_names = ['所属申万行业名称(2014)_[行业级别]一级行业', '股票收益率1']

# 获取所有列名，除了要跳过的那些
all_columns = pd.read_excel(file, nrows=0).columns
columns_to_use = [col for col in all_columns if col not in skip_column_names]

# 读取Excel文件，只使用未被跳过的列
data = pd.read_excel(file, usecols=columns_to_use)
data = data.iloc[7:,1:]
data.head()

x = data.iloc[:, 3:11].values
y = data.iloc[:, -5].values

# 数据标准化
scaler_x = MinMaxScaler()
scaler_y = MinMaxScaler()
x_scaled = scaler_x.fit_transform(x)
y_scaled = scaler_y.fit_transform(y.reshape(-1, 1))

def create_sequences(input_data, target_data, seq_length):
    """
    创建时间序列数据窗口
    :param input_data: 输入数据
    :param target_data: 目标数据
    :param seq_length: 时间序列长度
    :return: 序列输入和序列目标
    """
    sequences_x = []
    sequences_y = []
    for i in range(len(input_data) - seq_length):
        sequences_x.append(input_data[i:i + seq_length])
        sequences_y.append(target_data[i + seq_length])
    return np.array(sequences_x), np.array(sequences_y)


# 时间网络窗口大小
time_windows = 10

sequences_x, sequences_y = create_sequences(x_scaled, y_scaled, time_windows)


# 转换为 PyTorch 张量
sequences_x = torch.FloatTensor(sequences_x)
sequences_y = torch.FloatTensor(sequences_y)


# 数据集划分
train_size = int(0.7 * len(sequences_x))
val_size = int(0.2 * len(sequences_x))
test_size = len(sequences_x) - train_size - val_size


train_x = sequences_x[:train_size]
train_y = sequences_y[:train_size]
val_x = sequences_x[train_size:train_size + val_size]
val_y = sequences_y[train_size:train_size + val_size]
test_x = sequences_x[train_size + val_size:]
test_y = sequences_y[train_size + val_size:]


train_dataset = TensorDataset(train_x, train_y)
val_dataset = TensorDataset(val_x, val_y)
test_dataset = TensorDataset(test_x, test_y)


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)

# LSTM 模型定义
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)


    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])
        return out


# 超参数
input_size = x.shape[1]
hidden_size = 50
num_layers = 2
output_size = 1
num_epochs = 100
learning_rate = 0.001


model = LSTMModel(input_size, hidden_size, num_layers, output_size)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 训练模型
for epoch in range(num_epochs):
    model.train()
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()


    # 在验证集上评估
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for inputs, targets in val_loader:
            outputs = model(inputs)
            val_loss += criterion(outputs, targets).item()
    val_loss /= len(val_loader)
    print(f'Time window {time_windows}, Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss}')

# 在测试集上评估
model.eval()
test_loss = 0
with torch.no_grad():
    for inputs, targets in test_loader:
        outputs = model(inputs)
        test_loss += criterion(outputs, targets).item()
test_loss /= len(test_loader)
print(f'Time window {time_windows}, Test Loss: {test_loss}')
torch.save(model.state_dict(), 'best_lstm_model.pth')

# 加载模型进行预测
def predict(model, x):
    model.eval()
    with torch.no_grad():
        output = model(x)
        return output


# 示例预测
sample_x = torch.FloatTensor(sequences_x[0].unsqueeze(0))
prediction = predict(model, sample_x)
prediction = scaler_y.inverse_transform(prediction.numpy())
print(f'Prediction: {prediction}')

### 2.多个基础模型对比

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset


# 假设数据存储在一个名为 data.csv 的文件中，文件包含 30 个 x 列和 1 个 y 列
data = pd.read_csv('data.csv')
x = data.iloc[:, :-1].values
y = data.iloc[:, -1].values


# 数据标准化
scaler_x = MinMaxScaler()
scaler_y = MinMaxScaler()
x_scaled = scaler_x.fit_transform(x)
y_scaled = scaler_y.fit_transform(y.reshape(-1, 1))


def create_sequences(input_data, target_data, seq_length):
    """
    创建时间序列数据窗口
    :param input_data: 输入数据
    :param target_data: 目标数据
    :param seq_length: 时间序列长度
    :return: 序列输入和序列目标
    """
    sequences_x = []
    sequences_y = []
    for i in range(len(input_data) - seq_length):
        sequences_x.append(input_data[i:i + seq_length])
        sequences_y.append(target_data[i + seq_length])
    return np.array(sequences_x), np.array(sequences_y)


# 时间网络窗口大小
time_windows = [10, 15, 20]


# 存储不同窗口大小下的结果
results = {}


for t in time_windows:
    sequences_x, sequences_y = create_sequences(x_scaled, y_scaled, t)


    # 转换为 PyTorch 张量
    sequences_x = torch.FloatTensor(sequences_x)
    sequences_y = torch.FloatTensor(sequences_y)


    # 数据集划分
    train_size = int(0.7 * len(sequences_x))
    val_size = int(0.2 * len(sequences_x))
    test_size = len(sequences_x) - train_size - val_size


    train_x = sequences_x[:train_size]
    train_y = sequences_y[:train_size]
    val_x = sequences_x[train_size:train_size + val_size]
    val_y = sequences_y[train_size:train_size + val_size]
    test_x = sequences_x[train_size + val_size:]
    test_y = sequences_y[train_size + val_size:]


    train_dataset = TensorDataset(train_x, train_y)
    val_dataset = TensorDataset(val_x, val_y)
    test_dataset = TensorDataset(test_x, test_y)


    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32)
    test_loader = DataLoader(test_dataset, batch_size=32)


    # LSTM 模型定义
    class LSTMModel(nn.Module):
        def __init__(self, input_size, hidden_size, num_layers, output_size):
            super(LSTMModel, self).__init__()
            self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
            self.fc = nn.Linear(hidden_size, output_size)


        def forward(self, x):
            out, _ = self.lstm(x)
            out = self.fc(out[:, -1, :])
            return out


    # GRU 模型定义
    class GRUModel(nn.Module):
        def __init__(self, input_size, hidden_size, num_layers, output_size):
            super(GRUModel, self).__init__()
            self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
            self.fc = nn.Linear(hidden_size, output_size)


        def forward(self, x):
            out, _ = self.gru(x)
            out = self.fc(out[:, -1, :])
            return out


    # Bi-LSTM 模型定义
    class BiLSTMModel(nn.Module):
        def __init__(self, input_size, hidden_size, num_layers, output_size):
            super(BiLSTMModel, self).__init__()
            self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
            self.fc = nn.Linear(hidden_size * 2, output_size)


        def forward(self, x):
            out, _ = self.lstm(x)
            out = self.fc(out[:, -1, :])
            return out


    # Bi-GRU 模型定义
    class BiGRUModel(nn.Module):
        def __init__(self, input_size, hidden_size, num_layers, output_size):
            super(BiGRUModel, self).__init__()
            self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
            self.fc = nn.Linear(hidden_size * 2, output_size)


        def forward(self, x):
            out, _ = self.gru(x)
            out = self.fc(out[:, -1, :])
            return out


    # CNN-LSTM-Attention 模型定义
    class CNNLSTMAttentionModel(nn.Module):
        def __init__(self, input_size, hidden_size, num_layers, output_size):
            super(CNNLSTMAttentionModel, self).__init__()
            self.conv = nn.Conv1d(input_size, hidden_size, kernel_size=3, padding=1)
            self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
            self.attention = nn.Linear(hidden_size, 1)
            self.fc = nn.Linear(hidden_size, output_size)


        def forward(self, x):
            # 调整输入形状以适应 CNN 层
            x = x.permute(0, 2, 1)
            out = torch.relu(self.conv(x))
            out = out.permute(0, 2, 1)
            out, _ = self.lstm(out)
            attention_weights = torch.softmax(self.attention(out), dim=1)
            out = torch.sum(out * attention_weights, dim=1)
            out = self.fc(out)
            return out


    # 超参数
    input_size = x.shape[1]
    hidden_size = 64
    num_layers = 2
    output_size = 1
    num_epochs = 100
    learning_rate = 0.001


    models = {
        'LSTM': LSTMModel(input_size, hidden_size, num_layers, output_size),
        'GRU': GRUModel(input_size, hidden_size, num_layers, output_size),
        'Bi-LSTM': BiLSTMModel(input_size, hidden_size // 2, num_layers, output_size),
        'Bi-GRU': BiGRUModel(input_size, hidden_size // 2, num_layers, output_size),
        'CNN-LSTM-Attention': CNNLSTMAttentionModel(input_size, hidden_size, num_layers, output_size)
    }


    for model_name, model in models.items():
        print(f'Training {model_name} with time window {t}')
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)


        # 训练模型
        for epoch in range(num_epochs):
            model.train()
            for inputs, targets in train_loader:
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                loss.backward()
                optimizer.step()


            # 在验证集上评估
            model.eval()
            val_loss = 0
            with torch.no_grad():
                for inputs, targets in val_loader:
                    outputs = model(inputs)
                    val_loss += criterion(outputs, targets).item()
            val_loss /= len(val_loader)
            print(f'{model_name}, Time window {t}, Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss}')


        # 在测试集上评估
        model.eval()
        test_loss = 0
        with torch.no_grad():
            for inputs, targets in test_loader:
                outputs = model(inputs)
                test_loss += criterion(outputs, targets).item()
        test_loss /= len(test_loader)
        print(f'{model_name}, Time window {t}, Test Loss: {test_loss}')


        results[(model_name, t)] = {'model': model, 'test_loss': test_loss}


# 保存最佳模型（假设使用最小的测试损失作为最佳模型）
best_result = min(results.items(), key=lambda x: x[1]['test_loss'])
best_model_name, best_t = best_result[0]
best_model = best_result[1]['model']
torch.save(best_model.state_dict(), 'best_model.pth')


# 加载模型进行预测
def predict(model, x):
    model.eval()
    with torch.no_grad():
        output = model(x)
        return output


# 示例预测
sample_x = torch.FloatTensor(sequences_x[0].unsqueeze(0))
prediction = predict(best_model, sample_x)
prediction = scaler_y.inverse_transform(prediction.numpy())
print(f'Prediction: {prediction}')