arguments

**NOTE `662551` not in `HCP_1200`**

`s3://profile-hcp-west/hcp_reliability/single_shell/hcp_1200_afq/`

In [1]:
subjects = [
    '103818', '105923', '111312', '114823', '115320',
    '122317', '125525', '130518', '135528', '137128',
    '139839', '143325', '144226', '146129', '149337',
    '149741', '151526', '158035', '169343', '172332',
    '175439', '177746', '185442', '187547', '192439',
    '194140', '195041', '200109', '200614', '204521',
    '250427', '287248', '341834', '433839', '562345',
    '599671', '601127', '627549', '660951', # '662551', 
    '783462', '859671', '861456', '877168', '917255'
]
session_names = ['HCP_1200', 'HCP_Retest']
bundle_names = ['SLF_L']
# bundle_names = ['SLF_L', 'SLF_R']
# bundle_names = ['SLF_L', 'SLF_R', 'ARC_L', 'ARC_R', 'CST_L', 'CST_R']
# n_clusters = [2, 3, 4]
n_clusters = [3]

In [8]:
import itertools
# args = list(itertools.product(subjects, session_names, bundle_names))
args = list(itertools.product(subjects, session_names, bundle_names, n_clusters))

In [21]:


def job_args(ids):
    import numpy as np
    
    return list(map(tuple, np.array(args)[ids].tolist()))
    
# TODO use knot to determine failed and running jobs
# may want to stop running and rerun these with larger memory and disk allocations
def failed_jobs():
    import numpy as np
    # failed_job_ids = [19,22,81,101,148,149]
    failed_job_ids = [19,22,81,101]
#     print(len(failed_job_ids))
    return job_args(np.array(failed_job_ids)+8)

def running_jobs():
    import numpy as np
    running_job_ids = [1,3,13,15,35,59,83,117,119,131,137,139,141,143,147,163]
#     print(len(running_job_ids))
    return job_args(np.array(running_job_ids)+8)

def rerun_targets():
    import numpy as np
    # failed twice
    targets = list(map(tuple, np.array([*failed_jobs(), *running_jobs()])[[0,5,6,7,10,12]]))
    return targets

# print([*failed_jobs(), *running_jobs()])
# print(rerun_targets())
print(job_args([14,15,62,63]))

[('130518', 'HCP_1200', 'SLF_L', '3'), ('130518', 'HCP_Retest', 'SLF_L', '3'), ('287248', 'HCP_1200', 'SLF_L', '3'), ('287248', 'HCP_Retest', 'SLF_L', '3')]


all-in-one: `subbundle3`, `subbundle4`, and `subbundle5`

In [17]:
def subbundle(subject, session, bundle_name, n_clusters):
    import time
    import s3fs
    import numpy as np
    import pandas as pd
    from dipy.io.streamline import load_tractogram, save_tractogram
    from dipy.io.stateful_tractogram import StatefulTractogram
    from dipy.io.utils import create_nifti_header, get_reference_info
    from dipy.tracking.streamline import set_number_of_points, values_from_volume, bundles_distances_mdf
    import dipy.tracking.utils as dtu
    import nibabel as nib
    from fastdtw import fastdtw
    from sklearn.cluster import SpectralClustering
    from graspologic.embed import MultipleASE as MASE
    from graspologic.cluster import KMeansCluster
    
    def rrss(y, yhat):        
        residuals = y - yhat
        rss = np.dot(residuals.T, residuals)
        rrss = np.sqrt(rss)
        
        return rrss

    def relabel_clusters(cluster_labels):
        from_values = np.flip(np.argsort(np.bincount(cluster_labels))[-(np.unique(cluster_labels).size):])
        to_values = np.arange(from_values.size)

        d = dict(zip(from_values, to_values))

        new_cluster_labels = np.copy(cluster_labels)

        for k, v in d.items():
            new_cluster_labels[cluster_labels == k] = v

        return new_cluster_labels
        
    def density_map(tractogram):
        affine, vol_dims, voxel_sizes, voxel_order = get_reference_info(tractogram)
        tractogram_density = dtu.density_map(tractogram.streamlines, np.eye(4), vol_dims)
        tractogram_density = np.uint8(tractogram_density)
        nifti_header = create_nifti_header(affine, vol_dims, voxel_sizes)
        density_map_img = nib.Nifti1Image(tractogram_density, affine, nifti_header)
        
        return density_map_img
        
    def save_cluster_tractograms_and_density_maps(bundle_tractogram, model_prefix, cluster_labels):        
        for cluster_label in np.unique(cluster_labels):
            # tractogram
            f_name = f'{model_prefix}_cluster_{cluster_label}.trk'
            cluster_indicies = np.array(np.where(cluster_labels == cluster_label)[0])
            tg = StatefulTractogram.from_sft(bundle_tractogram.streamlines[cluster_indicies], bundle_tractogram)
            save_tractogram(tg, f_name, bbox_valid_check=False)
            print('saving cluster tractogram: ', f_name)
            
            # density map -- 8-bit unsigned int and gz
            f_name = f'{model_prefix}_cluster_{cluster_label}_density_map.nii.gz'
            tg.to_vox()
            nib.save(density_map(tg), f_name)
            print('saving cluster density map: ', f_name)
            
            # TODO save cluster afq_profile
            

    print("begin", subject, session, bundle_name, n_clusters)
    
    fs = s3fs.S3FileSystem()
    
    ### fractional anisotropy scalar file ###
    scalar_filename = 'FA.nii.gz'
    print('loading scalar file: ', scalar_filename)
    tic = time.perf_counter()
    
    fs.get(
        (
            f'profile-hcp-west/hcp_reliability/single_shell/'
            f'{session.lower()}_afq/sub-{subject}/ses-01/'
            f'sub-{subject}_dwi_model-DTI_FA.nii.gz'
        ),
        f'{scalar_filename}'
    )
    
    scalar_data = nib.load(scalar_filename).get_fdata()
    toc = time.perf_counter()
    print(f'scalar file: {toc - tic:0.4f} seconds')
    
    ### clean single shell deterministic bundle tractography ###
    tractogram_filename = f'{bundle_name}.trk'
    print('loading tractogram: ', tractogram_filename)
    tic = time.perf_counter()

    fs.get(
        (
            f'profile-hcp-west/hcp_reliability/single_shell/'
            f'{session.lower()}_afq/sub-{subject}/ses-01/'
            f'clean_bundles/sub-{subject}_dwi_space-RASMM_model-DTI_desc-det-afq-{bundle_name}_tractography.trk'
        ),
        f'{tractogram_filename}'
    )
    
    tractogram = load_tractogram(tractogram_filename, 'same')
    toc = time.perf_counter()
    print(f'tractogram file: {toc - tic:0.4f} seconds')
    
    # TODO save bundle_density map
    
    # TODO save bundle afq_profile/values_from_volume
    
    ### streamline profile ###
    print('calculating streamline profiles')
    tic = time.perf_counter()
    n_points = 100
    
    fgarray = set_number_of_points(tractogram.streamlines, n_points)
    
    if len(fgarray) == 0:
        return
    
    # TODO compare to afq_profile
    values = np.array(values_from_volume(scalar_data, fgarray, tractogram.affine))
    f_name = 'streamline_profile_fa.npy'
    np.save(f_name, values)
    print('saving: ', f_name)
    toc = time.perf_counter()

    print('values:', values.shape)
    print(f'streamline profile: {toc - tic:0.4f} seconds')
    
    ### pairwise warped streamline profile ###
    print('calculating pairwise warp')
    tic = time.perf_counter()
    dtw_values = np.zeros((values.shape[0], values.shape[0], values.shape[1]))
                
    for i, a in enumerate(values):
        for j, b in enumerate(values):
            _, path = fastdtw(a,b)
            path = np.array(path)
            dtw_value = a[np.append(path[np.where(path[:,1][:-1] != path[:,1][1:]),0][0], len(values.T)-1)]
            dtw_values[i,j] = dtw_value
            
    f_name = 'streamline_profile_dtw_pairwise_warped_fa.npy'
    np.save(f_name, dtw_values)
    print('saving: ', f_name)
    toc = time.perf_counter()
    
    print('dtw_values:', dtw_values.shape)
    print(f'dtw {toc - tic:0.4f} seconds')
    
    ### inverse scaled mdf (minimum average direct-flip) ###
    print('calculating mdf')
    tic = time.perf_counter()
    mdf = bundles_distances_mdf(fgarray, fgarray)
    
    # enforce symmetry
    mdf = (mdf + mdf.T) / 2
    
    # inverse scale
    is_mdf = (mdf.max() - mdf)
    is_mdf = is_mdf / is_mdf.max()

    f_name = 'adjacency_is_mdf.npy'
    np.save(f_name, is_mdf)
    print('saving: ', f_name)
    toc = time.perf_counter()
    
    print('is_mdf:', is_mdf.shape)
    print(f'mdf {toc - tic:0.4f} seconds')
    
    ### pairwise warped fa z-score(sqrt(RSS)) ###
    print('calculating nnrrss')
    tic = time.perf_counter()
    dtw_pairwise_fa_rrss = np.array(
        [
            [
                rrss(dtw_values[i][j], values[i]) 
                 for j in np.ndindex(dtw_values.shape[1])
            ]
            for i in np.ndindex(dtw_values.shape[0])
        ]
    )
    
    # standardize and invert
    from scipy import stats
    dtw_pairwise_fa_nnrrss = -1*stats.zscore(dtw_pairwise_fa_rrss)
    
    # enforce symmetry
    dtw_pairwise_fa_nnrrss += dtw_pairwise_fa_nnrrss.T
    dtw_pairwise_fa_nnrrss = dtw_pairwise_fa_nnrrss/2
    
    f_name = 'adjacency_pairwise_fa_nnrrss.npy'
    np.save(f_name, dtw_pairwise_fa_nnrrss)
    print('saving: ', f_name)
    toc = time.perf_counter()

    print('dtw_pairwise_fa_nnrrss:', dtw_pairwise_fa_nnrrss.shape)
    print(f'neg_normalized_rrss: {toc - tic:0.4f} seconds')
    
    ### weighted adjacencies ###
    print('calculating weighted adjacencies')
    tic = time.perf_counter()
    alphas = np.linspace(0,10,11)/10
    
    for alpha in alphas:
        wt_dtw_pairwise_fa_nnrrss_is_mdf = alpha * dtw_pairwise_fa_nnrrss + (1 - alpha) * is_mdf
        f_name = f'adjacency_wt_{int(alpha*10)}_pairwise_fa_nnrrss_{int((1 - alpha)*10)}_is_mdf.npy'
        np.save(f_name, wt_dtw_pairwise_fa_nnrrss_is_mdf)
        print('saving: ', f_name)
        print('wt_dtw_pairwise_fa_nnrrss_is_mdf:', wt_dtw_pairwise_fa_nnrrss_is_mdf.shape)

    toc = time.perf_counter()
    print(f'weighted adjacencies {toc - tic:0.4f} seconds')
    
    ### spectral clustering ###
    print('spectral clustering')
    tic = time.perf_counter()
    sc = SpectralClustering(affinity="precomputed", n_clusters=n_clusters)
    
    alphas = np.linspace(0,10,11)/10
    
    for alpha in alphas:
        name = f'wt_{int(alpha*10)}_pairwise_fa_nnrrss_{int((1 - alpha)*10)}_is_mdf'
        
        adjacency = np.load(f'adjacency_{name}.npy')
        
        sc_idx = sc.fit(np.absolute(adjacency)).labels_
        
        sc_idx = relabel_clusters(sc_idx)
    
        f_name = f'sc_{name}_idx.npy'
        np.save(f_name, sc_idx)
        print('saving: ', f_name)
        
        save_cluster_tractograms_and_density_maps(tractogram, f'sc_{name}', sc_idx)
        
    toc = time.perf_counter()
    print(f'spectral clustering {toc - tic:0.4f} seconds')
    
    ### mase ###
    print('mase')
    tic = time.perf_counter()
    embedder = MASE()
    
    tissue = np.load('adjacency_pairwise_fa_nnrrss.npy')
    distance = np.load('adjacency_is_mdf.npy')
    
    V_hat = embedder.fit_transform([tissue, distance])
    clusterer = KMeansCluster(n_clusters)
    mase_idx = clusterer.fit_predict(V_hat)
    
    mase_idx = relabel_clusters(mase_idx)
    
    f_name = 'mase_pairwise_fa_nnrrss_is_mdf_idx.npy'
    np.save(f_name, mase_idx)
    print('saving: ', f_name)
    
    save_cluster_tractograms_and_density_maps(tractogram, 'mase_pairwise_fa_nnrrss_is_mdf', mase_idx)
    
    toc = time.perf_counter()
    print(f'mase {toc - tic:0.4f} seconds')
    
    ### upload everything to s3 ###
    fs.put('*.npy', f'hcp-subbundle/{session}/{bundle_name}/{subject}/{n_clusters}')
    fs.put('*.nii.gz', f'hcp-subbundle/{session}/{bundle_name}/{subject}/{n_clusters}')
    fs.put('*.trk', f'hcp-subbundle/{session}/{bundle_name}/{subject}/{n_clusters}')
    
    print("end", subject, session, bundle_name, n_clusters)

In [4]:
import cloudknot as ck
ck.set_region('us-west-2')

In [18]:
from datetime import datetime
knot = ck.Knot(
    name='hcp-subbundle-' + datetime.now().isoformat()[:-7].replace(':','-'),
    func=subbundle,
    base_image='python:3.8',
    pars_policies=('AmazonS3FullAccess',),
    memory=32000,  # in MB
    volume_size=50,  # in GB
    bid_percentage=105)



In [19]:
result_futures = knot.map(args, starmap=True)
# result_futures = knot.map([*failed_jobs(), *running_jobs()], starmap=True)
# result_futures = knot.map(rerun_targets(), starmap=True)

In [20]:
knot.view_jobs()

Job ID              Name                        Status   
---------------------------------------------------------
6d3b4c54-64b1-4a8e-82f2-35d70ec26ae9        hcp-subbundle-2021-01-16T17-25-31-0        SUBMITTED


In [22]:
knot.jobs[0].status

{'status': 'FAILED',
 'statusReason': 'Array Child Job failed',
 'attempts': [],
 'arrayProperties': {'statusSummary': {'STARTING': 0,
   'FAILED': 4,
   'RUNNING': 0,
   'SUCCEEDED': 84,
   'RUNNABLE': 0,
   'SUBMITTED': 0,
   'PENDING': 0},
  'size': 88}}