In [None]:
import nibabel as nib
import numpy as np
from os.path import join
import math
import matplotlib.pyplot as plt
from scipy.stats import ttest_1samp
from statsmodels.stats.multitest import multipletests
from nilearn import plotting
from nilearn.image import resample_to_img
from scipy import ndimage
from scipy.spatial.distance import cdist
from scipy.sparse import csgraph
import cvxpy as cp
from sklearn.model_selection import KFold
from itertools import product
import scipy.io as sio
import h5py
from sklearn.decomposition import PCA
import scipy.sparse as sp
import matplotlib.ticker as mticker
from mpl_toolkits.axes_grid1.inset_locator import inset_axes, mark_inset

In [None]:
import psutil, os
p = psutil.Process(os.getpid())
p.cpu_affinity({0,1,2,3,4})   # use only core 0

Loading Datasets and Masks

In [None]:
ses = 1
sub = '04'
run = 1

base_path = '/mnt/TeamShare/Data_Masterfile/H20-00572_All-Dressed/PRECISIONSTIM_PD_Data_Results/fMRI_preprocessed_data/Rev_pipeline/derivatives'
anat_img = nib.load(f'/mnt/TeamShare/Data_Masterfile/H20-00572_All-Dressed/PRECISIONSTIM_PD_Data_Results/fMRI_preprocessed_data/Rev_pipeline/derivatives/sub-pd0{sub}/ses-{ses}/anat/sub-pd0{sub}_ses-{ses}_T1w_brain.nii.gz')

data_name = f'sub-pd0{sub}_ses-{ses}_run-{run}_task-mv_bold_corrected_smoothed_reg.nii.gz'
BOLD_path_org = join(base_path, f'sub-pd0{sub}', f'ses-{ses}', 'func', data_name)
bold_img = nib.load(BOLD_path_org)
bold_data = bold_img.get_fdata()
bold_data = bold_data.astype(np.float16)

mask_path = f'/mnt/TeamShare/Data_Masterfile/H20-00572_All-Dressed/PRECISIONSTIM_PD_Data_Results/fMRI_preprocessed_data/Rev_pipeline/derivatives/sub-pd0{sub}/ses-{ses}/anat/sub-pd0{sub}_ses-{ses}_T1w_brain_mask.nii.gz'
back_mask = nib.load(mask_path)
back_mask = back_mask.get_fdata()
back_mask = back_mask.astype(np.float16)

mask_path = f'/mnt/TeamShare/Data_Masterfile/H20-00572_All-Dressed/PRECISIONSTIM_PD_Data_Results/fMRI_preprocessed_data/Rev_pipeline/derivatives/sub-pd0{sub}/ses-{ses}/anat/sub-pd0{sub}_ses-{ses}_T1w_brain_pve_0.nii.gz'
csf_mask = nib.load(mask_path)
csf_mask = csf_mask.get_fdata()
csf_mask = csf_mask.astype(np.float16)

mask_path = f'/mnt/TeamShare/Data_Masterfile/H20-00572_All-Dressed/PRECISIONSTIM_PD_Data_Results/fMRI_preprocessed_data/Rev_pipeline/derivatives/sub-pd0{sub}/ses-{ses}/anat/sub-pd0{sub}_ses-{ses}_T1w_brain_pve_1.nii.gz'
white_mask = nib.load(mask_path)
white_mask = white_mask.get_fdata()
white_mask = white_mask.astype(np.float16)

print(anat_img.shape)
# print(bold_data.shape)
# print(back_mask.shape)
# print(csf_mask.shape)

Apply Masks on Bold Dataset

In [None]:
back_mask_data = back_mask > 0
csf_mask_data = csf_mask > 0
white_mask_data = white_mask > 0.5
mask = np.logical_and(back_mask_data, ~csf_mask_data)
nonzero_mask = np.where(mask)

white_mask_flat = white_mask_data[nonzero_mask]
keep_voxels = ~white_mask_flat

bold_flat = bold_data[nonzero_mask]
masked_bold = bold_flat[keep_voxels]

print(f"number of selected voxels after masking: {masked_bold.shape[0]/math.prod(bold_data.shape[:3])*100:.2f}%")
print('bold_data masked shape:', masked_bold.shape)

Load Beta values, Mask it, Find threshold to determine and delete outlier voxels

In [None]:
glm_dict = np.load(f'TYPED_FITHRF_GLMDENOISE_RR_sub{sub}.npy', allow_pickle=True).item()
beta_glm = glm_dict['betasmd']
beta_run1, beta_run2 = beta_glm[:,0,0,:90], beta_glm[:,0,0,90:]

if run == 1:
    beta = beta_run1[keep_voxels]
else:
    beta = beta_run2[keep_voxels]
print("Beta Range:[", np.nanmin(beta), np.nanmax(beta), "], Mean: ", np.nanmean(beta))


check how many trials for each voxel have outlier beta (shows my previous method is incorrect)

In [None]:
# lower_thr, upper_thr = np.nanpercentile(beta, [1, 99])
# print(f'low_thr: {lower_thr:.2f}, high_thr: {upper_thr:.2f}') #low_thr: -4.64, high_thr: 4.60
# beta_extreme_mask = np.logical_or(beta < lower_thr, beta > upper_thr)
# voxels_with_extreme_beta = np.any(beta_extreme_mask, axis=1)

# print(f"percentage of voxels with extreme beta values: {np.sum(voxels_with_extreme_beta)/beta.shape[0]*100:.2f}%")

# mask = np.logical_and(back_mask_data, ~csf_mask_data)
# mask &= ~white_mask_data
# nonzero_mask = np.where(mask)

# clean_beta = beta[~voxels_with_extreme_beta]
# print('clean_beta shape:', clean_beta.shape)

# # This figure is related to my previous calculation (remove voxels even if they have one trial)
# tmp = np.sum(beta_extreme_mask[voxels_with_extreme_beta, :], axis=1)
# plt.hist(tmp)
# plt.xlabel('Num_trials')
# plt.ylabel("voxel count")
# plt.title('Trial-level Outlier Counts in Voxels with Extreme Beta')
# plt.show()

In [None]:
# fig, axs = plt.subplots(1,2)
# axs[0].hist(np.mean(beta, axis=1))
# axs[0].set_title('Mean')
# axs[1].hist(np.var(beta, axis=1))
# axs[1].set_title('Var')
# plt.suptitle('Original Beta')
# plt.tight_layout()
# plt.show()

Normalize beta value, remove voxels with most of trials have bad beta value, interpolate rest of the beta

In [None]:
# detect outlier beta after normalization
med = np.nanmedian(beta, keepdims=True)
mad = np.nanmedian(np.abs(beta - med), keepdims=True)
scale = 1.4826 * np.maximum(mad, 1e-9)    
beta_norm = (beta - med) / scale      
thr = np.nanpercentile(np.abs(beta_norm), 99.9)
outlier_mask = np.abs(beta_norm) > thr      
print(f"{np.sum(np.any(outlier_mask, axis=1))/beta.shape[0]*100:.2f}% voxels with at least one outlier beta")

# detect how many trials are bad for selected voxels
beta_clean = beta.copy()
trials = np.arange(beta.shape[1])
voxel_outlier_fraction = np.sum(outlier_mask, axis=1)/outlier_mask.shape[1]*100
valid_voxels = voxel_outlier_fraction <= 50
beta_clean[~valid_voxels] = np.nan 

# save the selected voxels indices for later
keeped_voxels_mask = ~np.all(np.isnan(beta_clean), axis=1)
keeped_voxels_indices = np.flatnonzero(keeped_voxels_mask)

# fig, axs = plt.subplots(1,2)
# axs[0].hist(np.mean(beta_clean, axis=1))
# axs[0].set_title('Mean')
# axs[1].hist(np.var(beta_clean, axis=1))
# axs[1].set_title('Var')
# plt.suptitle('After removing voxels (with 50% or higher trials with high beta)')
# plt.tight_layout()
# plt.show()

# interpolate beta value for voxels which have less than 50% of trials as bad
orig_outlier = []
intrp_outlier = []
for v in np.flatnonzero(valid_voxels):
    mask = outlier_mask[v]
    if not mask.any():
        continue
    good = ~mask
    orig_outlier.append(beta[v, mask])
    beta_clean[v, mask] = np.interp(trials[mask], trials[good], beta[v, good])
    intrp_outlier.append(beta_clean[v, mask])

# remove voxels that have more than 50% of trials bad beta value
beta_clean1 = beta_clean[~np.all(np.isnan(beta_clean), axis=1)]
print(f"{(beta_clean.shape[0]-beta_clean1.shape[0])/beta_clean.shape[0]*100:.3f}% voxels have more than 50% trials with outlier")
clean_beta = beta_clean1
clean_beta.shape

In [None]:
# fig, axs = plt.subplots(1,2)
# axs[0].hist(np.mean(clean_beta, axis=1))
# axs[0].set_title('Mean')
# axs[1].hist(np.var(clean_beta, axis=1))
# axs[1].set_title('Var')
# plt.suptitle('After Interpolation')
# plt.tight_layout()
# plt.show()

In [None]:
# from scipy.stats import skew
# from scipy.stats import kurtosis

# skew_value = skew(clean_beta, axis=1)
# kurt_value = kurtosis(clean_beta, axis=1)

# fig, axs = plt.subplots(1,2)
# axs[0].hist(skew_value)
# axs[0].set_title('Skewness')
# axs[1].hist(kurt_value)
# axs[1].set_title('kurtosis ')
# plt.suptitle('After Interpolation')
# plt.tight_layout()
# plt.show()

In [None]:
clean_bold = masked_bold[keeped_voxels_mask] 
num_trials = 90
trial_len = 9
clean_bold_reshape = np.zeros((clean_bold.shape[0], num_trials, trial_len))
start = 0

for i in range(num_trials):
        end = start + trial_len
        clean_bold_reshape[:, i, :] = clean_bold[:, start:end]
        start += trial_len
        if start == 270 or start == 560:   # your skips
            start += 20

In [None]:
bold_var = np.mean(np.var(clean_bold_reshape, axis=1), axis=-1)
beta_var = np.var(clean_beta, axis=-1)

np.corrcoef(bold_var, beta_var)

Plot beta, clean_beta, interpolated beta

In [None]:
# def hist_with_zoom_ranges(x, fig_title, zoom_ranges, main_bins=200, inset_bins=60, density=False):
#     fig, ax = plt.subplots(figsize=(9,7))
#     ax.hist(x, bins=main_bins, density=density, color='C0', alpha=0.85, edgecolor='none')
#     ax.set_title(fig_title)
#     ax.set_xlabel('beta')
#     ax.set_ylabel('density' if density else 'count')

#     inset_data = []
#     for (lo, hi) in zoom_ranges:
#         xin = x[(x >= lo) & (x <= hi)]
#         counts, edges = np.histogram(xin, bins=inset_bins, range=(lo, hi), density=density)
#         inset_data.append((lo, hi, xin, edges, counts))

#     boxes = [(0.08, 0.08, 0.4, 0.34),
#              (0.2, 0.3, 0.4, 0.4),
#              (0.64, 0.08, 0.4, 0.34)]

#     for (left, bottom, w, h), (lo, hi, xin, edges, counts) in zip(boxes, inset_data):
#         axins = inset_axes(ax, width=f"{w*100:.0f}%", height=f"{h*100:.0f}%", bbox_to_anchor=(left, bottom, w, h), bbox_transform=ax.transAxes,
#             loc='upper left', borderpad=0.8)

#         if xin.size:
#             axins.hist(xin, bins=edges, density=density, color='C0', alpha=0.95, edgecolor='none')
#         else:
#             axins.text(0.5, 0.5, 'no data', ha='center', va='center', fontsize=8, transform=axins.transAxes)

#         axins.set_xlim(lo, hi)

#         if counts.size:
#             ymax = counts.max()
#             axins.set_ylim(0, ymax * 1.1)
#         else:
#             axins.set_ylim(0, 1)

#         axins.set_title(f"[{lo}, {hi}]", fontsize=9)
#         axins.tick_params(labelsize=8)

#         pp, p1, p2 = mark_inset(ax, axins, loc1=3, loc2=4, fc='none', ec='none')
#         pp.set_visible(False)
#         for con in (p1, p2):
#             con.set_color('0.4')
#             con.set_linewidth(1.2)

#     fig.tight_layout()
#     return fig, ax


# x = np.concatenate(orig_outlier)
# hist_with_zoom_ranges(x, 'Original Beta (for selected voxels)', ((-1500,-1000), (-100,100), (1000,2000)))
# plt.show()

# # x = np.concatenate(intrp_outlier)
# # hist_with_zoom_ranges(x, 'Interpolated Beta (for selected voxels)')
# # plt.show()

In [None]:
# before = np.concatenate(orig_outlier)     
# after  = np.concatenate(intrp_outlier)  

# n_bins = 200
# xmin, xmax = np.percentile(before, [0.1, 99.9])
# edges = np.linspace(xmin, xmax, n_bins + 1)
# b_counts, _ = np.histogram(before, bins=edges)
# a_counts, _ = np.histogram(after,   bins=edges)

# fig, axs = plt.subplots(1, 2, figsize=(15, 4), gridspec_kw={"wspace": 0.3})
# axs[0].hist(before, bins=edges, alpha=0.45, label='Before', color='C0')
# axs[0].hist(after,   bins=edges, alpha=0.45, label='After',  color='C1')
# axs[0].set_title('Beta bafore & after interpolation')
# axs[0].set_ylabel('count')
# axs[0].legend()

# centers = 0.5 * (edges[:-1] + edges[1:])
# delta = a_counts - b_counts
# axs[1].bar(centers, delta, width=np.diff(edges), align='center', color='C2', alpha=0.8)
# axs[1].axhline(0, color='0.3', lw=1)
# axs[1].set_title('Per-bin change (After − Before)')
# axs[1].set_xlabel('beta')
# axs[1].set_ylabel('Δ count')

# zoom_ranges = [(-800, -250), (250, 1000)]
# positions = [(0.1, 0.4, 0.5, 0.5),   
#              (0.6, 0.4, 0.5, 0.5)]   

# for (lo, hi), (lx, ly, lw, lh) in zip(zoom_ranges, positions):
#     axins = inset_axes(axs[1],
#                        width=f"{lw*100:.0f}%",
#                        height=f"{lh*100:.0f}%",
#                        bbox_to_anchor=(lx, ly, lw, lh),
#                        bbox_transform=axs[1].transAxes,
#                        loc='upper left',
#                        borderpad=0.8)
#     mask = (centers >= lo) & (centers <= hi)
#     axins.bar(centers[mask], delta[mask], width=np.diff(edges)[mask], align='center', color='C2', alpha=0.8)
#     axins.axhline(0, color='0.3', lw=1)
#     axins.set_xlim(lo, hi)
#     axins.set_title(f"[{lo},{hi}]", fontsize=8)
#     axins.tick_params(labelsize=7)

#     pp, p1, p2 = mark_inset(axs[1], axins, loc1=3, loc2=4, fc='none', ec='none')
#     pp.set_visible(False)
#     for con in (p1, p2):
#         con.set_color('0.4')
#         con.set_linewidth(1.2)

# plt.show()

Plot outlier voxel on the brain

In [None]:
# extreme_volume = np.zeros(bold_img.shape[:3], dtype=np.float32)
# nonzero_mask_kept = tuple(axis[keep_voxels] for axis in nonzero_mask)

# extreme_volume[nonzero_mask_kept] = (~valid_voxels).astype(np.float32)
# extreme_img = nib.Nifti1Image(extreme_volume, bold_img.affine, bold_img.header)
# extreme_img = resample_to_img(extreme_img, anat_img, interpolation='nearest')
# view = plotting.view_img(extreme_img, bg_img=anat_img, cmap='autumn', symmetric_cmap=False, threshold=0.5, vmax=1, opacity=0.9, title='Voxels with Extreme Betas')
# view
# view.save_as_html(file_name=f'rejected_voxels_sub{sub}_ses{ses}_run{run}.html')

Apply t-test and FDR, detect & remove non-active voxels

In [None]:
# one sample t-test against 0
tvals, pvals = ttest_1samp(clean_beta, popmean=0, axis=1, nan_policy='omit')

# FDR correction
tested = np.isfinite(pvals)
alpha=0.05
rej, q, _, _ = multipletests(pvals[tested], alpha=alpha, method='fdr_bh')

n_voxel = clean_beta.shape[0]
qvals  = np.full(n_voxel, np.nan)
reject = np.zeros(n_voxel, dtype=bool)
reject[tested] = rej
qvals[tested]  = q

# reject non-active voxels
clean_active_beta = clean_beta[reject]
print(f"{clean_active_beta.shape[0]/clean_beta.shape[0]*100:.2f}% of voxels are active at FDR q<{alpha}")
clean_active_beta.shape

Create 3-D beta dataset to use it for filtering

In [None]:
clean_active_beta_4d = np.full(bold_img.shape[:3]+(clean_beta.shape[1],), np.nan)
# active_indices = keeped_voxels_indices[reject]
# active_coords = tuple(axis[active_indices] for axis in nonzero_mask)

kept_flat_indices = np.flatnonzero(keep_voxels)
kept_coords = tuple(axis[kept_flat_indices] for axis in nonzero_mask)
active_flat_indices = kept_flat_indices[keeped_voxels_indices[reject]]
active_coords = tuple(axis[active_flat_indices] for axis in nonzero_mask)

clean_active_beta_4d[active_coords + (slice(None), )] = clean_active_beta
clean_active_beta_4d.shape

In [None]:
# # transfer back beta value on the volume
# clean_mask = ~np.asarray(voxels_with_extreme_beta, dtype=bool)
# clean_indices = np.nonzero(clean_mask)[0]
# active_indices = clean_indices[np.asarray(reject, dtype=bool)]

# spatial_shape = bold_img.shape[:3]
# n_trials = clean_active_beta.shape[1]
# beta_volume = np.full(spatial_shape + (n_trials,), np.nan, dtype=np.float32)

# coords = tuple(axis[active_indices] for axis in nonzero_mask)
# beta_volume[coords[0], coords[1], coords[2], :] = clean_active_beta.astype(np.float32)

Map mean beta for active voxels back into anatomical space

In [None]:
clean_active_beta_4d.shape


In [None]:

# clean_mask = ~np.asarray(voxels_with_extreme_beta, dtype=bool)
# clean_indices = np.flatnonzero(clean_mask)
# active_mask = np.asarray(reject, dtype=bool)
# active_indices = clean_indices[active_mask]


# active_voxel_coords = tuple(idx[active_indices] for idx in nonzero_mask)

# mean_active_beta = np.nanmean(clean_active_beta, axis=1).astype(np.float32)
# mean_beta_volume = np.full(bold_img.shape[:3], np.nan, dtype=np.float32)
# mean_beta_volume[active_voxel_coords] = mean_active_beta

mean_beta_volume = np.nanmean(clean_active_beta_4d, axis=-1)

mean_beta_img = nib.Nifti1Image(mean_beta_volume, bold_img.affine, bold_img.header)
mean_beta_img = resample_to_img(mean_beta_img, anat_img, interpolation='linear')

# finite_vals = np.isfinite(mean_active_beta)
# if np.any(finite_vals):
#     vmax = np.nanpercentile(mean_active_beta[finite_vals], 99)
#     vmin = np.nanpercentile(mean_active_beta[finite_vals], 1)
# else:
#     vmax = vmin = 0.0

active_beta_view = plotting.view_img(
    mean_beta_img,
    bg_img=anat_img,
    cmap='seismic',
    symmetric_cmap=False,
    threshold=1e-6,
    # vmax=vmax,
    # vmin=vmin,
    colorbar=True,
    title='Mean beta for active voxels'
)
active_beta_view
active_beta_view.save_as_html(file_name=f'active_beta_map_sub{sub}_ses{ses}_run{run}.html')

Apply Hampel Filter to have smooth beta value (remove voxels that have 0-2 neighbour, smooth beta for voxles with more than 2 neighbour)

In [None]:
def hampel_filter_image(image, window_size, threshold_factor, return_stats=False):
    if window_size % 2 == 0:
        raise ValueError("window_size must be odd")

    filtered = image.astype(float).copy()
    footprint = np.ones((window_size,) * 3, dtype=bool)

    insufficient_counts = []
    corrected_indices_parts = []

    for t in range(image.shape[3]):
        vol = image[..., t]
        med = ndimage.generic_filter(vol, np.nanmedian, footprint=footprint, mode='constant', cval=np.nan)
        mad = ndimage.generic_filter(np.abs(vol - med), np.nanmedian, footprint=footprint, mode='constant', cval=np.nan)
        counts = ndimage.generic_filter(np.isfinite(vol).astype(np.float32), np.sum, footprint=footprint, mode='constant', cval=0)

        scaled_mad = 1.4826 * mad
        insufficient = counts < 3
        insufficient_counts.append(int(np.count_nonzero(insufficient)))

        filtered[..., t][insufficient] = np.nan

        valid = np.isfinite(vol)
        enough_data = (~insufficient) & valid
        outliers = enough_data & (np.abs(vol - med) > threshold_factor * scaled_mad)

        if np.any(outliers):
            coords = np.argwhere(outliers)
            t_column = np.full((coords.shape[0], 1), t, dtype=int)
            corrected_indices_parts.append(np.hstack((coords, t_column)))

        filtered[..., t][outliers] = med[outliers]

    if return_stats:
        insufficient_counts_arr = np.array(insufficient_counts, dtype=int)
        if corrected_indices_parts:
            corrected_indices = np.vstack(corrected_indices_parts)
        else:
            corrected_indices = np.empty((0, 4), dtype=int)

        stats = {
            'insufficient_counts': insufficient_counts_arr,
            'insufficient_total': int(insufficient_counts_arr.sum()),
            'corrected_indices': corrected_indices,
            'corrected_total': int(corrected_indices.shape[0]),
        }
        return filtered, stats

    return filtered


beta_volume_filter, hampel_stats = hampel_filter_image(beta_volume, window_size=5, threshold_factor=3, return_stats=True)
print('Insufficient neighbours per frame:', hampel_stats['insufficient_counts'], flush=True)
print('Total voxels with <3 neighbours:', hampel_stats['insufficient_total'], flush=True)
print('Total corrected voxels:', hampel_stats['corrected_total'], flush=True)
if hampel_stats['corrected_total'] > 0:
    preview = hampel_stats['corrected_indices'][:5]
    print('Sample corrected voxel indices (x, y, z, t):', preview, flush=True)

# save cleaned beta volume
np.save(f'cleaned_beta_volume_sub{sub}_ses{ses}_run{run}.npy', beta_volume_filter)


In [None]:
# convert cleaned beta volume to a 2D array for optimization
beta_volume_filter = np.load('beta_valume_clean_2d.npy')
beta_volume_filter = beta_volume_filter.astype(np.float16)
spatial_shape = beta_volume_filter.shape[:-1]
voxels_with_any_nan = np.zeros(spatial_shape, dtype=bool)
voxels_with_all_nan = np.ones(spatial_shape, dtype=bool)

# Sweep the time dimension once
for t in range(beta_volume_filter.shape[-1]):
    frame_nan = np.isnan(beta_volume_filter[..., t])
    voxels_with_any_nan |= frame_nan
    voxels_with_all_nan &= frame_nan

print(np.sum(voxels_with_any_nan), np.sum(voxels_with_all_nan), flush=True)

n_trial = beta_volume_filter.shape[-1]
beta_volume_filter_2d = beta_volume_filter.reshape(-1, n_trial)
print(beta_volume_filter_2d.shape, flush=True)
mask_2d = voxels_with_all_nan.reshape(-1)
beta_valume_clean_2d = beta_volume_filter_2d[~mask_2d]
print(beta_valume_clean_2d.shape, flush=True)
np.save(f'beta_valume_clean_2d.npy', beta_valume_clean_2d)

In [None]:
del anat_img, back_mask, csf_mask, white_mask, mask, nonzero_mask, masked_bold
del beta_run1, beta_run2, beta, clean_beta, voxels_with_all_nan, voxels_with_any_nan
del beta_volume_filter_2d, beta_valume_clean_2d

Plot finall beta value on the brain

In [None]:
from nilearn import plotting
import nibabel as nib
import numpy as np

# choose what to visualize; here we use the across-trial mean
beta_to_plot = np.nanmean(beta_valume_clean_2d.astype(np.float32), axis=1)

# lift the 1-D beta array back into the 3-D volume
beta_volume_overlay = np.full(mask_2d.shape, np.nan, dtype=np.float32)
beta_volume_overlay[~mask_2d] = beta_to_plot
beta_volume_overlay = beta_volume_overlay.reshape(bold_img.shape[:3])

beta_img = nib.Nifti1Image(beta_volume_overlay, bold_img.affine, header=bold_img.header)

# match anatomical space before plotting
beta_img_on_anat = resample_to_img(beta_img, anat_img, interpolation='nearest')

display = plotting.plot_stat_map(
    beta_img_on_anat,
    bg_img=anat_img,
    cmap='cold_hot',
    threshold=None,        # set to e.g. 0.5 if you want to mask small values
    symmetric_cbar=True,
    colorbar=True,
    title='Mean beta overlay'
)
plotting.show()


Load the clean beta, which is ready for optimization problem

In [None]:
beta_valume_clean_2d = np.load(f'beta_valume_clean_2d.npy')
beta_valume_clean_2d.shape

Create Matrices for Optim problem

In [None]:
def calculate_matrices(
    beta_valume_clean_2d,
    bold_data,
    mask_2d,
    trial_indices=None,
    trial_len=9,
    num_components=600,
    pca_components=None,
    pca_mean=None):
    print("begin", flush=True)
    print(type(mask_2d))
    num_trials = beta_valume_clean_2d.shape[-1]
    trial_idx = np.arange(num_trials) if trial_indices is None else np.unique(np.asarray(trial_indices, int).ravel())

    # ----- reshape BOLD into trials -----
    bold_data_reshape = bold_data.reshape(-1, bold_data.shape[-1])
    print(bold_data.reshape(-1, bold_data.shape[-1]).shape[0], mask_2d.dtype, mask_2d.size)
    bold_data_selected = bold_data_reshape[~mask_2d]         # keep voxels of interest
    bold_data_selected_reshape = np.zeros((bold_data_selected.shape[0], num_trials, trial_len), dtype=np.float32)
    start = 0
    
    for i in range(num_trials):
        end = start + trial_len
        if end > bold_data_selected.shape[1]:
            raise ValueError("BOLD data does not contain enough timepoints for all trials")
        bold_data_selected_reshape[:, i, :] = bold_data_selected[:, start:end]
        start += trial_len
        if start == 270 or start == 560:   # your skips
            start += 20
    X = bold_data_selected_reshape[:, trial_idx, :]          # [Nvox, Ntrials, T]
    print("BOLD reshaped before PCA", X.shape, flush=True)

    # ----- apply PCA -----
    print("PCA...", flush=True)
    X_reshap = X.reshape(X.shape[0], -1).astype(np.float32)

    if pca_components is None or pca_mean is None:
        pca = PCA()
        X_pca_full = pca.fit_transform(X_reshap.T).astype(np.float32)
        components = pca.components_.astype(np.float32)
        mean = pca.mean_.astype(np.float32)
        n_components = min(num_components, components.shape[0])
        components = components[:n_components]
        X_pca = X_pca_full[:, :n_components]
    else:
        components = pca_components.astype(np.float32)
        mean = pca_mean.astype(np.float32)
        n_components = components.shape[0]
        X_centered = X_reshap.T - mean
        X_pca = (X_centered @ components.T).astype(np.float32)

    beta_reduced = (beta_valume_clean_2d.T - mean) @ components.T
    beta_reduced = beta_reduced.T


    # ----- L_task (same idea as yours) -----
    print("L_task...", flush=True)
    beta_selected = beta_reduced[:, trial_idx]
    counts = np.count_nonzero(np.isfinite(beta_selected), axis=-1)
    sums = np.nansum(beta_selected, axis=-1, dtype=np.float64)
    mean_beta = np.zeros(beta_selected.shape[0], dtype=np.float32)
    m = counts > 0
    mean_beta[m] = (sums[m] / counts[m]).astype(np.float32)
    L_task = np.zeros_like(mean_beta, dtype=np.float32)
    v = np.abs(mean_beta) > 0
    L_task[v] = (1.0 / np.abs(mean_beta[v])).astype(np.float32)

    # ----- L_var: variance of trial differences, as sparse diagonal -----
    print("L_var...", flush=True)
    X_pca = X_pca[:, :n_components].T
    num_trials = len(trial_idx)
    X = X_pca.reshape(X_pca.shape[0], num_trials, trial_len)
    L_var = np.zeros((X.shape[0], X.shape[0]), dtype=np.float32)
    for i in range(num_trials-1):
        x1 = X[:, i, :]
        x2 = X[:, i+1, :]
        L_var += (x1-x2) @ (x1-x2).T
    L_var /= (num_trials - 1)

    selected_BOLD_flat = X.reshape(X.shape[0], -1).astype(np.float32)
    return L_task.astype(np.float32), L_var, selected_BOLD_flat, components, mean

# %%
def objective_func(w, L_task, L_var, alpha_var):
    print("Calculating objective...", flush=True)
    quad = (w.T @ np.diag(L_task) @ w + alpha_var * (w.T @ L_var @ w))
    return quad

# %%
def optimize_voxel_weights(L_task, L_var, alpha_var):
    print("Optimizing voxel weights...", flush=True)
    L_total = np.diag(L_task) + alpha_var * L_var
    n = L_total.shape[0]
    L_total = np.nan_to_num(L_total)
    L_total = 0.5*(L_total + L_total.T) + 1e-6*np.eye(n)
    eigvals, eigvecs = np.linalg.eigh(L_total)
    eigvals[eigvals < 0] = 0.0  # clip the numerical negatives
    L_total_psd = (eigvecs @ np.diag(eigvals) @ eigvecs.T).astype(np.float64)

    w = cp.Variable(n, nonneg=True)
    constraints = [cp.sum(w) == 1]

    objective = cp.Minimize(cp.quad_form(w, cp.psd_wrap(L_total_psd)))
    problem = cp.Problem(objective, constraints)
    problem.solve(solver=cp.OSQP, verbose=True)
    return w.value


# %%
def calculate_weight(param_grid, beta_valume_clean_2d, bold_data, mask_2d, trial_len):
    kf = KFold(n_splits=5, shuffle=True, random_state=0)
    best_score = np.inf
    best_alpha_var = None
    num_trials = beta_valume_clean_2d.shape[-1]

    for a_var in param_grid["alpha_var"]:
        fold_scores = []
        print(f"a_var: {a_var}", flush=True)
        count = 1

        for train_idx, val_idx in kf.split(np.arange(num_trials)):
            print(f"k-fold num: {count}", flush=True)
            print(type(mask_2d), flush=True)
            L_task_train, L_var_train, _, pca_components, pca_mean = calculate_matrices(
                beta_valume_clean_2d,
                bold_data,
                mask_2d,
                train_idx,
                trial_len,
            )
            w = optimize_voxel_weights(L_task_train, L_var_train, alpha_var=a_var)

            L_task_val, L_var_val, _, _, _ = calculate_matrices(
                beta_valume_clean_2d,
                bold_data,
                mask_2d,
                val_idx,
                trial_len,
                pca_components=pca_components,
                pca_mean=pca_mean,
            )

            fold_scores.append(objective_func(w, L_task_val, L_var_val, a_var))
            print(f"fold_scores: {fold_scores}", flush=True)
            count += 1

        mean_score = np.mean(fold_scores)
        print(mean_score)
        if mean_score < best_score:
            best_score = mean_score
            best_alpha_var = a_var

    print("Best alpha_var:", best_alpha_var, "with CV loss:", best_score, flush=True)
    return best_alpha_var, best_score

In [None]:
param_grid = {"alpha_var":   [0.1, 0.5, 0.9, 1.0, 2]}

trial_len = 9
best_alpha_var, best_score = calculate_weight(param_grid, beta_valume_clean_2d, bold_data, mask_2d, trial_len)
L_task, L_var, selected_BOLD_flat, pca_components, pca_mean = calculate_matrices(beta_valume_clean_2d, bold_data, mask_2d, None, trial_len)
print(f"best alpha_var: {best_alpha_var}, best_score: {best_score}", flush=True)
weights = optimize_voxel_weights(L_task, L_var, alpha_var=best_alpha_var)
voxel_space_weights = pca_components.T @ weights
y = selected_BOLD_flat.T @ weights

np.save('weights.npy', weights)
np.save('voxel_space_weights.npy', voxel_space_weights)
np.save('pca_components.npy', pca_components)
np.save('pca_mean.npy', pca_mean)
np.save('y.npy', y)
print("Finished!", flush=True)

# %%


Analysis the Optim Output

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
L_task = np.load('L_task.npy') # (600,)
L_var = np.load('L_var.npy') # (600,600)
voxel_space_weights = np.load('voxel_space_weights.npy') #(296838,)
weights = np.load('weights.npy') # (600,)
y = np.load('y.npy') #(810)

In [None]:
fig, ax = plt.subplots(figsize=(6,4))
sns.kdeplot(voxel_space_weights)

In [None]:
L_var.shape

In [None]:

for arr, lab in [(weights,'weights'), (L_task,'L_task')]:
    fig, ax = plt.subplots(figsize=(6,4))
    sns.kdeplot(arr, ax=ax, label=lab, fill=True, alpha=0.2)
    ax.legend(); ax.set_title('Distribution comparison')

In [None]:
fig, ax = plt.subplots()
lower = np.percentile(L_var, 1)
upper = np.percentile(L_var, 99)
tmp = np.clip(L_var, lower, upper)
im = ax.imshow(tmp, cmap='jet')
fig.colorbar(im, ax=ax).set_label('L_var values')

ax.invert_yaxis()
ax.set_title('Heatmap of L_var Matrix')
ax.set_xlabel('Component Index')
ax.set_ylabel('Component Index')
ax.grid(alpha=0.3)

plt.show()


In [None]:
tmp = np.sum(L_var, axis=0)
plt.figure()
plt.plot(tmp)
plt.show()

In [None]:
tmp[tmp>1e6].shape

In [None]:
plt.figure(figsize=(6,4))
plt.hist(L_var.ravel(), bins=100, alpha=0.7)
plt.xlabel('voxel_space_weights'); plt.ylabel('count'); plt.title('Histogram')
plt.yscale('log')

In [None]:
plt.figure(figsize=(6,4))
plt.hist(voxel_space_weights, bins=100, alpha=0.7)
plt.xlabel('voxel_space_weights'); plt.ylabel('count'); plt.title('Histogram')
plt.yscale('log')

In [None]:
plt.figure(figsize=(6,3))
plt.plot(y)
plt.xlabel('index'); plt.ylabel('y'); plt.title('Target values')