### Instructions

Refer to `Example script.ipynb` for examples of using this notebook.

In [2]:
import os, glob, json, copy
import pandas as pd
import numpy as np
import nibabel as nib

##### Update first level model script and singularity paths if necessary (unlikely)

In [4]:
L1_path = os.getcwd()
script_path = os.path.join(L1_path, 'l1analysis_SPM.py')
singularity_path = '/data00/tools/singularity_images/neurodocker.sqsh'

### GENERATE SLURM JOBS FOR INDIVIDUAL PARTICIPANT FIRST-LEVEL MODELS

In [108]:
def ensure_exist(path, file):
    if os.path.exists(os.path.join(path, file)):
        return file
    else:
        raise Exception("File missing: " + file)

def ensure_relative(path):
    if path.startswith('/'):
        raise Exception("Make sure path is relative to the data path: " + path)
    else:
        return path
        
def ensure_list(obj):
    if type(obj) is not list:
        return [obj]
    else:
        return obj
    
def remove_key(key, obj):
    if obj.get(key):
        del obj[key]
        
def copy_from_template(target, template):
    for key, item in template.items():
        if key not in target.keys():
            target[key] = copy.deepcopy(template[key])
        elif type(item) is dict:
            copy_from_template(target[key], template[key])

In [109]:
with open(model_path, 'r') as f:
    model = json.load(f)

descriptions = model.get("Description", [])

for template_path in ensure_list(model.get("Template", [])):
   
    with open(template_path, 'r') as f:
        template = json.load(f)
    
    print(f"Using template {os.path.basename(template_path)}: ", end="")
    print("\n\t".join(ensure_list(template.get("Description", ["No description found"]))))

    descriptions += ensure_list(template.get("Description", []))
    
    copy_from_template(model, template)
    
if len(descriptions) > 0:
    model['Description'] = descriptions

remove_key('SecondLevel', model)

print("----")

Using template template-beta.json: Single-trial beta
	No FWHM smoothing
	No global scaling
	FAST correlation
	6 motion parameters and framewise displacement
	Trash FD >= 0.75
Using template task-ARF.json: product ARF
----


In [110]:
task = model['Info']['task']
job_name = 'task-{}_model-{}'.format(model['Info']['task'], model['Info']['model'])

env = model['Environment']

os.makedirs(env['output_path'], exist_ok=True)
os.makedirs(env['working_path'], exist_ok=True)

try:
    os.chmod(env['output_path'], 0o0777)
    os.chmod(env['working_path'], 0o0777)
except:
    pass
    
subs = []
if model['Info'].get('sub'):
    subs = ensure_list(model['Info']['sub'])
elif model['Info'].get('sub_container'):
    sub_container = ensure_relative(model['Info']['sub_container'])
    subs = [x.split(os.path.sep)[-2].replace('sub-','') for x in glob.glob(os.path.join(env['data_path'], sub_container, 'sub-*'+os.path.sep))]

exclude_subs = model['Info'].get('exclude',{}).get('sub',[])
for es in exclude_subs:
    es = es.replace('sub-','')
    if es in subs:
        subs.remove(es)

if len(subs) == 0:
    raise Exception("No subjects found.")
else:
    print(f"Processing {len(subs)} participants:")
    print(subs)

print("----")

Processing 29 participants:
['PRD003', 'PRD001', 'PRD002', 'PRD005', 'PRD006', 'PRD007', 'PRD008', 'PRD009', 'PRD010', 'PRD012', 'PRD013', 'PRD014', 'PRD015', 'PRD016', 'PRD017', 'PRD019', 'PRD020', 'PRD021', 'PRD023', 'PRD024', 'PRD029', 'PRD026', 'PRD034', 'PRD032', 'PRD028', 'PRD030', 'PRD033', 'PRD027', 'PRD025']
----


In [111]:
def customize_event(env, job_name, spm_model, event_path, tr_max):
    
    output_path = os.path.join(ensure_relative(env['job_path']), job_name, 'events') 
    os.makedirs(os.path.join(env['data_path'], output_path), exist_ok=True)
    
    event_df = (pd.read_csv(os.path.join(env['data_path'], event_path), sep='\t')
                .query('onset >= 0')
                .query(f'onset <= {tr_max}')
                .reset_index(drop=True))
    
    for operation, params in spm_model.get("event_options", {}).items():
        
        if operation == 'map_event':
            
            condition_map = {}
            for trial_type in event_df['trial_type'].unique():
                condition_map[trial_type] = trial_type
                
            for new_trial_type, trial_types in params.items():
                for trial_type in ensure_list(trial_types):
                    condition_map[trial_type] = new_trial_type
            
            event_df['trial_type'] = event_df['trial_type'].map(condition_map)
            
        elif operation == 'melt_event':
            
            for trial_type, trial_data in params.items():
                for idx, row in event_df.query(f'trial_type == "{trial_type}"').iterrows():
                    event_df.loc[idx, 'trial_type'] = trial_type + "_" + str(row[trial_data])

        elif operation == 'include_event':
            
            include_events = ensure_list(params)
            event_df = event_df[event_df['trial_type'].isin(include_events)].reset_index(drop=True)
                                
        elif operation == 'exclude_event':
            
            exclude_events = ensure_list(params)
            event_df = event_df[~event_df['trial_type'].isin(exclude_events)].reset_index(drop=True)

    all_pmods = []
    for trial_type, pmods in spm_model.get('pmod',{}).items():
        
        trial_df = event_df.query(f'trial_type == "{trial_type}"')
                
        for pmod in ensure_list(pmods):
            all_pmods.append(pmod)
            
            pmod_values = trial_df[pmod].copy()
            
            for operation in ensure_list(spm_model.get("pmod_options", [])):
                
                if operation == "rank":
                    pmod_values = pmod_values.rank()
                elif operation == "minmax_scale":
                    pmod_values = (pmod_values - pmod_values.min()) / (pmod_values.max() - pmod_values.min())
                elif operation == "zscore":
                    pmod_values = (pmod_values - pmod_values.mean()) / pmod_values.std()
                elif operation == "fillna":
                    pmod_values = pmod_values.fillna(pmod_values.mean())
            
            event_df.loc[trial_df.index, pmod] = pmod_values.tolist()
        
        pmod_isna = trial_df[ensure_list(pmods)].isna().any(axis=1)
        for idx, row in trial_df[pmod_isna].iterrows():
            event_df.loc[idx, 'trial_type'] = trial_type + '_nopmod'
        
    (event_df[['onset', 'duration', 'trial_type'] + np.unique(all_pmods).tolist()]
         .sort_values(by='onset')
         .to_csv(os.path.join(env['data_path'], output_path, os.path.basename(event_path)), sep='\t', index=False))
    
    return os.path.join(output_path, os.path.basename(event_path))

In [112]:
spm_model = model['SpecifySPMModel']
env['job_path'] = ensure_relative(env['job_path'])

job_path = os.path.join(env['data_path'], env['job_path'], job_name, 'jobs') 
os.makedirs(job_path, exist_ok=True)

if spm_model.get("outlier"):
    outlier_path = os.path.join(env['job_path'], job_name, 'outlier') 
    os.makedirs(os.path.join(env['data_path'], outlier_path), exist_ok=True)
    
for regressors_path in ensure_list(spm_model.get('regressors', [])):
    if (not regressors_path.endswith('txt')) and (not regressors_path.endswith('tsv')):
        raise Exception("Regressors: only TSV or TXT file supported.")

regressor_names = ensure_list(spm_model.get('regressor_names',[]))
    
for sub in subs:
    print(sub, ": ", end="")
    
    job = copy.deepcopy(model)
    
    format_args = {}
    format_args['sub'] = sub
    format_args['task'] = task
    
    issues = []
    
    runs = ensure_list(job['Info'].get('run',-1))
    for exclude_sub, exclude_runs in job['Info'].get('exclude',{}).get('run',{}).items():
        exclude_sub = exclude_sub.replace('sub-','')
        if sub == exclude_sub:
            for exclude_run in ensure_list(exclude_runs):
                if exclude_run in runs:
                    runs.remove(exclude_run)

    for include_sub, include_runs in job['Info'].get('include',{}).get('run',{}).items():
        if sub == include_sub:
            runs = runs + ensure_list(include_runs)

    functional_runs = []
    regressors = []
    event_files = []
    outlier_files = []

    for run in runs:

        if run != -1:
            format_args['run'] = run
        
        func_path = ensure_exist(env['data_path'], spm_model['functional_runs'].format(**format_args))
        functional_runs.append(func_path)
            
        regressors_path = ensure_exist(env['data_path'], spm_model['regressors'].format(**format_args))
        regressors.append(regressors_path)

        if regressors_path.endswith('tsv'):
            
            regressor_df = pd.read_csv(os.path.join(env['data_path'], regressors_path), sep='\t')

            chosen_regressor_names = []
            
            col_exists = regressor_df.columns.isin(regressor_names)
            if col_exists.sum() == 0:
                raise Exception("No regressors found.")
                
            chosen_regressor_names = regressor_df.columns[col_exists].tolist()
            if col_exists.sum() != len(regressor_names):
                issues.append(f"Only some of the regressors are found: {chosen_regressor_names}")

        if spm_model.get("outlier"):
            
            outlier_indices = list(range(spm_model['outlier'].get("dummy_scan", 0)))
            
            if spm_model["outlier"].get("regressor_names"):
                
                outlier_names = ensure_list(spm_model["outlier"]["regressor_names"])
                
                col_exists = regressor_df.columns.isin(outlier_names)
                if col_exists.sum() > 0:
                    chosen_outlier_names = regressor_df.columns[col_exists].tolist()
                    outlier_indices = outlier_indices + list(np.ravel(np.where((regressor_df[chosen_outlier_names] != 0).any(axis=1))))
                
                    if col_exists.sum() != len(outlier_names):
                        issues.append(f"Only some of the outlier variables are found: {chosen_regressor_names}")

                else:
                    issues.append(f"No outlier variables found.")

            outlier_indices = np.unique(np.array(outlier_indices, dtype=int))
            
            if len(outlier_indices) > 0:
                
                if run != -1:
                    outlier_file = os.path.join(outlier_path, f"sub-{sub}_task-{task}_run-{run}_outliers.txt")
                else:
                    outlier_file = os.path.join(outlier_path, f"sub-{sub}_task-{task}_outliers.txt")

                outlier_files.append(outlier_file)               
                np.savetxt(os.path.join(env['data_path'], outlier_file), outlier_indices, fmt="%d")
        
        elif spm_model.get("outlier_files"):
            
            outlier_files.append(ensure_exist(env['data_path'], spm_model['outlier_files'].format(**format_args)))
            
        event_path = ensure_exist(env['data_path'], spm_model['event_files'].format(**format_args))
        event_df = pd.read_csv(os.path.join(env['data_path'], event_path), sep='\t')
        
        event_clip = False
        tr_max = (nib.load(os.path.join(env['data_path'], func_path)).header.get_data_shape()[-1] - 1) * job["Info"]["tr"]
        
        if (event_df.onset.min() < 0):
            issues.append("Events with negative onset time will be discarded.")
            event_clip = True
        
        if (event_df.onset.max() > tr_max):
            issues.append("Events with onset time longer than scan length will be discarded.")
            event_clip = True
        
        pmod_isna = False
        
        if spm_model.get("pmod"):
            
            for trial_type, pmods in spm_model['pmod'].items():
                trial_df = event_df.query(f'trial_type == "{trial_type}"')
                
                for pmod in ensure_list(pmods):
                    if trial_df[pmod].isna().any():
                        issues.append(f"{event_path}: '{pmod}' has missing values for '{trial_type}' event.")
                        pmod_isna = True
                    if trial_df[pmod].var() == 0:
                        issues.append(f"{event_path}: '{pmod}' has zero variance for '{trial_type}' event, and will be excluded during analysis.")
                
        if (spm_model.get("event_options") is None) and (spm_model.get("pmod_options") is None) and (pmod_isna == False) and (event_clip == False):
            event_files.append(event_path)
        else:            
            event_files.append(customize_event(env, job_name, spm_model, event_path, tr_max))
                
    job['SpecifySPMModel']['functional_runs'] = functional_runs
    job['SpecifySPMModel']['event_files'] = event_files
    job['SpecifySPMModel']['regressors'] = regressors
    job['SpecifySPMModel']['regressor_names'] = chosen_regressor_names
    
    if len(outlier_files) > 0:
        job['SpecifySPMModel']['outlier_files'] = outlier_files
    
    job['SpecifySPMModel']['time_repetition'] = job["Info"]["tr"]
    job['Level1Design']['interscan_interval'] = job["Info"]["tr"]
    
    for key in ["event_options", "pmod_options", "outlier"]:
        remove_key(key, job["SpecifySPMModel"])

    all_events = pd.concat([pd.read_csv(os.path.join(env['data_path'], x), sep='\t') for x in job['SpecifySPMModel']['event_files']], 
                           ignore_index=True)
    all_trial_types = all_events['trial_type'].unique()

    if job.get('EstimateContrast', {}).get('basic_contrasts', False):
        
        contrasts = [] 
                               
        for trial_type in all_trial_types:
            contrasts.append([trial_type, "T", [trial_type], [1]])
        
        for cond, pmods in spm_model.get('pmod',{}).items():
            for pmod in ensure_list(pmods):
                contrasts.append([f'{cond}x{pmod}^1', "T", [f'{cond}x{pmod}^1'], [1]])
                
        contrasts = contrasts + ensure_list(job['EstimateContrast'].get('contrasts',[]))
            
        job['EstimateContrast']['contrasts'] = contrasts
        del job['EstimateContrast']['basic_contrasts']
    
    removed_contrast = []
    
    for contrast in job.get('EstimateContrast', {}).get('contrasts', []):
        
        contrast_available = True
        
        for trial_type in contrast[2]:
            
            if trial_type.endswith('^1'):
                trial_type = trial_type.split('x')[0]
            
            if trial_type not in all_trial_types:
                contrast_available = False
                
        if not contrast_available:
            removed_contrast.append(contrast)
            
    for rc in removed_contrast:
        job['EstimateContrast']['contrasts'].remove(rc)
    
    if len(removed_contrast) > 0:
        issues.append("These contrasts have been removed: " + str([rc[0] for rc in removed_contrast]))
    
    job["Info"]["sub"] = sub

    for key in ["job_path"]:
        remove_key(key, job["Environment"])

    for key in ["sub_container", "exclude", "run", "tr"]:
        remove_key(key, job["Info"])

    job_output = os.path.join(job_path, f"sub-{sub}.json")
    with open(job_output, 'w') as f:
        json.dump(job, f)
        
    if len(issues) == 0:
        print("job created")
    else:
        print("issues found - \n\t" + "\n\t".join(issues))
        
print("----")

PRD003 : 

Exception: File missing: derivatives/fmriprep/sub-PRD003/func/sub-PRD003_task-ARF_run-01_events.tsv

In [95]:
slurm_path = os.path.join(env['data_path'], env['job_path'], job_name, 'slurm') 
os.makedirs(slurm_path, exist_ok=True)
try:
    os.chmod(slurm_path, 0o0777)
    
    os.makedirs(os.path.join(slurm_path, 'out'), exist_ok=True)
    os.chmod(os.path.join(slurm_path, 'out'), 0o0777)
except:
    pass
    

for sub in subs:
    
    slurm_header = []
    slurm_header.append("#!/bin/bash")
    slurm_header.append(f"#SBATCH --job-name=sub-{sub}.job")
    slurm_header.append(f"#SBATCH --output=out/sub-{sub}.job")
    slurm_header.append(f"#SBATCH --error=out/sub-{sub}.err")
    slurm_header.append("#SBATCH --time=5-00:00")
    slurm_header.append("")
    slurm_header.append("srun ")

    data_path = env['data_path']
    output_path = env['output_path']
    working_path = env['working_path']

    json_path = os.path.join(job_path, f"sub-{sub}.json")
    
    cmd = []
    cmd.append("singularity run --cleanenv")
    cmd.append(f"-B {script_path}:/worker.py")
    cmd.append(f"-B {data_path}:/data")
    cmd.append(f"-B {output_path}:/output")
    cmd.append(f"-B {working_path}:/working")
    cmd.append(f"-B {json_path}:/job.json")
    cmd.append(f"{singularity_path} python /worker.py /job.json")
    
    slurm_output = os.path.join(output_path, job_name, f"sub-{sub}")
    slurm_working = os.path.join(working_path, job_name, f"sub-{sub}")
    
    slurm_footer = []
    slurm_footer.append("")
    slurm_footer.append("")
    slurm_footer.append(f"chmod -R 775 {slurm_output}")
    slurm_footer.append(f"chmod -R 775 {slurm_working}")
    
    slurm_cmd = "\n".join(slurm_header) + " \\\n  ".join(cmd) + "\n".join(slurm_footer) 
    
    with open(os.path.join(slurm_path, f"sub-{sub}.job"), 'w') as f:
        f.write(slurm_cmd)    

### TEST DRIVE ONE MODEL (SLURM JOB)

In [96]:
print(f"To test drive one job (sub-{sub}), copy and paste the following lines in terminal:")
print("")

print(" \\\n  ".join(cmd))

print("----")

To test drive one job (sub-DP6), copy and paste the following lines in terminal:

singularity run --cleanenv \
  -B /data00/projects/megameta/scripts/jupyter_megameta/cnlab_pipeline/cnlab/GLM/l1analysis_SPM.py:/worker.py \
  -B /data00/projects/megameta/darpa1:/data \
  -B /data00/projects/megameta/darpa1/derivatives/nipype:/output \
  -B /data00/projects/megameta/darpa1/working/nipype:/working \
  -B /data00/projects/megameta/darpa1/models/task-share_model-beta/jobs/sub-DP6.json:/job.json \
  /data00/tools/singularity_images/neurodocker.sqsh python /worker.py /job.json
----


### RUN ALL MODELS (SLURM JOBS)

In [97]:
print("Alternatively, submit the whole batch by copying and pasting the following lines in terminal:")
print("")

print(f"cd {slurm_path}")
for sub in subs:
    print(f"sbatch -D {slurm_path} -c 8 sub-{sub}.job")
print(" ")

Alternatively, submit the whole batch by copying and pasting the following lines in terminal:

cd /data00/projects/megameta/darpa1/models/task-share_model-beta/slurm
sbatch -D /data00/projects/megameta/darpa1/models/task-share_model-beta/slurm -c 8 sub-DP111.job
sbatch -D /data00/projects/megameta/darpa1/models/task-share_model-beta/slurm -c 8 sub-DP113.job
sbatch -D /data00/projects/megameta/darpa1/models/task-share_model-beta/slurm -c 8 sub-DP121.job
sbatch -D /data00/projects/megameta/darpa1/models/task-share_model-beta/slurm -c 8 sub-DP122.job
sbatch -D /data00/projects/megameta/darpa1/models/task-share_model-beta/slurm -c 8 sub-DP147.job
sbatch -D /data00/projects/megameta/darpa1/models/task-share_model-beta/slurm -c 8 sub-DP157.job
sbatch -D /data00/projects/megameta/darpa1/models/task-share_model-beta/slurm -c 8 sub-DP186.job
sbatch -D /data00/projects/megameta/darpa1/models/task-share_model-beta/slurm -c 8 sub-DP21.job
sbatch -D /data00/projects/megameta/darpa1/models/task-shar