## Null model for receptor regressions
- spatial permutation aka "spin test" 
- documentation on null models: https://netneurolab.github.io/neuromaps/user_guide/nulls.html 
- code for spin test: https://github.com/netneurolab/neuromaps/blob/main/neuromaps/nulls/nulls.py
- implement this on the group level only?

! this notebook only contains the group level code as a sanitiy check

In [1]:
import os
import tempfile
import nibabel as nib
import numpy as np
from scipy import ndimage
from scipy.stats import zscore
from scipy.spatial.distance import cdist
from sklearn.linear_model import LinearRegression
from neuromaps import nulls, transforms
from nilearn import datasets, surface
from params_and_paths import Paths, Params, Receptors
import fmri_funcs as fun

paths = Paths()
params = Params()
rec = Receptors()

In [3]:
EXPLORE_MODEL = 'noEntropy_noER'



In [4]:
def get_reg_r_sq(X, y):
    lin_reg = LinearRegression()
    lin_reg.fit(X, y)
    yhat = lin_reg.predict(X)
    SS_Residual = sum((y - yhat) ** 2)
    SS_Total = sum((y - np.mean(y)) ** 2)
    r_squared = 1 - (float(SS_Residual)) / SS_Total
    return r_squared

def get_reg_r_pval(X, y, spins, nspins):
    mask =  np.logical_not(np.isclose(y, 0))
    y_masked = y[mask]
    X_masked = X[mask,:]
    emp = get_reg_r_sq(X, y)
    null = np.zeros((nspins, ))
    for s in range(nspins):   #remove the zeros from the (permutated) effect file and the corresponding values from receptor 
        y_spin = zscore(spins[:,s])
        mask = np.logical_not(np.isclose(y_spin, 0))
        y_masked = y_spin[mask]
        X_masked = X[mask,:]
        null[s] = get_reg_r_sq(X_masked, y_masked)
    return emp, (1 + sum(null > emp))/(nspins + 1)

In [5]:
output_dir = os.path.join(paths.home_dir, 'variance_explained')
if not os.path.exists(output_dir):
        os.makedirs(output_dir)

In [9]:
#load group level effect maps 
#by study and latent variable 
tasks = ['EncodeProb', 'NAConf', 'PNAS', 'Explore']  
with open(os.path.join(output_dir,'regression_null_models_grouplevel_spin_nullremoved.txt'), "w") as outfile:
    for task in tasks:
        outfile.write(f'{task}:\n')
        if task == 'Explore':
            task_path = os.path.join(paths.home_dir, task, 'schaefer', 'second_level', EXPLORE_MODEL)
        else:
            task_path = os.path.join(paths.home_dir, task, 'schaefer', 'second_level')

        receptor_dir = os.path.join(paths.home_dir, 'receptors', rec.source) 
        receptor_data =np.load(os.path.join(receptor_dir,f'receptor_density_{params.mask}_surf.pickle'), allow_pickle=True)
                    
        if task in ['NAConf']:
            add_info = '_firstTrialsRemoved'
        elif not params.zscore_per_session:
            add_info = '_zscoreAll'
        else:
            add_info = ""

        for contrast in ['confidence', 'surprise']:
            fname = f'{contrast}_schaefer_effect_map{add_info}.nii.gz'
            data_vol = nib.load(os.path.join(task_path, fname))
            effect_data = transforms.mni152_to_fsaverage(data_vol, fsavg_density='41k')
            data_gii = []
            for img in effect_data:
                data_hemi = img.agg_data()
                data_hemi = np.asarray(data_hemi).T
                data_gii += [data_hemi]
                effect_array = np.hstack(data_gii)
            spins = nulls.alexander_bloch(effect_array, atlas='fsaverage', density='41k',
                                    n_perm=1000)
            emp, model_pval = get_reg_r_pval(zscore(receptor_data),
                                    zscore(effect_array), 
                                    spins, 1000)
            
            outfile.write(f'{contrast}:\n')
            outfile.write(f'R2:{emp}  p:{model_pval}\n')
        outfile.write('\n\n')


In [6]:
tasks = ['EncodeProb']  
for task in tasks:
    if task == 'Explore':
        task_path = os.path.join(paths.home_dir, task, 'schaefer', 'second_level', EXPLORE_MODEL)
    else:
        task_path = os.path.join(paths.home_dir, task, 'schaefer', 'second_level')

    receptor_dir = os.path.join(paths.home_dir, 'receptors', rec.source) 
    receptor_data =np.load(os.path.join(receptor_dir,f'receptor_density_{params.mask}_surf.pickle'), allow_pickle=True)
                
    if task in ['NAConf']:
        add_info = '_firstTrialsRemoved'
    elif not params.zscore_per_session:
        add_info = '_zscoreAll'
    else:
        add_info = ""

    for contrast in ['confidence', 'surprise']:
        fname = f'{contrast}_schaefer_effect_map{add_info}.nii.gz'
        data_vol = nib.load(os.path.join(task_path, fname))
        effect_data = transforms.mni152_to_fsaverage(data_vol, fsavg_density='41k')
        data_gii = []
        for img in effect_data:
            data_hemi = img.agg_data()
            data_hemi = np.asarray(data_hemi).T
            data_gii += [data_hemi]
            effect_array = np.hstack(data_gii)
        mask = np.isclose(effect_array, 0)
        


In [None]:
receptor_data =np.load(os.path.join(receptor_dir,f'receptor_density_{params.mask}.pickle'), allow_pickle=True)


In [None]:
#sanity check: get the R2 group level from the volumne regression

tasks = ['EncodeProb', 'NAConf', 'PNAS', 'Explore']  
with open(os.path.join(output_dir,'regression_grouplevel_emp.txt'), "w") as outfile:
    for task in tasks:
        outfile.write(f'{task}:\n')
        if task == 'Explore':
            task_path = os.path.join(paths.home_dir, task, 'schaefer', 'second_level', EXPLORE_MODEL)
        else:
            task_path = os.path.join(paths.home_dir, task, 'schaefer', 'second_level')

        receptor_dir = os.path.join(paths.home_dir, 'receptors', rec.source) 
        receptor_data =np.load(os.path.join(receptor_dir,f'receptor_density_{params.mask}.pickle'), allow_pickle=True)
                    
        if task in ['NAConf']:
            add_info = '_firstTrialsRemoved'
        elif not params.zscore_per_session:
            add_info = '_zscoreAll'
        else:
            add_info = ""

        for contrast in ['confidence', 'surprise']:
            fname = f'{contrast}_schaefer_effect_map{add_info}.nii.gz'
            data_vol = nib.load(os.path.join(task_path, fname))
            
            #masker
            masker = fun.get_masker()
            masker.fit()
            data_masked = masker.fit_transform(data_vol)
            
            emp = get_reg_r_sq(zscore(receptor_data), zscore(data_masked.reshape(-1,1), axis=None))
            
            outfile.write(f'{contrast}:\n')
            outfile.write(f'R2:{emp}\n')
        outfile.write('\n\n')

In [5]:
tasks = ['EncodeProb', 'NAConf', 'PNAS', 'Explore']  
for task in tasks:
    if task == 'Explore':
        task_path = os.path.join(paths.home_dir, task, 'schaefer', 'second_level', EXPLORE_MODEL)
    else:
        task_path = os.path.join(paths.home_dir, task, 'schaefer', 'second_level')

    receptor_dir = os.path.join(paths.home_dir, 'receptors', rec.source) 
    receptor_data =np.load(os.path.join(receptor_dir,f'receptor_density_{params.mask}.pickle'), allow_pickle=True)
                
    if task in ['NAConf']:
        add_info = '_firstTrialsRemoved'
    elif not params.zscore_per_session:
        add_info = '_zscoreAll'
    else:
        add_info = ""

    for contrast in ['confidence']:
        fname = f'{contrast}_schaefer_effect_map{add_info}.nii.gz'
        data_vol = nib.load(os.path.join(task_path, fname))
        
        #masker
        masker = fun.get_masker()
        masker.fit()
        data_masked = masker.fit_transform(data_vol)

NameError: name 'os' is not defined

In [12]:
sum(np.isclose(data_masked.reshape(-1,1), 0))

array([354])

debug compare image and null ditribution in neuromaps 

In [33]:
from neuromaps import datasets, images, nulls, resampling
receptor = receptor_data[:,0] 
rotated = nulls.alexander_bloch(effect_array, atlas='fsaverage', density='41k',
                                n_perm=100, seed=1234)


In [34]:
from neuromaps import stats
corr, pval = stats.compare_images(effect_array, receptor, nulls=rotated)

In [None]:
zeromask = np.zeros(len(effect_array), dtype=bool)
ignore_zero = True
nan_policy = 'omit'
if ignore_zero:
    zeromask = np.logical_or(np.isclose(effect_array, 0),
                                np.isclose(receptor, 0))
nanmask = np.logical_or(np.isnan(effect_array), np.isnan(receptor))
if nan_policy == 'raise':
    if np.any(nanmask):
        raise ValueError('Inputs contain nan')
elif nan_policy == 'omit':
    mask = np.logical_and(np.logical_not(zeromask),
                            np.logical_not(nanmask))
elif nan_policy == 'propagate':
    mask = np.logical_not(zeromask)
srcdata, trgdata = effect_array[mask], receptor[mask]

if rotated is not None:
    n_perm = rotated.shape[-1]
    if ignore_zero:
        zeromask = np.logical_or(np.isclose(rotated, 0),  #remove the values that are zero in source 
                                    np.isclose(receptor, 0)) #remove the values that are removed in trg
    rotated_masked = rotated[np.logical_or(np.isclose(receptor, 0))] 



#! the mask is applied to permutated data (including permutated nulls)

#TODO: possible solution
    #!don't remove anything in the the compare_img function but than in permtest_metric
    

    


In [4]:
output_dir = os.path.join(paths.home_dir, 'variance_explained')

reg_null = np.load(os.path.join(output_dir,f'EncodeProb_surprise_all_regression_null_cv_r2'), allow_pickle=True)

this should not be a problem with the variogram -> keeps the nulls fixed 

In [20]:
import pandas as pd

tasks = ['EncodeProb', 'NAConf', 'PNAS', 'Explore']  

proj = '_on_surf'

latent_vars = ['surprise', 'confidence'] 
df = pd.DataFrame(index=tasks, columns=latent_vars)

task = 'NAConf'
latent_var = 'surprise'
output_dir = os.path.join(paths.home_dir, 'variance_explained')

#get the empirical r2 (on surf)
emp = np.load(os.path.join(output_dir,f'{task}_{latent_var}_all_regression_cv_r2_on_surf.pickle'), allow_pickle=True)
#get the null r2
null = np.load(os.path.join(output_dir,f'{task}_{latent_var}_all_regression_null_cv_r2.pickle'), allow_pickle=True)
null_df = pd.DataFrame(null)
#null_df = null_df.drop(range(0,1000))
null_means = null_df.mean().tolist()

#summarize the null r2 as we did with the empirical r2 above (for table overview in paper)
r2 = sum(null_means) / len(null_means)
df.loc[task, latent_var] = r2

from scipy.stats import zscore, sem, ttest_rel

ttest = ttest_rel(null_means, emp, alternative='less') 

ttest.pvalue


0.9996773519545895