In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
import os
from mpl_toolkits.mplot3d import Axes3D
import plotly.graph_objs as go
from plotly.subplots import make_subplots
import plotly.io as pio
from sklearn.cluster import KMeans
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn 
import torch.nn.functional as F
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from torchsummary import summary

Project_Data = "C:\\Users\\yazee\\Desktop\\PhD-Studies\\Data"
Project_Figure = "C:\\Users\\yazee\\Desktop\\PhD-Studies\\Figure"

Project_Paths = [Project_Data, Project_Figure]
for path in Project_Paths:
    if not os.path.exists(path):
        print(f"Path {path} does not exists. Creating it ....")
        os.mkdir(path)

file_names = [f for f in os.listdir(Project_Data) if f.endswith(".txt")]
data_list = {}
for fName in file_names:
    key = fName.split('.')[0]
    data_list[key] = pd.read_csv(os.path.join(Project_Data,fName), delimiter = " ")

In [None]:
class ComptonNetV2(nn.Module):
    def __init__(self):
        super(ComptonNetV2, self).__init__()
        
        # MLP for both-reacted events (6 features)
        self.mlp_both = nn.Sequential(
            nn.Linear(6, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU()
        )
        
        # MLP for scatter-only events (3 features)
        self.mlp_scatter = nn.Sequential(
            nn.Linear(3, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU()
        )
        
        # MLP for absorber-only events (3 features)
        self.mlp_absorber = nn.Sequential(
            nn.Linear(3, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU()
        )
        
        # Convolutional layers for feature extraction and upsampling
        self.conv1 = nn.Conv2d(1, 64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(128)
        self.conv3 = nn.Conv2d(128, 64, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.conv4 = nn.Conv2d(64, 32, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(32)
        self.conv5 = nn.Conv2d(32, 1, kernel_size=3, padding=1)
        self.bn5 = nn.BatchNorm2d(1)
        
        # Upsampling layers
        self.upsample1 = nn.Upsample(size=(16, 128), mode='bilinear', align_corners=True)
        self.upsample2 = nn.Upsample(size=(32, 256), mode='bilinear', align_corners=True)
        self.upsample3 = nn.Upsample(size=(64, 256), mode='bilinear', align_corners=True)
        self.upsample4 = nn.Upsample(size=(256, 256), mode='bilinear', align_corners=True)
        
        self.dropout = nn.Dropout(0.5)

    def forward(self, x_both, x_scatter, x_absorber):
        # Process both-reacted events
        features_both = self.mlp_both(x_both)
        features_both = torch.max(features_both, dim=0, keepdim=True)[0]
        
        # Process scatter-only events
        features_scatter = self.mlp_scatter(x_scatter)
        features_scatter = torch.max(features_scatter, dim=0, keepdim=True)[0]
        
        # Process absorber-only events
        features_absorber = self.mlp_absorber(x_absorber)
        features_absorber = torch.max(features_absorber, dim=0, keepdim=True)[0]
        
        # Concatenate features from all event types
        combined_features = torch.cat([features_both, features_scatter, features_absorber], dim=1)
        combined_features = combined_features.unsqueeze(1).unsqueeze(1)
        
        # Pass through the convolutional layers with upsampling
        x = F.relu(self.bn1(self.conv1(combined_features)))
        x = self.dropout(x)
        x = self.upsample1(x)
        
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.dropout(x)
        x = self.upsample2(x)
        
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.dropout(x)
        x = self.upsample3(x)
        
        x = F.relu(self.bn4(self.conv4(x)))
        x = self.dropout(x)
        x = self.upsample4(x)
        
        x = self.bn5(self.conv5(x))
        
        return x

def custom_summary(model):
    print(f"{'Layer':<25} {'Output Shape':<20} {'Param #':<10}")
    print("="*55)
    
    x_both = torch.zeros(1, 6)
    x_scatter = torch.zeros(1, 3)
    x_absorber = torch.zeros(1, 3)
    
    features_both = model.mlp_both(x_both)
    features_both = torch.max(features_both, dim=0, keepdim=True)[0]
    print(f"{'mlp_both':<25} {str(features_both.shape):<20} {sum(p.numel() for p in model.mlp_both.parameters()):<10}")
    
    features_scatter = model.mlp_scatter(x_scatter)
    features_scatter = torch.max(features_scatter, dim=0, keepdim=True)[0]
    print(f"{'mlp_scatter':<25} {str(features_scatter.shape):<20} {sum(p.numel() for p in model.mlp_scatter.parameters()):<10}")
    
    features_absorber = model.mlp_absorber(x_absorber)
    features_absorber = torch.max(features_absorber, dim=0, keepdim=True)[0]
    print(f"{'mlp_absorber':<25} {str(features_absorber.shape):<20} {sum(p.numel() for p in model.mlp_absorber.parameters()):<10}")
    
    combined_features = torch.cat([features_both, features_scatter, features_absorber], dim=1)
    combined_features = combined_features.unsqueeze(1).unsqueeze(1)
    print(f"{'Concatenated Features':<25} {str(combined_features.shape):<20} {'-':<10}")
    
    x = F.relu(model.bn1(model.conv1(combined_features)))
    print(f"{'conv1':<25} {str(x.shape):<20} {sum(p.numel() for p in model.conv1.parameters()):<10}")
    
    x = model.dropout(x)
    x = model.upsample1(x)
    print(f"{'upsample1':<25} {str(x.shape):<20} {'-':<10}")
    
    x = F.relu(model.bn2(model.conv2(x)))
    print(f"{'conv2':<25} {str(x.shape):<20} {sum(p.numel() for p in model.conv2.parameters()):<10}")
    
    x = model.dropout(x)
    x = model.upsample2(x)
    print(f"{'upsample2':<25} {str(x.shape):<20} {'-':<10}")
    
    x = F.relu(model.bn3(model.conv3(x)))
    print(f"{'conv3':<25} {str(x.shape):<20} {sum(p.numel() for p in model.conv3.parameters()):<10}")
    
    x = model.dropout(x)
    x = model.upsample3(x)
    print(f"{'upsample3':<25} {str(x.shape):<20} {'-':<10}")
    
    x = F.relu(model.bn4(model.conv4(x)))
    print(f"{'conv4':<25} {str(x.shape):<20} {sum(p.numel() for p in model.conv4.parameters()):<10}")
    
    x = model.dropout(x)
    x = model.upsample4(x)
    print(f"{'upsample4':<25} {str(x.shape):<20} {'-':<10}")
    
    x = model.bn5(model.conv5(x))
    print(f"{'conv5':<25} {str(x.shape):<20} {sum(p.numel() for p in model.conv5.parameters()):<10}")
      
def generate_ground_truth_images(pi_data, grid_size=256):
    ground_truth_images = []
    for _, pi_info in pi_data.iterrows():
        x_idx = int((pi_info['PI_Vx'] + grid_size / 2) * (grid_size / grid_size))
        y_idx = int((pi_info['PI_Vy'] + grid_size / 2) * (grid_size / grid_size))
        
        ground_truth_image = np.zeros((grid_size, grid_size))
        ground_truth_image[y_idx, x_idx] = 1
        
        ground_truth_images.append(ground_truth_image)
        
    return np.array(ground_truth_images)

In [None]:
model = ComptonNetV2()
custom_summary(model)

In [None]:
pi_data = data_list['PI_data']
incident_gamma_energy = 1.5 
grid_size = 256

processed_data_both = []
processed_data_scatter = []
processed_data_absorber = []
ground_truth_images = []

for event_id in data_list['SG_data']['SG_ID'].unique():
    sg_mask = data_list['SG_data']['SG_ID'] == event_id
    ag_mask = data_list['AG_data']['AG_ID'] == event_id
    ce_mask = data_list['CE_data']['CE_ID'] == event_id
    
    event_both = [0, 0, 0, 0, 0, 0]
    event_scatter = [0, 0, 0]
    event_absorber = [0, 0, 0]
    
    if sg_mask.any():
        sg_data = data_list['SG_data'][sg_mask].iloc[0]
        event_scatter = [sg_data['SG_Vz'], sg_data['SG_Vy'], sg_data['SG_energy']]
    
    if ag_mask.any():
        ag_data = data_list['AG_data'][ag_mask].iloc[0]
        if 'SG_Back' in data_list['AG_vol']['VolName'][ag_mask].values[0]:
            event_absorber = [ag_data['AG_Vz'], ag_data['AG_Vy'], sg_data['SG_energy']]

    if sg_mask.any() and ag_mask.any() and ce_mask.any():
        ag_layer_name = data_list['AG_vol']['VolName'][ag_mask].values[0]
        if 'SG_Back' in ag_layer_name:
            ce_data = data_list['CE_data'][ce_mask].iloc[0]
            event_both = [sg_data['SG_Vz'], sg_data['SG_Vy'], ce_data['CE_energy'], ag_data['AG_Vz'], ag_data['AG_Vy'], sg_data['SG_energy']]
    
    processed_data_both.append(event_both)
    processed_data_scatter.append(event_scatter)
    processed_data_absorber.append(event_absorber)

    
    pi_mask = pi_data['PI_ID'] == event_id
    pi_info = pi_data[pi_mask].iloc[0]
    x_idx = int((pi_info['PI_Vx'] + grid_size / 2) * (grid_size / grid_size))
    y_idx = int((pi_info['PI_Vy'] + grid_size / 2) * (grid_size / grid_size))

    ground_truth_image = np.zeros((grid_size, grid_size))
    ground_truth_image[y_idx, x_idx] = 1

    ground_truth_images.append(ground_truth_image)

columns_both = ['z_sca', 'y_sca', 'e_sca', 'z_abs', 'y_abs', 'e_abs']
columns_scatter = ['z_sca', 'y_sca', 'e_sca']
columns_absorber = ['z_abs', 'y_abs', 'e_abs']

processed_df_both = pd.DataFrame(processed_data_both, columns=columns_both)
processed_df_scatter = pd.DataFrame(processed_data_scatter, columns=columns_scatter)
processed_df_absorber = pd.DataFrame(processed_data_absorber, columns=columns_absorber)

input_data_both = torch.tensor(processed_df_both.values, dtype=torch.float32)
input_data_scatter = torch.tensor(processed_df_scatter.values, dtype=torch.float32)
input_data_absorber = torch.tensor(processed_df_absorber.values, dtype=torch.float32)

ground_truth_images_tensor = torch.tensor(np.array(ground_truth_images), dtype=torch.float32).unsqueeze(1)

assert input_data_both.size(0) == input_data_scatter.size(0) == input_data_absorber.size(0) == ground_truth_images_tensor.size(0)

dataset = TensorDataset(input_data_both, input_data_scatter, input_data_absorber, ground_truth_images_tensor)
data_loader = DataLoader(dataset, batch_size=32, shuffle=True)

In [None]:
input_data_both = torch.tensor(processed_df_both.values, dtype=torch.float32)
input_data_scatter = torch.tensor(processed_df_scatter.values, dtype=torch.float32)
input_data_absorber = torch.tensor(processed_df_absorber.values, dtype=torch.float32)

ground_truth_images_tensor = torch.tensor(np.array(ground_truth_images), dtype=torch.float32).unsqueeze(1)

dataset = TensorDataset(input_data_both, input_data_scatter, input_data_absorber, ground_truth_images_tensor)
data_loader = DataLoader(dataset, batch_size=32, shuffle=True)

model = ComptonNetV2()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
num_samples_to_visualize = 5
num_epochs = 50

for epoch in range(num_epochs):
    running_loss = 0.0
    for i, (inputs_both, inputs_scatter, inputs_absorber, labels) in enumerate(data_loader):
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(inputs_both, inputs_scatter, inputs_absorber)
        
        if outputs.shape != labels.shape:
            print(f"Warning: Mismatch in output and label shapes: {outputs.shape} vs {labels.shape}")
            outputs = outputs.expand_as(labels)
        
        loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(data_loader)}")

    if epoch % 10 == 0:
        model.eval()
        with torch.no_grad():
            test_output = model(inputs_both[:num_samples_to_visualize], inputs_scatter[:num_samples_to_visualize], inputs_absorber[:num_samples_to_visualize])
            
            # Make sure the output has the correct shape
            if test_output.shape[1] != 1:
                test_output = test_output.view(-1, 1, 256, 256)
            
            test_output_np = test_output.squeeze(1).cpu().numpy()  # Shape should be (N, 256, 256)
            num_samples_in_batch = test_output_np.shape[0]  # Number of actual samples in this batch

            for j in range(num_samples_in_batch):  # Only loop over available samples
                plt.imshow(test_output_np[j], cmap='hot', interpolation='nearest')
                plt.title(f"Predicted Heatmap at Epoch {epoch}, Sample {j}")
                plt.colorbar()
                plt.show()
        model.train()

print("Training completed.")

model.eval()
with torch.no_grad():
    inputs_both, inputs_scatter, inputs_absorber, _ = next(iter(data_loader))
    output_images = model(inputs_both, inputs_scatter, inputs_absorber)
    
    for i in range(output_images.size(0)):
        output_image = output_images[i].squeeze().cpu().numpy()
        
        max_position = np.unravel_index(np.argmax(output_image, axis=None), output_image.shape)
        print(f"Predicted source position for sample {i}: {max_position}")