In [8]:
#import packages
from nipype.interfaces.io import DataSink, SelectFiles, DataGrabber # Data i/o
from nipype.interfaces.utility import IdentityInterface, Function     # utility
from nipype.pipeline.engine import Node, Workflow, JoinNode,MapNode      # pypeline engine

from nipype.interfaces.fsl.model import Randomise, GLM, Cluster
from nipype.interfaces.freesurfer.model import Binarize
from nipype.interfaces.fsl.utils import ImageMeants, Merge, Split
from nipype.interfaces.fsl.maths import ApplyMask
from pandas import read_csv, DataFrame

#set output file type for FSL to NIFTI
from nipype.interfaces.fsl.preprocess import FSLCommand
FSLCommand.set_default_output_type('NIFTI_GZ')

# Set study variables
setup = "Lucy"

if setup == "sherlock":
    studyhome = '/oak/stanford/groups/iang/BABIES_data/BABIES_rest/'
    preproc_dir = studyhome + 'processed/preproc'
    output_dir = studyhome + 'processed/sbc_analysis'
    workflow_dir = studyhome + 'workflows'
    roi_dir = studyhome + 'ROIs'
    group_con = studyhome + 'misc/tcon.con'
    template_brain = studyhome + 'templates/T2w_BABIES_template_2mm.nii.gz'
    gm_template = studyhome + 'templates/BABIES_gm_mask_2mm.nii.gz'


if setup == "Lucy":
    studyhome = '/Volumes/iang/BABIES_data/BABIES_rest/'
    preproc_dir = studyhome + 'processed/preproc'
    output_dir = studyhome + 'processed/sbc_analysis'
    workflow_dir = studyhome + 'workflows'
    roi_dir = studyhome + 'ROIs'
    group_con = studyhome + 'misc/tcon.con'
    template_brain = studyhome + 'templates/T2w_BABIES_template_2mm.nii.gz'
    gm_template = studyhome + 'templates/BABIES_gm_mask_2mm.nii.gz'
    

elif setup == "Cat":
    studyhome = '/Users/catcamacho/Box/SNAP/BABIES/BABIES_rest/'
    preproc_dir = studyhome + 'processed/preproc'
    output_dir = studyhome + 'processed/sbc_analysis'
    workflow_dir = studyhome + 'workflows'
    roi_dir = studyhome + 'ROIs'
    group_con = studyhome + 'misc/tcon.con'
    template_brain = studyhome + 'templates/T2w_BABIES_template_2mm.nii.gz'
    gm_template = studyhome + 'templates/BABIES_gm_mask_2mm.nii.gz'

subject_info = read_csv(studyhome + 'misc/subject_info.csv', index_col=None) #csv with columns 'subject_id','age', and any group lists like 'lena'
subject_info = subject_info.sort_values(by=['subject_id'])
subjects_list = subject_info['subject_id'].tolist()
list_names = ['lena','sens', 'postnatal_stress'] #replace with actual group list labels

proc_cores = 2

# ROIs for connectivity analysis
Lamyg = roi_dir + '/L_amyg_6mm.nii.gz'
Ramyg = roi_dir + '/R_amyg_6mm.nii.gz'
lipl = roi_dir + '/l_ipl_8mm.nii.gz'
ripl = roi_dir + '/r_ipl_8mm.nii.gz'
mpfc = roi_dir + '/mPFC_8mm.nii.gz'

target_rois = [mpfc, lipl, ripl]

ROIs = [Lamyg, Ramyg]
rois = ['L_amyg','R_amyg']

#Lifg = roi_dir + '/l_ifg_10mm.nii.gz'
#Rifg = roi_dir + '/r_ifg_10mm.nii.gz'

#ROIs = [Lifg, Rifg]
#rois = ['l_ifg','r_ifg']

min_clust_size = 10

In [9]:
## File handling
# Identity node- select subjects
infosource = Node(IdentityInterface(fields=['subject_id','ROIs']),
                     name='infosource')
infosource.iterables = [('subject_id', subjects_list),('ROIs',ROIs)]


# Data grabber- select fMRI and ROIs
templates = {'func': preproc_dir + '/proc_func/{subject_id}/func_filtered_smooth.nii*'}
selectfiles = Node(SelectFiles(templates), name='selectfiles')

# Datasink- where our select outputs will go
datasink = Node(DataSink(), name='datasink')
datasink.inputs.base_directory = output_dir
datasink.inputs.container = output_dir
substitutions = [('_subject_id_', ''),
                ('_ROIs_..Users..catcamacho..Box..SNAP..BABIES..BABIES_rest..ROIs..',''), 
                 ('_ROIs_..share..iang..active..BABIES..BABIES_rest..ROIs..',''), 
                  ('_ROIs_..oak..stanford..groups..iang..BABIES_data..BABIES_rest..ROIs..',''),
                 ('_ROIs_..Volumes..iang..BABIES_data..BABIES_rest..ROIs..',''),
                 ('_6mm.nii.gz','_'),('_8mm.nii.gz','_'),('_10mm.nii.gz','_')]
datasink.inputs.substitutions = substitutions

In [10]:
## Seed-based level 1

# Extract ROI timeseries
ROI_timeseries = Node(ImageMeants(), name='ROI_timeseries', iterfield='mask')

# model ROI connectivity
glm = Node(GLM(out_file='betas.nii.gz',out_cope='cope.nii.gz'), name='glm', iterfield='design')

In [12]:
sbc_workflow = Workflow(name='sbc_workflow')
sbc_workflow.connect([(infosource,selectfiles,[('subject_id','subject_id')]),
                      (selectfiles,ROI_timeseries,[('func','in_file')]),
                      (infosource,ROI_timeseries,[('ROIs','mask')]),
                      (ROI_timeseries,glm,[('out_file','design')]),
                      (selectfiles,glm,[('func','in_file')]),
                      (glm,datasink,[('out_file','glm_betas')])
                     ])
sbc_workflow.base_dir = workflow_dir
#sbc_workflow.write_graph(graph2use='flat')
sbc_workflow.run('MultiProc', plugin_args={'n_procs': proc_cores})

190122-09:38:39,349 workflow INFO:
	 Workflow sbc_workflow settings: ['check', 'execution', 'logging', 'monitoring']
190122-09:38:43,143 workflow INFO:
	 Running in parallel.
190122-09:38:43,161 workflow INFO:
	 [MultiProc] Running 0 tasks, and 94 jobs ready. Free memory (GB): 14.40/14.40, Free processors: 2/2.
190122-09:38:43,688 workflow INFO:
	 [Node] Setting-up "sbc_workflow.selectfiles" in "/Volumes/iang/BABIES_data/BABIES_rest/workflows/sbc_workflow/_ROIs_..Volumes..iang..BABIES_data..BABIES_rest..ROIs..R_amyg_6mm.nii.gz_subject_id_143-C-T1/selectfiles".
190122-09:38:43,836 workflow INFO:
	 [Node] Setting-up "sbc_workflow.selectfiles" in "/Volumes/iang/BABIES_data/BABIES_rest/workflows/sbc_workflow/_ROIs_..Volumes..iang..BABIES_data..BABIES_rest..ROIs..L_amyg_6mm.nii.gz_subject_id_143-C-T1/selectfiles".
190122-09:38:45,163 workflow INFO:
	 [MultiProc] Running 2 tasks, and 92 jobs ready. Free memory (GB): 14.00/14.40, Free processors: 0/2.
                     Currently running:
 

<networkx.classes.digraph.DiGraph at 0x11d19a470>

In [13]:
def extract_cluster_betas(cluster_index_file, sample_betas, min_clust_size, subject_ids):
    from nipype import config, logging
    config.enable_debug_mode()
    logging.update_logging(config)
    from nibabel import load, save, Nifti1Image
    from pandas import DataFrame, Series
    from numpy import unique, zeros_like
    from nipype.interfaces.fsl.utils import ImageMeants
    from os.path import abspath
    
    subject_ids = sorted(subject_ids)
    sample_data = DataFrame(subject_ids, index=None, columns=['Subject'])
    
    cluster_nifti = load(cluster_index_file[0])
    cluster_data = cluster_nifti.get_data()
    clusters, cluster_sizes = unique(cluster_data, return_counts=True)
    
    final_clusters = clusters[cluster_sizes>=min_clust_size]
    for clust_num in final_clusters[1:]:
        temp = zeros_like(cluster_data)
        temp[cluster_data==clust_num] = 1
        temp_nii = Nifti1Image(temp,cluster_nifti.affine)
        temp_file = 'temp_clust_mask.nii.gz'
        save(temp_nii, temp_file)
        
        eb = ImageMeants()
        eb.inputs.in_file = sample_betas
        eb.inputs.mask = temp_file
        eb.inputs.out_file = 'betas.txt'
        eb.run()
        betas = open('betas.txt').read().splitlines()
        sample_data['clust' + str(clust_num)] = Series(betas, index=sample_data.index)
    
    sample_data.to_csv('extracted_betas.csv')
    extracted_betas_csv = abspath('extracted_betas.csv')
    return(extracted_betas_csv)

def separate_sub_lists(subjects_df, list_name, output_dir, roi):
    from nipype import config, logging
    config.enable_debug_mode()
    logging.update_logging(config)
    from subprocess import check_call
    from os.path import abspath
    from nipype.interfaces.fsl import Merge
    
    included_subs = subjects_df[subjects_df[list_name]==1]
    included_subs = included_subs.sort_values(by=['subject_id'])
    subjects_list = included_subs['subject_id'].tolist()
    subjects_ages = included_subs['age'].tolist()
    
    beta_list = []
    text_file = open('temp_text.txt','w')
    for a in range(0,len(subjects_list)):
        beta_list.append(output_dir+'/glm_betas/'+roi+'_'+subjects_list[a]+'/betas.nii.gz')
        text_file.write('1 {0}\n'.format(subjects_ages[a]))
    
    text_file.close()
    file = abspath('temp_text.txt')
    check_call(['Text2Vest',file,list_name + '_design.mat'])
    design_file = abspath('{0}_design.mat'.format(list_name))
        
    me=Merge()
    me.inputs.dimension='t'
    me.inputs.in_files=beta_list
    me.inputs.merged_file='betas_merged.nii.gz'
    me.run()
    
    betas = abspath('betas_merged.nii.gz')
    
    return(betas,design_file,subjects_list)

In [14]:
## Level 2
# new identity node
infosource2 = Node(IdentityInterface(fields=['roi','list_name']),
                   name='infosource2')
infosource2.iterables = [('roi',rois),('list_name',list_names)]

# Data grabber- select fMRI and ROIs
datagrabber = Node(Function(input_names=['subjects_df', 'list_name', 'output_dir', 'roi'], 
                            output_names=['betas','design_file', 'subjects_list'], 
                            function=separate_sub_lists), name='datagrabber')
datagrabber.inputs.output_dir = output_dir
datagrabber.inputs.subjects_df = subject_info

# FSL randomise for higher level analysis
highermodel = Node(Randomise(raw_stats_imgs= True,
                             mask=gm_template,
                             tcon=group_con),
                   name = 'highermodel')

## Cluster results
# make binary masks of sig clusters
binarize = MapNode(Binarize(min=0.95), 
                   name='binarize', 
                   iterfield=['in_file'])

# mask T-map before clustering
mask_tmaps = MapNode(ApplyMask(), 
                     name='mask_tmaps',
                     iterfield=['in_file','mask_file'])

# clusterize and extract cluster stats/peaks
clusterize = MapNode(Cluster(threshold=3.56,
                             pthreshold=0.05,
                             dlh=4,
                             volume=97532, #for GM mask
                             out_index_file='outindex.nii.gz', 
                             out_localmax_txt_file='localmax.txt'), 
                     name='clusterize',
                     iterfield=['in_file'])

extract_betas = Node(Function(input_names=['cluster_index_file','sample_betas',
                                           'min_clust_size','subject_ids'],
                              output_names=['extracted_betas_csv'],
                              function=extract_cluster_betas),
                     name='extract_betas')
extract_betas.inputs.min_clust_size = min_clust_size

In [15]:
group_workflow = Workflow(name='group_workflow')
group_workflow.connect([(infosource2,datagrabber,[('roi','roi')]),
                        (infosource2,datagrabber,[('list_name','list_name')]),
                        (datagrabber, highermodel,[('betas','in_file'),('design_file','design_mat')]),
                        (highermodel, clusterize, [('tstat_files','in_file')]),
                        (datagrabber, extract_betas, [('betas','sample_betas'),
                                                      ('subjects_list','subject_ids')]),
                        (clusterize, extract_betas, [('index_file','cluster_index_file')]),

                        (highermodel,datasink,[('t_corrected_p_files','rand_corrp_files')]),
                        (highermodel,datasink,[('tstat_files','rand_tstat_files')]),
                        (clusterize,datasink,[('index_file','cluster_index_file')]),
                        (clusterize,datasink,[('localmax_txt_file','localmax_txt_file')]),
                        (datagrabber, datasink, [('betas','merged_betas')]),
                        (extract_betas, datasink, [('extracted_betas_csv','all_cluster_betas')])
                       ])
group_workflow.base_dir = workflow_dir
#group_workflow.write_graph(graph2use='flat')
group_workflow.run('MultiProc', plugin_args={'n_procs': proc_cores})

190122-10:01:11,418 workflow INFO:
	 Workflow group_workflow settings: ['check', 'execution', 'logging', 'monitoring']
190122-10:01:15,680 workflow INFO:
	 Running in parallel.
190122-10:01:15,688 workflow INFO:
	 [MultiProc] Running 0 tasks, and 6 jobs ready. Free memory (GB): 14.40/14.40, Free processors: 2/2.
190122-10:01:16,32 workflow INFO:
	 [Node] Outdated cache found for "group_workflow.datagrabber".
190122-10:01:16,42 workflow INFO:
	 [Node] Setting-up "group_workflow.datagrabber" in "/Volumes/iang/BABIES_data/BABIES_rest/workflows/group_workflow/_list_name_postnatal_stress_roi_R_amyg/datagrabber".
190122-10:01:16,130 workflow INFO:
	 [Node] Outdated cache found for "group_workflow.datagrabber".
190122-10:01:16,193 workflow INFO:
	 [Node] Outdated cache found for "group_workflow.datagrabber".
190122-10:01:16,203 workflow INFO:
	 [Node] Setting-up "group_workflow.datagrabber" in "/Volumes/iang/BABIES_data/BABIES_rest/workflows/group_workflow/_list_name_sens_roi_R_amyg/datagrabb

<networkx.classes.digraph.DiGraph at 0x11d6705f8>

In [17]:
def target_roi_betas(target_rois, sample_betas, subject_ids):
    from nipype.interfaces.fsl.utils import ImageMeants
    from os.path import abspath, basename
    from pandas import DataFrame, Series
    from nipype import config, logging

    config.enable_debug_mode()
    logging.update_logging(config)

    subject_ids = sorted(subject_ids)
    sample_data = DataFrame(subject_ids, index=None, columns=['Subject'])

    for roi in target_rois: 
        roi_file = basename(roi)
        eb = ImageMeants()
        eb.inputs.in_file = sample_betas
        eb.inputs.mask = roi_file
        eb.inputs.out_file = 'betas.txt'
        eb.run()
        betas = open('betas.txt').read().splitlines()
        sample_data[roi_file[:-7]] = Series(betas, index=sample_data.index)

    sample_data.to_csv('extracted_betas.csv')
    extracted_betas_csv = abspath('extracted_betas.csv')
    return(extracted_betas_csv)

target_betas = Node(Function(input_names=['target_rois','sample_betas','subject_ids'], 
                             output_names=['extracted_betas_csv'], 
                             function=target_roi_betas),
                    name='target_betas')
target_betas.inputs.target_rois = target_rois

In [19]:
pullbetas_workflow = Workflow(name='pullbetas_workflow')
pullbetas_workflow.connect([(infosource2,datagrabber,[('roi','roi'),
                                                      ('list_name','list_name')]),
                            (datagrabber, target_betas,[('betas','sample_betas'),
                                                        ('subjects_list','subject_ids')]),
                            (target_betas, datasink, [('extracted_betas_csv','roi_target_betas')])
                           ])
pullbetas_workflow.base_dir = workflow_dir
#pullbetas_workflow.write_graph(graph2use='flat')
pullbetas_workflow.run('MultiProc', plugin_args={'n_procs': proc_cores})

190122-10:34:58,846 workflow INFO:
	 Workflow pullbetas_workflow settings: ['check', 'execution', 'logging', 'monitoring']
190122-10:35:01,335 workflow INFO:
	 Running in parallel.
190122-10:35:01,341 workflow INFO:
	 [MultiProc] Running 0 tasks, and 6 jobs ready. Free memory (GB): 14.40/14.40, Free processors: 2/2.
190122-10:35:02,391 workflow INFO:
	 [Node] Outdated cache found for "group_workflow.datagrabber".
190122-10:35:02,401 workflow INFO:
	 [Node] Setting-up "group_workflow.datagrabber" in "/Volumes/iang/BABIES_data/BABIES_rest/workflows/group_workflow/_list_name_postnatal_stress_roi_R_amyg/datagrabber".
190122-10:35:02,475 workflow INFO:
	 [Node] Outdated cache found for "group_workflow.datagrabber".
190122-10:35:02,509 workflow INFO:
	 [Node] Outdated cache found for "group_workflow.datagrabber".
190122-10:35:02,521 workflow INFO:
	 [Node] Setting-up "group_workflow.datagrabber" in "/Volumes/iang/BABIES_data/BABIES_rest/workflows/group_workflow/_list_name_sens_roi_R_amyg/dat

RuntimeError: Workflow did not execute cleanly. Check log for details