In [None]:
import numpy as np
import h5py
import math
import os
import pathlib
import matplotlib.pyplot as plt
import matplotlib
%matplotlib inline
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, BatchNormalization, Activation, Layer, ReLU, LeakyReLU
from tensorflow.keras import backend as K

In [None]:
from func import load_model, save_model

## Load dataset

In [None]:
filename = 'BKG_dataset.h5'

In [None]:
# make sure input data has correct input shape
with h5py.File(filename, 'r') as file:
    X_train = np.array(file['X_train'])
    X_train = X_train.reshape((X_train.shape[0], -1))
    X_test = np.array(file['X_test'])
    X_test = X_test.reshape((X_test.shape[0], -1))
    X_val = np.array(file['X_val'])
    X_val = X_val.reshape((X_val.shape[0], -1))

## Define Dense NN architecture

In [None]:
input_shape = X_train.shape[-1]
latent_dimension = 3
num_nodes=[16,8]

In [None]:
#encoder
inputArray = Input(shape=(input_shape))
x = Dense(num_nodes[0], use_bias=False)(inputArray)
x = BatchNormalization()(x)
x = Activation('relu')(x)
x = Dense(num_nodes[1], use_bias=False)(x)
x = BatchNormalization()(x)
x = Activation('relu')(x)
x = Dense(latent_dimension, use_bias=False)(x)
x = BatchNormalization()(x)
encoder = Activation('relu')(x)

#decoder
x = Dense(num_nodes[1], use_bias=False)(encoder)
x = BatchNormalization()(x)
x = Activation('relu')(x)
x = Dense(num_nodes[0], use_bias=False)(x)
x = BatchNormalization()(x)
x = Activation('relu')(x)
decoder = Dense(input_shape)(x)

#create autoencoder
autoencoder = Model(inputs = inputArray, outputs=decoder)
autoencoder.summary()

In [None]:
autoencoder.compile(optimizer = keras.optimizers.Adam(), loss='mse')

## Train model

In [None]:
# callbacks=[] - optional

EPOCHS = 1
BATCH_SIZE = 1024

In [None]:
history = autoencoder.fit(X_train, X_train, epochs = EPOCHS, batch_size = BATCH_SIZE,
                  validation_data=(X_val, X_val),
                  #callbacks=callbacks
                 )

In [None]:
model_name = 'model_name'
model_directory = 'model_directory'
save_model(model_directory+model_name, autoencoder)

## Prediction - background

In [None]:
bkg_prediction = autoencoder.predict(X_test)

## Prediction - signals

In [None]:
# add correct signal labels
signal_labels = ['GluGluToHHTo4B', 'HTo2LongLivedTo4mu_1000', 'HTo2LongLivedTo4mu_125_12', 'HTo2LongLivedTo4mu_125_25', 'HTo2LongLivedTo4mu_125_50', 'VBFHToTauTau', 'VBF_HH', 'VBF_HToInvisible_M125', 'VBF_HToInvisible_M125_private', 'VectorZPrimeToQQ__M100', 'VectorZPrimeToQQ__M200', 'VectorZPrimeToQQ__M50', 'ZprimeToZH_MZprime1000', 'ZprimeToZH_MZprime600', 'ZprimeToZH_MZprime800']

In [None]:
# add correct path to signal files
signal_file = 'BSM_preprocessed.h5'

In [None]:
# read signal data
signal_data = []
for i, label in enumerate(signal_labels):
    with h5py.File(signal_file, 'r') as file:
        test_data = np.array(file[label])
        test_data = test_data.reshape((test_data.shape[0],-1))
    signal_data.append(test_data)

In [None]:
signal_results = []

for i, label in enumerate(signal_labels):
    signal_prediction = autoencoder.predict(signal_data[i])
    signal_results.append([label, signal_data[i], signal_prediction]) # save [label, true, prediction] for signal

## Save results

In [None]:
save_file = 'save_file'

In [None]:
with h5py.File(save_file, 'w') as file:
    file.create_dataset('BKG_input', data=X_test)
    file.create_dataset('BKG_predicted', data = bkg_prediction)
    for i, sig in enumerate(signal_results):
        file.create_dataset('%s_input' %sig[0], data=sig[1])
        file.create_dataset('%s_predicted' %sig[0], data=sig[2])

## Evaluate results

1. Plot reconstruction of input data against input data distribution
2. Plot loss distribution after prediction (check loss value for signals)
3. Plot ROC curves - how good is anomaly detection for chosen FPR threshold

# 1.

In [None]:
from func import make_feature_plots

In [None]:
# for one of the input features - BACKGROUND
make_feature_plots(X_test[:,0:1].reshape(X_test.shape[0]*1),\
                   bkg_prediction[:,0:1].reshape(bkg_prediction.shape[0]*1),\
                   '$p_{T}$', 'MET', 100, True)

In [None]:
# for one of the input features - ONE SIGNAL (Leptoquark)
make_feature_plots(signal_data[0][:,0:1].reshape(signal_data[0].shape[0]*1),\
                   signal_results[0][2][:,0:1].reshape(signal_results[0][2].shape[0]*1),\
                   '$p_{T}$', 'MET', 100, True)

# 2.

In [None]:
from func import mse_loss

In [None]:
# compute loss value (true, predicted)
total_loss = []
total_loss.append(mse_loss(X_test, bkg_prediction.astype(np.float32)).numpy())
for i, signal_X in enumerate(signal_data):
    total_loss.append(mse_loss(signal_X, signal_results[i][2].astype(np.float32)).numpy())

In [None]:
bin_size=100

plt.figure(figsize=(10,8))
for i, label in enumerate(np.concatenate((['BKG'],signal_labels),axis=0):
    plt.hist(total_loss[i], bins=bin_size, label=label, density = True, histtype='step', fill=False, linewidth=1.5)
plt.yscale('log')
plt.xlabel("Autoencoder Loss")
plt.ylabel("Probability (a.u.)")
plt.grid(True)
plt.title('MSE loss')
plt.legend(loc='best')
plt.show()

# 3.

In [None]:
from sklearn.metrics import roc_curve, auc

In [None]:
labels = np.concatenate([['Background'], np.array(signal_labels)])

In [None]:
target_background = np.zeros(total_loss[0].shape[0])

plt.figure(figsize=(10,8))
for i, label in enumerate(labels):
    if i == 0: continue # background events
    
    trueVal = np.concatenate((np.ones(total_loss[i].shape[0]), target_background)) # anomaly=1, bkg=0
    predVal_loss = np.concatenate((total_loss[i], total_loss[0]))

    fpr_loss, tpr_loss, threshold_loss = roc_curve(trueVal, predVal_loss)

    auc_loss = auc(fpr_loss, tpr_loss)
    
    plt.plot(fpr_loss, tpr_loss, "-", label='%s (auc = %.1f%%)'%(label,auc_loss*100.), linewidth=1.5)
    
    plt.semilogx()
    plt.semilogy()
    plt.ylabel("True Positive Rate")
    plt.xlabel("False Positive Rate")
    plt.legend(loc='center right')
    plt.grid(True)
    plt.tight_layout()
plt.plot(np.linspace(0, 1),np.linspace(0, 1), '--', color='0.75')
plt.axvline(0.00001, color='red', linestyle='dashed', linewidth=1) # threshold value for measuring anomaly detection performance
plt.title("ROC AE")
plt.show()