Training on AutoDL

In [None]:
import os
import pickle
import polars as pl
import numpy as np
import pandas as pd

In [None]:
train_path = 'autodl-tmp/train.parquet'

feature_cols = [f"feature_{i:02d}" for i in range(79)] + [f"responder_{idx}_lag_1" for idx in range(9)]
target_col = 'responder_6'
weight_col = 'weight'

df = pl.scan_parquet(train_path).collect().to_pandas()
valid = pl.scan_parquet(train_path).filter(pl.col('date_id')>=1650).collect().to_pandas()

In [None]:
xgb_feature_cols = [f"feature_{idx:02d}" for idx in range(79)] \
            + [f"responder_{idx}_last_lag" for idx in range(9)] 
        #+ [f"feature_{idx:02d}_mean_lag" for idx in range(79)] \
        #+ [f"feature_{idx:02d}_std_lag" for idx in range(79)] \
        #+ [f"feature_{idx:02d}_max_lag" for idx in range(79)] \
        #+ [f"feature_{idx:02d}_min_lag" for idx in range(79)] \
        #+ [f"feature_{idx:02d}_first_lag" for idx in range(79)] \
        #+ [f"feature_{idx:02d}_last_lag" for idx in range(79)] \
        #+ [f"feature_{idx:02d}_chg_lag" for idx in range(79)] \

        #+ [f"responder_{idx}_mean_lag" for idx in range(9)] \
        #+ [f"responder_{idx}_std_lag" for idx in range(9)] \
        #+ [f"responder_{idx}_max_lag" for idx in range(9)] \
        #+ [f"responder_{idx}_last_lag" for idx in range(9)] \
        #+ [f"responder_{idx}_chg_lag" for idx in range(9)] 

In [None]:
X_train = df[ xgb_feature_cols ]
y_train = df[ target_col ]
w_train = df[ weight_col ]

X_valid = valid[ xgb_feature_cols ]
y_valid = valid[ target_col ]
w_valid = valid[ weight_col ]

X_train.shape, y_train.shape, w_train.shape

In [None]:
import os
import warnings
import torch
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
from pytorch_lightning import (LightningDataModule, LightningModule, Trainer)
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint, Timer
from pytorch_lightning.loggers import WandbLogger

import pandas as pd
import polars
import numpy as np
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader


class custom_args():
    def __init__(self):
        self.usegpu = True
        self.seed = 55
        self.model = 'nn'
        self.use_wandb = False #Weights & Biases
        self.project = 'js-xs-nn-with-lags'
        self.dname = "./input_df/"
        self.loader_workers = 4 #线程数
        self.bs = 8192 #Batch Size
        self.lr = 1e-3 # 学习率
        self.weight_decay = 5e-4 # L2正则化系数
        self.dropouts = [0.1, 0.1] # 循环中的两层dropout rate
        self.n_hidden = [512, 512, 256] # 循环中的三层线形层的维度
        self.patience = 25 #早停
        self.max_epochs = 2000 
        self.N_fold = 5 #5折交叉验证
        self.gpuid = 0


my_args = custom_args()

In [None]:
#加载数据集
class CustomDataset(Dataset):
    def __init__(self, df, accelerator):
        self.features = torch.FloatTensor(df[xgb_feature_cols].values).to(accelerator)
        self.labels = torch.FloatTensor(df[target_col].values).to(accelerator)
        self.weights = torch.FloatTensor(df[weight_col].values).to(accelerator)
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        x = self.features[idx]
        y = self.labels[idx]
        w = self.weights[idx]
        return x, y, w


class DataModule(LightningDataModule):
    def __init__(self, train_df, batch_size, valid_df=None, accelerator='cpu'):
        super().__init__()
        self.df = train_df
        self.batch_size = batch_size
        self.dates = self.df['date_id'].unique()
        self.accelerator = accelerator
        self.train_dataset = None
        self.valid_df = None
        if valid_df is not None:
            self.valid_df = valid_df
        self.val_dataset = None

    def setup(self, fold=0, N_fold=3, stage=None):
        # Split dataset
        selected_dates = [date for ii, date in enumerate(self.dates) if ii % N_fold != fold]
        df_train = self.df.loc[self.df['date_id'].isin(selected_dates)]
        self.train_dataset = CustomDataset(df_train, self.accelerator)
        
        # 如果又独立的验证集
        if self.valid_df is not None:
            df_valid = self.valid_df
            self.val_dataset = CustomDataset(df_valid, self.accelerator)

    #shuffle：是否在每个epoch开始时打乱数据顺序，避免过拟合
    def train_dataloader(self, n_workers=0):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=n_workers)

    #时间序列需要按时间顺序评估，验证集保持shaffle = false
    def val_dataloader(self, n_workers=0):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False, num_workers=n_workers)

In [None]:
def r2_val(y_true, y_pred, sample_weight):
    r2 = 1 - np.average((y_pred - y_true) ** 2, weights=sample_weight) / (np.average((y_true) ** 2, weights=sample_weight) + 1e-38)
    return r2

#LightningModule 是一个神经网络类，标准化了模型训练、验证和优化的流程
'''
神经网络结构如下：
Input(0)→BN(0)→Linear(0→1)→BN(1)→SiLU(1)→Dropout(1)→Linear(1→2)→...→Linear→Tanh→Output
循环中，除了第0层外，都加入SiLU层，所有dropouts列表长度内，都加入Dropout层
'''
class NN(LightningModule):
    def __init__(self, input_dim, hidden_dims, dropouts, lr, weight_decay): 
        super().__init__()
        self.save_hyperparameters() #自动保存__init__方法中的所有输入参数到self.hparams中
        layers = []
        in_dim = input_dim
        for i, hidden_dim in enumerate(hidden_dims):
            layers.append(nn.BatchNorm1d(in_dim))
            if i > 0:
                layers.append(nn.SiLU())
            if i < len(dropouts):
                layers.append(nn.Dropout(dropouts[i]))
            layers.append(nn.Linear(in_dim, hidden_dim))
            # layers.append(nn.ReLU())
            in_dim = hidden_dim
        layers.append(nn.Linear(in_dim, 1)) # 输出层
        layers.append(nn.Tanh())
        self.model = nn.Sequential(*layers) #将所有层组合存放至Sequential
        self.lr = lr
        self.weight_decay = weight_decay #L2正则化系数
        self.validation_step_outputs = []

    def forward(self, x):
        return 5 * self.model(x).squeeze(-1)  

    def training_step(self, batch):
        x, y, w = batch
        y_hat = self(x)
        loss = F.mse_loss(y_hat, y, reduction='none') * w  # 考虑样本权重的加权MSE
        loss = loss.mean()
        self.log('train_loss', loss, on_step=False, on_epoch=True, batch_size=x.size(0))
        return loss

    def validation_step(self, batch):
        x, y, w = batch
        y_hat = self(x)
        loss = F.mse_loss(y_hat, y, reduction='none') * w
        loss = loss.mean()
        self.log('val_loss', loss, on_step=False, on_epoch=True, batch_size=x.size(0))
        self.validation_step_outputs.append((y_hat, y, w))
        return loss

    def on_validation_epoch_end(self):
        """Calculate validation WRMSE at the end of the epoch."""
        y = torch.cat([x[1] for x in self.validation_step_outputs]).cpu().numpy()
        if self.trainer.sanity_checking: #Sanity Check 调试阶段
            prob = torch.cat([x[0] for x in self.validation_step_outputs]).cpu().numpy()
        else:
            prob = torch.cat([x[0] for x in self.validation_step_outputs]).cpu().numpy()
            weights = torch.cat([x[2] for x in self.validation_step_outputs]).cpu().numpy()
            # r2_val
            val_r_square = r2_val(y, prob, weights)
            self.log("val_r_square", val_r_square, prog_bar=True, on_step=False, on_epoch=True)
        self.validation_step_outputs.clear() #清空缓存

    def configure_optimizers(self):
        #动态优化器，根据验证损失调整学习率。当 val_loss 在 5 个 epoch 内未下降时，学习率减半（factir = 0.5)
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr, weight_decay=self.weight_decay)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5,
                                                               verbose=True)
        return {
            'optimizer': optimizer,
            'lr_scheduler': {
                'scheduler': scheduler,
                'monitor': 'val_loss',
            }
        }

    def on_train_epoch_end(self):
        if self.trainer.sanity_checking:
            return
        epoch = self.trainer.current_epoch
        metrics = {k: v.item() if isinstance(v, torch.Tensor) else v for k, v in self.trainer.logged_metrics.items()}
        formatted_metrics = {k: f"{v:.5f}" for k, v in metrics.items()}
        print(f"Epoch {epoch}: {formatted_metrics}")

In [None]:
args = my_args

# checking device
#device = torch.device('cuda' if torch.cuda.is_available() and args.usegpu else 'cpu')
device = torch.device(f'cuda:{args.gpuid}' if torch.cuda.is_available() and args.usegpu else 'cpu')
accelerator = 'gpu' if torch.cuda.is_available() and args.usegpu else 'cpu'
loader_device = 'cpu'


# Initialize Data Module
# forward fill：用前一个有效值填充NaN值，用0填充剩余的NaN值
df[xgb_feature_cols] = df[xgb_feature_cols].fillna(method = 'ffill').fillna(0)
valid[xgb_feature_cols] = valid[xgb_feature_cols].fillna(method = 'ffill').fillna(0)
data_module = DataModule(df, batch_size=args.bs, valid_df=valid, accelerator=loader_device)

In [None]:
import gc
del df
gc.collect() #释放删除的df占用的内存
torch.set_float32_matmul_precision('medium') #设置 PyTorch 浮点矩阵乘法的计算精度
pl.seed_everything(args.seed) #固定所有随机种子，确保实验可复现
for fold in range(args.N_fold):
    data_module.setup(fold, args.N_fold) #根据当前折数 fold 划分训练集和验证集
    # Obtain input dimension
    input_dim = data_module.train_dataset.features.shape[1]
    # Initialize Model
    model = NN(
        input_dim=input_dim,
        hidden_dims=args.n_hidden,
        dropouts=args.dropouts,
        lr=args.lr,
        weight_decay=args.weight_decay
    )
    # Initialize Logger
    if args.use_wandb:
        wandb_run = wandb.init(project=args.project, config=vars(args), reinit=True)
        logger = WandbLogger(experiment=wandb_run)
    else:
        logger = None
    # Initialize Callbacks
    early_stopping = EarlyStopping('val_loss', patience=args.patience, mode='min', verbose=False) #早停
    checkpoint_callback = ModelCheckpoint(monitor='val_loss', mode='min', save_top_k=1, verbose=False, filename=f"./models/nn_{fold}.model") #仅保存最佳模型 
    timer = Timer()
    # Initialize Trainer
    trainer = Trainer(
        max_epochs=args.max_epochs,
        accelerator=accelerator,
        #devices=args.n_gpus,
        #strategy='ddp_notebook',
        devices=[args.gpuid] if args.usegpu else None,
        logger=logger,
        callbacks=[early_stopping, checkpoint_callback, timer],
        enable_progress_bar=True
    )
    # Start Training
    trainer.fit(model, data_module.train_dataloader(args.loader_workers), data_module.val_dataloader(args.loader_workers))
    # You can find trained best model in your local path
    print(f'Fold-{fold} Training completed in {timer.time_elapsed("train"):.2f}s')
