#Voxel Selection

##Written by Gergana Slaveykova s1070004
##Radboud University- B3 Thesis project

# Imports

In [None]:
#Mount Google Drive to acess data and storing purposes
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#Installs
!pip install nibabel
!pip install nilearn
!pip install git+https://github.com/nipy/nipy.git

#Imports
import nibabel as nib
import numpy as np
from nipy.modalities.fmri.experimental_paradigm import load_paradigm_from_csv_file
from nipy.modalities.fmri.design_matrix import make_dmtx
from nipy.labs.viz import plot_map, cm
import matplotlib.pyplot as plt
import pickle
from __future__ import annotations
import os
from types import ModuleType
from typing import Tuple, Union
from nilearn import masking, plotting
from PIL import Image
from scipy.stats import t, zscore
from sklearn.linear_model import RidgeCV
import h5py
import PIL.Image
from scipy import sparse, stats
from scipy.stats import t, zscore
from sklearn.kernel_ridge import KernelRidge
from numpy.linalg import svd
import time
from scipy import signal
from sklearn.model_selection import KFold

#Features loading

In [None]:
# Define the number of layers
num_layers = 5

#Set the feature paths
train_file_paths = [f'/content/drive/My Drive/newDataExperimentationGOD/objects_features_vgg_tr_{i}.npy' for i in range(0, num_layers + 1)]
test_file_paths = [f'/content/drive/My Drive/newDataExperimentationGOD/objects_features_vgg_te_{i}.npy' for i in range(0, num_layers + 1)]

#feature array initialization
train_features = []
test_features = []

# Load and process train features
for i in range(num_layers):
    print(np.load(train_file_paths[i]).shape)
    train_features.append(np.load(train_file_paths[i]).reshape(1200 if i < 5 else 1200, -1).squeeze())
    print(f"Layer {i + 1} - Train features: {train_features[i].shape}")

# Print separator
print(20 * "=")

# Load and process test features
for i in range(num_layers):
    print(np.load(test_file_paths[i]).shape)
    print(np.load(test_file_paths[i]).reshape(50 if i < 5 else 50, -1).shape)
    print(np.load(test_file_paths[i]).reshape(50 if i < 5 else 50, -1).squeeze().shape)
    test_features.append(np.load(test_file_paths[i]).reshape(50 if i < 5 else 50, -1).squeeze())
    print(f"Layer {i + 1} - Test features: {test_features[i].shape}")

#Linear model

In [None]:
#Arhitecture of Kernel CV Rigde regression
class KernelRidgeCV:
    def __init__(self, kernel, target, n_lambdas):
        self.kernel = kernel  # Precomputed kernel matrix
        self.target = target  # Target values for the regression
        self.n_lambdas = n_lambdas  # Number of lambda (regularization) values to consider
        self._lambdas = None  # Placeholder for lambda values
        self._df = None  # Placeholder for degrees of freedom

    @property
    def lambdas(self):
        if self._lambdas is not None:
            return self._lambdas

        # Singular Value Decomposition of the kernel matrix
        s = svd(self.kernel)[1]
        s = s[s > 0]

        self._lambdas = np.full((self.n_lambdas), np.nan)
        length = s.shape[0]
        self._df = np.linspace(length, 1, self.n_lambdas)
        mean = np.mean(1/s)

        # Function to find the difference between desired and actual degrees of freedom
        f = lambda df, lamb: df - np.sum(s / (s + lamb))
        f_prime = lambda lamb: np.sum(s / (s + lamb)**2)

        # get all the lambdas
        for i in range(1, self.n_lambdas):
            if i == 1:
                self._lambdas[i] = 0
            else:
                self._lambdas[i] = self._lambdas[i-1]
            self._lambdas[i] = max(self._lambdas[i], (length / self._df[i] - 1) / mean)
            temp = f(self._df[i], self._lambdas[i])
            # Use Newton-Raphson method to refine lambda values
            while abs(temp) > 1e-10:
                self._lambdas[i] = max(0, self._lambdas[i] - temp / f_prime(self._lambdas[i]))
                temp = f(self._df[i], self._lambdas[i])
        return self._lambdas[1:]


    #train loop
    def train(self, X):
        best_model, best_error = None, np.inf

        # Cross-validation over all lambda values
        for lambda_, df_ in zip(self.lambdas, self._df):
            # Initialize Kernel Ridge Regression model with the current lambda
            kernel_ridge = KernelRidge(alpha=lambda_)
            kernel_ridge.fit(X, self.target)
            y = kernel_ridge.predict(X)

            # Compute the error, avoiding division by zero
            print((1 - df_ / self.kernel.shape[0]))
            if  (1 - df_ / self.kernel.shape[0]) != 0:
              error = np.sum(((self.target - y) / (1 - df_ / self.kernel.shape[0])) ** 2)
            else:
            # Set the error to negative infinity to penalize this case
              error = np.
            #this version was having division by 0 which was giving a warning for our case
            #error = np.sum(((self.target - y) / (1 - df_ / self.kernel.shape[0])) ** 2)
            print(f"curr error: {error}")
            if error < best_error:
                best_error = error
                best_model = kernel_ridge
        print("Best error:", best_error, "Alpha: ", best_model.alpha)
        return best_model

In [None]:

def get_correlations_KERNEL(features_train_layer,features_test_layer,x_tr,x_te):
    """
    Computes the Pearson correlation coefficients between predicted and actual test target values using
    Kernel Ridge Regression with cross-validated lambda (regularization) values.

    Parameters:
    - features_train_layer (np.array): Training set feature representations. These are the features extracted from a particular layer of a model.
    - features_test_layer (np.array): Test set feature representations. These are the features extracted from a particular layer of a model.
    - x_tr (np.array): Target values corresponding to the training set.
    - x_te (np.array): Target values corresponding to the test set.

    Returns:
    - correlation_coefficients (list): List of Pearson correlation coefficients between predicted and actual test target values.
    - x_hat (np.array): Predicted target values for the test set.
    """
    n_te = np.array(x_te).shape[0]# nr examples, test set
    n_tr = np.array(x_tr).shape[0] # nr examples. training set


    # number of lambda values to test w/ grid search
    n = 10
    print(f"Shaape of features test {np.array(features_test_layer).shape}")
    print(f"Shaape of features test {np.array(features_train_layer).shape}")
    # Reshape feature matrices
    f_te = features_test_layer.reshape(n_te, -1)
    f_tr = features_train_layer.reshape(n_tr, -1)

    # k-Ridge @ multiplication brain2gan methods neuroencoding
    # Compute the kernel matrix
    kernel = f_tr @ f_tr.T
    kernel = kernel.astype(float)
    # Initialize Kernel Ridge Regression with cross-validation
    ridge_cv = KernelRidgeCV(kernel, x_tr, n)
    model = ridge_cv.train(f_tr)

    # Predict target values for the test set
    x_hat = model.predict(f_te)
    print("Alpha:", model.alpha)

    y_test = x_te
    # Calculate the Pearson correlation coefficient for the entire response vectors
    correlation_coefficients =pearson_correlation_coefficient(x_hat,y_test,0)
    print(len(correlation_coefficients[0]))
    return correlation_coefficients,x_hat

In [None]:
def pearson_correlation_coefficient(x: np.ndarray, y: np.ndarray, axis: int) -> np.ndarray:
  """
    Calculates the Pearson correlation coefficient and the corresponding p-values between two arrays along the specified axis.

    Parameters:
    - x (np.ndarray): predicted data
    - y (np.ndarray): original data

    Returns:
    - r (np.ndarray): Pearson correlation coefficients.
    - p (np.ndarray): p-values for testing non-correlation.
    """
    # Standardize x and y using z-score normalization
    r = (np.nan_to_num(zscore(x)) * np.nan_to_num(zscore(y))).mean(axis)
    # Calculate p-values for the correlation coefficients
    p = 2 * t.sf(np.abs(r / np.sqrt((1 - r ** 2) / (x.shape[0] - 2))), x.shape[0] - 2)
    return r, p

In [None]:
def clip_and_z_score(train,test):
    """
    Applies clipping and z-score normalization to training and test data.

    Parameters:
    - train (np.ndarray): Training data.
    - test (np.ndarray): Test data.

    Returns:
    - x_tr_normalized (np.ndarray): Clipped and z-score normalized training data.
    - x_te_pt_normalized (np.ndarray): Clipped and z-score normalized test data.
    """
    # Clip the values of the training and test data to be within the range [-3, 3]
    x_tr = np.clip(train, -3, 3)
    x_te_pt = np.clip(test, -3, 3)

    # Z-score normalization
    # Calculate mean and standard deviation on training data
    norm_mean_x = np.mean(x_tr, axis=0)
    norm_std_x = np.std(x_tr, axis=0, ddof=1)
    # Avoid division by zero
    norm_std_x[norm_std_x == 0] = 1

    # Normalization
    x_tr_normalized = (x_tr - norm_mean_x) / norm_std_x
    x_te_pt_normalized = (x_te_pt - norm_mean_x) / norm_std_x

    # Z-scoring the whole dataset if needed
    #x_whole_normalized = (x - np.mean(x, axis=0)) / np.std(x, axis=0, ddof=1)

    return x_tr_normalized,x_te_pt_normalized

#Main logic

In [None]:
#regions of interest
regions=['V1','V2','V3','V4', 'LOC','FFA','PPA']

layers_final=[]
layers_final_full=[]
indices_all_areas=[]
important_indeces=[]
number_of_model=0
important_correlations_total=[]

for area in regions:
  # Load hyperaligned data for the given brain area
  x = np.load(f"/content/drive/MyDrive/GOD-NEW-3p/my_experiment_{area}_hyperaligned.npy")
  print(x.shape)
  # Split the data to get training set
  x_tr = x[:1200]
  correlation_coefficients_list = []
  y_pred_list = []
  # Placeholder for predictions
  y_heat = np.zeros_like(x_tr)
  # Placeholder for cumulative correlations
  corr_total= np.zeros_like(x_tr)


  # Iterate through the layers
  for i in range(len(train_features)):
      # Initialize K-Fold cross-validation with 5 splits
      kf = KFold(n_splits=5)
      # Perform K-Fold cross-validation
      for train_index, test_index in kf.split(x_tr):
        print(test_index)
        # Split the data into training and test sets for this fold
        x_train, x_test = x_tr[train_index], x_tr[test_index]
        x_train, x_test=clip_and_z_score( x_train, x_test)
        # Get the corresponding features for this layer
        trainfeatures, testfeatures = train_features[i][train_index], train_features[i][test_index]
        # Compute correlations using Kernel Ridge Regression
        correlation_coefficients, y_pred = get_correlations_KERNEL(trainfeatures, testfeatures, x_train , x_test)
        # Store predictions
        y_heat[test_index] = y_pred
        print(f"the MODEL is: {number_of_model}")
        number_of_model += 1

      # Store predictions for the current layer
      y_pred_list.append(y_heat)
      #Compute correlations
      correlation_coefficients_new =pearson_correlation_coefficient(y_heat,x_tr,0)
      correlation_coefficients_list.append(correlation_coefficients_new)



  correlation_coefficients=np.array(correlation_coefficients_list)
  print(correlation_coefficients.shape)
  y_pred_list=np.array(y_pred_list)

  # Define layers to study in detail
  layers_to_study_full = [1,2,3,4, 5]
  important_correlations_total.append(correlation_coefficients_list)
  print("I am added")




In [None]:
def filter_first_part(correlations_list, layers_to_study):
    """
    Filters out significant voxels based on correlation and p-values, and identifies the most significant layer for each voxel.

    Parameters:
    - correlations_list (list): A list of tuples containing correlation coefficients and p-values for each layer.
    - layers_to_study (list): A list of layers to study, where layer indices are 1-based.

    Returns:
    - layers_result (list): A list indicating the most significant layer index for each voxel.
    - significant_voxels (list): A list of voxel indices that have at least one significant correlation.
    """
    # Convert 1-based layer indices to 0-based for indexing
    layers_to_study = [layer - 1 for layer in layers_to_study]
    dummy_conter=0

    layers_result = []
    significant_voxels=[]

    # Iterate through each voxel
    for voxel in range(len(correlations_list[0][0])):
        # Extract correlation coefficients and p-values for the current voxel across specified layers
        correlation_coefficients_voxel = np.array([correlations_list[layer][0][voxel] for layer in layers_to_study])
        p_values_array = np.array([correlations_list[layer][1][voxel] for layer in layers_to_study])
        # Check if any p-value is less than or equal to 0.05 (significant)
        if np.any(p_values_array <= 0.05):
          significant_voxels.append(voxel)
          # Identify the layer with the maximum correlation coefficient for this voxel
          layer_index = np.argmax(correlation_coefficients_voxel)
          layers_result.append(layer_index + 1) #store layers
    return layers_result,significant_voxels



In [None]:
tot_result=[]
# List of layers to study
layers_to_study_full = [1,2,3,4, 5]
sig_v=[]
# Iterate through each brain area in important_correlations_total
for area in important_correlations_total:
  result_per_area=[]
  # Filter significant voxels and get the most significant layer indices for the current area
  index_gery,significant_voxels=filter_first_part(area, layers_to_study_full)
  # Append the results to the total results list
  tot_result.append(index_gery)
  sig_v.append(significant_voxels)
  # Print the number of significant voxels found for the current area
  print(len(index_gery))

In [None]:
# Iterate through each list of significant layer indices in tot_result
for i in tot_result:
  # Get the unique elements and their counts
  unique_elements, counts = np.unique(i, return_counts=True)
  # Create a dictionary mapping each unique element to its count
  element_counts = dict(zip(unique_elements, counts))
  # Print the dictionary of element counts
  print(element_counts)

In [None]:
#If you want to save your results
folder_path="/content/drive/MyDrive/GOD-NEW-3p/Neural-DecodingPartProgresive/"
with open(f'{folder_path}indeces_filtered_clipped3.dat', 'wb') as fp:
            pickle.dump(sig_v, fp)
