In [None]:
import os
import shutil
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import warnings
import pandas as pd
from torchvision import transforms

In [None]:
import random

seed = 40
deterministic = True

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
if deterministic:
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
warnings.filterwarnings('ignore')

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
model_name = 'model' + '51'
model_path = model_name +'/model (' + '25' +')'
eeg_path = model_name +'/eeg_state_dict (' + '17' +')'

In [None]:
model = torch.load('/kaggle/input/' + model_path + '.pt', map_location='cpu')

In [None]:
class Egg(nn.Module):
  def __init__(self , input_size = 1024):
    super().__init__()
    self.input_size = input_size
    self.backbone = model
    self.softmax = nn.Softmax(dim=1)
    self.leakyrelu = nn.LeakyReLU()
    self.dropout = nn.Dropout(0.2)

    self.l1 = nn.Linear(224 , 1)
    self.l2=nn.Linear(224,6)

    self.convt1 = nn.ConvTranspose2d(1,1,kernel_size=3,stride=2,padding=3)
    self.convt2 = nn.ConvTranspose2d(1,1,kernel_size=3,stride=2,padding=3)
    self.convt3 = nn.ConvTranspose2d(1,1,kernel_size=6,stride=2,padding=3)

    self.b = nn.BatchNorm2d(1)

  def forward(self , x):
    vit = self.backbone(x)  # vit.shape : batch_num , 1024
    vit = vit.unsqueeze(1)
    vit = vit.unsqueeze(1)

    vit = vit.reshape(-1,1,32,32)

    vit=self.convt1(vit)
    vit=self.b(vit)
    vit=self.leakyrelu(vit)
    vit=self.convt2(vit)
    vit=self.b(vit)
    vit=self.leakyrelu(vit)
    vit=self.convt3(vit)
    vit=self.b(vit)
    vit=self.leakyrelu(vit)

    ssl = self.l1(vit)
    ssl = ssl.permute(0,1,3,2)
    ssl=self.l2(ssl)
    ssl=ssl.reshape(-1,6)
    ssl=self.softmax(ssl)

    return ssl , vit

In [None]:
egg =Egg()

In [None]:
egg.load_state_dict(torch.load('/kaggle/input/' + eeg_path + '.pt', map_location='cpu'))

In [None]:
for param in model.parameters():
  param_requires_grad = False
for param in egg.parameters():
  param_requires_grad = False

In [None]:
def memory_decreasing(df_train):
    start_mem = df_train.memory_usage().sum() / 1024**2

    for col in list(df_train):
        col_type = df_train[col].dtype

        if col_type != object:
            c_min = df_train[col].min()
            c_max = df_train[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    if c_min >= 0:
                        df_train[col] = df_train[col].astype(np.uint8)
                    else:
                        df_train[col] = df_train[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    if c_min >= 0:
                        df_train[col] = df_train[col].astype(np.uint16)
                    else:
                        df_train[col] = df_train[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    if c_min >= 0:
                        df_train[col] = df_train[col].astype(np.uint32)
                    else:
                        df_train[col] = df_train[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    if c_min >= 0:
                        df_train[col] = df_train[col].astype(np.uint64)
                    else:
                        df_train[col] = df_train[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df_train[col] = df_train[col].astype(np.float32)
                else:
                    df_train[col] = df_train[col].astype(np.float64)

    end_mem = df_train.memory_usage().sum() / 1024**2

    return df_train

In [None]:
test_spec_dir = '/kaggle/input/hms-harmful-brain-activity-classification/test_spectrograms'
df_test = pd.read_csv('/kaggle/input/hms-harmful-brain-activity-classification/test.csv')
df_test = memory_decreasing(df_test)
df_test['spec_path'] = df_test['spectrogram_id'].apply(lambda x: os.path.join(test_spec_dir, f'{x}.parquet'))

In [None]:
def test_load_spectrogram_data(spec_path):
    if os.path.isfile(spec_path):
        spec_data = pd.read_parquet(spec_path)
        return spec_data
    else:
        print(f"Invalid file path: {spec_path}")
        return None

In [None]:
class test_Spec():
    def __init__(self, start , batch):
        self.start = start
        self.batch = batch

    def spec_input(self):
        result = []
        columns = ['seizure_vote' , 'lpd_vote' , 'gpd_vote' , 'lrda_vote' , 'grda_vote', 'other_vote']

        rc = test_load_spectrogram_data(df_test['spec_path'][0]).columns
        rc = rc[1:401]


        for i in range(self.start , self.start + self.batch):
            idx = i - self.start
            result.append(0)
            result[idx] = test_load_spectrogram_data(df_test['spec_path'][i])
            result[idx] = result[idx].iloc[: , 1:401]

            result[idx] = result[idx].fillna(0)
            #result[idx] = result[idx].iloc[:300 , 1:101]
            result[idx] = result[idx].values
            result[idx] = np.array(result[idx])

        return result

In [None]:
def making_dataset(image):

  result = []
  for i in range(len(image)):
    result.append(0)
    im = torch.tensor(image[i])
    result[i] = torch.log(im + 0.0000000000000001)
    result[i] = np.array(result[i])

  for j in range(len(image)):
    i = len(image) - j
    if len(result) >= i+1:
        if result[i].shape[0] < 1:
          del result[i]

  transform = transforms.Compose([
    transforms.Resize((672,224)),
  ])

  for i in range(len(result)):
    result[i] = torch.tensor(result[i])
    result[i] = result[i].unsqueeze(0)
    result[i] = torch.tensor(result[i])

  for i in range(len(result)):
    result[i] = transform(result[i])
    if result[i].max() != result[i].min():
      result[i] = (result[i]-result[i].max())/(result[i].max() - result[i].min())
    else:
      result[i] = torch.zeros(672,224)
    result[i] = result[i].reshape(3,224,224)

  return result

In [None]:
pred = egg.eval()
pred = pred.to(device)

dfl = df_test['spec_path'].shape[0]
np_test_preds = np.zeros((dfl , 6))

In [None]:
for i in range(dfl):
    test_spec_fun = test_Spec(i , 1)
    test_spec = test_spec_fun.spec_input()
    test_spec = torch.tensor(test_spec)
    if test_spec.shape[0] > 0:
        test_spec = torch.tensor(test_spec)
        test_spec = test_spec.to(torch.float32)
        test_spec = making_dataset(test_spec)
        test_spec[0] = np.array(test_spec[0])
        test_spec = torch.tensor(test_spec)
        test_spec=test_spec.to(device)
        p1,p2 = pred(test_spec)
        pre = p1.tolist()
        for j in range(0,6):
            np_test_preds[i][j] = pre[0][j]

In [None]:
for i in range(len(np_test_preds)):
    if sum(np_test_preds[i]) != 1:
        not_enough = 1 - sum(np_test_preds[i])
        for j in range(0,6):
            if np_test_preds[i][j] == max(np_test_preds[i]):
                max_idx = j
        np_test_preds[i][max_idx] += not_enough

In [None]:
np_test_preds