In [None]:
from dotenv import load_dotenv
load_dotenv()
import os
import numpy as np
from tqdm import tqdm
from nilearn import plotting
import pickle
import matplotlib.pyplot as plt
import hcp_utils as hcp

### This script shows you how to reshape the fmri data into a matrix organized by conditions x repetitions x numvertices. So, indexing such a matrix like fmri_data[0,:,15000] yields all repeated instances (10 for test set, 2 for train) for the condition number 0 (find the name of the 2s clip in the associated variable 'condition_order') at voxel number 15000. This script then calculates a noiseceiling based on the variability of the data within repetitions. Be careful of interpreting this though as we are not working with beta values, but timeseries estimates. You may want to do some inter-subject and intra-subject correlations on the data too which won't be too difficult once the data is in this nice matrix format. This script also shows you how to visualize the results on an inflated and flat brain.

In [2]:
def calculate_noiseceiling(betas, n: int=1):
    """
    Calculate the standard deviation across trials, square the result,
    average across images, and then take the square root. The result is
    the estimate of the 'noise standard deviation'. 
    Parameters:
    betas: beta estimates in shape (vertices, num_reps, num_stimuli)
    Returns:
    ncsnr: noise-ceiling SNR at each voxel in shape (voxel_x, voxel_y, voxel_z) as ratio between signal std and noise std
    noiseceiling: noise ceiling at each voxel in shape (voxel_x, voxel_y, voxel_z) as % of explainable variance 
    Code adapted from GLMsingle example: https://github.com/cvnlab/GLMsingle/blob/main/examples/example9_noiseceiling.ipynb
    """
    assert(len(betas.shape) == 3)
    numvertices = betas.shape[0]
    num_reps = betas.shape[1]
    num_vids = betas.shape[2]
    noisesd = np.sqrt(np.mean(np.power(np.std(betas,axis=1,keepdims=1,ddof=1),2),axis=2)).reshape((numvertices,))

    # Calculate the total variance of the single-trial betas.
    totalvar = np.power(np.std(np.reshape(betas, (numvertices , num_reps*num_vids)), axis=1),2)

    # Estimate the signal variance and positively rectify.
    signalvar = totalvar - np.power(noisesd,2)

    signalvar[signalvar < 0] = 0
    # Compute ncsnr as the ratio between signal standard deviation and noise standard deviation.
    ncsnr = np.sqrt(signalvar) / noisesd

    # Compute noise ceiling in units of percentage of explainable variance
    noiseceiling = 100 * (np.power(ncsnr,2) / (np.power(ncsnr,2) + 1/n))
    return ncsnr, noiseceiling

def list_rep(myList: list, reps: int):
    #returns a list of items in "mylist" that are repeated "reps" number of times
    repList = []
    # traverse for all elements
    for x in myList:
        if x not in repList: 
            count = myList.count(x)
            if count == reps:
                repList.append(x)
    return repList

In [None]:
#setup paths and housekeeping
dataset_root = os.path.join(os.getenv("DATASETS_ROOT", "/default/path/to/datasets"),"CC2017") #use default if DATASETS_ROOT env variable is not set.
project_root = os.getenv("PROJECT_ROOT", "/default/path/to/datasets")

print(f"dataset_root: {dataset_root}")

fmri_path = os.path.join(dataset_root,"video_fmri_dataset")
nvertices = 91282

In [None]:
#define subject and session
n_task = {'train': [1,2], 'test': [1,10]}
ext_list = ['eps','png']
save_flag = False
for subject in range(1,4):
    save_root = os.path.join(project_root, "src", "fmriDatasetPreparation", "datasets", "CC2017", "validation", "output", "noiseceiling", f"subject{subject}")
    if not os.path.exists(save_root) and save_flag:
        os.makedirs(save_root)

    for task in ['test','train']:#load ts estimates
        with open(os.path.join(fmri_path, "TSTrialEstimates", f"subject{subject}", "estimates-prepared", "step01", f"subject{subject}_z=1_TSTrialEstimates_task-{task}.pkl"), 'rb') as f:
            estimates, condition_order = pickle.load(f) #shape nvideos, nvertices 
        #reshape the trial estimates into a matrix organized by condition and its repeates
        numreps = max(n_task[task])
        repeated_conditions = list_rep(condition_order, numreps) #just get the conditions repeated exactly numpreps times (10 for test, 2 for train)
        fmri = np.zeros((len(repeated_conditions), numreps, nvertices))
        for cond_idx, cond in tqdm(enumerate(repeated_conditions), total=len(repeated_conditions), desc=f"Reorganizing data into condition x repeats x vertices matrix"):
            indices = [i for i, x in enumerate(condition_order) if x == cond]
            fmri[cond_idx, :, :] = estimates[np.array(indices).astype(int), :]

        fmri = fmri.T
        print(f"shape of fmri data in vertices x repeats x conditions matrix: {fmri.shape}")
        for n in n_task[task]:
            print("calculating noiseceiling...")
            ncsnr, noiseceiling = calculate_noiseceiling(fmri, n=n)
            print(f"subject{subject} {task} n={n} max noiseceiling: {np.nanmax(noiseceiling)}")
            
            views = ['lateral', 'medial'] #['lateral', 'medial', 'dorsal', 'ventral', 'anterior', 'posterior']
            noiseceiling[noiseceiling < 0] = 0
            stat = noiseceiling.copy()
            for hemi in ['left','right']:
                mesh = hcp.mesh.inflated
                cortex_data = hcp.cortex_data(stat)
                bg = hcp.mesh.sulc
                for view in views:
                    display = plotting.plot_surf_stat_map(mesh, cortex_data, hemi=hemi,
                    threshold=1, bg_map=bg, view=view, cmap='hot')
                    if save_flag:
                        for ext in ext_list:
                            if ext == 'png':
                                plt.savefig(os.path.join(save_root, f"subject{subject}_task-{task}_mesh-inflated_view-{view}_hemi-{hemi}_n-{n}_trialestimates.{ext}"),dpi=300)
                            else:
                                plt.savefig(os.path.join(save_root, f"sub-{subject:02}_task-{task}_mesh-inflated_view-{view}_hemi-{hemi}_n-{n}_trialestimates.{ext}"))

                #flattened brain
                if hemi == 'left':
                    cortex_data = hcp.left_cortex_data(stat)
                    display = plotting.plot_surf(hcp.mesh.flat_left, cortex_data,
                    threshold=1, bg_map=hcp.mesh.sulc_left, colorbar=True, cmap='hot')
                    if save_flag:
                        for ext in ext_list:
                            if ext == 'png':
                                plt.savefig(os.path.join(save_root, f"sub-{subject:02}_task-{task}_mesh-flat_hemi-{hemi}_n-{n}_trialestimates.{ext}"),dpi=300)
                            else:
                                plt.savefig(os.path.join(save_root, f"sub-{subject:02}_task-{task}_mesh-flat_hemi-{hemi}_n-{n}_trialestimates.{ext}"))
                    plt.show()

                if hemi == 'right':
                    cortex_data = hcp.right_cortex_data(stat)
                    display = plotting.plot_surf(hcp.mesh.flat_right, cortex_data,
                    threshold=1, bg_map=hcp.mesh.sulc_right, colorbar=True, cmap='hot')
                    if save_flag:
                        for ext in ext_list:
                            if ext == 'png':
                                plt.savefig(os.path.join(save_root, f"sub-{subject:02}_task-{task}_mesh-flat_hemi-{hemi}_n-{n}_trialestimates.{ext}"),dpi=300)
                            else:
                                plt.savefig(os.path.join(save_root, f"sub-{subject:02}_task-{task}_mesh-flat_hemi-{hemi}_n-{n}_trialestimates.{ext}"))
                    plt.show()