In [None]:
!pip install -q nnAudio -qq

import os
import gc
import pickle
import numpy as np
import pandas as pd
import time
import random
from random import shuffle
import math
from matplotlib import pyplot as plt
from matplotlib.pyplot import figure
from matplotlib.gridspec import GridSpec
import seaborn as sns

import tensorflow as tf

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Dropout

from keras.preprocessing import sequence
from keras.layers import Dropout
from keras.layers import Flatten
from keras.layers import Bidirectional
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D
from keras.layers import BatchNormalization

from keras.utils import Sequence

In [None]:
training = pd.read_csv('/kaggle/input/g2net-gravitational-wave-detection/training_labels.csv')
sample_submission = pd.read_csv('/kaggle/input/g2net-gravitational-wave-detection/sample_submission.csv')

In [None]:
def idx2path(idx: str, is_train: bool = True) -> str:
    if is_train:
        parent = '/kaggle/input/g2net-gravitational-wave-detection/train/'
    else:
        parent = '/kaggle/input/g2net-gravitational-wave-detection/test/'
    return os.path.join(parent, idx[0], idx[1], idx[2], idx + '.npy')

In [None]:
data = np.load(idx2path(training['id'][0]))
figure(figsize = (18, 6), dpi = 80)
plt.plot(data[0])
plt.plot(data[1])
plt.plot(data[2])
plt.show()

In [None]:
import torch
from nnAudio.Spectrogram import CQT1992v2

Q_TRANSFORM = CQT1992v2(sr = 2048, fmin = 20, fmax = 1024, hop_length = 64)

def transform(idx: str, is_train: bool = True) -> list:
    spectrograms = []
    waves = np.load(idx2path(idx, is_train))
    for i in range(3):
        wave = waves[i]
        wave = wave / np.max(wave)
        wave = torch.from_numpy(wave).float()
        spectrogram = Q_TRANSFORM(wave)
        spectrogram = np.array(spectrogram)
        spectrogram = np.squeeze(spectrogram)
        spectrogram = np.swapaxes(spectrogram,0,1)
        spectrograms.append(spectrogram)
    return spectrograms

In [None]:
class Dataset(Sequence):
    def __init__(self, x: str, y = None, batch_size = 256, shuffle = True):
        '''x is array of id and y is array of the associated class.'''
        self.x = x
        self.y = y
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.train = True if y is not None else False
        
    def __len__(self):
        return math.ceil(len(self.x)/self.batch_size)
    
    def __getitem__(self, idx):
        batch_x = self.x[idx * self.batch_size : (idx + 1) * self.batch_size]
        if self.train:
            batch_y = self.y[idx * self.batch_size : (idx + 1) * self.batch_size]
        list_x = np.array([transform(_x, self.train) for _x in batch_x])
        batch_X = np.stack(list_x)
        
        # Input for RNN
        batch_X = batch_X.reshape(batch_X.shape[0], -1, batch_X.shape[3])
        
        if self.train:
            return batch_X, batch_y
        else:
            return batch_X
    
    def on_epoch_end(self):
        '''Method called at the end of every epoch.'''
        if self.shuffle and self.train:
            x_y = list(zip(self.x, self.y))
            shuffle(x_y)
            self.x, self.y = list(zip(*x_y))

In [None]:
x = training['id'].values
y = training['target'].values
x_test = sample_submission['id'].values

In [None]:
x_train, x_val, y_train, y_val = train_test_split(x, y, test_size=0.1, random_state=42, stratify=y)

In [None]:
train_dataset = Dataset(x_train, y_train)
valid_dataset = Dataset(x_val, y_val)
test_dataset = Dataset(x_test)

In [None]:
# Initialising the RNN
model=Sequential()

# Adding the first Bi-Directional LSTM layer
model.add(Bidirectional(LSTM(128, return_sequences=True), input_shape=(train_dataset[0][0].shape[1], train_dataset[0][0].shape[2])))
model.add(Dropout(0.2))

# Adding the second Bi-Directional LSTM layer
model.add(Bidirectional(LSTM(128)))
model.add(Dropout(0.2))

# Adding the output layer
model.add(Dense(1, activation='sigmoid'))

# Compiling the RNN
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy', 'AUC'])

print(model.summary())

In [None]:
chkpt = tf.keras.callbacks.ModelCheckpoint("bidir_lstm_weights.h5", save_best_only=True, save_weights_only=True,)

start_time = time.time()
train_history = model.fit(train_dataset,
                          use_multiprocessing=True, 
                          workers=4, 
                          epochs=3, 
                          validation_data=valid_dataset,
                          callbacks=[chkpt],)
end_time = time.time()

print('Model training took {} seconds'.format(end_time - start_time))

In [None]:
model.load_weights('bidir_lstm_weights.h5')

predictions = model.predict(test_dataset, use_multiprocessing=True, workers=4, verbose=1)
predictions = predictions.reshape(-1)
submission = pd.DataFrame({'id':sample_submission['id'], 'target':predictions})
submission.to_csv('bidir_lstm_submission.csv', index = False)