In [None]:
def pybabyafq(subject, session):    
    import os.path as op
    import os
    from dipy.data.fetcher import _make_fetcher
    import nibabel as nib
    import s3fs
    import json
    from AFQ import api
    from AFQ.definitions.mapping import AffMap
    from AFQ.definitions.mask import MaskFile

    fs = s3fs.S3FileSystem()
    afq_home = op.join(op.expanduser('~'), 'AFQ_data')
    os.makedirs(afq_home, exist_ok=True)
    
    ##########################################################################
    # Step 0: Pediatric templates
    ##########################################################################

    print('Step 0: Fetching pediatric templates...', flush=True)

    pediatric_fnames = [
        "ATR_roi1_L.nii.gz", "ATR_roi1_R.nii.gz",
        "ATR_roi2_L.nii.gz", "ATR_roi2_R.nii.gz",
        "ATR_roi3_L.nii.gz", "ATR_roi3_R.nii.gz",
        "CGC_roi1_L.nii.gz", "CGC_roi1_R.nii.gz",
        "CGC_roi2_L.nii.gz", "CGC_roi2_R.nii.gz",
        "CGC_roi3_L.nii.gz", "CGC_roi3_R.nii.gz",
        "CST_roi1_L.nii.gz", "CST_roi1_R.nii.gz",
        "CST_roi2_L.nii.gz", "CST_roi2_R.nii.gz",
        "FA_L.nii.gz", "FA_R.nii.gz",
        "FP_L.nii.gz", "FP_R.nii.gz",
        "HCC_roi1_L.nii.gz", "HCC_roi1_R.nii.gz",
        "HCC_roi2_L.nii.gz", "HCC_roi2_R.nii.gz",
        "IFO_roi1_L.nii.gz", "IFO_roi1_R.nii.gz",
        "IFO_roi2_L.nii.gz", "IFO_roi2_R.nii.gz",
        "IFO_roi3_L.nii.gz", "IFO_roi3_R.nii.gz",
        "ILF_roi1_L.nii.gz", "ILF_roi1_R.nii.gz",
        "ILF_roi2_L.nii.gz", "ILF_roi2_R.nii.gz",
        "LH_Parietal.nii.gz", "RH_Parietal.nii.gz",
        "MdLF_roi1_L.nii.gz", "MdLF_roi1_R.nii.gz",
        "SLF_roi1_L.nii.gz", "SLF_roi1_R.nii.gz",
        "SLF_roi2_L.nii.gz", "SLF_roi2_R.nii.gz",
        "SLFt_roi2_L.nii.gz", "SLFt_roi2_R.nii.gz",
        "SLFt_roi3_L.nii.gz", "SLFt_roi3_R.nii.gz",
        "UNC_roi1_L.nii.gz", "UNC_roi1_R.nii.gz",
        "UNC_roi2_L.nii.gz", "UNC_roi2_R.nii.gz",
        "UNC_roi3_L.nii.gz", "UNC_roi3_R.nii.gz",
        "VOF_box_L.nii.gz", "VOF_box_R.nii.gz",
        "UNCNeo-withCerebellum-for-babyAFQ.nii.gz",
        "UNCNeo_JHU_tracts_prob-for-babyAFQ.nii.gz",
        "mid-saggital.nii.gz"
    ]

    pediatric_md5_hashes = [
        "2efe0deb19ac9e175404bf0cb29d9dbd", "c2e07cd50699527bd7b0cbbe88703c56",
        "76b36d8d6759df58131644281ed16bd2", "645102225bad33da30bafd41d24b3ab0",
        "45ec94d42fdc9448afa6760c656920e9", "54e3cb1b8c242be279f1410d8bb3c383",
        "1ee9f7e8b21ef8ceee81d5a7a178ef33", "4f11097f7ae317aa8d612511be79e2f1",
        "1c4c0823c23b676d6d35004d93b9c695", "d4830d558cc8f707ebec912b32d197a5",
        "c405e0dbd9a4091c77b3d1ad200229b4", "ec0aeccc6661d2ee5ed79259383cdcee",
        "2802cd227b550f6e85df0fec1d515c29", "385addb999dc6d76957d2a35c4ee74bb",
        "b79f01829bd95682faaf545c72b1d52c", "b79f01829bd95682faaf545c72b1d52c",
        "e49ba370edca96734d9376f551d413db", "f59e9e69e06325198f70047cd63c3bdc",
        "ae3bd2931f95adae0280a8f75cd3ca9b", "c409a0036b8c2dd4d03d11fbc6bfbdcd",
        "c2597a474ea5ec9e3126c35fd238f6b2", "67af59c934147c9f9ff6e0b76c4cc6eb",
        "72d0bbc0b6162e9291fdc450c103a1f0", "51f5041be63ad0ac10d1ac09e3bf1a8e",
        "6200f5cdc1402dce46dedd505468b147", "83cb5bf6b9b1eda63c8643871e84a6d4",
        "2a5d8d309b1256d6e48958e112201c2c", "ba24d0915fdff215a403480d0a7745c9",
        "1001e833d1344274d543ffd02a66af80", "03e20c5eebcd4d439c4ffb36d26a10d9",
        "6200f5cdc1402dce46dedd505468b147", "83cb5bf6b9b1eda63c8643871e84a6d4",
        "a6ae325cce2dc4bb21b52ee4c6ca844f", "a96a31df8f96ccf1c5f871a18e4a2d72",
        "65b7378ca85689a5f23b1b84a6ed78f0", "ce0d0ea696ef51c671aa118e11192e2d",
        "ce4918086ca1e829698576bcf95db62b", "96168d2eff74ec2acf9065f499732526",
        "6b20ba319d44472ec21d6ece937605bb", "26b1cf6ec8bd365dde42e3efe9beeac2",
        "0b3ccf06564d973bfcfff9a87e74f8b5", "84f3426033a2b225b0920b2622864375",
        "5351b3cb7efa9aa8e83e266342809ebe", "4e8a34aaba4e0f22a6149f38620dc39d",
        "682c08f66e8c2cf9e4e60f5ce308c76c", "9077affd4f3a8a1d6b44678cde4b3bf4",
        "5adf36f00669cc547d5eb978acf46266", "66a8002688ffdf3943722361da90ec6a",
        "efb5ae138df92019541861f9aa6a4d57", "757ec61078b2e9f9a073871b3216ff7a",
        "ff1e238c52a21f8cc5d44ac614d9627f", "cf16dd2767c6ab2d3fceb2890f6c3e41",
        "6016621e244b60b9c69fd44b055e4a03", "fd495a2c94b6b13bfb4cd63e293d3fc0",
        "bf81a23d80f55e5f1eb0c16717193105",
        "6f8bf8f70216788d14d9a49a3c664b16",
        "19df0297d6a2ac21da5e432645d63174",
    ]

    pediatric_remote_fnames = [
        "24880625", "24880628", "24880631", "24880634", "24880637", "24880640",
        "24880643", "24880646", "24880649", "24880652", "24880655", "24880661",
        "24880664", "24880667", "24880670", "24880673", "24880676", "24880679",
        "24880685", "24880688", "24880691", "24880694", "24880697", "24880700",
        "24880703", "24880706", "24880712", "24880715", "24880718", "24880721",
        "24880724", "24880727", "24880730", "24880733", "24880736", "24880748",
        "24880739", "24880742", "24880754", "24880757", "24880760", "24880763",
        "24880769", "24880772", "24880775", "24880778", "24880781", "24880787",
        "24880790", "24880793", "24880796", "24880802", "24880805", "24880808",
        "24880616", "24880613", "24986396"
    ]

    template_home = op.join(afq_home, 'pediatric_templates')
    os.makedirs(template_home, exist_ok=True)
    
    fetch_pediatric_templates = _make_fetcher(
        'fetch_pediatric_templates',
        template_home,
        'https://ndownloader.figshare.com/files/',
        pediatric_remote_fnames,
        pediatric_fnames,
        md5_list=pediatric_md5_hashes,
        doc='Download pediatric templates'
    )

    def read_pediatric_templates():
        """
        Load pediatric pyAFQ templates. 
        
        Used to create pediatric `bundle_dict`.

        Returns
        -------
        dict : 
            keys = names of template ROIs, and 
            values = `Nifti1Image` from each of the ROI nifti files.
        """
        files, folder = fetch_pediatric_templates()

        print('Loading pediatric templates...', flush=True)
        pediatric_templates = {}
        for f in files:
            img = nib.load(op.join(folder, f))
            pediatric_templates[f.split('.')[0]] = img

        # For the arcuate (AF/ARC), reuse the SLF ROIs
        pediatric_templates['ARC_roi1_L'] = pediatric_templates['SLF_roi1_L']
        pediatric_templates['ARC_roi1_R'] = pediatric_templates['SLF_roi1_R']
        pediatric_templates['ARC_roi2_L'] = pediatric_templates['SLFt_roi2_L']
        pediatric_templates['ARC_roi2_R'] = pediatric_templates['SLFt_roi2_R']
        pediatric_templates['ARC_roi3_L'] = pediatric_templates['SLFt_roi3_L']
        pediatric_templates['ARC_roi3_R'] = pediatric_templates['SLFt_roi3_R']

        # For the middle longitudinal fasciculus (MdLF) reuse ILF ROI
        pediatric_templates['MdLF_roi2_L'] = pediatric_templates['ILF_roi2_L']
        pediatric_templates['MdLF_roi2_R'] = pediatric_templates['ILF_roi2_R']

        return pediatric_templates


    pediatric_templates = read_pediatric_templates()

    ##########################################################################
    # Step 1: Pediatric bundle specification
    ##########################################################################

    print('Step 1: Creating pediatric bundle specification...', flush=True)

    pediatric_bundle_names = [
        "ARC",  # 'Arcuate Fasciculus'
        "ATR",  # 'Thalamic Radiation'
        "CGC",  # 'Cingulum Cingulate'
        "CST",  # 'Corticospinal'
        "FA",   # 'Forceps Minor'
        "FP",   # 'Forceps Major'
        "IFO",  # 'Inferior Fronto-occipital'
        "ILF",  # 'Inferior Longitudinal Fasciculus'
        "MdLF", # 'Middle Longitudinal Fasciculus'
        "SLF",  # 'Superior Longitudinal Fasciculus'
        "UNC"   # 'Uncinate Fasciculus'
    ]

    def make_pediatric_bundle_dict(bundle_names=pediatric_bundle_names):
        """
        Create pyAFQ bundle dictionary object for pediatric subjects.
        
        Parameters
        ----------
        bundle_names : (optional) list of pediatric bundle names, used to
        generate `bundle_dict`. by default all pediatric bundles are included.
        
        Returns
        -------
        dict : pyAFQ `bundle_dict`
        """
        # pediatric probability maps
        prob_map_order = ["ATR_L", "ATR_R", "CST_L", "CST_R", "CGC_L", "CGC_R",
                          "HCC_L", "HCC_R", "FP", "FA", "IFO_L", "IFO_R", "ILF_L",
                          "ILF_R", "SLF_L", "SLF_R", "UNC_L", "UNC_R",
                          "ARC_L", "ARC_R", "MdLF_L", "MdLF_R"]

        prob_maps = pediatric_templates['UNCNeo_JHU_tracts_prob-for-babyAFQ']
        prob_map_data = prob_maps.get_fdata()

        # pediatric bundle dict
        pediatric_bundles = {}

        # each bundles gets a digit identifier (to be stored in the tractogram)
        uid = 1

        for name in bundle_names:
            # ROIs that cross the mid-line
            if name in ["FA", "FP"]:
                pediatric_bundles[name] = {
                    'ROIs': [pediatric_templates[name + "_L"],
                             pediatric_templates[name + "_R"],
                             pediatric_templates["mid-saggital"]],
                    'rules': [True, True, True],
                    'cross_midline': True,
                    'prob_map': prob_map_data[...,
                                              prob_map_order.index(name)],
                    'uid': uid}
                uid += 1
            # SLF is a special case, because it has an exclusion ROI:
            elif name == "SLF":
                for hemi in ['_R', '_L']:
                    pediatric_bundles[name + hemi] = {
                        'ROIs': [pediatric_templates[name + '_roi1' + hemi],
                                 pediatric_templates[name + '_roi2' + hemi],
                                 pediatric_templates["SLFt_roi2" + hemi]],
                        'rules': [True, True, False],
                        'cross_midline': False,
                        'prob_map': prob_map_data[...,
                                                  prob_map_order.index(name + hemi)],
                        'uid': uid}
                    uid += 1
            # Third ROI for curvy tracts
            elif name in ["ARC", "ATR", "CGC", "IFO", "UNC"]:
                for hemi in ['_R', '_L']:
                    pediatric_bundles[name + hemi] = {
                        'ROIs': [pediatric_templates[name + '_roi1' + hemi],
                                 pediatric_templates[name + '_roi2' + hemi],
                                 pediatric_templates[name + '_roi3' + hemi]],
                        'rules': [True, True, True],
                        'cross_midline': False,
                        'prob_map': prob_map_data[...,
                                                  prob_map_order.index(name + hemi)],
                        'uid': uid}
                    uid += 1
            elif name == "MdLF":
                for hemi in ['_R', '_L']:
                    pediatric_bundles[name + hemi] = {
                        'ROIs': [pediatric_templates[name + '_roi1' + hemi],
                                 pediatric_templates[name + '_roi2' + hemi]],
                        'rules': [True, True],
                        'cross_midline': False,
                        # reuse probability map from ILF
                        'prob_map': prob_map_data[...,
                                                  prob_map_order.index("ILF" + hemi)],
                        'uid': uid}
                    uid += 1
            # Default: two ROIs within hemisphere
            else:
                for hemi in ['_R', '_L']:
                    pediatric_bundles[name + hemi] = {
                        'ROIs': [pediatric_templates[name + '_roi1' + hemi],
                                 pediatric_templates[name + '_roi2' + hemi]],
                        'rules': [True, True],
                        'cross_midline': False,
                        'prob_map': prob_map_data[...,
                                                  prob_map_order.index(name + hemi)],
                        'uid': uid}
                    uid += 1

        return pediatric_bundles

    ##########################################################################
    # Step 2: Retrieve mrtrix derivatives
    ##########################################################################
    print('Step 2: Downloading mrtrix derivatives...', flush=True)
    
    dhcp_home = op.join(afq_home, 'dhcp')
    os.makedirs(dhcp_home, exist_ok=True)
    
    dataset_description = {
        "Name" : "dHCP",
        "BIDSVersion" : "1.4.0"
    }

    with open(op.join(dhcp_home, 'dataset_description.json'), 'w') as f:
        json.dump(dataset_description, f)
    
    mrtrix_derivatives = op.join(dhcp_home, 'derivatives', 'mrtrix')
    os.makedirs(mrtrix_derivatives, exist_ok=True)
    
    fs.get(
        f'dhcp-afq/mrtrix/dataset_description.json',
        op.join(mrtrix_derivatives, 'dataset_description.json')
    )
    
    subject_session_dir = op.join(mrtrix_derivatives, f'sub-{subject}', f'ses-{session}')
    os.makedirs(subject_session_dir, exist_ok=True)
    
    files = [
        f'sub-{subject}_ses-{session}_desc-csd_tractography.tck',
        f'sub-{subject}_ses-{session}_desc-preproc_resliced_aligned_dwi.bval',
        f'sub-{subject}_ses-{session}_desc-preproc_resliced_aligned_dwi.bvec',
        f'sub-{subject}_ses-{session}_desc-preproc_resliced_aligned_dwi.nii.gz',
        f'sub-{subject}_ses-{session}_desc-preproc_space_dwi_resliced_aligned_brainmask.nii.gz'
    ]
    
    for file in files:
        fs.get(
            f'dhcp-afq/mrtrix/sub-{subject}/ses-{session}/{file}',
            op.join(subject_session_dir, file)
        )

    ##########################################################################
    # Step 3: DTI
    ##########################################################################
    print('Step 3: DTI', flush=True)
    
    myafq_dti = api.AFQ(
        bids_path=dhcp_home,
        dmriprep='dHCP neonatal MRtrix pipeline',
        custom_tractography_bids_filters={'scope':'dHCP neonatal MRtrix pipeline', 'suffix':'tractography'},
        reg_template=pediatric_templates['UNCNeo-withCerebellum-for-babyAFQ'],
        reg_subject='b0',
        mapping=AffMap(),
        brain_mask=MaskFile('brainmask'),
        max_bval=1000,
        bundle_info=make_pediatric_bundle_dict(),
        segmentation_params={'filter_by_endpoints': False, 'dist_to_waypoint': 0.75},
        clean_params={'distance_threshold': 3}
    )

    # export all AFQ artifacts
    myafq_dti.export_all()

    ##########################################################################
    # Step 4: DKI
    ##########################################################################

    print('Step 4: DKI', flush=True)
    myafq_dki = api.AFQ(
        bids_path=dhcp_home,
        dmriprep='dHCP neonatal MRtrix pipeline',
        custom_tractography_bids_filters={'scope':'dHCP neonatal MRtrix pipeline', 'suffix':'tractography'},
        reg_template=pediatric_templates['UNCNeo-withCerebellum-for-babyAFQ'],
        reg_subject='b0',
        mapping=AffMap(),
        brain_mask=MaskFile('brainmask'),
        bundle_info=make_pediatric_bundle_dict(),
        segmentation_params={'filter_by_endpoints': False, 'dist_to_waypoint': 0.75},
        clean_params={'distance_threshold': 3}
    )
    
    myafq_dki.dki_fa[0]
    myafq_dki.dki_md[0]
    myafq_dki.dki_mk[0]

    ##########################################################################
    # Step 5: Tract Profiles
    ##########################################################################

    print('Step 5: Tract Profiles', flush=True)
    
    # delete tract profile file prior to running
    row = myafq_dki.data_frame.iloc[0]
    tract_profiles_fname = myafq_dki._get_fname(row, '_profiles.csv')
    os.remove(tract_profiles_fname)
    
    myafq_all = api.AFQ(
        bids_path=dhcp_home,
        dmriprep='dHCP neonatal MRtrix pipeline',
        custom_tractography_bids_filters={'scope':'dHCP neonatal MRtrix pipeline', 'suffix':'tractography'},
        reg_template=pediatric_templates['UNCNeo-withCerebellum-for-babyAFQ'],
        reg_subject='b0',
        scalars=["dti_fa", "dti_md", "dki_fa", "dki_md", "dki_mk"],
        mapping=AffMap(),
        brain_mask=MaskFile('brainmask'),
        bundle_info=make_pediatric_bundle_dict(),
        segmentation_params={'filter_by_endpoints': False, 'dist_to_waypoint': 0.75},
        clean_params={'distance_threshold': 3}
    )
    
    # regenerate tract profiles for all scalars
    myafq_all.tract_profiles

    ##########################################################################
    # Step 6: Upload pybabyafq derivatives
    ##########################################################################
    
    print('Step 6: Uploading pyAFQ derivatives...', flush=True)
    
    afq_derivatives = op.join(dhcp_home, 'derivatives', 'afq')
    
    fs.put(afq_derivatives, 'dhcp-afq/afq/', recursive=True)

In [None]:
def get_subject_session_pair(bucket_path):
    """
    find subject session tuples from s3 file system
    
    not all subjects have corresponding session data, and some have multiple
    
    there is no metadata that lists theses pairs, so traverse the bucket
    to identify
    
    Parameters
    ----------
    bucket_path : string
    
    Returns
    -------
    list of tuples containing subject_id and session_id
    """
    import s3fs
    fs = s3fs.S3FileSystem()

    subject_session_pairs = []

    for file in fs.ls(bucket_path):
        if fs.isdir(file):
            # directory bucket_path/sub-<subid>       
            subject = file.split('/')[-1].split('-')[-1]
            for file2 in fs.ls(file):
                if fs.isdir(file2):
                    # directory bucket_path/sub-<subid>/ses-<sesid>
                    session = file2.split('/')[-1].split('-')[-1]
                    subject_session_pairs.append((subject, session))
    
    return subject_session_pairs


def check_anat_requirements(args):
    """
    not all subjects have ribbon file. 
    this is common point of failure in pipeline.
    remove subject from list.
    """
    
    import s3fs
    fs = s3fs.S3FileSystem()

    return [arg for arg in args if fs.exists(f'dhcp-afq/dhcp_anat_pipeline/sub-{arg[0]}/ses-{arg[1]}/anat/sub-{arg[0]}_ses-{arg[1]}_desc-ribbon_space-T2w_dseg.nii.gz')]


# anat and dmri have different subject session pairs 558 and 490 respectively
# will only want the intesection
dhcp_anat_sub_ses = get_subject_session_pair('dhcp-afq/dhcp_anat_pipeline/')
dhcp_dmri_sub_ses = get_subject_session_pair('dhcp-afq/dhcp_dmri_pipeline/')

args = sorted(set(dhcp_anat_sub_ses) & set(dhcp_dmri_sub_ses))
args = check_anat_requirements(args)

In [None]:
import cloudknot as ck

In [None]:
from datetime import datetime
timestamp = datetime.now().isoformat()[:-7].replace(':','-')[:-3]
timestamp

In [None]:
knot = ck.Knot(
    name='dhcp-' + timestamp,
    func=pybabyafq,
    base_image='python:3.7',
    image_github_installs='https://github.com/yeatmanlab/pyAFQ.git',
    pars_policies=('AmazonS3FullAccess',),
    job_def_vcpus=8,
    max_vcpus=8*len(args),
    memory=64000,  # in MB
    volume_size=250, # in GB
    bid_percentage=105 # use spot instance
)

In [None]:
result_futures = knot.map(args, starmap=True)