In [1]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import roc_curve, auc
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import numpy as np

import h5py

In [2]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

Num GPUs Available:  1


In [3]:
print(tf.__version__)

2.10.0


In [4]:
sys_details = tf.sysconfig.get_build_info()
sys_details["cuda_version"]

'64_112'

In [5]:
photon_file_path = "SinglePhotonPt50_IMGCROPS_n249k_RHv1.hdf5"
electron_file_path = "SingleElectronPt50_IMGCROPS_n249k_RHv1.hdf5"

# Load data from the provided HDF5 files
with h5py.File(photon_file_path, 'r') as f:
    X_photon = f['X'][:]
    y_photon = f['y'][:]

with h5py.File(electron_file_path, 'r') as f:
    X_electron = f['X'][:]
    y_electron = f['y'][:]

# Concatenate the datasets
X = np.concatenate([X_photon, X_electron], axis=0)
y = np.concatenate([y_photon, y_electron], axis=0)

# Split into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Convert labels to one-hot encoding
y_train = tf.keras.utils.to_categorical(y_train, num_classes=2)
y_test = tf.keras.utils.to_categorical(y_test, num_classes=2)


In [6]:
from tensorflow.keras import layers

class ResBlock(tf.keras.Model):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResBlock, self).__init__()
        self.conv1 = layers.Conv2D(out_channels, kernel_size=3, strides=stride, padding="same", use_bias=False)
        self.bn1 = layers.BatchNormalization()
        self.conv2 = layers.Conv2D(out_channels, kernel_size=3, strides=1, padding="same", use_bias=False)
        self.bn2 = layers.BatchNormalization()
        self.shortcut = tf.keras.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = tf.keras.Sequential([
                layers.Conv2D(out_channels, kernel_size=1, strides=stride, use_bias=False),
                layers.BatchNormalization()
            ])

    def call(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = tf.nn.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += self.shortcut(x)
        out = tf.nn.relu(out)
        return out


class ResNet(tf.keras.Model):
    def __init__(self, num_classes=2):
        super(ResNet, self).__init__()
        self.in_channels = 16

        self.conv1 = layers.Conv2D(16, kernel_size=3, strides=1, padding="same", use_bias=False)
        self.bn1 = layers.BatchNormalization()
        self.relu = tf.nn.relu
        self.layer1 = self._make_layer(ResBlock, 16, 2, stride=1)
        self.layer2 = self._make_layer(ResBlock, 32, 2, stride=2)
        self.layer3 = self._make_layer(ResBlock, 64, 2, stride=2)
        self.avgpool = layers.GlobalAveragePooling2D()
        self.fc = layers.Dense(num_classes, activation='softmax')

    def _make_layer(self, block, out_channels, blocks, stride=1):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels
        for i in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return tf.keras.Sequential(layers)

    def call(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)

        out = self.avgpool(out)
        out = self.fc(out)

        return out

In [None]:
# Create the model
model = ResNet()
# Compile the model
model.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])

# Train the model
model.fit(X_train, y_train, batch_size=32, epochs=30, validation_split=0.1)


Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30

In [None]:
# Evaluate the model
test_loss, test_acc = model.evaluate(X_test, y_test)

In [None]:
# Get probabilities and true labels
probs = model.predict(X_test,batch_size=32)[:, 1]

In [None]:
# Compute ROC curve and AUC score
y_test_original = np.argmax(y_test, axis=1)
fpr, tpr, _ = roc_curve(y_test_original, probs)
roc_auc = auc(fpr, tpr)

# Plot ROC curve
plt.figure()
lw = 2
plt.plot(fpr, tpr, color='darkorange', lw=lw, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()