<a href="https://colab.research.google.com/github/Christian-Stefan/Team-Internship/blob/Chris/ContextModel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Fusion Model

## Context Branch

In [None]:
class ContextBranch(nn.Module):
    def __init__(self):
        super().__init__() # Constructor to initialize the `upper` constructor/pervious constructor? Why?
        self.conv = nn.Sequential(
            nn.Conv3d(1, 16, kernel_size=3, padding=1),
            nn.BatchNorm3d(16),
            nn.ReLU(),
            nn.MaxPool3d(2),  # (7,256,256) → (3,128,128)

            nn.Conv3d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm3d(32),
            nn.ReLU(),
            nn.AdaptiveAvgPool3d((1, 1, 1))  # output: [B, 32, 1, 1, 1]
        )

    def forward(self, x):
        x = self.conv(x)
        return x.view(x.size(0), -1)  # [B, 32]

## Radiomics Branch

In [None]:
class RadiomicsBranch(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 32)
        )

    def forward(self, x):
        return self.fc(x)

## Local Branch

In [None]:
class LocalBranch(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv3d(1, 16, kernel_size=3, padding=1),
            nn.BatchNorm3d(16),
            nn.ReLU(),
            nn.MaxPool3d(2),  # (5,64,64) → (2,32,32)

            nn.Conv3d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm3d(32),
            nn.ReLU(),
            nn.AdaptiveAvgPool3d((1, 1, 1))  # output: [B, 32, 1, 1, 1]
        )

    def forward(self, x):
        x = self.conv(x)
        return x.view(x.size(0), -1)  # flatten to [B, 32]

# Model Header

In [None]:
class TripleFusionModel(nn.Module):
    def __init__(self, num_classes, radiomics_dim=25):
        self.local_branch = LocalBranch()
        self.context_branch = ContextBranch()
        self.radiomics_branch = RadiomicsBranch(radiomics_dim)

        self.classifier = nn.Sequential(
            nn.Linear(32 + 32 + 32, 64),  # fuse outputs
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(64, num_classes)
        )

    def forward(self, volume_local, volume_context, radiomics):
        local_feat = self.local_branch(volume_local)
        context_feat = self.context_branch(volume_context)
        radio_feat = self.radiomics_branch(radiomics)

        fused = torch.cat([local_feat, context_feat, radio_feat], dim=1)
        out = self.classifier(fused)
        return out

# Model Selection

## Particle Swarm Optimization (PSO) - Hyperparameter Optimization Initialization & Explanation

In Particle Swarm Optimization (PSO), **particles** represent candidate solutions—for example, a randomly initialized set of hyperparameters for a machine learning model.

- Each particle moves within a **D-dimensional bounded continuous search space**.
- Particles collaborate to discover the optimal solution (i.e., the hyperparameter combination that minimizes a loss function).

Each particle \( i \) is defined by three key vectors at each iteration \( t \):

- **Position**: $\mathbf{x}_i(t) \in \mathbb{R}^D $ — current location of particle \( i \)
- **Velocity**: $mathbf{v}_i(t) \in \mathbb{R}^D $ — the movement direction and speed of particle \( i \)
- **Best Position**: $ \mathbf{b}_i(t) \in \mathbb{R}^D $— the best position found so far by particle \( i \) based on the fitness (e.g., validation loss)

### Position Update Rule

The new position is computed by adding the current velocity to the current position:

\[
$\mathbf{x}_i(t + 1) = \mathbf{x}_i(t) + \mathbf{v}_i(t)$
\]

### Velocity Update Rule (Conceptual Overview)

The velocity is influenced by:
- **Cognitive component**: Particle's own best-known position
- **Social component**: Best-known position among all particles (global best)

The combination of these two attractions causes particles to explore and exploit the search space, ideally converging on an optimal or near-optimal solution over iterations.


In [30]:
import random as rd

best_position_container:list = [] # Keeps track of best position g(t) visited by any particple up to itteration t

# 1. Random initialization of particles/subsets of variants for hyperparameters (e.g., Learning Rate;	Optimizer; Loss Functios)
up_lr:float = 0.1 # Upper range of learning rate - initially
low_lr:float = 0.01 # Lower range of learning rate

optimizers:list = [ # Available optimizers in torch
    "Adadelta",
    "Adafactor",
    "Adagrad",
    "Adam",
    "AdamW",
    "SparseAdam",
    "Adamax",
    "ASGD",
    "LBFGS",
    "NAdam",
    "RAdam",
    "RMSprop",
    "Rprop",
    "SGD"
]

loss_functions:list = [
    "L1Loss",
    "MSELoss",
    "CrossEntropyLoss",
    "CTCLoss",
    "NLLLoss",
    "PoissonNLLLoss",
    "GaussianNLLLoss",
    "KLDivLoss",
    "BCELoss",
    "BCEWithLogitsLoss",
    "MarginRankingLoss",
    "HingeEmbeddingLoss",
    "MultiLabelMarginLoss",
    "HuberLoss",
    "SmoothL1Loss",
    "SoftMarginLoss",
    "MultiLabelSoftMarginLoss",
    "CosineEmbeddingLoss",
    "MultiMarginLoss",
    "TripletMarginLoss",
    "TripletMarginWithDistanceLoss"
]


hyperparameters_grid:dict = {
    'Learning_rate': [0.1, 0.01, 0.005, 0.0010, 0.00020], # rd.uniform(low_lr, up_lr),
    'Optimizer': optimizers, # optimizers[rd.randint(0, len(optimizers)-1)],
    'Loss Function': loss_functions # loss_functions[rd.randint(0, len(loss_functions)-1)]
}

### Particle initialization/estimator - Method

In [37]:
def _init_Populaton():

  hyperparameters_grid:dict = {
    'Learning_rate': [0.1, 0.01, 0.005, 0.0010, 0.00020],
    'Optimizer': optimizers,
    'Loss Function': loss_functions
}
  return hyperparameters_grid

# Usage example - Initialize a population of 10 particles/individuals:
# for x in range(10):
#     particle = init_Populaton()
#     print(particle)

### Evaluate fitness - Method

In [None]:
def evaluate_fitness(self,
                     X_train,
                     X_test,
                     y_train,
                     y_test,
                     hyperparameters):
        """
        Evaluate the fitness of a set of hyperparameters.

        Parameters:
            - estimator: The estimator object.
            - X_train: Training features.
            - X_test: Testing features.
            - y_train: Training labels.
            - y_test: Testing labels.
            - hyperparameters: The set of hyperparameters to evaluate.

        Returns:
            - score: The accuracy score of the estimator with the given hyperparameters.
        """
        # Unpack hyperparameters
        estimator_instance = self._create_estimator(hyperparameters)

        estimator_instance.fit(X_train, y_train)
        y_pred = estimator_instance.predict(X_test)
        accuracy_pso = accuracy_score(y_test, y_pred)
        return accuracy_pso

### Create the searching space - Method

In [46]:
def pso_hyperparameter_optimization(self,
                                    X_train,
                                    X_test,
                                    y_train,
                                    y_test,
                                    num_particles,
                                    num_iterations,
                                    c1 = 2.05,
                                    c2 = 2.05,
                                    num_jobs=-1,
                                    w=0.72984):
        """
        Perform hyperparameter optimization using Particle Swarm Optimization (PSO).

        Parameters:
            - estimator: The estimator object (e.g., KNeighborsClassifier, ViT).
            - data: The dataset.
            - target_column_index: Index of the target column in the dataset.
            - num_particles: Number of particles in the population.
            - num_iterations: Number of iterations for the PSO algorithm.
            - c1: Acceleration constant. Default value is c1 = 2.05
            - c2: Acceleration constant. Default value is c2 = 2.05
            - num_jobs: Number of parallel jobs for fitness evaluation.
            - inertia weight: Inertia constant. Default value is w=0.72984 according to the paper by M. Clerc and J. Kennedy

        Returns:
            - global_best_position: The best set of hyperparameters found.
            - global_best_fitness: The best accuracy found.
        """
        if self.random_seed is not None:
            np.random.seed(self.random_seed)

        # 1. Initialize the population of particles
        hyperparameter_space = self._init_Populaton()
        progress_bar = tqdm(total=num_iterations, desc="PSO Progress")
        population:list = [] # Container 1: Population

        for _ in range(self.num_particles):
            hyperparameters = [np.random.choice(hyperparameter_space[param]) for param in hyperparameter_space]
            population.append(hyperparameters)


        # 2. Initialize velocity and best position
        velocity = [[0] * len(hyperparameter_space) for _ in range(num_particles)] # Container 2: Velocity, of each hyperparameter-set stays zero at initialization phase
        best_position = population.copy() # Container 3: Best position do not differ from population
        global_best_fitness = -float("inf") # Conainer 4: Best positions are all the same across individuals hence fitness stays constant
        global_best_position = [] # Container 5: Best position a given individual has ever reached




        # PSO optimization loop
        for _ in range(num_iterations):
            fitness = Parallel(n_jobs=num_jobs)(
                delayed(self.evaluate_fitness)(X_train, X_test, y_train, y_test, particle)
                for particle in population
            )





# Dependencies

In [47]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import random
import numpy as np
from tqdm import tqdm
from joblib import Parallel, delayed
from sklearn.metrics import accuracy_score, classification_report, precision_score