In [1]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, ConcatDataset
import random
import numpy as np
import pickle
import xarray as xr
import matplotlib.pyplot as plt
import datetime
import scipy.stats
import pandas as pd
import torch.nn.functional as F
from torchvision import transforms
from torch.optim import lr_scheduler

import warnings
warnings.filterwarnings('ignore')

In [2]:
def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
#     torch.backends.cudnn.deterministic = True

setup_seed(906) #906

In [3]:
# 查看设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [5]:
# CBAM结合了通道注意力和空间注意力，分别关注"什么"特征重要和"哪里"重要
 
class ChannelAttention(nn.Module):
    def __init__(self, in_planes, ratio=16):
        super().__init__()
        self.avg_pool=nn.AdaptiveAvgPool2d(1)
        self.max_pool=nn.AdaptiveMaxPool2d(1)
        self.fc1=nn.Conv2d(in_planes, in_planes//ratio, 1, bias=False)
        self.relu1=nn.ReLU()
        self.fc2=nn.Conv2d(in_planes//ratio, in_planes, 1, bias=False)
        self.sigmoid=nn.Sigmoid()
 
    def forward(self, x):
        avg_out=self.fc2(self.relu1(self.fc1(self.avg_pool(x))))
        max_out=self.fc2(self.relu1(self.fc1(self.max_pool(x))))
        out=avg_out+max_out
        return self.sigmoid(out)
 
class SpatialAttention(nn.Module):
    def __init__(self, kernel_size=7):
        super().__init__()
        self.conv1=nn.Conv2d(2, 1, kernel_size, padding=kernel_size//2, bias=False)
        self.sigmoid=nn.Sigmoid()
 
    def forward(self, x):
        avg_out=torch.mean(x, dim=1, keepdim=True)
        max_out, _=torch.max(x, dim=1, keepdim=True)
        x=torch.cat([avg_out, max_out], dim=1)
        x=self.conv1(x)
        return self.sigmoid(x)
 
class CBAM(nn.Module):
    def __init__(self, in_planes, ratio=16, kernel_size=7):
        super().__init__()
        self.ca=ChannelAttention(in_planes, ratio)
        self.sa=SpatialAttention(kernel_size)
 
    def forward(self, x):
        x=x*self.ca(x)
        x=x*self.sa(x)
        return x

In [6]:
class ConvNetwork(nn.Module):
    """
    M_Num , 代表卷积过滤器（convolutional filters）
    N_Num , 代表神经元个数（neurons）
    """
    def __init__(self, M_Num, N_Num, is_se=True):
        self.M = M_Num
        self.N = N_Num
        super().__init__()
        self.is_se = is_se
        self.conv = nn.Sequential(
            nn.Conv2d(136, 64, kernel_size=(4, 8), padding="same"),   
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
            nn.Conv2d(64, 128, kernel_size=(4, 8), padding="same"),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(stride=(2, 2), kernel_size=(2, 2)),
            nn.Conv2d(128, 64, kernel_size=(4, 8), stride=(1, 1), padding="same"),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),) 
        if self.is_se:
            self.se = CBAM(64) 
        self.dense = nn.Sequential(
            nn.Linear(76800, 128), 
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),  # 缓解过拟合，一定程度上正则化
            nn.Linear(128, 64), 
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(64, 19),)

            
    def forward(self, InData):
        x = self.conv(InData)
        x = self.se(x) * x  
        x = nn.Flatten()(x) 
        return x

In [7]:
class ConvTransformer(nn.Module):
    def __init__(self, is_se=True, num_classes=19, num_heads=8, num_layers=2):
        super().__init__()
        self.cnn = ConvNetwork(96, 96)
        # 调整维度以适应 Transformer 输入
        self.linear_proj = nn.Linear(76800, 256)
        self.transformer_encoder_layer = nn.TransformerEncoderLayer(d_model=256, nhead=num_heads)
        self.transformer_encoder = nn.TransformerEncoder(self.transformer_encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.cnn(x)
        x = self.linear_proj(x)
        x = x.unsqueeze(1)  # 添加序列长度维度
        x = self.transformer_encoder(x)
        x = x.squeeze(1)
        x = self.fc(x)
        return x

In [8]:
model = ConvTransformer().to(device)

In [9]:
# 改变学习率
def train_model(model, DL, valDataL, criterion, optimizer, num_epochs, patience=3):
    best_acc = 0.0
    # 用于记录验证集准确率没有提升的连续轮数
    no_improvement_count = 0
    # 创建 ReduceLROnPlateau 调度器，当验证集准确率在 2 个 epoch 内没有提升时，学习率乘以 0.1
    scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=10, factor=0.1)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct_preds = 0
        total_preds = 0
        for batch, (X, y) in enumerate(DL):
            X = X.type(torch.FloatTensor)
            X, y = X.to(device), y.to(device)

            optimizer.zero_grad()

            outputs = model(X)
            outputs = outputs.float()
            outputs = outputs.to(device)
            loss = criterion(outputs, y.float())
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            # 计算准确率
            preds = outputs.argmax(dim=1)  # 取每个样本的最大值索引
            y = y.argmax(dim=1)  # 取每个样本的最大值索引
            correct_preds += torch.sum(preds == y).item()
            total_preds += y.size(0)

        epoch_loss = running_loss / len(DL)
        epoch_acc = correct_preds / total_preds * 100

        # 在验证集上评估
        model.eval()
        val_correct_preds = 0
        val_total_preds = 0
        with torch.no_grad():
            for batch, (X, y) in enumerate(valDataL):
                X, y = X.to(device), y.to(device)
                outputs = model(X)
                preds = outputs.argmax(dim=1)  # 取每个样本的最大值索引
                y = y.argmax(dim=1)  # 取每个样本的最大值索引
                val_correct_preds += torch.sum(preds == y).item()
                val_total_preds += y.size(0)

        val_acc = val_correct_preds / val_total_preds * 100

        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}, Train Accuracy: {epoch_acc:.2f}%, Val Accuracy: {val_acc:.2f}%")

        # 保存最好的模型
        if val_acc > best_acc:
            best_acc = val_acc
            # 保存模型
            torch.save(model.state_dict(), "/root/autodl-tmp/model/best_model/cnnbest_model.pth")
            # 重置验证集准确率未提升的计数器
            no_improvement_count = 0
        else:
            no_improvement_count += 1

        # 调用调度器更新学习率
        scheduler.step(val_acc)

       

    print(f"训练完成。验证集最高准确率为: {best_acc:.2f}%")

In [11]:
class Loss(nn.Module):
    def __init__(self,quantile1 = 0.81,quantile2 = 0.31):
        super(Loss, self).__init__()
        self.quantile1 = quantile1
        self.quantile2 = quantile2
 
    def forward(self, y_true, y_pred):
        residual = y_pred - y_true
        quantileloss = torch.where(y_true > -0.501, torch.max((self.quantile1 - 1) * residual, self.quantile1 * residual), torch.max((self.quantile2 - 1) * residual, self.quantile2 * residual))
        quantileloss1 = torch.max((self.quantile1 - 1) * residual, self.quantile1 * residual)
        quantileloss2 = torch.max((self.quantile2 - 1) * residual, self.quantile2 * residual)

        return torch.mean(quantileloss)
 
loss_fn = Loss().to(device)

In [11]:
optimizer = torch.optim.AdamW(model.parameters(), weight_decay=0.001) 

In [None]:
# 训练模型
train_model(model, DL, valDataL, loss_fn, optimizer, num_epochs=30, patience=3)

In [None]:
Data=DataLoader(ENSODataset(type_="OBStest"), batch_size=300, shuffle=False)
model.eval()
with torch.no_grad():
    X,y = next(iter(Data))
    X = X.type(torch.FloatTensor)
    X,y = X.to(device),y.to(device)
    # Compute prediction
    pred = model(X)
    pred = pred.float()
    pred = pred.to(device)