In [None]:
import pandas as pd
from os.path import join, isdir, exists
from os import mkdir
import torch as th

from tqdm import tqdm
from random import shuffle, seed
from torchaudio import functional as th_audio_f

In [None]:
dataset_path = "/run/media/samuel/M2_nvme_gen4/hms-harmful-brain-activity-classification"
train_output_folder = "/run/media/samuel/M2_nvme_gen4/hms-harmful-brain-activity-classification/train_eeg_fft_data"
valid_output_folder = "/run/media/samuel/M2_nvme_gen4/hms-harmful-brain-activity-classification/valid_eeg_fft_data"

if not exists(train_output_folder):
    mkdir(train_output_folder)
elif not isdir(train_output_folder):
    raise NotADirectoryError("Output folder '{}' does not exist!".format(train_output_folder))

if not exists(valid_output_folder):
    mkdir(valid_output_folder)
elif not isdir(valid_output_folder):
    raise NotADirectoryError("Output folder '{}' does not exist!".format(valid_output_folder))

In [None]:
train_ratio = 0.8

seed(314159)

all_data_df = pd.read_csv(join(dataset_path, "train.csv"), sep=",")

unique_egg_ids = list(all_data_df["eeg_id"].unique())
shuffle(unique_egg_ids)

train_eeg_ids = unique_egg_ids[:int(len(unique_egg_ids) * train_ratio)]
valid_eeg_ids = unique_egg_ids[int(len(unique_egg_ids) * train_ratio):]

train_df = all_data_df[all_data_df["eeg_id"].isin(train_eeg_ids)]
valid_df = all_data_df[all_data_df["eeg_id"].isin(valid_eeg_ids)]

In [None]:
target_cols = [
    "seizure_vote",
    "lpd_vote",
    "gpd_vote",
    "lrda_vote",
    "grda_vote",
    "other_vote",
]

sample_rate_eegs = 200
nb_seconds_eegs = 50

n_fft = 32

In [None]:
def eeg_feature_to_spectrogram(feature_series: pd.Series) -> th.Tensor:
    return th.abs(
        th_audio_f.spectrogram(
            th.from_numpy(feature_series.to_numpy()).to(th.float).nan_to_num(),
            pad=0,
            window=th.hann_window(n_fft),
            n_fft=n_fft,
            hop_length=n_fft // 2,
            win_length=n_fft,
            power=None,
            normalized=True,
        )
        .T[1:, 1:]
    )


def extract_eeg_to_fft(input_df: pd.DataFrame, output_folder: str) -> None:
    per_eeg_labels_df = (
        input_df.groupby("eeg_id")
        .agg(
            {
                col: "sum"
                for col in target_cols
            }
        )
        .reset_index()
    )

    for i, row in tqdm(list(per_eeg_labels_df.iterrows())):
        eeg = pd.read_parquet(
            join(dataset_path, "train_eegs", f"{int(row['eeg_id'])}.parquet"),
             engine="pyarrow",
        )

        offset = (len(eeg) - nb_seconds_eegs * sample_rate_eegs) // 2
        eeg = eeg.iloc[offset : offset + nb_seconds_eegs * sample_rate_eegs]

        eeg_spec_t = th.cat([eeg_feature_to_spectrogram(eeg[ft_col]) for ft_col in eeg.columns], dim=1)
        
        targets_df = row[target_cols].astype(float).fillna(0.0)
        classes = targets_df / targets_df.sum()

        classes_t = th.nan_to_num(th.tensor(classes.to_numpy(), dtype=th.float))

        th.save(eeg_spec_t, join(output_folder, f"{i}_eeg.pt"))
        th.save(classes_t, join(output_folder, f"{i}_classes.pt"))

In [None]:
extract_eeg_to_fft(train_df, train_output_folder)

In [None]:
extract_eeg_to_fft(valid_df, valid_output_folder)