In [1]:
import os
import math
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from utilities import import_data_files
import torch
import torch.nn as nn
import torch.optim as optim
import awkward as ak

In [2]:
# Kernel SVM Model
class KernelSVM(nn.Module):
    def __init__(self, input_size=9801, output_size=1):
        super(KernelSVM, self).__init__()
        self.linear = nn.Linear(input_size, output_size)

    def forward(self, x1, x2):
        # Ensure x1 and x2 are 1D
        x1 = x1.view(-1)
        x2 = x2.view(-1)
        
        # Outer product (quadratic kernel) and squaring
        kernel = torch.ger(x1, x2) ** 2
        
        # Flatten and pass through linear layer
        kernel_flat = kernel.view(-1)
        return self.linear(kernel_flat)

In [3]:
# Data preparation
def prepare_data(DFs, train_ratio=0.7, val_ratio=0.15):
    print("Data loaded from files")
    print("Converting data to numpy array...")
    
    # Extracting data from awkward arrays
    accepted_numpy = ak.to_numpy(DFs[0]['SuperCell_ET'])
    rejected_numpy = ak.to_numpy(DFs[1]['SuperCell_ET'])
    
    print("Converting data to torch tensors...")
    accepted_tensor = torch.tensor(accepted_numpy, dtype=torch.float32)
    rejected_tensor = torch.tensor(rejected_numpy, dtype=torch.float32)
    
    print("Generating labels...")
    accepted_labels = torch.zeros(len(accepted_tensor), dtype=torch.float32)
    rejected_labels = torch.ones(len(rejected_tensor), dtype=torch.float32)
    
    # Concatenating data and labels
    print("Concatenating datasets...")
    data = torch.cat([accepted_tensor, rejected_tensor], dim=0)
    labels = torch.cat([accepted_labels, rejected_labels], dim=0)
    
    print("Shuffling data...")
    indices = torch.randperm(len(data))
    data = data[indices]
    labels = labels[indices]
    
    # Splitting data
    print("Splitting data...")
    dataset = torch.utils.data.TensorDataset(data, labels)
    train_size = int(train_ratio * len(dataset))
    val_size = int(val_ratio * len(dataset))
    test_size = len(dataset) - train_size - val_size
    
    train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(
        dataset, [train_size, val_size, test_size])
    
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=1, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=1, shuffle=False)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False)
    
    return train_loader, val_loader, test_loader

In [4]:
# Hinge loss function
def hinge_loss(output, target):
    return torch.clamp(1 - output * target.view_as(output), min=0).mean()

In [5]:
# Training function
def train_model(train_loader, val_loader, input_size=9801, learning_rate=0.01, num_epochs=10):
    model = KernelSVM(input_size=input_size, output_size=1)
    optimizer = optim.SGD(model.parameters(), lr=learning_rate)
    
    model.train()
    epoch_losses = []
    
    for epoch in range(num_epochs):
        epoch_loss = 0
        for i, (x1, y1) in enumerate(train_loader):
            for j, (x2, y2) in enumerate(train_loader):
                if i == j:
                    continue
                
                optimizer.zero_grad()
                
                output = model(x1.squeeze(0), x2.squeeze(0))
                target = 2 * (y1 == y2).float() - 1  # +1 for same class, -1 for different class
                
                loss = hinge_loss(output, target)
                loss.backward()
                optimizer.step()
                
                epoch_loss += loss.item()
        
        val_loss = validate_model(model, val_loader)
        epoch_losses.append((epoch_loss / len(train_loader), val_loss))
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {epoch_losses[-1][0]:.4f}, Val Loss: {val_loss:.4f}')
    
    torch.save(model.state_dict(), f'svm_model_test2_in{input_size}_lr{learning_rate}_ep{num_epochs}.pth')
    return model, epoch_losses

In [6]:
# Validation function
def validate_model(model, val_loader):
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for x1, y1 in val_loader:
            for x2, y2 in val_loader:
                if torch.equal(x1, x2):
                    continue
                
                output = model(x1.squeeze(0), x2.squeeze(0))
                target = 2 * (y1 == y2).float() - 1
                
                loss = hinge_loss(output, target)
                val_loss += loss.item()
    
    return val_loss / len(val_loader)

In [7]:
# Testing function
def test_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for x1, y1 in test_loader:
            for x2, y2 in test_loader:
                if torch.equal(x1, x2):
                    continue
                
                output = model(x1.squeeze(0), x2.squeeze(0))
                predicted = (output.item() > 0).float()  # Binary classification threshold
                
                target = 2 * (y1 == y2).float() - 1
                correct += (predicted == (target > 0)).sum().item()
                total += 1
    
    print(f'Accuracy: {correct / total * 100:.2f}%')

In [8]:
# Plot loss
def plot_loss(epoch_losses):
    train_losses, val_losses = zip(*epoch_losses)
    plt.plot(range(1, len(train_losses) + 1), train_losses, label='Train Loss', marker='o')
    plt.plot(range(1, len(val_losses) + 1), val_losses, label='Val Loss', marker='x')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss Over Epochs')
    plt.legend()
    plt.grid()
    plt.show()

In [9]:
# # Main Execution
# if __name__ == "__main__":
#     DFs = import_data_files(["l1calo_hist_ZMUMU_extended.root", "l1calo_hist_EGZ_extended.root"])
#     train_loader, val_loader, test_loader = prepare_data(DFs)
    
#     model, epoch_losses = train_model(train_loader, val_loader)
#     plot_loss(epoch_losses)
    
#     test_model(model, test_loader)

DFs = import_data_files(["l1calo_hist_ZMUMU_extended.root", "l1calo_hist_EGZ_extended.root"])
print("Data Files Imported")

Data Files Imported


In [10]:
train_loader, val_loader, test_loader = prepare_data(DFs)
print("Data Prepared")

Data loaded from files
Converting data to numpy array...
Converting data to torch tensors...
Generating labels...
Concatenating datasets...
Shuffling data...
Splitting data...
Data Prepared


In [12]:
model, epoch_losses = train_model(train_loader, val_loader)
print("Model Trained")

KeyboardInterrupt: 

In [None]:
plot_loss(epoch_losses)
print("Loss Obtained")

In [None]:
test_model(model, test_loader)
print("Model Tested")