In [None]:
import os
import gc
import cv2
import time
import random
import glob
from PIL import Image
import  matplotlib.pyplot as plt

# For data manipulation
import numpy as np
import pandas as pd

# Pytorch Imports
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.optim import lr_scheduler
from torch.utils.data import Dataset, DataLoader
from torch.cuda import amp
from pytorch_toolbelt import losses as L

# Utils
from tqdm.auto import tqdm

# For Image Models
import timm

# Albumentations for augmentations
import albumentations as A
from albumentations.pytorch import ToTensorV2

import warnings
warnings.filterwarnings("ignore")

## using gpu:1
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

def seed_everything(seed=123):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
seed_everything()

In [None]:
class WidthAttention(nn.Module):
    def __init__(self, in_ch, width: int):
        super().__init__()

    def forward(self, x):
        attention = self.attention(x)
        attention = attention.unsqueeze(1).unsqueeze(1)
        return x * attention

class Customize_Model(nn.Module):
    def __init__(self, model_name, cls):
        super().__init__()
        
    def forward(self, image):
        x = self.model(image)
        return x

class Slide_Window_Model(nn.Module):
    def __init__(self, model_name, cls):
        super().__init__()
        
    def forward(self, image):
        x = self.model(image)  ## (1,1280,13,21)
        x = self.att(x)  ## (1,1280,13,21)
        x = self.gp(x)   ## (1,1280)
        x = self.out(x)  ## (1,6)
        return x if self.training else x.view(-1, self.cls, 1, 1)

In [None]:
from preprocessing import spectrogram_from_eeg

def get_test_transform(img_size):
    return A.Compose([
        A.PadIfNeeded(min_height=400, min_width=CFG['img_crop'], border_mode=0, p=1),
        ToTensorV2(p=1.0),
    ])

class Customize_Dataset(Dataset):
    def __init__(self, df, transforms=None):
        self.df = df
        self.transforms = transforms
        
    def read_data(self, data):
        def norm_to_255(img):
            img= img-img.min()
            img= img/img.max()
            img= img*255
            return img.astype(np.uint8)
        
        def norm_to_standard(img):
            ep = 1e-6
            m = np.nanmean(img.flatten())
            s = np.nanstd(img.flatten())
            img = (img-m)/(s+ep)
            img = np.nan_to_num(img, nan=0.0)
            return img
        
        ## train_spectrograms
        path= data['image_path'].replace('train_eegs','train_spectrograms')
        if 'test_eegs' in data['image_path']: path= data['image_path'].replace('test_eegs','test_spectrograms')
        path= path.replace(str(data['eeg_id']), str(data['spectrogram_id']))
        raw= pd.read_parquet(path).fillna(0)
        
        col= list(raw.filter(like='LL', axis=1))
        img_LL= np.log1p(raw[col].T.values)
        col= list(raw.filter(like='RL', axis=1))
        img_RL= np.log1p(raw[col].T.values)
        col= list(raw.filter(like='RP', axis=1))
        img_RP= np.log1p(raw[col].T.values)
        col= list(raw.filter(like='LP', axis=1))
        img_LP= np.log1p(raw[col].T.values)
        
        img= np.concatenate([img_LL, img_LP, img_RP, img_RL], axis=0)
        img= np.expand_dims(img, axis=2)
        img= np.concatenate([img, img, img], axis=2)
        img_spectrograms= norm_to_standard(img)
        
        ## train_eegs
        img_10= spectrogram_from_eeg(data['image_path'], duration=10, height=100)
        img_10= np.concatenate([img_10[..., 0],
                                img_10[..., 1],
                                img_10[..., 2],
                                img_10[..., 3]], axis=0)
        img_30= spectrogram_from_eeg(data['image_path'], duration=30, height=100)
        img_30= np.concatenate([img_30[..., 0],
                                img_30[..., 1],
                                img_30[..., 2],
                                img_30[..., 3]], axis=0)
        img= np.concatenate([img_10, img_30], axis=1)
        img= np.expand_dims(img, axis=2)
        img= np.concatenate([img, img, img], axis=2)
        img_eeg= img
        
        ## fuse img
        img_spectrograms= img_spectrograms[:, :, :1]
        img_eeg= img_eeg[..., :1]
        img= np.concatenate([img_eeg, img_spectrograms], axis=1)
        img= np.concatenate([img, img, img], axis=2)
        
        return img
    
    def __getitem__(self, index):
        data = self.df.loc[index]
        img= self.read_data(data)
        
        if self.transforms:
            img = self.transforms(image=img)["image"]
            
        return {
            'image': torch.tensor(img, dtype=torch.float32),
        }
    
    def __len__(self):
        return len(self.df)

# CFG

In [None]:
CFG= {
    'fold': 0,
    'img_size': None,
    'img_crop': 656,
    'TTA': 1,
    'stage': 1,
    
    'pseudo_label': False,
    'model': [
        './train_model/cv0_best.ts',
        
#         './test_model/effb0_lb34/cv0_best.ts',
    ],
}
CFG['model']= [ torch.load(m, map_location= 'cuda:0') for m in CFG['model'] ]
print(f"length of model: {len(CFG['model'])}")

# Prepare Dataset

In [None]:
df= pd.read_csv('../Data/train_eeg.csv')
if CFG['stage']==2: df= df[df['voter']>7]
if CFG['pseudo_label']: df['fold']= CFG['fold']
else: df= df.drop_duplicates(subset=['spectrogram_id'])

train_df= df[df['fold']!=CFG['fold']].reset_index(drop=True)
valid_df= df[df['fold']==CFG['fold']].reset_index(drop=True)
print(f'train dataset: {len(train_df)}')
print(f'valid dataset: {len(valid_df)}')

valid_dataset= Customize_Dataset(valid_df, get_test_transform(CFG['img_size']))
valid_loader= DataLoader(valid_dataset, batch_size=1, shuffle=False, num_workers=0)
df.head()

In [None]:
from monai.inferers import sliding_window_inference

def slide_inference(model, img):
    
    img= torch.unsqueeze(img, 0).cuda()
    for i, m in enumerate(model):
        with torch.no_grad():
            m.eval()
            
            imgs= torch.cat([img, img.flip(-1), img.flip(-2), img.flip(-1).flip(-2)], dim=0)
            with torch.no_grad():
                
                pred= sliding_window_inference(imgs[:CFG['TTA']],
                                                roi_size=(-1,CFG['img_crop']), 
                                                mode= 'gaussian',
                                                sw_batch_size=4, 
                                                overlap=0.25,
                                                predictor=m)
                pred= pred.view(pred.shape[0],6,-1).mean(dim=-1)
                pred= pred.mean(dim=0)
                
        if i==0: preds= pred.softmax(dim=-1)
        else: preds+= pred.softmax(dim=-1)
        
    preds= preds/len(model)
    preds= preds.cpu().numpy()
    return preds

def inference(model, img):
    
    img= torch.unsqueeze(img, 0).cuda()
    for i, m in enumerate(model):
        with torch.no_grad():
            m.eval()
            
            imgs= torch.cat([img, img.flip(-1), img.flip(-2), img.flip(-1).flip(-2)], dim=0)
            with torch.no_grad():
                pred= m(imgs[:CFG['TTA']])
                pred= pred.mean(dim=0)
                
        if i==0: preds= pred.softmax(dim=-1)
        else: preds+= pred.softmax(dim=-1)
            
    preds= preds/len(model)
    preds= preds.cpu().numpy()
    return preds

In [None]:
valid_df['pred_cls']= None
count= 0
for i, data in enumerate(tqdm(valid_loader)):
    for j in range(len(data['image'])):
        img= data['image'][j]
        prob= slide_inference(CFG['model'], img)
        
        valid_df.loc[count, 'pred_cls']= prob.argmax(0)
        valid_df.loc[count, 'prob']= str(prob.tolist())
        count+= 1
valid_df.head()

# Pseudo Label 

In [None]:
if CFG['pseudo_label']:
    df= pd.read_csv('../Data/train_npy_PL1.csv')
    df[f"PL_prob_cv{CFG['fold']}"]= valid_df['prob']
    df.to_csv('../Data/train_npy_PL1.csv',index=False)

# Confusion_Matrix

In [None]:
from sklearn.metrics import confusion_matrix
from sklearn. metrics import roc_auc_score

cm_df= pd.DataFrame( confusion_matrix(valid_df['label'].values, 
                                      valid_df['pred_cls'].astype(np.int64).values) )
print(f'row: label, column: pred')


for i in range(len(cm_df)):
    cm_df.loc[i,'recall']= cm_df.loc[i,i] / cm_df.loc[i].sum()
    cm_df.loc[i,'precision']= cm_df.loc[i,i] / cm_df[i].sum()
recall= cm_df.loc[1,'recall']
precision= cm_df.loc[1,'precision']
f1_score= (2*recall*precision)/(recall+precision)
print(f'f1_score: {f1_score}')
print(f"mean recall: {cm_df['recall'].mean()}")
cm_df

In [None]:
from metrics import *

pred= []
soft_label= []
for i in range(len(valid_df)):
    p= eval(valid_df.loc[i, 'prob'])
    l= eval(valid_df.loc[i, 'soft_label'])
    pred.append(p)
    soft_label.append(l)
pred= np.array(pred)
soft_label= np.array(soft_label)

kl_divergence(soft_label, pred)

# Grad_Cam