In [54]:
import os
import nibabel as nib
import numpy as np
import pandas as pd
import scipy.io
import time

from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from joblib import Parallel, delayed

In [2]:
parent_dir = '/Users/qingfangliu/Library/CloudStorage/Dropbox/KahntLab/Project_SPEEDTMS/Analysis'
external = '/Volumes/QF10TB/SPEEDTMS_results'

In [3]:
# Load subject information
sub_info_file = os.path.join(parent_dir, 'SubInfo', 'SubjectConds.XLSX')
sub_info = pd.read_excel(sub_info_file)

In [4]:
# Extract subject indices and exclusion information
subs = sub_info['Sub']
excluded = sub_info['Excluded']
subs = subs[excluded == 0]  # Keep only subjects not excluded
nsubs = len(subs)

In [5]:
# Load behavioral data
# Assuming Alld.mat is a standard MATLAB .mat file (not v7.3 HDF5 format)
alld_path = os.path.join(external, 'Behavior', 'Alld.mat')
alld = scipy.io.loadmat(alld_path)

In [7]:
modelname = 'Outcome_pseudoconcat_Nz'
decodetype = 'LOOCV'

In [8]:
# info to prepare data loading
nruns = 3
nSess = 2
nallruns = nruns * nSess

In [None]:
# Define mask path and name
maskmap = os.path.join(parent_dir, 'Masks', 'Group_mask.nii')
maskname = 'WB'

# Load the mask NIfTI file
mask_img = nib.load(maskmap)
mask_data = mask_img.get_fdata()

# Get the shape (dimensions)
sz = mask_data.shape

In [13]:
sz

(79, 95, 79)

In [10]:
# Get the linear indices of non-zero voxels (in flattened array)
lin_index = np.flatnonzero(mask_data)

In [14]:
lin_index.shape

(203528,)

In [15]:
dat_use = 'beta'
dat_scale = 'raw'
rget = 4

In [16]:
# prepare to do searchlight
# based on which mask map to use

# define reference voxel
ref_vox = np.round(np.array(sz) / 2).astype(int)

In [19]:
ref_vox

array([40, 48, 40])

In [18]:
# Create 3D coordinate grid
MX, MY, MZ = np.meshgrid(
    np.arange(1, sz[0]+1),  # MATLAB is 1-indexed, so add 1
    np.arange(1, sz[1]+1),
    np.arange(1, sz[2]+1),
    indexing='ij'
)

In [22]:
# Compute Euclidean distance from each voxel to the reference voxel
radii = np.sqrt((MX - ref_vox[0])**2 + (MY - ref_vox[1])**2 + (MZ - ref_vox[2])**2)

In [24]:
radii.shape

(79, 95, 79)

In [25]:
# Get prototype sphere index for voxels within `rget` radius
ref_index = np.ravel_multi_index(tuple(ref_vox - 1), sz)  # adjust for 0-indexing
radius_index = np.flatnonzero(radii < rget) - ref_index

In [27]:
radius_index.shape

(251,)

In [28]:
# Compute number of searchlights (i.e., number of voxels in the mask)
nvox = len(lin_index)

In [29]:
# Create conversion map: 3D volume with consecutive indices at mask locations
linindexconv = np.zeros(sz, dtype=int)
linindexconv_flat = linindexconv.ravel()
linindexconv_flat[lin_index] = np.arange(1, len(lin_index) + 1)
linindexconv = linindexconv_flat.reshape(sz)

In [32]:
# looping through subjects
# this analysis is done within each subject

i = 1

# Assume Subs is a pandas Series or list of subject numbers
subno = f'Sub{subs[i]}'  # Create subject string
print(subno)

Sub2


In [34]:
# Path to subject decoding results
subjpath = os.path.join(external, 'Decoding_Searchlight_python', subno)

In [35]:
# Construct result path
res_path = os.path.join(subjpath, modelname, maskname, dat_use, dat_scale, f'r{rget}')

# Create directory if it doesn't exist
if not os.path.exists(res_path):
    os.makedirs(res_path)

# Print summary info
print(f'{subno} - {dat_use} - {dat_scale}')

Sub2 - beta - raw


In [40]:
# to load data of beta estimates for decoding

# Prepare to organize betas into runs
data = [None] * nallruns
Labels = [None] * nallruns
nconds = 3
rctr = 0

In [37]:
data

[None, None, None, None, None, None]

In [38]:
Labels

[None, None, None, None, None, None]

In [51]:
for ss in range(nSess):
    for r in range(nruns):
        print(f'Data organization: {subno}-r{rctr+1}')
        temp = np.zeros((nconds,nvox))  
        
        for n in range(nconds):
            bidx = n + 3 * rctr + 1
            bname = os.path.join(external, 'Decoding_GLM', subno, 'fxMultivariate', modelname, f'beta_{bidx:04d}.nii')
            vol = nib.load(bname).get_fdata()
            temp[n,:] = vol.flatten()[lin_index] # extract ROI voxel data
            
        data[rctr] = temp
        Labels[rctr] = np.arange(1, nconds+1)
        rctr += 1 

Data organization: Sub2-r1
Data organization: Sub2-r2
Data organization: Sub2-r3
Data organization: Sub2-r4
Data organization: Sub2-r5
Data organization: Sub2-r6


In [52]:
data

[array([[            nan,             nan,             nan, ...,
          7.07192659e-01,  1.26317009e-01, -1.57458365e-01],
        [            nan,             nan,             nan, ...,
          4.76203799e-01, -1.24924707e+00, -4.51054802e-04],
        [            nan,             nan,             nan, ...,
          7.43021131e-01, -5.25924027e-01, -5.68005443e-01]]),
 array([[        nan,         nan,         nan, ..., -0.21465586,
         -1.04216063, -1.01983523],
        [        nan,         nan,         nan, ..., -0.35943496,
         -1.30116093, -0.95837456],
        [        nan,         nan,         nan, ...,  0.01606934,
          0.18507093, -0.13777131]]),
 array([[        nan,         nan,         nan, ...,  0.49997833,
         -0.75954312, -0.64725244],
        [        nan,         nan,         nan, ..., -0.18594971,
         -1.01889813, -1.01164329],
        [        nan,         nan,         nan, ...,  0.24840146,
         -0.27893299, -0.01248497]]),
 arr

In [53]:
Labels

[array([1, 2, 3]),
 array([1, 2, 3]),
 array([1, 2, 3]),
 array([1, 2, 3]),
 array([1, 2, 3]),
 array([1, 2, 3])]

In [55]:
resultvol_vol = np.zeros(sz)  # volume to save decoding accuracy
temp_sl_res = np.zeros(nvox)  # vector to save searchlight results

In [60]:
def searchlight_single_voxel(cnt):
    if cnt % 1000 == 0:
        print(f'Searchlight - V: {cnt}/{nvox}')
        
    # move prototype sphere to position of current voxel
    indexindex = radius_index + lin_index[cnt]
    
    # eliminate indices outside brain mask
    Useindex = np.intersect1d(lin_index, indexindex)
    
    # Map to data indices
    dat_index = linindexconv.flatten()[Useindex]

    runvec = np.arange(nallruns)  # 0-indexed in Python
    accvals = np.zeros(len(runvec))
    
    for idx, r in enumerate(runvec):
        rtrain = runvec[runvec != r]  # Leave-one-run-out
        rtest = r

        vectors_train = np.vstack([data[t][:, dat_index] for t in rtrain])
        labels_train = np.hstack([Labels[t] for t in rtrain])

        vectors_test = data[rtest][:, dat_index]
        labels_test = Labels[rtest]
        
        # remove samples with NANs
        train_nan_mask = ~np.isnan(vectors_train).any(axis=1)
        test_nan_mask = ~np.isnan(vectors_test).any(axis=1)

        vectors_train = vectors_train[train_nan_mask]
        labels_train = labels_train[train_nan_mask]

        vectors_test = vectors_test[test_nan_mask]
        labels_test = labels_test[test_nan_mask]
        
        # Only proceed if there is enough data after cleaning
        if len(labels_train) == 0 or len(labels_test) == 0:
            accvals[idx] = 33.33  # assume chance level
            continue

        # Train and test SVM
        clf = SVC(kernel='linear', C=0.0001)
        clf.fit(vectors_train, labels_train)
        pred_label = clf.predict(vectors_test)

        accvals[idx] = accuracy_score(labels_test, pred_label) * 100

    outcome = np.mean(accvals)  # average accuracy
    return outcome - 33.33  # subtract chance level (3 classes: 33.33%)

In [61]:
start_time = time.time()

temp_sl_res = Parallel(n_jobs=-1)(delayed(searchlight_single_voxel)(cnt) for cnt in range(nvox))
temp_sl_res = np.array(temp_sl_res)  # back to numpy array

end_time = time.time()

print(f"Execution time: {end_time - start_time:.4f} seconds")

IndexError: index 203528 is out of bounds for axis 1 with size 203528

In [None]:
# Assign the searchlight results back into the full 3D space
resultvol_vol = np.zeros(sz)
resultvol_vol_flat = resultvol_vol.flatten()
resultvol_vol_flat[lin_index] = temp_sl_res
resultvol_vol = resultvol_vol_flat.reshape(sz)

In [None]:
# Load a header (affine and header info) from an existing beta file
ref_img = nib.load(bname)  # bname from your last loaded beta

# Create a new NIfTI image with the result data and original affine/header
result_img = nib.Nifti1Image(resultvol_vol, affine=ref_img.affine, header=ref_img.header)

# Define the output file path
Resultname = f'Acc_{subno}_{decodetype}.nii'
result_path = os.path.join(res_path, Resultname)

# Save the NIfTI file
nib.save(result_img, result_path)

print(f"Result saved to {result_path}")