In [None]:
import torch.nn.functional as F
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import r2_score, mean_squared_error
import numpy as np
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import torch
from torch.utils.data import TensorDataset, DataLoader
import pandas as pd

class Attention(nn.Module):
    def __init__(self, hidden_dim):
        super(Attention, self).__init__()
        self.hidden_dim = hidden_dim
        self.attention_weights = nn.Linear(self.hidden_dim * 2, 1)  # 乘以2因为是双向

    def forward(self, gru_output):
        attention_scores = self.attention_weights(gru_output).squeeze(2)
        attention_weights = F.softmax(attention_scores, dim=1).unsqueeze(2)
        weighted_output = gru_output * attention_weights
        context_vector = weighted_output.sum(dim=1)
        return context_vector


class CNN_GRUModel(nn.Module):
    
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers, cnn_out_channels, cnn_kernel_size, pool_kernel_size, dropout_prob, fc_layers=[]):
        super(CNN_GRUModel, self).__init__()
        
        # CNN层
        self.cnn = nn.Conv1d(in_channels=input_dim, out_channels=cnn_out_channels, kernel_size=cnn_kernel_size, stride=1, padding=cnn_kernel_size // 2)
        
        # 池化层
        self.pool = nn.MaxPool1d(kernel_size=pool_kernel_size)

        # Batch Normalization层
        self.batch_norm = nn.BatchNorm1d(cnn_out_channels)

        # Dropout层
        self.dropout = nn.Dropout(dropout_prob)

        # 双向GRU层
        self.gru = nn.GRU(input_size=cnn_out_channels,
                          hidden_size=hidden_dim,
                          num_layers=num_layers,
                          batch_first=True,
                          dropout=dropout_prob if num_layers > 1 else 0,
                          bidirectional=True)
        # 注意力层
        self.attention = Attention(hidden_dim)

        # 更新全连接层序列
        fc_sequence = [nn.Linear(2 * hidden_dim if i == 0 else fc_layers[i-1], layer_size) for i, layer_size in enumerate(fc_layers)]
        fc_sequence.append(nn.Linear(fc_layers[-1] if fc_layers else 2 * hidden_dim, output_dim))
        self.fc_layers = nn.Sequential(*fc_sequence)

    # forward函数保持不变

    def forward(self, x):
        # 调整维度适应CNN层
        x = x.permute(0, 2, 1)

        # CNN层
        x = self.cnn(x)
        
        # Batch Normalization
        x = self.batch_norm(x)

        # ReLU激活函数
        x = F.relu(x)
        
        # Dropout
        x = self.dropout(x)

        # 调整维度适应双向GRU层
        x = x.permute(0, 2, 1)
        x, _ = self.gru(x)

        # 应用注意力层
        x = self.attention(x)

        # 全连接层
        x = self.fc_layers(x)
        return x



# 模型参数
cnn_out_channels = 256  # 卷积层输出的通道数
cnn_kernel_size = 3    # 卷积层的核大小
dropout_prob = 0.2     # Dropout层的概率
input_dim = len(features_columns)
hidden_dim = 512
output_dim = 1
pool_kernel_size = 2
num_layers = 3  # 增加GRU层的数量
fc_layers = [256, 256] # 添加的全连接层的单元数列表

lr=0.0001
num_epochs = 200
# 实例化模型
model = CNN_GRUModel(input_dim=input_dim, hidden_dim=hidden_dim,output_dim=output_dim,num_layers=num_layers,cnn_out_channels=cnn_out_channels,cnn_kernel_size=cnn_kernel_size,pool_kernel_size=pool_kernel_size,dropout_prob=dropout_prob,fc_layers=fc_layers)
# 定义损失函数
criterion = nn.HuberLoss()
# 将模型转移到GPU上
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 定义优化器
optimizer = optim.Adam(model.parameters(), lr=lr)
# 训练模型的函数
def train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs):
    best_rmse = np.inf
    best_r2 = -np.inf
    best_epoch = -1

    for epoch in range(num_epochs):
        model.train()  # 设置模型为训练模式
        train_loss = 0.0
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            train_loss += loss.item()
            loss.backward()
            optimizer.step()

        # 计算平均训练损失
        train_loss /= len(train_loader)
        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}')
        
        # 在每个epoch后评估模型
        rmse, r2 = evaluate_model(model, test_loader)
        print(f'Epoch [{epoch+1}/{num_epochs}], Test RMSE: {rmse:.4f}, Test R^2: {r2:.4f}')
        
        # 更新最佳指标和epoch
        if rmse < best_rmse or (rmse == best_rmse and r2 > best_r2):
            best_rmse = rmse
            best_r2 = r2
            best_epoch = epoch + 1

    print(f'Best Test RMSE: {best_rmse:.4f} at epoch {best_epoch}')
    print(f'Best Test R^2: {best_r2:.4f} at epoch {best_epoch}')
    return best_epoch, best_rmse, best_r2

def evaluate_model(model, test_loader):
    model.eval()  # 设置模型为评估模式
    predictions = []
    actuals = []
    with torch.no_grad():  # 在评估模式下不跟踪梯度
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            predictions.extend(output.view(-1).tolist())
            actuals.extend(target.view(-1).tolist())

    predictions = np.array(predictions)
    actuals = np.array(actuals)
    rmse = np.sqrt(mean_squared_error(actuals, predictions))
    r2 = r2_score(actuals, predictions)
    return rmse, r2


# 开始训练并在每个epoch后评估
best_epoch, best_rmse, best_r2 = train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs)


In [None]:
def GOA(train_func, num_grasshoppers, num_iterations, param_space):
    lb = np.array([param_space[key][0] for key in param_space])
    ub = np.array([param_space[key][1] for key in param_space])

    dim = len(param_space)
    grasshoppers = [Grasshopper(np.random.uniform(lb, ub, dim)) for _ in range(num_grasshoppers)]

    best_grasshopper = Grasshopper(np.zeros(dim))
    best_cost = float('inf')

    c_max = 1
    c_min = 0.00001

    for iteration in range(num_iterations):
        c = c_max - iteration * ((c_max - c_min) / num_iterations)

        for i in range(num_grasshoppers):
            new_position = np.zeros(dim)
            for j in range(num_grasshoppers):
                if i != j:
                    distance = np.linalg.norm(grasshoppers[i].position - grasshoppers[j].position)
                    s = (ub - lb) * (0.5 * np.cos(distance) + 0.5)
                    new_position += c * s * (grasshoppers[j].position - grasshoppers[i].position)

            grasshoppers[i].position = np.clip(new_position, lb, ub)
            cost = train_func(grasshoppers[i].position)

            if cost < best_cost:
                best_cost = cost
                best_grasshopper.position = np.copy(grasshoppers[i].position)

        print(f"Iteration {iteration}: Best Cost = {best_cost}")

    return best_grasshopper.position


def train_and_evaluate(params):
    # Unpack parameters
    hidden_dim, num_layers, cnn_out_channels, dropout_prob, lr = params
    
    # Assumed other necessary parameters - adjust these based on your model
    input_dim = len(features_columns)  # Appropriate input dimension
    output_dim = 1                     # Appropriate output dimension
    cnn_kernel_size = 3                # Kernel size for CNN layers
    pool_kernel_size = 2               # Kernel size for pooling layers
    fc_layers = [256, 256]             # Full connected layers parameters, if any

    # Instantiate the model and set hyperparameters
    model = CNN_GRUModel(input_dim=input_dim, 
                         hidden_dim=int(hidden_dim), 
                         output_dim=output_dim, 
                         num_layers=int(num_layers), 
                         cnn_out_channels=int(cnn_out_channels), 
                         cnn_kernel_size=cnn_kernel_size, 
                         pool_kernel_size=pool_kernel_size, 
                         dropout_prob=dropout_prob,
                         fc_layers=fc_layers)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)  # Move model to appropriate device (CPU or GPU)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    # Training loop
    model.train()
    for epoch in range(num_epochs):
        epoch_loss = 0.0
        for batch in train_loader:
            inputs, targets = batch  # Adjust these based on your DataLoader structure

            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

    # Evaluate the model and return loss or other metric
    return loss  # or another metric


# 定义超参数搜索空间
param_space = {
    'hidden_dim': [32, 512],
    'num_layers': [1, 4],
    'cnn_out_channels': [16, 256],
    'dropout_prob': [0.1, 0.5],
    'lr': [0.0001, 0.001]
}

# 运行蝗虫优化算法
best_params = GOA(train_and_evaluate, num_grasshoppers=30, num_iterations=100, param_space=param_space)
print(f"Best Parameters: {best_params}")