<a href="https://colab.research.google.com/github/hanggao811/AnomalyDetectionLIGO/blob/main/CNNClassifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import os
from google.colab import drive
drive.mount('/content/drive')
os.chdir('/content/drive/My Drive/LIGO/DatasetSplitting/Datasplitting01/dataset')
import tensorflow as tf
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from tensorflow.keras import layers, optimizers, Input, Model
from tensorflow.keras.metrics import AUC
from sklearn.base import BaseEstimator
from sklearn.model_selection import RandomizedSearchCV,GridSearchCV
import random
N = 42
random.seed(N)
np.random.seed(N)
tf.random.set_seed(N)
tf.config.experimental.enable_op_determinism()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
#Load data and normalize them

# ----- Background Data -----
background = np.load('background.npz')['data']
stds = np.std(background, axis=-1)[:, :, np.newaxis]
background = background/stds
background = np.swapaxes(background, 1, 2)

# ----- BBH Data -----
bbh = np.load('bbh_for_challenge.npy')
stds = np.std(bbh, axis=-1)[:, :, np.newaxis]
bbh = bbh/stds
bbh = np.swapaxes(bbh, 1, 2)

# ----- SGLF Data -----
sglf = np.load('sglf_for_challenge.npy')
stds = np.std(sglf, axis=-1)[:, :, np.newaxis]
sglf = sglf/stds
sglf = np.swapaxes(sglf, 1, 2)

# Create train and test input and target datasets and then shuffle them
X = np.concatenate((background, bbh, sglf), axis=0)
y = np.concatenate((np.zeros(background.shape[0]), np.ones(bbh.shape[0]), np.ones(sglf.shape[0])), axis=0)
perm = np.random.permutation(X.shape[0])
X = X[perm]
y = y[perm]

# Check下先0w0
print(f'X train shape:{X.shape}')
print(f'y train shape:{y.shape}')

X train shape:(300000, 200, 2)
y train shape:(300000,)


In [7]:
class CNNClassifier(BaseEstimator):
    def __init__(self, input_shape,f1,f2,k1,k2,s1,s2,d,lr,epochs,batch_size):
        self.input_shape = input_shape
        self.f1 = f1
        self.f2 = f2
        self.k1 = k1
        self.k2 = k2
        self.s1 = s1
        self.s2 = s2
        self.d = d
        self.lr = lr
        self.epochs = epochs
        self.batch_size = batch_size
        self.classifier = None

    def build_classifier(self):
        input = Input(shape=self.input_shape)
        x = layers.Conv1D(filters = self.f1, kernel_size = self.k1, strides = self.s1, padding='same', activation='relu')(input)
        x = layers.MaxPooling1D()(x)
        x = layers.Conv1D(filters = self.f2, kernel_size= self.k2 ,strides= self.s2, padding='same', activation='relu')(x)
        x = layers.MaxPooling1D()(x)
        x = layers.Flatten()(x)
        x = layers.Dense(self.d, activation='relu')(x)
        output = layers.Dense(1,activation='sigmoid')(x)

        self.classifier = Model(input,output)
        self.classifier.compile(optimizer=optimizers.Adam(learning_rate=self.lr), loss='binary_crossentropy', metrics=['accuracy'])

    def score(self, X, y):
        preds = self.classifier.predict(X)
        auc = AUC()
        auc.update_state(y, preds)  # preds are probabilities
        return auc.result().numpy()

    def fit(self,X,y):
        self.build_classifier()
        self.classifier.fit(X,y, epochs=self.epochs, batch_size=self.batch_size, verbose=0)
        return self



In [8]:
# # Grid Search
# param_dist = {
#     'f1': [16, 32, 64],
#     'f2': [16, 32, 64],
#     'k1': [3,6,9,18],
#     'k2': [3,6,9,18],
#     's1': [1,3,5],
#     's2': [1,3,5],
#     'd': [64, 128],
#     'lr': [1e-3,1e-4,5e-4],
#     'epochs': [35,60],
#     'batch_size': [64,128,512],
# }
param_dist = {
    'f1': [16, 32],
    'f2': [16, 32],
    'k1': [3],
    'k2': [3],
    's1': [1],
    's2': [1],
    'd': [64, 128],
    'lr': [1e-3,1e-4,5e-4],
    'epochs': [60],
    'batch_size': [128,700],
}

model = CNNClassifier(input_shape=(200, 2), f1=16, f2=16, k1=3, k2=3, s1=1, s2=1, d=64, lr=1e-3, epochs=60, batch_size=128)
random_search = RandomizedSearchCV(
    estimator=model,
    param_distributions=param_dist,
    n_iter=10,       # Try 30 random combinations
    cv=3,            # 3-fold cross-validation
    verbose=2,       # Print progress
    n_jobs=1,        # You can set >1 if system allows parallel
)
random_search.fit(X, y)
best_model = random_search.best_estimator_
print("Best AUC:", random_search.best_score_)
print("Best Params:", random_search.best_params_)

Fitting 3 folds for each of 10 candidates, totalling 30 fits
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 1ms/step
[CV] END batch_size=128, d=128, epochs=60, f1=32, f2=32, k1=3, k2=3, lr=0.001, s1=1, s2=1; total time= 9.5min
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 2ms/step
[CV] END batch_size=128, d=128, epochs=60, f1=32, f2=32, k1=3, k2=3, lr=0.001, s1=1, s2=1; total time= 9.4min
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 1ms/step
[CV] END batch_size=128, d=128, epochs=60, f1=32, f2=32, k1=3, k2=3, lr=0.001, s1=1, s2=1; total time= 9.0min
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 2ms/step
[CV] END batch_size=700, d=128, epochs=60, f1=16, f2=32, k1=3, k2=3, lr=0.0001, s1=1, s2=1; total time= 2.5min
[1m3125/3125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 2ms/step
[CV] END batch_size=700, d=128, epochs=60, f1=16, f2=32, k1=3, k2=3, lr=0.0001, s1=1, s2=1; total time= 2.4min
[

In [9]:
# Log results to csv

results_df = pd.DataFrame(random_search.cv_results_)
results_df.to_csv("random_search_results.csv", index=False)

In [10]:
# Test the best hyper-parameter on test dataset
# Load test data and normalize them

data = np.load('ligo_bb_final.npz')
test_data = data['data']
stds = np.std(test_data, axis=-1)[:, :, np.newaxis]
test_data = test_data/stds
test_data = np.swapaxes(test_data, 1, 2)
data_label = data['ids']
# indices1 = np.where(data_label == 1)[0]
# indices0 = np.where(data_label == 0)[0]
# background_test_data = test_data[indices0]
# signal_test_data = test_data[indices1]

# Evaluation

auc_score = best_model.score(test_data, data_label)
print("AUC on test set:", auc_score)

[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 2ms/step
AUC on test set: 0.03658789
