This is the python code for the CNN in our paper, "*Convolutional Neural Networks for Direct Detection of Dark Matter*". 

The CNN was origionally run using Google Colab GPU and so data was stored in a google drive. 

This code (and final model, final_model.h5) is for just the "HitPeak" images - the "Hit" and "Peak" images require their own individual models.

This file should not be run all at once:
*   The hyperparameters first need to be optimised using the 'Train' dataset (1). 
*   Once these are optimised, the CNN is run again using these hyperparamters on the 'Train' dataset (2) - This saves the model in a h5 file.
*   The saved model is run using the 'Test' dataset (3).


In [0]:
!pip install tensorflow-gpu
!pip install keras
!pip install matplotlib
!pip install scikit-learn
!pip install pandas
!pip install imageio
!pip install h5py

In [0]:
from google.colab import drive
drive.mount('/content/drive')
!unzip drive/My\ Drive/Colab\ Notebooks/Train.zip
!unzip drive/My\ Drive/Colab\ Notebooks/Test.zip

In [0]:
import glob
import numpy as np
import os.path as path
import imageio
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import Activation, Dropout, Flatten, Dense, Conv2D, MaxPooling2D
from keras.callbacks import EarlyStopping, TensorBoard
from datetime import datetime
import keras
from keras import regularizers
from sklearn.model_selection import train_test_split
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score, average_precision_score, precision_recall_curve, auc,recall_score
from keras.optimizers import Adam
from keras.regularizers import l2
import h5py
from sklearn.model_selection import GridSearchCV
from keras.backend import cast, greater, clip, floatx,epsilon
from keras.wrappers.scikit_learn import KerasClassifier
from keras import backend as K
from sklearn.model_selection import StratifiedKFold

In [0]:
# Define image path (e.g.)

IMAGE_PATH = '/content/Train'
file_paths = glob.glob(path.join(IMAGE_PATH, '*.png'))

# Load the images into a single variable and convert to a numpy array
images = [imageio.imread(path) for path in file_paths]
images = np.asarray(images)

# Get image size
image_size = np.asarray([images.shape[1], images.shape[2], images.shape[3]])
print(image_size)

# Scale images so values are between 0 and 1
images = images / 255


In [0]:
# Read the labels from the filenames

n_images = images.shape[0]
labels = np.zeros(n_images)
for i in range(n_images):
    filename = path.basename(file_paths[i])[0]
    if filename[0] == 'W':                          #Every file that begins with W is assigned a 1
        labels[i] = 1
    else:
        labels[i] = 0

# Background = 0 = FALSE
# WIMPS = 1 = TRUE


For gridsearch the data does not need to be split into "train" and "validation" sets as this is done automatically. 

This split only needs to be done when running the final model.

In [0]:
# Split into test and training sets

TRAIN_TEST_SPLIT = 0.7            

# Split at the given index
split_index = int(TRAIN_TEST_SPLIT * n_images)
shuffled_indices = np.random.permutation(n_images)
train_indices = shuffled_indices[0:split_index]
test_indices = shuffled_indices[split_index:]

# Split the images and the labels
x_train = images[train_indices, :, :, :]
y_train = labels[train_indices]
x_val = images[test_indices, :, :, :]
y_val = labels[test_indices]

#x_train = images
#y_train = labels


(1) Grid search run for testing hyperparameters:
*   Convolutional layers - 1, 2, 3
*   Epochs - 20, 30, 40, 50, 60
*   Batch size - 100, 200, 300
*   Learning rate - 0.005, 0.001, 0.01
*   Alpha value - 0.001, 0.05, 0.01, 0.1









In [0]:
# Grid search
from keras import backend as K
np.random.seed(0)

# Define recall, precision and f1 functions
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))


def create_model(conv_layers, learning_rate, alpha_value):
  model = Sequential()
  for i in range(conv_layers):
    model.add(Conv2D(16, (3,3), strides = (1,1), kernel_regularizer=l2(0.005)))
    model.add(LeakyReLU(alpha = alpha_value))
    model.add(MaxPooling2D(pool_size = (2,2), strides = (1,1)))
    
  model.add(Dropout(0.25))
  model.add(Flatten())

  model.add(Dense(32, bias_regularizer=l2(0.001), kernel_regularizer=l2(0.001)))
  model.add(LeakyReLU(alpha=alpha_value))
  
  model.add(Dropout(0.5))
  model.add(Dense(1))
  model.add(Activation('sigmoid'))
  model.compile(loss='binary_crossentropy', optimizer=keras.optimizers.adam(lr=learning_rate), metrics=['acc', f1_m, precision_m, recall_m])
  return model

model = KerasClassifier(build_fn=create_model, verbose=1)

params_to_test_gen = {'epochs':[20,30,40,50,60], 'batch_size': [100,200,300], 'conv_layers':[1,2,3], 'learning_rate':[0.005,0.001,0.01], 'alpha_value':[0.001, 0.05, 0.01, 0.1]}

gsearch_0 = GridSearchCV(estimator = model, param_grid = params_to_test_gen, scoring='recall', n_jobs= None, iid=False, cv=StratifiedKFold(n_splits=3, shuffle = True, random_state = 0), verbose = 5)
gsearch_0.fit(x_train, y_train, verbose=1, validation_data=(x_val, y_val))
print("Best score: ", gsearch_0.best_score_)
print("Best parameters: ", gsearch_0.best_params_)


(2) Run CNN again using 'optimal hyperparameters' and save model as "final_model.h5"

In [0]:
from keras import backend as K
np.random.seed(0)

# Define recall, precision and f1 functions
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))
  
# 1 conv layer / 100 bs / 50 epoch HitPeak images
def CNN_model():
    shape = (image_size[0], image_size[1], image_size[2])
    model = Sequential()
    
    # 1 conv layer
    model.add(Conv2D(16, (3,3), strides = (1,1), input_shape=shape,kernel_regularizer=l2(0.005)))
    model.add(keras.layers.LeakyReLU(alpha=0.05))
    model.add(MaxPooling2D(pool_size=(2, 2), strides = (1,1)))

    # 2 conv layers
    #model.add(Conv2D(16,(3,3),kernel_regularizer=l2(0.005)))
    #model.add(keras.layers.LeakyReLU(alpha=0.05))
    #model.add(MaxPooling2D(pool_size=(2, 2)))

    # 3 conv layers
    #model.add(Conv2D(16,(3,3),kernel_regularizer=l2(0.005)))
    #model.add(keras.layers.LeakyReLU(alpha=0.05))
    #model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Dropout(0.25)) 
    model.add(Flatten())

    model.add(Dense(32, bias_regularizer=regularizers.l2(0.001),kernel_regularizer=regularizers.l2(0.001)))
    model.add(keras.layers.LeakyReLU(alpha=0.05))    
    model.add(Dropout(0.5)) 

    # Output layer
    model.add(Dense(1))
    model.add(Activation('sigmoid'))

    # Compile the model
    model.compile(loss='binary_crossentropy',
                    optimizer='adam',
                    metrics=['acc', f1_m, precision_m, recall_m])
    return model

def run_CNN():
    model = CNN_model()
    model.fit(x_train, y_train, epochs=50, batch_size=100, verbose=1, validation_data=(x_val, y_val))
    model.save('final_model_3.h5')
    #model.save_weights('final_weights.h5')
    score = model.evaluate(x_val, y_val, verbose=1)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])
    print('Test f1:', score[2])
    print('Test precision:', score[3])
    print('Test recall:', score[4])

run_CNN()


(3) Call the saved model ("final_model.h5") and run using the test data set

In [0]:
# Plotting/Printing Results
from keras.models import load_model
from keras import backend as K
np.random.seed(0)

# Define recall, precision and f1 functions
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))
    
IMAGE_PATH = '/content/Test'
file_paths = glob.glob(path.join(IMAGE_PATH, '*.png'))

# Load the images into a single variable and convert to a numpy array 
images = [imageio.imread(path) for path in file_paths]
images = np.asarray(images)

# Get image size 
image_size = np.asarray([images.shape[1], images.shape[2], images.shape[3]])
print(image_size)

# Scale images so values are between 0 and 1 
images = images / 255

n_images = images.shape[0]
labels = np.zeros(n_images)
for i in range(n_images):
    filename = path.basename(file_paths[i])[0]
    if filename[0] == 'W':            #Every file that begins with W is assigned a 1
        labels[i] = 1
    else:
        labels[i] = 0

x_test = images
y_test = labels

dependencies = {
    'f1_m' : f1_m, 
    'precision_m' : precision_m,
    'recall_m': recall_m,
}

model = keras.models.load_model('/content/drive/My Drive/Colab Notebooks/final_model.h5', custom_objects=dependencies)

# Summarize the model 
model.summary()
score = model.evaluate(x_test, y_test, verbose=1)
print("%s: %.2f%%" % (model.metrics_names[1], score[1]*100))


from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_curve
from sklearn.metrics import f1_score
from sklearn.metrics import auc
from matplotlib import pyplot


lr_probs = model.predict_proba(x_test)    # Predict probabilities
yhat = model.predict(x_test)      # Predict class values
yhat = np.round(yhat)
lr_precision, lr_recall, _ = precision_recall_curve(y_test, lr_probs)
lr_f1, lr_auc = f1_score(y_test, yhat), auc(lr_recall, lr_precision)
print('Logistic: f1=%.3f auc=%.7f' % (lr_f1, lr_auc))   # Summarise scores
''' plot the precision-recall curves '''
no_skill = len(y_test[y_test==1]) / len(y_test)
pyplot.plot(lr_recall, lr_precision, 'r-', label='F1 = %0.2f' %(lr_f1))
pyplot.xlabel('Recall')
pyplot.ylabel('Precision')
pyplot.legend(loc='lower left')
plt.title('Precision-Recall (PR) Curve')
pyplot.show()

# Save precision and recall data to plot PR curve seperately
np.savetxt("/content/drive/My Drive/Colab Notebooks/precision_HP.txt",lr_precision)
np.savetxt("/content/drive/My Drive/Colab Notebooks/recall_HP.txt",lr_recall)


test_predictions = yhat             # Make a prediction on the test set
accuracy = accuracy_score(y_test, test_predictions)
print("Accuracy: " + str(accuracy))
average_precision = average_precision_score(y_test, test_predictions)
print("Average precision: " + str(average_precision))

from sklearn.metrics import precision_recall_curve

precision, recall, threshold = precision_recall_curve(y_test, test_predictions)
auc = auc(recall, precision)
recall1 = recall_score(y_test, np.round(test_predictions))
print("recall: " + str(recall1))
print('AUC:' +str(auc))


# Report Confusion Matrix
y_actu = pd.Series(y_test.ravel(), name='Actual')
y_pred = pd.Series(np.round(test_predictions.ravel()), name='Predicted')
df_confusion = pd.crosstab(y_actu, y_pred)
print(df_confusion)

# Plot confusion matrix
def plot_confusion_matrix(df_confusion, cmap='YlGn'):
    plt.matshow(df_confusion, cmap=cmap) # imshow
    plt.colorbar()
    tick_marks = np.arange(0,len(df_confusion.columns))
    plt.xticks(tick_marks, df_confusion.columns)
    plt.yticks(tick_marks, df_confusion.index)
    plt.ylabel(df_confusion.index.name)
    plt.xlabel(df_confusion.columns.name)
    for i in range(len(df_confusion.index)):
        for j in range(len(df_confusion.columns)):
            plt.text(j,i,str(df_confusion.iloc[i,j]))
    plt.show()

plot_confusion_matrix(df_confusion)

from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score

def plot_roc_curve(fpr, tpr):
    plt.plot(fpr, tpr, color='darkblue', label='AUC = %0.2f' %(auc))
    plt.plot([0, 1], [0, 1], color='orange', linestyle='--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend()
    plt.show()
    
fpr, tpr, thresholds = roc_curve(y_test, test_predictions)
# Save tpr and fpr to plot seperate ROC curve
np.savetxt("/content/drive/My Drive/Colab Notebooks/tpr.txt",tpr)
np.savetxt("/content/drive/My Drive/Colab Notebooks/fpr.txt",fpr)
plot_roc_curve(fpr, tpr)
