In [None]:
# ====================================================
# CFG
# ====================================================
class CFG:
    debug=False
    num_workers=42
    model_name='tf_efficientnet_b7_ns'
    model_dir='../final_models/b7_3d_v2/'
    batch_size=42 # 64
    seed=42
    target_size=1
    target_col='target'
    n_fold=5
    trn_fold=[0]
    # [0, 1, 2, 3, 4]

In [None]:
# ====================================================
# Library
# ====================================================
import sys
# sys.path.append('../input/timm-pytorch-image-models/pytorch-image-models-master')

import os
import math
import time
import random
import shutil
from pathlib import Path
from contextlib import contextmanager
from collections import defaultdict, Counter

import scipy as sp
import numpy as np
import pandas as pd

from sklearn import preprocessing
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import StratifiedKFold, GroupKFold, KFold

from tqdm.auto import tqdm
from functools import partial

import cv2
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam, SGD
import torchvision.models as models
from torch.nn.parameter import Parameter
from torch.utils.data import DataLoader, Dataset
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts, CosineAnnealingLR, ReduceLROnPlateau

import albumentations as A
from albumentations.pytorch import ToTensorV2
from albumentations import ImageOnlyTransform

import timm

from torch.cuda.amp import autocast, GradScaler

from nnAudio.Spectrogram import CQT1992v2

import warnings
warnings.filterwarnings('ignore')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# ====================================================
# Utils
# ====================================================
def get_score(y_true, y_pred):
    score = roc_auc_score(y_true, y_pred)
    return score


def get_result(result_df):
    preds = result_df['preds'].values
    labels = result_df['target'].values
    score = get_score(labels, preds)
    return score
    

def seed_torch(seed=42):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

seed_torch(seed=CFG.seed)

In [None]:
test = pd.read_csv('../input/sample_submission.csv')

# if CFG.debug:
#     test = test.sample(n=1000, random_state=CFG.seed).reset_index(drop=True)

def get_test_file_path(image_id):
    return "../input/test/{}/{}/{}/{}.npy".format(
        image_id[0], image_id[1], image_id[2], image_id)

test['file_path'] = test['id'].apply(get_test_file_path)

display(test.head())

In [None]:

class ThreeTrainDataset(Dataset):
    def __init__(self, CFG, df, transform=False):
        self.df = df
        self.file_names = df["file_path"].values
        self.labels = df["target"].values
        self.transform = transform

        self.no_waves = df[df['target'] == 0].reset_index(drop=True)
        self.no_waves_file_names = self.no_waves["file_path"].values

        self.X_0_mean = np.load("input/X_0_mean.npy")
        self.X_1_mean = np.load("input/X_1_mean.npy")
        self.X_2_mean = np.load("input/X_2_mean.npy")


    def __len__(self):
        return len(self.df)

    def apply_qtransform(self, waves):
        hide = random.randint(0, 14)
        reverser = random.randint(0, 1)
        scale = 1

        w0 = waves[0]
        if self.transform:
            # if reverser == 1:
            #     w0 = np.flip(w0)
            if hide == 0:
                random_no_wave_idx = random.randint(0, len(self.no_waves) - 1)
                random_no_wave = np.load(self.no_waves_file_names[random_no_wave_idx])
                w0 = random_no_wave[0]
                # w0 = np.zeros(w0.shape)
                
        w0 = scale * w0 / 5e-20 #3e-21 #4.615211621383077e-20# 7.422368145063434e-21
        # w0 = scale * w0 / (self.X_0_mean / 1e-6)
        w0 = torch.from_numpy(w0).float()

        w1 = waves[1]
        if self.transform:
            # if reverser == 1:
            #     w1 = np.flip(w1)
            if hide == 1:
                random_no_wave_idx = random.randint(0, len(self.no_waves) - 1)
                random_no_wave = np.load(self.no_waves_file_names[random_no_wave_idx])
                w1 = random_no_wave[1]
                # w1 = np.zeros(w1.shape)
        w1 = scale * w1 / 5e-20 #2e-21 # 4.1438353591025024e-20 #7.418562450079042e-21

        w1 = torch.from_numpy(w1).float()

        w2 = waves[2]
        if self.transform:
            # if reverser == 1:
            #     w2 = np.flip(w2)
            if hide == 2:
                random_no_wave_idx = random.randint(0, len(self.no_waves) - 1)
                random_no_wave = np.load(self.no_waves_file_names[random_no_wave_idx])
                w2 = random_no_wave[2]
                # w2 = np.zeros(w2.shape)
        w2 = scale * w2 / 6e-20 #3.5e-21 #6e-20 #1.837612126304118e-21

        w2 = torch.from_numpy(w2).float()
        return w0, w1, w2

    def __getitem__(self, idx):
        file_path = self.file_names[idx]
        waves = np.load(file_path)
        
        w0, w1, w2 = self.apply_qtransform(waves)
        
        label = torch.tensor(self.labels[idx]).float()
        return w0, w1, w2, label

In [None]:

class V2Model(nn.Module):
    def __init__(self, cfg, pretrained=False):
        super().__init__()
        self.cfg = cfg
        self.model = timm.create_model(
            self.cfg.model_name, pretrained=pretrained, in_chans=3
        )
        self.n_features = self.model.classifier.in_features
        self.model.classifier = nn.Linear(self.n_features, self.cfg.target_size, bias=False)

        self.wave_transform = CQT1992v2(sr=2048, fmin=20, fmax=512, hop_length=16)



    def forward(self, h_raw, l_raw, v_raw):
        with autocast():
            h = self.wave_transform(h_raw)
            l = self.wave_transform(l_raw)
            v = self.wave_transform(v_raw)

            x = torch.stack([h, l, v], 1)
            output = self.model(x)
            return output

In [None]:
# ====================================================
# inference
# ====================================================
def inference(model, states, test_loader, device):
    model.to(device)
    tk0 = tqdm(enumerate(test_loader), total=len(test_loader))
    probs = []
    for i, (w0, w1, w2) in tk0:
        w0 = w0.cuda()
        w1 = w1.cuda()
        w2 = w2.cuda()
        avg_preds = []
        for state in states:
            model.load_state_dict(state['model'])
            model.eval()
            with torch.no_grad():
                with autocast():
                    y_preds = model(w0, w1, w2)
            avg_preds.append(y_preds.sigmoid().to('cpu').numpy())
        avg_preds = np.mean(avg_preds, axis=0)
        probs.append(avg_preds)
    probs = np.concatenate(probs)
    return probs


In [None]:
model = V2Model(CFG, pretrained=False)
states = [torch.load(CFG.model_dir+f'{CFG.model_name}_fold{fold}_best_score.pth') for fold in CFG.trn_fold]
test_dataset = ThreeTrainDataset(test, transform=False)
test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
predictions = inference(model, states, test_loader, device)

In [None]:
test['target'] = predictions
test[['id', 'target']].to_csv(model_dir + 'submission.csv', index=False)
test.head()