# Data Loading and Feature Extraction

### library imports

In [None]:
import os
import numpy as np
import nibabel as nib
from nilearn import image, input_data
from nilearn.datasets import fetch_atlas_aal
from sklearn.model_selection import train_test_split
import pandas as pd
from scipy.stats import skew, kurtosis
from skimage.feature import graycomatrix, graycoprops
import torch
import torch.nn as nn
import pennylane as qml
import torch.optim as optim
from sklearn.metrics import accuracy_score
import json
import logging


### Adding paths

In [None]:
mri_pet_data_path = r"C:\Users\ishsh\OneDrive\Desktop\QML_Project\data\processed\mri_pet\derivatives"
mri_data_path = r"C:\Users\ishsh\OneDrive\Desktop\QML_Project\data\processed\mri\derivatives\cleaned_skullstrip"

### Configure Logging

In [None]:
logging.basicConfig(filename='processing_log.log', level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

### Fetch the AAL Atlas

In [None]:
aal_atlas = fetch_atlas_aal()
atlas_filename = aal_atlas.maps

### Function to extract ROI-based features


### Light imports

In [None]:
import numpy as np
import nibabel as nib
from nilearn import input_data
from scipy.stats import skew, kurtosis # Add this import statement
from skimage.feature import graycomatrix, graycoprops

## MRI

In [None]:

def extract_mri_features(mri_path, atlas_filename, surviving_labels=None):
    try:
        img = nib.load(mri_path)
        masker = input_data.NiftiLabelsMasker(labels_img=atlas_filename, standardize=False)
        roi_data = masker.fit_transform(img)

        all_roi_features = []
        if surviving_labels is None:
            surviving_labels = range(roi_data.shape[1])

        for i, roi_values in enumerate(roi_data.T):
            if i in surviving_labels:
                mean_val = np.mean(roi_values)
                std_val = np.std(roi_values)
                skew_val = np.skew(roi_values)
                kurt_val = np.kurtosis(roi_values)

                if roi_values.max() == roi_values.min():
                    roi_features = [mean_val, std_val, skew_val, kurt_val, 0, 0, 0, 0]
                else:
                    roi_values_int = (roi_values - roi_values.min()) / (roi_values.max() - roi_values.min()) * 255
                    roi_values_int = roi_values_int.astype(np.uint8)

                    size = int(np.sqrt(len(roi_values_int)))
                    if size * size != len(roi_values_int):
                        size += 1
                    reshaped_roi_values = np.pad(roi_values_int, (0, size * size - len(roi_values_int)), 'constant').reshape(size, size)

                    glcm = graycomatrix(reshaped_roi_values, distances=[1], angles=[0, np.pi/4, np.pi/2, 3*np.pi/4], levels=256, symmetric=True, normed=True)
                    contrast = graycoprops(glcm, 'contrast').mean()
                    correlation = graycoprops(glcm, 'correlation').mean()
                    energy = graycoprops(glcm, 'energy').mean()
                    homogeneity = graycoprops(glcm, 'homogeneity').mean()

                    roi_features = [mean_val, std_val, skew_val, kurt_val, contrast, correlation, energy, homogeneity]
                all_roi_features.extend(roi_features)

        logging.info(f"MRI features extracted from {mri_path}")
        return np.array(all_roi_features), surviving_labels

    except Exception as e:
        logging.error(f"Error processing MRI file {mri_path}: {e}")
        return None, None

## PET

In [None]:

def extract_pet_features(pet_path, atlas_filename, surviving_labels=None):
    try:
        img = nib.load(pet_path)
        masker = input_data.NiftiLabelsMasker(labels_img=atlas_filename, standardize=False)
        roi_data = masker.fit_transform(img)

        all_roi_features = []
        if surviving_labels is None:
            surviving_labels = range(roi_data.shape[1])

        for i, roi_values in enumerate(roi_data.T):
            if i in surviving_labels:
                mean_val = np.mean(roi_values)
                std_val = np.std(roi_values)
                skew_val = np.skew(roi_values)
                kurt_val = np.kurtosis(roi_values)

                if roi_values.max() == roi_values.min():
                    roi_features = [mean_val, std_val, skew_val, kurt_val, 0, 0, 0, 0]
                else:
                    roi_values_int = (roi_values - roi_values.min()) / (roi_values.max() - roi_values.min()) * 255
                    roi_values_int = roi_values_int.astype(np.uint8)

                    size = int(np.sqrt(len(roi_values_int)))
                    if size * size != len(roi_values_int):
                        size += 1
                    reshaped_roi_values = np.pad(roi_values_int, (0, size * size - len(roi_values_int)), 'constant').reshape(size, size)

                    glcm = graycomatrix(reshaped_roi_values, distances=[1], angles=[0, np.pi/4, np.pi/2, 3*np.pi/4], levels=256, symmetric=True, normed=True)
                    contrast = graycoprops(glcm, 'contrast').mean()
                    correlation = graycoprops(glcm, 'correlation').mean()
                    energy = graycoprops(glcm, 'energy').mean()
                    homogeneity = graycoprops(glcm, 'homogeneity').mean()

                    roi_features = [mean_val, std_val, skew_val, kurt_val, contrast, correlation, energy, homogeneity]
                all_roi_features.extend(roi_features)

        logging.info(f"PET features extracted from {pet_path}")
        return np.array(all_roi_features), surviving_labels

    except Exception as e:
        logging.error(f"Error processing PET file {pet_path}: {e}")
        return None, None

### Find the reference MRI image for MRI-only data


In [None]:
reference_mri_only_path = None
max_rois_mri_only = 0

for subject_folder in os.listdir(mri_data_path):
    mri_path = os.path.join(mri_data_path, subject_folder, "anat", f"{subject_folder}_T1w_skullstripped.nii.gz")
    img = nib.load(mri_path)
    masker = input_data.NiftiLabelsMasker(labels_img=atlas_filename, standardize=False)
    roi_data = masker.fit_transform(img)
    num_rois = roi_data.shape[1]

    if num_rois > max_rois_mri_only:
        max_rois_mri_only = num_rois
        reference_mri_only_path = mri_path

_, surviving_mri_only_labels = extract_mri_features(reference_mri_only_path, atlas_filename)

for subject_folder in os.listdir(os.path.join(mri_pet_data_path, "cleaned_skullstrip")):
    mri_path = os.path.join(mri_pet_data_path, "cleaned_skullstrip", subject_folder, "anat", f"{subject_folder}_T1w_skullstripped.nii.gz")
    pet_path = os.path.join(mri_pet_data_path, "preprocessed_pet", subject_folder, "pet", f"{subject_folder}_space-MNI152NLin2009aPET.nii.gz")

    mri_features, _ = extract_mri_features(mri_path, atlas_filename, surviving_mri_pet_labels)
    pet_features, _ = extract_pet_features(pet_path, atlas_filename, surviving_mri_pet_labels)

    if mri_features is not None and pet_features is not None:
        combined_features = np.concatenate((mri_features, pet_features))
        mri_pet_features.append(combined_features)
        mri_pet_labels.append(1 if "patient" in subject_folder else 0)

mri_pet_features = np.array(mri_pet_features)
mri_pet_labels = np.array(mri_pet_labels)


### Extract MRI-PET features

In [None]:
mri_pet_features = []
mri_pet_labels = []
surviving_mri_pet_labels = None

for subject_folder in os.listdir(os.path.join(mri_pet_data_path, "cleaned_skullstrip")):
    mri_path = os.path.join(mri_pet_data_path, "cleaned_skullstrip", subject_folder, "anat", f"{subject_folder}_T1w_skullstripped.nii.gz")
    pet_path = os.path.join(mri_pet_data_path, "preprocessed_pet", subject_folder, "pet", f"{subject_folder}_space-MNI152NLin2009aPET.nii.gz")

    mri_features, surviving_mri_pet_labels = extract_mri_features(mri_path, atlas_filename, surviving_mri_pet_labels)
    pet_features, surviving_mri_pet_labels = extract_pet_features(pet_path, atlas_filename, surviving_mri_pet_labels)

    combined_features = np.concatenate((mri_features, pet_features))
    mri_pet_features.append(combined_features)
    mri_pet_labels.append(1 if "patient" in subject_folder else 0)

mri_pet_features = np.array(mri_pet_features)
mri_pet_labels = np.array(mri_pet_labels)

### Extract MRI-only features and corrected labels

In [None]:
mri_only_features = []
mri_only_labels = []
surviving_mri_only_labels = None

for subject_folder in os.listdir(mri_data_path):
    mri_path = os.path.join(mri_data_path, subject_folder, "anat", f"{subject_folder}_T1w_skullstripped.nii.gz")
    mri_features, surviving_mri_only_labels = extract_mri_features(mri_path, atlas_filename, surviving_mri_only_labels)
    mri_only_features.append(mri_features)
    subject_id = int(subject_folder.replace("sub-", ""))
    mri_only_labels.append(0 if subject_id > 4000 else 1)

mri_only_features = np.array(mri_only_features)
mri_only_labels = np.array(mri_only_labels)

## Saving intermediates

In [None]:
import numpy as np
import pandas as pd

# ... (Feature extraction code) ...

# Save MRI-PET features and labels
np.save("mri_pet_features.npy", mri_pet_features)
np.save("mri_pet_labels.npy", mri_pet_labels)

# Save MRI-only features and labels
np.save("mri_only_features.npy", mri_only_features)
np.save("mri_only_labels.npy", mri_only_labels)

#Save the training and testing sets
np.save("X_train_mri_pet.npy", X_train_mri_pet)
np.save("X_test_mri_pet.npy", X_test_mri_pet)
np.save("y_train_mri_pet.npy", y_train_mri_pet)
np.save("y_test_mri_pet.npy", y_test_mri_pet)

#Or using pandas
df_mri_pet = pd.DataFrame(np.concatenate((mri_pet_features, mri_pet_labels.reshape(-1,1)), axis = 1))
df_mri_pet.to_csv("mri_pet_data.csv", index=False)

# Quantum Feature Encoding

In [None]:
from sklearn.decomposition import PCA
import numpy as np
import torch

# Function for PCA and angle encoding
def quantum_encode(features, n_qubits):
    pca = PCA(n_components=n_qubits)
    reduced_features = pca.fit_transform(features)
    normalized_features = reduced_features / np.linalg.norm(reduced_features, axis=1, keepdims=True)

    angles = np.arcsin(normalized_features)
    return angles

# Model Architecture

### Classical MRI and PET Encoders (Simple Neural Networks)

In [None]:
class ClassicalEncoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(ClassicalEncoder, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, hidden_size // 2)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

### Quantum Variational Circuit (QVC)

In [None]:
def qvc(inputs, weights):
    n_qubits = len(inputs)
    for i in range(n_qubits):
        qml.RX(inputs[i], wires=i)
    for i in range(n_qubits - 1):
        qml.CNOT(wires=[i, i + 1])
    for i in range(n_qubits):
        qml.Rot(weights[i, 0], weights[i, 1], weights[i, 2], wires=i)
    return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]


### Hybrid Model

In [None]:
class HybridModel(nn.Module):
    def __init__(self, mri_input_size, pet_input_size, hidden_size, n_qubits):
        super(HybridModel, self).__init__()
        self.mri_encoder = ClassicalEncoder(mri_input_size, hidden_size)
        self.pet_encoder = ClassicalEncoder(pet_input_size, hidden_size)
        self.dev = qml.device('default.qubit', wires=n_qubits)
        self.qnode = qml.QNode(qvc, self.dev)
        self.weights = torch.nn.Parameter(torch.rand(n_qubits, 3))
        self.fc_final = nn.Linear(n_qubits, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, mri_inputs, pet_inputs):
        mri_encoded = self.mri_encoder(mri_inputs)
        pet_encoded = self.pet_encoder(pet_inputs)
        combined = torch.cat((mri_encoded, pet_encoded), dim=1)
        quantum_inputs = quantum_encode(combined.detach().numpy(), len(self.weights))
        quantum_inputs = torch.tensor(quantum_inputs, dtype=torch.float32)
        quantum_output = torch.tensor(self.qnode(quantum_inputs), dtype=torch.float32)
        output = self.fc_final(quantum_output)
        output = self.sigmoid(output)
        return output

# Model Training

### light imports

In [None]:
import torch
import torch.optim as optim
import torch.nn as nn

### Hyperparameters

In [None]:
mri_input_size = mri_only_features.shape[1]
pet_input_size = mri_pet_features.shape[1] // 2
hidden_size = 64
n_qubits = 10
learning_rate = 0.001
epochs = 50

# Saving Hyperparameters

In [None]:
import json

# ... (Hyperparameter definitions) ...

hyperparameters = {
    "mri_input_size": mri_input_size,
    "pet_input_size": pet_input_size,
    "hidden_size": hidden_size,
    "n_qubits": n_qubits,
    "learning_rate": learning_rate,
    "epochs": epochs,
    # Add other hyperparameters
}

with open("hyperparameters.json", "w") as f:
    json.dump(hyperparameters, f, indent=4)

### Initialize Model, Loss Function, and Optimizer


In [None]:
model = HybridModel(mri_input_size, pet_input_size, hidden_size, n_qubits)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

## Pretrain MRI Encoder (using MRI-only data)


In [None]:
mri_encoder_optimizer = optim.Adam(model.mri_encoder.parameters(), lr=learning_rate)
for epoch in range(epochs):
    model.mri_encoder.train()
    optimizer.zero_grad()
    mri_inputs = torch.tensor(mri_only_features, dtype=torch.float32)
    mri_outputs = model.mri_encoder(mri_inputs)
    loss = criterion(mri_outputs.mean(axis=1).unsqueeze(1), torch.tensor(mri_only_labels, dtype=torch.float32).unsqueeze(1))
    loss.backward()
    mri_encoder_optimizer.step()
    print(f"Pretrain MRI Encoder Epoch {epoch+1}/{epochs}, Loss: {loss.item()}")


## Fusion Training (MRI-PET data)


In [None]:
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    mri_inputs = torch.tensor(X_train_mri_pet[:, :pet_input_size], dtype=torch.float32)
    pet_inputs = torch.tensor(X_train_mri_pet[:, pet_input_size:], dtype=torch.float32)
    outputs = model(mri_inputs, pet_inputs)
    loss = criterion(outputs, torch.tensor(y_train_mri_pet, dtype=torch.float32).unsqueeze(1))
    loss.backward()
    optimizer.step()
    print(f"Fusion Epoch {epoch+1}/{epochs}, Loss: {loss.item()}")

## Saving Training Progress

In [None]:
# ... (Model training code) ...

train_losses = []
#Pretrain MRI Encoder
for epoch in range(epochs):
    # ... (training code) ...
    train_losses.append(loss.item())
#Fusion Training
for epoch in range(epochs):
    # ... (training code) ...
    train_losses.append(loss.item())

np.save("train_losses.npy", np.array(train_losses))

## Saving outputs

In [None]:
import torch

# ... (Model training code) ...

# Save the trained models
torch.save(model.mri_encoder.state_dict(), "mri_encoder.pth")
torch.save(model.state_dict(), "hybrid_model.pth")

# Model evaluation

### light imports

In [None]:
import torch
from sklearn.metrics import accuracy_score


### Evaluation


In [None]:
model.eval()
with torch.no_grad():
    mri_inputs_test = torch.tensor(X_test_mri_pet[:, :pet_input_size], dtype=torch.float32)
    pet_inputs_test = torch.tensor(X_test_mri_pet[:, pet_input_size:], dtype=torch.float32)
    test_outputs = model(mri_inputs_test, pet_inputs_test)
    test_predictions = (test_outputs > 0.5).float()
    accuracy = accuracy_score(y_test_mri_pet, test_predictions.numpy())
    print(f"Test Accuracy: {accuracy}")

## Saving Evaluation report

In [None]:
from sklearn.metrics import accuracy_score

# ... (Model evaluation code) ...

# Save evaluation report
evaluation_report = {
    "test_accuracy": accuracy,
    # Add other metrics as needed
}

with open("evaluation_report.txt", "w") as f:
    for key, value in evaluation_report.items():
        f.write(f"{key}: {value}\n")