<a href="https://colab.research.google.com/github/Charan6924/Deep-Learning/blob/main/ElectronPhotonClassification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
from sklearn.model_selection import train_test_split

In [2]:
import h5py
import numpy as np
import torch
import gc

f_p = h5py.File("/content/SinglePhotonPt50_IMGCROPS_n249k_RHv1.hdf5", "r")
f_e = h5py.File("/content/SingleElectronPt50_IMGCROPS_n249k_RHv1.hdf5", "r")

n_p = f_p["X"].shape[0]
n_e = f_e["X"].shape[0]
total_n = n_p + n_e

print(f"Total samples: {total_n}")

X_tensor = torch.empty((total_n, 2, 32, 32), dtype=torch.float32)
y_tensor = torch.empty((total_n,), dtype=torch.float32)

print("Processing Photons...")
X_temp = f_p["X"][:]
X_temp = np.moveaxis(X_temp, -1, 1)
X_tensor[:n_p] = torch.from_numpy(X_temp)
y_tensor[:n_p] = torch.from_numpy(f_p["y"][:])

del X_temp
f_p.close()
gc.collect()

print("Processing Electrons...")
X_temp = f_e["X"][:]
X_temp = np.moveaxis(X_temp, -1, 1)
X_tensor[n_p:] = torch.from_numpy(X_temp)
y_tensor[n_p:] = torch.from_numpy(f_e["y"][:])

f_e.close()
gc.collect()

print("✅ Data successfully loaded into Tensors without crashing!")

Total samples: 498000
Processing Photons...
Processing Electrons...
✅ Data successfully loaded into Tensors without crashing!


In [5]:
from torch.utils.data import TensorDataset, DataLoader, random_split

full_dataset = TensorDataset(X_tensor, y_tensor)
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size

train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)

In [6]:
inputs, labels = next(iter(train_loader))
print(inputs.shape)

torch.Size([128, 2, 32, 32])


In [7]:
inputs, labels = next(iter(val_loader))
print(inputs.shape)

torch.Size([128, 2, 32, 32])


In [8]:
import torch.nn as nn

In [9]:
class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        return self.relu(self.bn1(self.conv1(x)) + self.shortcut(x))

In [10]:
class ResNet15(nn.Module):
    def __init__(self):
        super(ResNet15, self).__init__()
        self.in_channels = 16
        self.conv1 = nn.Conv2d(2, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self._make_layer(16, 2, stride=1)
        self.layer2 = self._make_layer(32, 2, stride=2)
        self.layer3 = self._make_layer(64, 2, stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(64, 1)

    def _make_layer(self, out_channels, num_blocks, stride):
        layers = []
        layers.append(ResBlock(self.in_channels, out_channels, stride))
        self.in_channels = out_channels
        for _ in range(1, num_blocks):
            layers.append(ResBlock(out_channels, out_channels, stride=1))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

In [12]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ResNet15().to(device)
print(f"Model created and moved to {device}")

Model created and moved to cpu


In [13]:
from sklearn.metrics import roc_auc_score
import torch.optim as optim

In [14]:
optimizer = torch.optim.Adam(model.parameters(),lr = 0.001)
criterion = nn.BCEWithLogitsLoss()
epochs = 10

In [15]:
def train_model():
    print(f"Starting training on {device}...")

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels.unsqueeze(1))
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        model.eval()
        val_labels = []
        val_preds = []

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs = inputs.to(device)
                outputs = model(inputs)

                val_labels.extend(labels.cpu().numpy())
                val_preds.extend(torch.sigmoid(outputs).cpu().numpy())

        epoch_auc = roc_auc_score(val_labels, val_preds)
        print(f"Epoch {epoch+1}/{epochs} | Loss: {running_loss/len(train_loader):.4f} | Val AUC: {epoch_auc:.4f}")

train_model()

Starting training on cpu...


KeyboardInterrupt: 