In [None]:
!pip install -q timm
!pip install -q git+https://github.com/PyFstat/PyFstat@python37

In [None]:
import os, gc, re, h5py, cv2
from PIL import Image
from pathlib import Path
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
import numpy as np, pandas as pd
from collections import defaultdict
from sklearn.model_selection import KFold
from sklearn.metrics import roc_auc_score

import timm
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision

import skimage
from skimage import io

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

class CFG:
    wandb=False
    competition='G2Net'
    model='inception_v4'
    apex=False
    max_grad_norm=1.36
    seed=13
    positive_rate=0.5
    signal_low=0.2
    signal_high=0.1
    folds=10
    lr=0.00056
    dropout=0.25
    epochs=3
    gaussian_noise=2.
    one_cycle_pct_start=0.1
    one_cycle=True
    batch=32
    hog=True
    dim=None

In [None]:
def make_image(data: np.array) -> np.array:
    tmp = np.abs(data[-360:, :])
    tmp = tmp[:, :256]#.reshape(360, 512, 11).mean(axis=2)
    # v2
    tmp = tmp - tmp.mean()
    tmp = tmp * 255 / tmp.std()
    return tmp


def make_data(name_file: str) -> dict:
    r"""  
    name_file: str name.hdf5
    return: {id: str, freq: [], L1: [[], []], H1:[[], []]}    
    """
    data = defaultdict(dict)
    with h5py.File(path / name_file, "r") as f:
        idk = list(f.keys())[0]
        data[idk]['freq'] = np.array(f[idk]['frequency_Hz'])
        for k in ['L1', 'H1']:
            data[idk][k] = [np.array(f[idk][k]['SFTs']), np.array(f[idk][k]['timestamps_GPS'])]
    return data


def bin_noise(data: np.array, ts: np.array, bin: int = 256) -> list:
    r""" Split data by by bin, mean all data between split index, or fill mean data
    This idea get smooths results.
        data: amplitudes
        ts: time timestamps
        bin: int
    return: tuple    
    """
    bin_size = (ts.max() - ts.min()) // bin
    idx = np.searchsorted(ts, [ts[0] + bin_size * i for i in range(bin)])
    global_noise = np.mean(np.abs(data))
    return np.array(
        [np.mean(np.abs(i)) if i.shape[1] > 0 else global_noise for i in np.array_split(data, idx[1:], axis=1)]
    )


def to_img(data: np.array, ts: np.array, bin: int = 256) -> np.array:
    smooth_noise = bin_noise(data, ts)
    img = make_image(smooth_noise)
    img = np.clip(img, 0, 255).astype(np.uint8)
    # print(np.min(img), np.max(img), np.mean(img), np.std(img))
    return img


def net_hog_features(img: np.array, dim: int = 257) ->np.array:    
    img = np.transpose(img.cpu().numpy(), (1, 2, 0))
    bins = np.linspace(0, 1, dim)
    fd = skimage.feature.hog(
        img, orientations=8, pixels_per_cell=(16, 16),
        cells_per_block=(3, 3), visualize=False, multichannel=True
    )
    hist = np.histogram(fd, bins=bins)
    return hist[0]


class G2NETDataset(Dataset):
    def __init__(self, df: pd.DataFarame, image_path: str) -> None:
        self.df = df
        self.image_path = image_path
        self.transforms = get_transforms()

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        id = self.df.iloc[index].id
        h1 = self.transforms(Image.open(f'{self.image_path}/{id}_h1.png'))
        l1 = self.transforms(Image.open(f'{self.image_path}/{id}_l1.png'))
        return np.concatenate([h1, l1])


class Net(nn.Module):
    def __init__(
        self,
        name_model: str = CFG.model,
        dim: int = CFG.dim,
        hog: bool = CFG.hog
    ):
        super().__init__()
        self.model = timm.create_model(
            name_model,
            in_chans=2,
            pretrained=False 
        )
        self.dim = dim
        self.hog = hog
        if self.hog:
            self.s = nn.Linear(1000+self.dim, 1)
        else:
            self.s = nn.Linear(1000, 1)
        # print('Take version: ', self.s, 'dim: ', self.dim)

    def forward(self, x): 
        x1  = self.model(x)
        if not self.hog:
            return self.s(x1)
        tmp = []
        for j in x:
            tmp.append(
                torch.tensor(net_hog_features(j, self.dim + 1), dtype=torch.float, 
            ).reshape(1, -1).to(DEVICE))
        xx =  torch.cat(tmp, axis = 0)
        xx = nn.functional.normalize(xx, p=2.0, dim = 1)
        x1 = nn.functional.normalize(x1, p=2.0, dim = 1) 
        x3 = torch.cat((x1, xx), axis = 1)   
        return self.s(x3)


@torch.no_grad()
def evaluate(
    model: nn.Module,
    loader: torch.utils.data.dataloader
) -> tuple:
    model.to(DEVICE)
    model.eval()        
    pred = []
    pbar = tqdm(
        loader,
        desc=f"Test: ",
        total=len(loader),
        mininterval= len(loader)//20
    )
    for X in pbar:
        with torch.autocast(enabled=CFG.apex):
            y_ = model(X.to(DEVICE))
        pred.append(y_.cpu().squeeze())
    pred = torch.concat(pred)
    return torch.sigmoid(pred).numpy()

In [None]:
!mkdir -p data/test_img
for _, r in tqdm(df_test.iterrows(), total=len(df_test)):
    if not os.path.exists(f'data/test_img/{r.id}_h1.png'):
        data = make_data(f'/kaggle/input/g2net-detecting-continuous-gravitational-waves/test/{r.id}.hdf5')
        h1 = to_img(data[r.id]['H1'][0], data[r.id]['H1'][1])
        l1 = to_img(data[r.id]['L1'][0], data[r.id]['L1'][1])
        cv2.imwrite(f'data/test_img/{r.id}_h1.png', h1)
        cv2.imwrite(f'data/test_img/{r.id}_l1.png', l1)

In [None]:
test = G2NETDataset(df_test, paths)
loader_test = torch.utils.data.DataLoader(
    test,
    batch_size=CFG.batch,
    shuffle=False,
    num_workers=os.cpu_count()
)
fold_preds = []
for fold in range(CFG.folds):
    model = Net()
    model.load_state_dict(torch.load(f'/kaggle/input/efficientnetv2-m-hog/models/model-f{fold}.tph'))
    preds = evaluate(model, loader_test)
    fold_preds.append(preds)
    del model, preds
    gc.collect()
    torch.cuda.empty_cache()

preds = np.stack(fold_preds).squeeze().mean(axis=0)
df_test['target'] = preds
df_test.target.plot.hist()