In [2]:
import numpy as np
import pandas as pd
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv1D, MaxPooling1D, Flatten, Dropout
from tensorflow.keras.callbacks import Callback
from sklearn.model_selection import train_test_split
import time
import keras
import matplotlib.pyplot as plt

In [3]:
class PlotValidationLoss(Callback):
    def __init__(self,bin_factor):
        self.bin_factor=bin_factor
        self.val_losses = []

    def on_epoch_end(self, epoch, logs=None):
    
        val_loss = logs.get('val_loss')
        print(f"Epoch {epoch+1}, Validation Loss: {val_loss:.4f}")
        self.val_losses.append(val_loss)
        if (epoch+1) % 15 == 0 or epoch == 0:
            self.plot_validation_loss()

    def plot_validation_loss(self):
        epochs_range = range(0, len(self.val_losses))
        plt.plot(epochs_range, self.val_losses, 'b', label='Validation Loss')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.title('Validation Loss')
        plt.legend()
        plt.savefig('Training-CNN_Validation_Error_'+str(self.bin_factor)+'.png')

In [4]:
start_time = time.time()
import os
from preprocessing import Preprocessing
path_BH = os.path.join('data', 'BH')
path_NS = os.path.join('data', 'NS')
preprocessor = Preprocessing() 

bin_factor=10

BH_powerspectra = preprocessor.collect_all_powerspectra(path_BH, bin_factor=bin_factor, BH=True)
NS_powerspectra = preprocessor.collect_all_powerspectra(path_NS, bin_factor=bin_factor, BH=False)

powerspectra=np.append(np.array(BH_powerspectra),np.array(NS_powerspectra),axis=0)
#data=pd.DataFrame(powerspectra,columns=['freq','power','error','BH?'])

In [9]:
powerspectra.shape

(131072, 4)

In [10]:
powerspectra[:,0:2].shape

(131072, 2)

In [57]:
# post_processing_time = time.time()
X = powerspectra[:,0:2]
y = powerspectra[:,3].flatten()

# Assuming 'X' is your input data
# Calculate mean and standard deviation
mean = np.mean(X, axis=0)
std_dev = np.std(X, axis=0)
X_standardized = (X - mean) / std_dev

#mean = np.mean(y, axis=0)
#std_dev = np.std(y, axis=0)
#y_standardized = (y - mean) / std_dev
# Standardize the data

# Split data into training, validation, and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_standardized, y, test_size=0.3, random_state=42)
#X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

In [58]:
#X_train, X_val, X_test=[data.reshape([8,-1,1]) for data in [X_train,X_val, X_test]]
y_train = keras.utils.to_categorical(y_train, num_classes = 2)
y_test = keras.utils.to_categorical(y_test, num_classes = 2)

In [59]:
# Flatten the input data
#X_train.reshape((-1,30,1))
X_train=X_train.reshape((-1,500,1))
#X_test=X_test.reshape((-1,500,1))

In [60]:
X_train.shape,y_train.shape,X_test.shape,y_test.shape

((367, 500, 1), (91750, 2), (39322, 2), (39322, 2))

In [61]:
# Define your model
model = Sequential()

# Add 1D convolutional layer
model.add(Conv1D(filters=128, kernel_size=32, strides=2, use_bias=True,activation='relu', input_shape=(X_train.shape[1],X_train.shape[2])))

# Add max pooling layer
model.add(MaxPooling1D(pool_size=2))

# Add additional convolutional layers or other layers as needed
# model.add(Conv1D(filters=64, kernel_size=3, activation='relu'))
# model.add(MaxPooling1D(pool_size=2))

# Flatten the output of the last convolutional layer
model.add(Flatten())

# Add fully connected layers
model.add(Dense(32, activation='relu'))
model.add(Dropout(0.2))  # Dropout for regularization

# Output layer for binary classification
model.add(Dense(2, activation='sigmoid'))

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [62]:
plot_validation_loss = PlotValidationLoss(bin_factor)
# Train the model
epochs,batch_size=150, 64
model.fit(X_train, y_train, epochs=epochs,validation_split=0.05, batch_size=batch_size)
#model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(X_val, y_val))

Epoch 1/150
Epoch 2/150
Epoch 3/150
Epoch 4/150
Epoch 5/150
Epoch 6/150
Epoch 7/150
Epoch 8/150
Epoch 9/150
Epoch 10/150
Epoch 11/150
Epoch 12/150
Epoch 13/150
Epoch 14/150
Epoch 15/150
Epoch 16/150
Epoch 17/150
Epoch 18/150
Epoch 19/150
Epoch 20/150
Epoch 21/150
Epoch 22/150
Epoch 23/150
Epoch 24/150
Epoch 25/150
Epoch 26/150
Epoch 27/150
Epoch 28/150
Epoch 29/150
Epoch 30/150
Epoch 31/150
Epoch 32/150
Epoch 33/150
Epoch 34/150
Epoch 35/150
Epoch 36/150
Epoch 37/150
Epoch 38/150
Epoch 39/150
Epoch 40/150
Epoch 41/150
Epoch 42/150
Epoch 43/150
Epoch 44/150
Epoch 45/150
Epoch 46/150
Epoch 47/150
Epoch 48/150
Epoch 49/150
Epoch 50/150
Epoch 51/150
Epoch 52/150
Epoch 53/150
Epoch 54/150
Epoch 55/150
Epoch 56/150
Epoch 57/150
Epoch 58/150
Epoch 59/150
Epoch 60/150
Epoch 61/150
Epoch 62/150
Epoch 63/150
Epoch 64/150
Epoch 65/150
Epoch 66/150
Epoch 67/150
Epoch 68/150
Epoch 69/150
Epoch 70/150
Epoch 71/150
Epoch 72/150
Epoch 73/150
Epoch 74/150
Epoch 75/150
Epoch 76/150
Epoch 77/150
Epoch 78

<keras.src.callbacks.History at 0x7f3fc4b53d90>

In [86]:
prediction = model.predict(X_test, batch_size=1)



In [85]:
X_test=X_test.reshape((-1,500,1))

In [84]:
X_test=X_test[:,:39000]

In [87]:
# Evaluate the model
test_loss, test_acc = model.evaluate(X_test, y_test)

# Make predictions
predictions = model.predict(y_test)

ValueError: Data cardinality is ambiguous:
  x sizes: 156
  y sizes: 39322
Make sure all arrays contain the same number of samples.

In [None]:
# Optionally, save the model
model.save('CNN_model_'+str(bin_factor)+'.h5')

# Save the test accuracy in a text file
with open('CNN_model_'+str(bin_factor)+'_test_accuracy_and_parameters.txt', 'w') as f:
    f.write(f'Test Accuracy: {test_acc:.4f} \n')
    f.write('____________________________\n')
    f.write('Model Architecture:\n\n')
    model.summary(print_fn=lambda x: f.write(x + '\n'))