# 导入相关依赖库.
第一个块用于检查版本.

In [1]:
import os

import numpy as np
import pandas as pd
import timm
import torch
import torchvision

from torch.nn.functional import softmax
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import Resize

print(f'numpy: {np.__version__}')
print(f'pandas: {pd.__version__}')
print(f'timm: {timm.__version__}')
print(f'torch: {torch.__version__}')
print(f'torchvision: {torchvision.__version__}')

numpy: 1.26.4
pandas: 2.2.2
timm: 0.9.16
torch: 2.1.2
torchvision: 0.16.2


# 相关路径信息和超参数.

In [2]:
ALL_MODEL_PATH = [
    '/kaggle/input/efficientnet-b0-cv5-epochs3/fold01-model.pth',
    '/kaggle/input/efficientnet-b0-cv5-epochs3/fold02-model.pth',
    '/kaggle/input/efficientnet-b0-cv5-epochs3/fold03-model.pth',
    '/kaggle/input/efficientnet-b0-cv5-epochs3/fold04-model.pth',
    '/kaggle/input/efficientnet-b0-cv5-epochs3/fold05-model.pth'
]
DATA_DIR = '/kaggle/input/hms-harmful-brain-activity-classification'
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
LABELS = ['seizure_vote',
          'lpd_vote',
          'gpd_vote',
          'lrda_vote',
          'grda_vote',
          'other_vote']
PRE_TRAINED = False

# 数据预处理.¶
1. 准备数据(加载频谱图).
2. 创建数据集,包含数据预处理和信号处理.

In [3]:
def prepare_data(data_dir):
    """准备数据."""
    dataframe = pd.read_csv(f'{data_dir}/test.csv')
    submission_dataframe = pd.read_csv(f'{data_dir}/sample_submission.csv')

    # 合并两个dataframe的数据.
    submission_dataframe = pd.merge(left=submission_dataframe,
                                    right=dataframe,
                                    how='inner',
                                    on='eeg_id')

    # 添加频谱图路径信息.
    submission_dataframe['spec_path'] = submission_dataframe.spectrogram_id.map(
        lambda name: f'{data_dir}/test_spectrograms/{name}.parquet'
    )

    return submission_dataframe


class SpectrogramDataset(Dataset):
    """频谱图数据集."""
    def __init__(self, dataframe):
        self.dataframe = dataframe

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, index):
        # 读取频谱图.
        row_elm = self.dataframe.iloc[index]
        spectrogram = pd.read_parquet(row_elm.spec_path)

        signal = spectrogram.fillna(-1)  # 填充缺失值.
        signal = signal.values[:, 1:]  # 去掉第一个索引列.
        signal = signal.transpose()  # 转换成关于时间的函数.
        signal = torch.from_numpy(signal)  # 转换成张量.
        signal = self.preprocess(signal[None, :])  # 增加batch维度再传递.
        signal = Resize([512, 512], antialias=False)(signal)  # 统一大小(不选择抗锯齿).
        
        # 这个标签只用于占位.
        label = np.asarray(row_elm.loc[LABELS].values, np.float32)
        label = torch.from_numpy(label)  # 转换成张量.

        return signal, label

    @staticmethod
    def preprocess(signal):
        """处理信息."""
        # 转换成对数, 使得数据更平滑稳定.
        signal = torch.clip(signal, np.exp(-6), np.exp(10))
        signal = torch.log(signal)

        # 进行标准化, 符合ImageNet的预训练数据格式.
        mean, std = torch.mean(signal), torch.std(signal)
        signal = (signal - mean) / (std + 1e-6)  # 常小数避免除零保证数值稳定性.

        return signal

# 加载测试数据集.

In [4]:
test_df = prepare_data(DATA_DIR)
test_ds = SpectrogramDataset(test_df)
test_dl = DataLoader(test_ds, num_workers=os.cpu_count())

x, y = next(iter(test_dl))
print(x)
print(y)

tensor([[[[ 2.3325,  2.2601,  2.1483,  ...,  1.9688,  2.1143,  2.2084],
          [ 2.3929,  2.2949,  2.1435,  ...,  2.1120,  2.1597,  2.1906],
          [ 2.3089,  2.2396,  2.1324,  ...,  2.2200,  2.1558,  2.1143],
          ...,
          [-1.4682, -1.5644, -1.7131,  ...,  0.0490,  0.0909,  0.1181],
          [-1.4864, -1.6086, -1.7976,  ..., -0.0120,  0.0149,  0.0323],
          [-1.3885, -1.6152, -1.9658,  ..., -0.1403, -0.2011, -0.2405]]]])
tensor([[0.1667, 0.1667, 0.1667, 0.1667, 0.1667, 0.1667]])


# 创建模型.

In [5]:
def create_model(pre_trained=True,
                 device=torch.device('cpu')):
    """创建模型."""
    model = timm.create_model('tf_efficientnet_b0.ns_jft_in1k',
                              pretrained=pre_trained,
                              num_classes=6,
                              in_chans=1)
    model.to(device)

    return model

In [6]:
model = create_model(PRE_TRAINED, DEVICE)
# print(model)

In [7]:
def test(model, dataframe, dataloader, all_model_path, device=torch.device('cpu')):
    """测试并提交CSV."""
    # 对模型输出进行平均.
    all_y_preds = np.zeros([len(dataframe), len(LABELS)])

    for model_path in all_model_path:
        # 加载训练好的数据.
        model.load_state_dict(torch.load(model_path, map_location=device))

        model.eval()
        with torch.no_grad():
            # 初始化数组用于对结果的合并占位.
            y_preds = []

            for x, _ in dataloader:
                y_pred = model(x.to(device))
                y_pred = softmax(y_pred, dim=1)
                y_pred = y_pred.cpu().numpy()  # 转换回CPU.
                # 添加到数组中.
                y_preds.append(y_pred)

            # 合并成ndarray.
            y_preds = np.concatenate(y_preds)
            all_y_preds += y_preds

    # 提交文件.
    dataframe[LABELS] = all_y_preds / len(all_model_path)
    dataframe = dataframe[['eeg_id'] + LABELS]  # 仅保留需要提交的列.
    dataframe.to_csv('submission.csv', index=None)
    print(dataframe)

In [8]:
test(model, test_df, test_dl, ALL_MODEL_PATH, DEVICE)

       eeg_id  seizure_vote  lpd_vote  gpd_vote  lrda_vote  grda_vote  \
0  3911565283      0.148325  0.139162  0.001145   0.208682     0.0084   

   other_vote  
0    0.494287  
