In [1]:
import nilearn.image, nilearn.decoding
import numpy as np
import pandas as pd
import nibabel as nib
import scipy.stats as stats
from sklearn.model_selection import GroupKFold
from sklearn.svm import LinearSVC
from tqdm import tqdm # precessing-bar

# import matplotlib.pyplot as plt
# import plotly.graph_objects as go
# from plotly.subplots import make_subplots
# import plotly.express as px #interactive-plot
# import cufflinks as cf
# from plotly.offline import iplot
# cf.go_offline()



In [2]:
# Ver. Local
# python >= 3.6 -> fString

fmri_dir='/Volumes/clmnlab/G7T/main/fmri_data'

def getColPos_IF(subj_id,df=False):
    ex_group= {'WG_P1P2' : ['IF02', 'IF03', 'IF04', 'IF07'],
               'WG_P2P1' : ['IF05', 'IF06', 'IF08', 'IF10'],
               'GW_P1P2' : ['IF12', 'IF15'],
               'GW_P2P1' : ['IF13', 'IF14']}
    if df:
        ex_color={'WG':['White','Gray','White','Gray'],'GW':['Gray','White','Gray','White']}
        ex_pos={'P1P2':['x','x','+','+'],'P2P1':['+','+','x','x']}
    else:
        ex_color={'WG':[0,1,0,1],'GW':[1,0,1,0]}
        ex_pos={'P1P2':[0,0,1,1],'P2P1':[1,1,0,0]}
    ColPos=[keys for keys in ex_group if subj_id in ex_group[keys]][0]
    color=ex_color[ColPos[:2]]
    pos=ex_pos[ColPos[3:]]
    return color, pos

def load_Beta_IF(subj_id, run_n):
    data_dir=f'{fmri_dir}/glm_results/G7T_{subj_id}/LSSpb04_unmask/betasLSS.G7T_{subj_id}.run0{run_n}.nii.gz'
    img = nilearn.image.load_img(data_dir)
    return img

def load_Behavior_IF(subj_id, run_n):
    group_id=subj_id[0:2]
    Subj_id=group_id+subj_id[-2:]
    EL = subj_id[2]
    regressor_dir=f'{fmri_dir}/regressors/G7T_{group_id}/G7T_{subj_id}/totalReward_run0{run_n}.txt'
    hit_Dat=pd.read_csv(regressor_dir,sep=' ',header=None).T.rename(columns={0:'hit'})
    handpos=pd.DataFrame({'hand':['R']*(len(hit_Dat)),
                          'position':[0,0,1,1]*(len(hit_Dat)//4),
                          'trial':np.repeat(list(range(1,21)),2),
                          'mapping':[0,1,0,1]*(len(hit_Dat)//4),
                         'EarlyLate':[EL]*len(hit_Dat)})
    [group_color,group_position]=getColPos_IF(Subj_id,True)
    colpos=pd.DataFrame({'colorGroup':group_color*(len(hit_Dat)//4),'posGroup':group_position*(len(hit_Dat)//4)})
    y_dat=pd.concat([hit_Dat,handpos,colpos],axis=1)
    return y_dat

def totalize_behaviorIF(subj_id, runlist):
    bavDat=load_BehaviorIF(subj_id, runlist[0]).assign(session=runlist[0])
    for i in runlist[1:]:
        bavDat0=load_BehaviorIF(subj_id, i).assign(session=i)
        bavDat=pd.concat([bavDat,bavDat0])
    return bavDat

def get_XYG_IF(subj_id, runlist,target):
    Xs = [load_Beta_IF(subj_id, run) for run in runlist]
    Ys = [load_HitN_IF(subj_id, run, target) for run in runlist]
    
    group = [i for i, y in enumerate(Ys) for j in range(len(y))]
    
    xDat = nilearn.image.concat_imgs(Xs)
    yDat = np.concatenate(Ys)
    
    assert xDat.shape[-1] == yDat.shape[0]
    assert yDat.shape[0] == len(group)
    
    return xDat, yDat, group

# ROI Fan Mask Setting
def num2str3(num):
    numstr=str(num)
    zerolen=3-len(numstr)
    str3='0'*zerolen+numstr
    return str3
fanlist=['Fan_resam'+num2str3(i)+'.nii' for i in range(1,281)]

def fast_masking(img, roi):
    img_data = img.get_fdata()
    roi_mask = roi.get_fdata().astype(bool)
    
    if img_data.shape[:3] != roi_mask.shape:
        raise ValueError(f'different shape while masking! img={img_data.shape} and roi={roi_mask.shape}')
    return img_data[roi_mask, :].T

def load_HitN_IF(subj_id, run_n, target):
    group_id=subj_id[0:2]
    regressor_dir=f'{fmri_dir}/regressors/G7T_{group_id}/G7T_{subj_id}/totalReward_run0{run_n}.txt'
    hit_Dat=pd.read_csv(regressor_dir,sep=' ',header=None).T.rename(columns={0:'hit'})
    handpos=pd.DataFrame({'mapping':[0,1,0,1]*(len(hit_Dat)//4),'position':[0,0,1,1]*(len(hit_Dat)//4),'trial':np.repeat(list(range(1,21)),2)})
    [group_color,group_position]=getColPos_IF(group_id+subj_id[-2:])
    colpos=pd.DataFrame({'colorGroup':group_color*(len(hit_Dat)//4),'posGroup':group_position*(len(hit_Dat)//4)})
    y_dat=pd.concat([hit_Dat,handpos,colpos],axis=1)
    
    return list(y_dat[target])

In [7]:
X,Y,group=get_XYG_IF('IFE02',list(range(1,7)),'mapping')

In [3]:
def run_searchlight(fmri_mask, X, y, group, estimator, group_k, radius, chance_level):
    cv = GroupKFold(group_k)

    searchlight = nilearn.decoding.SearchLight(
        fmri_mask,
        radius=radius,
        estimator=estimator,
        n_jobs=4,
        verbose=False,
        cv=cv,
        scoring='balanced_accuracy'
    )

    searchlight.fit(X, y, group)
    scores = searchlight.scores_ - chance_level

    return nilearn.image.new_img_like(fmri_mask, scores)

In [6]:
subj_list = ['IFE02','IFL02','IFE03','IFL03','IFE05','IFL05','IFE06','IFL06','IFE07','IFL07','IFE08','IFL08','IFE13','IFL13','IFE15','IFL15']
runlist = [1,2,3,4,5,6]
for subj in tqdm(subj_list):
    estimator = LinearSVC(max_iter=1e5)
    estimator_name = 'svc'
    radius = 5  # 적절한 크기를 사용하세요.
#     mask_dir = f'/Volumes/clmnlab/G7T/pilot/fmri_data/preproc_data/7T_anat/G7T_{subj}/full_mask.G7T_{subj}.nii.gz'
    mask_dir = 'test2.nii'
    mask_img = nilearn.image.load_img(mask_dir)

    X, y, group = get_XYG_IF(subj, runlist,'mapping')

    searchlight_img = run_searchlight(mask_img, X, y, group, group_k=6, estimator=estimator, radius=radius, chance_level=1/2)
    searchlight_img.to_filename(f'{subj}_r{radius}_{estimator_name}_parahippocam.nii.gz')
    print(f"{subj} finished")

  6%|▋         | 1/16 [51:30<12:52:38, 3090.57s/it]

IFE02 finished


 12%|█▎        | 2/16 [2:01:52<13:20:20, 3430.05s/it]

IFL02 finished


 19%|█▉        | 3/16 [2:58:59<12:22:59, 3429.16s/it]

IFE03 finished


 25%|██▌       | 4/16 [5:42:38<17:49:11, 5345.98s/it]

IFL03 finished


 31%|███▏      | 5/16 [6:55:33<15:26:43, 5054.83s/it]

IFE05 finished


 38%|███▊      | 6/16 [9:48:21<18:28:05, 6648.54s/it]

IFL05 finished


 44%|████▍     | 7/16 [10:28:33<13:26:40, 5377.79s/it]

IFE06 finished


 50%|█████     | 8/16 [11:31:05<10:51:59, 4889.96s/it]

IFL06 finished


 56%|█████▋    | 9/16 [12:35:26<8:54:29, 4581.41s/it] 

IFE07 finished


 62%|██████▎   | 10/16 [13:30:50<7:00:24, 4204.13s/it]

IFL07 finished


 69%|██████▉   | 11/16 [15:04:42<6:26:01, 4632.38s/it]

IFE08 finished


 75%|███████▌  | 12/16 [16:04:24<4:47:49, 4317.43s/it]

IFL08 finished


 81%|████████▏ | 13/16 [17:55:07<4:10:44, 5014.90s/it]

IFE13 finished


 88%|████████▊ | 14/16 [21:05:03<3:50:58, 6929.23s/it]

IFL13 finished


 94%|█████████▍| 15/16 [22:45:01<1:50:50, 6650.06s/it]

IFE15 finished


100%|██████████| 16/16 [25:05:34<00:00, 5645.90s/it]  

IFL15 finished





In [None]:
subj_list = ['IFE02','IFL02','IFE03','IFL03','IFE05','IFL05','IFE06','IFL06','IFE07','IFL07','IFE08','IFL08','IFE13','IFL13','IFE15','IFL15']
runlist = [1,2,3,4,5,6]
for subj in tqdm(subj_list):
    estimator = LinearSVC(max_iter=1e5)
    estimator_name = 'svc'
    radius = 5  # 적절한 크기를 사용하세요.
    mask_dir = '/Volumes/clmnlab/G7T/main/fmri_data/masks/Cerebellum/mask.Cerebellum.nii.gz'
    mask_img = nilearn.image.load_img(mask_dir)

    X, y, group = get_XYG_IF(subj, runlist,'mapping')

    searchlight_img = run_searchlight(mask_img, X, y, group, group_k=6, estimator=estimator, radius=radius, chance_level=1/2)
    searchlight_img.to_filename(f'{subj}_r{radius}_{estimator_name}_cereb.nii.gz')
    print(f"{subj} finished")

  6%|▋         | 1/16 [8:14:03<123:30:49, 29643.29s/it]

IFE02 finished


 12%|█▎        | 2/16 [13:59:14<104:51:30, 26963.62s/it]

IFL02 finished


 19%|█▉        | 3/16 [19:11:28<88:27:10, 24494.64s/it] 

IFE03 finished


 25%|██▌       | 4/16 [34:16:49<111:28:30, 33442.58s/it]

IFL03 finished


 31%|███▏      | 5/16 [37:36:44<82:31:31, 27008.36s/it] 

IFE05 finished


 38%|███▊      | 6/16 [40:05:11<59:56:20, 21578.04s/it]

IFL05 finished


 44%|████▍     | 7/16 [43:30:44<47:00:41, 18804.62s/it]

IFE06 finished


 50%|█████     | 8/16 [47:15:23<38:14:15, 17206.93s/it]

IFL06 finished


 56%|█████▋    | 9/16 [52:51:01<35:10:03, 18086.19s/it]

IFE07 finished
