In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import QuantileTransformer
from simpleDNN import SimpleDNN
from torch.nn import functional as F


In [2]:
# weights=[10264.0,5151.0,5128.0,5069.0,
#  4998.0,
#  4935.0,
#  4909.0,
#  4901.0,
#  4785.0,
#  4613.0,
#  4601.0,
#  4393.0,
#  4376.0,
#  4305.0,
#  4120.0,
#  4083.0,
#  4061.0,
#  4019.0,
#  3906.0,
#  3891.0,
#  3817.0,
#  3599.0,
#  3431.0,
#  3405.0,
#  3397.0,
#  3361.0,
#  3349.0,
#  3327.0,
#  3319.0,
#  3232.0,
#  3190.0,
#  3028.0,
#  3018.0,
#  2896.0,
#  2892.0,
#  2759.0,
#  2715.0,
#  2613.0,
#  2587.0,
#  2550.0,
#  2398.0,
#  2304.0,
#  2228.0,
#  2094.0,
#  2063.0,
#  1930.0,
#  1192.0,
#  345.0]
# weights-np.array(weights)
# weights=weights/np.max(weights)
# weights

In [3]:

class CustomDataset(Dataset):
    def __init__(self, csv_file:str, features:list):
        # Load the data
        self.data = pd.read_csv(csv_file)
        self.data['class_label'] = np.argmax(self.data[['label_proton', 'label_pion']].values, axis=1)
        self.data = self.data.drop(['label_proton', 'label_pion'], axis=1)
        self.data = self.data.dropna()
        self.X = self.data[features].values
        # self.X = StandardScaler().fit_transform(self.X)
        self.X = QuantileTransformer().fit_transform(self.X)
        self.y = self.data['class_label'].values 
        
    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        # Return one sample of data
        X_sample = torch.tensor(self.X[idx], dtype=torch.float32)
        y_sample = torch.tensor(self.y[idx], dtype=torch.long)
        return X_sample, y_sample

# Step 2: Use DataLoader

def create_dataloader(csv_file, batch_size=32, shuffle=True,num=1,train=0.6,valid=0.2,test=0.2,features=[]):
    dataset = CustomDataset(csv_file, features=features)
   
    indices = np.arange(len(dataset))
    np.random.shuffle(indices)
    train=int(train*len(dataset))
    valid=int(valid*len(dataset))
    test=int(test*len(dataset))
    print(f"Train: {train}, Valid: {valid}, Test: {test}")
    train_indices,valid_indices,test_indices = indices[:train], indices[train:valid+train],indices[train+valid:]
    print(f"Train: {len(train_indices)}, Valid: {len(valid_indices)}, Test: {len(test_indices)}")   
    train_loader=DataLoader(dataset, batch_size=batch_size, sampler=torch.utils.data.SubsetRandomSampler(train_indices),num_workers=num)
    valid_loader=DataLoader(dataset, batch_size=batch_size, sampler=torch.utils.data.SubsetRandomSampler(valid_indices),num_workers=num)
    test_loader=DataLoader(dataset, batch_size=batch_size, sampler=torch.utils.data.SubsetRandomSampler(test_indices),num_workers=num)
    return train_loader,valid_loader,test_loader

      


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

## Training and validation function

In [5]:
from tqdm import tqdm

def train(model, device, train_loader, optimizer):
    
    train_loss_ep = 0.
    
    model.train()
    with tqdm.tqdm(train_loader, ascii=True) as tq:
        for data, target in tq:
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            # loss = F.nll_loss(output, target)
            loss= F.cross_entropy(output, target) # It is not recommended to use cross entropy loss for Binary classification.
            loss.backward()
            optimizer.step()  
            train_loss_ep += loss.item() * data.size(0)
        
    return train_loss_ep

def test(model, device, valid_loader1):
    
    test_loss_ep = 0.
    model.eval()
    with tqdm.tqdm(valid_loader1, ascii=True) as tq:
        for data, target in tq:
            data, target = data.to(device), target.to(device)
            output = model(data)
            # loss = F.nll_loss(output, target)
            loss= F.cross_entropy(output, target)
            
            test_loss_ep += loss.item() * data.size(0)
        
    return test_loss_ep
        



In [None]:


csv_file = "/mnt/tsv/smeared_50_50_50/pp_smeared_50_50_50.csv"  # Replace with your actual CSV file
f=['speed',  
 'AsymmetryY',
 'centralTowerFraction_cell',
 'AsymmetryX',
 'VarianceAtVertex',
 'weightedTime',
 'AsymmetryX_plain',
 'EfractionCell',
 'AsymmetryY_plain',
 'TotalEnergy',
 'numCellBeforeVertex',
 'radialSigma',
 'Asymmetry',
 'RatioEcell',
 'PostVertexEnergyFraction',
 'Asymmetry_plain',
 'longitudinalSigma',
 'longitudinalSigma_plain',
 'radius',
 'DeltaEcell_secondMax',
 'NumberOfUniqueCells',
 'TotalEnergyCloseToVertex',
 'radialSigma_plain',
 'length',
 'theta2',
 'Z_vertex',
 'd2',
 'MaxEnergyInCell',
 'MaxEnergyCloseVertex',
 'E2',
 'distanceMaxFromVertex',
 'EnergyFractionCloseToVertex',
 'radius_plain',
 'E1',
 'length_plain',
 'SecondMaxEnergyInCell',
 'R1',
 'VertexTime',
 'Aplanarity',
 'R2',
 'distanceFirstSecondMaxEnergy',
 'DeltaT',
 'E3',
 'theta3',
 'd3',
 'R3',
 'time0',
 'NumPeaks']
#f lis a list of features to be used in the model (you can add or reove time0 here)

train_loader,valid_loader,test_loader = create_dataloader(csv_file, batch_size=128,num=28,features=f)
for X_batch, y_batch in train_loader:
    print(X_batch.shape[1], y_batch.shape)
    input_size = X_batch.shape[1]
    break
hidden_layers = [96,32,16]  # Example of hidden layer sizes
model = SimpleDNN(input_size, hidden_layers) # I also tried to use weights=weights in the model but it did not work


In [None]:
import pandas as pd
import tqdm
n_epochs = 200
loss_data = []

valid_loss_min = np.inf  # set initial "min" to infinity
    
for epoch in range(n_epochs):
  
    weight_decay=0.001
    lr=0.0009
    lr=lr/(1+weight_decay*(epoch)**2.5)
    if epoch%10==0:
        lr=0.00005
    print(f" LR: {np.round(lr,6)}")
    optimizer = optim.Adam(model.parameters(), lr=lr,weight_decay=weight_decay)  
    # optimizer = optim.SGD(model.parameters(), lr=lr,weight_decay=weight_decay,momentum=0.9)  #SGD optimizer is pretty slow compared to Adam
    train_loss = train(model, device, train_loader, optimizer)
    valid_loss = test(model, device, valid_loader)
    
    
    train_loss = train_loss / len(train_loader.sampler)
    valid_loss = valid_loss / len(valid_loader.sampler)

   

    loss_data.append({'Epoch': epoch + 1, 'Training Loss': train_loss, 'Validation Loss': valid_loss})

    print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(
        epoch+1, 
        train_loss,
        valid_loss
        ))
    
    # save model if validation loss has decreased
    if valid_loss <= valid_loss_min:
        print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
        valid_loss_min,
        valid_loss))
        torch.save(model.state_dict(), 'model_hcal.pt')
        valid_loss_min = valid_loss
    loss_history_df = pd.DataFrame(loss_data)
    loss_history_df.to_csv('loss_history.csv', index=False)

In [None]:
model_test = SimpleDNN(input_size, hidden_layers)
model_test.load_state_dict(torch.load('model_hcal.pt',weights_only=True))


In [13]:
def eval_model(model, data_loader):
    model.eval() # Set model to eval mode
    true_preds, num_preds = 0., 0.
    
    with torch.no_grad(): # Deactivate gradients for the following code
        for data_inputs, data_labels in data_loader:
            
           
            data_inputs, data_labels = data_inputs.to(device), data_labels.to(device)
            preds = model(data_inputs)
            
          
            pred_labels = torch.argmax(preds, dim=-1) # Binarize predictions to 0 and 10
            
            # Keep records of predictions for the accuracy metric (true_preds=TP+TN, num_preds=TP+TN+FP+FN)
            true_preds += (pred_labels == data_labels).sum()
            num_preds += data_labels.shape[0]
            
    acc = true_preds / num_preds
    print(f"Accuracy of the model: {100.0*acc:4.2f}%")

In [None]:
eval_model(model_test, test_loader)

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

def plot_confusion_matrix(model, data_loader):
    model.eval() # Set model to eval mode
    true_labels = []
    pred_labels = []
    
    with torch.no_grad(): # Deactivate gradients for the following code
        for data_inputs, data_labels in data_loader:

            data_inputs, data_labels = data_inputs.to(device), data_labels.to(device)
            preds = model(data_inputs)
            pred_labels.extend(torch.argmax(preds, dim=-1).cpu().numpy()) # Binarize predictions to 0 and 10
            true_labels.extend(data_labels.cpu().numpy())
            
    cm = confusion_matrix(true_labels, pred_labels)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='coolwarm', xticklabels=['proton', 'pion'], yticklabels=['proton', 'pion'])
    plt.xticks(weight='bold',size=12)
    plt.yticks(weight='bold',size=12)
    plt.xlabel("Prediction",weight="bold",size=12)
    plt.ylabel('Truth',weight='bold',size=12)
    plt.title('Confusion Matrix',weight='bold',size=15)
    plt.show()

plot_confusion_matrix(model_test, test_loader)