In [1]:
# Standard Library Imports
import os
import sys
from subprocess import call, check_output
import json
import time

# Third-Party Library Imports
import numpy as np
import boto3
from botocore import UNSIGNED
from botocore.client import Config
import webdataset as wds
import nibabel as nib
import pickle as pkl
import h5py
from PIL import Image
import matplotlib.pyplot as plt

In [2]:
worker_id = int(sys.argv[1])

In [3]:
temp_dir = os.getcwd() + f"/temp{worker_id}" # the folder where the AFNI container will do its work
mni_dir = os.getcwd() + f"/MNIs{worker_id}" # the folder where MNI outputs will go
os.makedirs(temp_dir, exist_ok=True)
os.makedirs(mni_dir, exist_ok=True)
print(temp_dir)
print(mni_dir)

# if any files currently in temp_dir, remove it
command = f"rm {temp_dir}/*"
call(command,shell=True)

/weka/proj-fmri/paulscotti/fMRI-foundation-model/dataset_creation/afni_conversion/temp0
/weka/proj-fmri/paulscotti/fMRI-foundation-model/dataset_creation/afni_conversion/MNIs0


rm: cannot remove '/weka/proj-fmri/paulscotti/fMRI-foundation-model/dataset_creation/afni_conversion/temp0/ds000001': Is a directory
rm: cannot remove '/weka/proj-fmri/paulscotti/fMRI-foundation-model/dataset_creation/afni_conversion/temp0/ds000002': Is a directory
rm: cannot remove '/weka/proj-fmri/paulscotti/fMRI-foundation-model/dataset_creation/afni_conversion/temp0/ds000003': Is a directory
rm: cannot remove '/weka/proj-fmri/paulscotti/fMRI-foundation-model/dataset_creation/afni_conversion/temp0/ds000005': Is a directory
rm: cannot remove '/weka/proj-fmri/paulscotti/fMRI-foundation-model/dataset_creation/afni_conversion/temp0/ds000006': Is a directory
rm: cannot remove '/weka/proj-fmri/paulscotti/fMRI-foundation-model/dataset_creation/afni_conversion/temp0/ds000007': Is a directory
rm: cannot remove '/weka/proj-fmri/paulscotti/fMRI-foundation-model/dataset_creation/afni_conversion/temp0/ds000008': Is a directory
rm: cannot remove '/weka/proj-fmri/paulscotti/fMRI-foundation-model/d

1

In [4]:
# def afni_worker():
s3 = boto3.client('s3')
bucket_name = 'proj-fmri'
prefix = 'fmri_foundation_datasets/parallel_openneuro/'

if os.path.exists(f"discarded_dataset_ids_{worker_id}.npy"):
    discarded_dataset_ids = np.load(f"discarded_dataset_ids_{worker_id}.npy").tolist()
else:
    discarded_dataset_ids = []
print("discarded_dataset_ids",discarded_dataset_ids)

# sample_idx = -1
# mni_count = 0
# sink = wds.TarWriter(f"{mni_dir}/{mni_count}.tar")

paginator = s3.get_paginator('list_objects_v2')
file_name_list = []
for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
    for obj in page.get('Contents', []):
        file_name = obj['Key']
        file_name_list.append(file_name)
print("len(file_name_list) =", len(file_name_list))

# subset to current worker
worker_id_idx = np.linspace(0,len(file_name_list),30)[worker_id:worker_id+2].astype(np.int32).tolist()
file_name_list = file_name_list[worker_id_idx[0]:worker_id_idx[1]]
print("len(file_name_list) =", len(file_name_list))

discarded_dataset_ids ['ds000002', 'ds000007', 'ds000009', 'ds000017', 'ds000030', 'ds000031', 'ds000051', 'ds000105']
len(file_name_list) = 167200
len(file_name_list) = 5765


In [5]:
print("starting...")
for file_name in file_name_list:
    if file_name.endswith('_bold.nii.gz'):
        dataset_id = file_name.split('/')[2]

        if np.any(np.isin(dataset_id, discarded_dataset_ids)):
            continue

        func_path = file_name.split('/')[-1]
        temp_file_path = temp_dir + '/' + dataset_id + '/' + func_path
        mni_file_path = mni_dir + '/' + dataset_id + '/' + func_path
        
        os.makedirs(temp_dir + '/' + dataset_id, exist_ok=True)
        os.makedirs(mni_dir + '/' + dataset_id, exist_ok=True)

        if os.path.exists(f"{temp_file_path.split('.nii.gz')[0] + '.txt'}") or os.path.exists(mni_file_path.split(".nii.gz")[0] + "_overlap.txt"):
            continue
           
        # download from s3
        print(f"downloading {temp_file_path}")
        try:
            s3.download_file(bucket_name, file_name, temp_file_path)
        except:
            print("failed to download? 1")

        while not os.path.exists(f"{temp_file_path}"):
            print(f"s3 download failed. trying again... {temp_file_path}")
            try:
                s3.download_file(bucket_name, file_name, temp_file_path)
            except:
                print("failed to download? 2")
            time.sleep(10)

        # make temp lock file so other parallel jobs dont do duplicate work
        command = f"touch {temp_file_path.split('.nii.gz')[0] + '.txt'}"
        call(command, shell=True)

        # ### AFNI COMMANDS (see afni_watch.sh) ###
        # # define func variable
        # command = f"export func='{func_path}'"
        # call(command,shell=True)

        # # define suffix variable
        # command = f"export suffix='_MNI'"
        # call(command,shell=True)

        # # motion correction and nonlinear alignment to 2mm resolution MNI T1w brain
        # command = "align_epi_anat.py -anat tpl-MNI152NLin2009cAsym_res-02_T1w_brain.nii.gz -epi $func'.nii.gz' -epi_base 0 -epi_strip 3dAutomask -epi2anat -ginormous_move -anat_has_skull no -suffix $suffix -volreg on -tshift off -save_resample -master_epi 2.00"
        # call(command,shell=True)

        # # convert AFNI outputs to NIFTI
        # command = "3dAFNItoNIFTI -prefix $func$suffix'.nii.gz' $func$suffix'+tlrc'"

        # # remove leftover AFNI files
        # call("rm *+tlrc.*",shell=True)
        # call("rm *vr_motion.*",shell=True)
        # call("rm *mat.aff*",shell=True)

        # # create overlap txt to depict values for %(A \ B), the percent voxels from T1w that ARENT in func
        # call("3dABoverlap -no_automask tpl-MNI152NLin2009cAsym_res-02_T1w_brain.nii.gz ${func}.nii.gz | awk 'NR==3 {print $7}' >> ${func}_overlap.txt",shell=True)
        # ### END OF AFNI COMMANDS 

        # Wait for AFNI to be complete
        print('waiting...')
        waiting_time = 0
        while not os.path.exists(mni_file_path.split(".nii.gz")[0] + "_overlap.txt"):
            time.sleep(5)     
            waiting_time += 5
            if waiting_time > 180:
                break

        if waiting_time <= 180:
            time.sleep(10) # wait to ensure txt file was fully created
            with open(mni_file_path.split(".nii.gz")[0] + "_overlap.txt", 'r') as file:
                try:
                    overlap = file.readlines()
                    overlap = np.array(overlap).astype(np.float32)[0]
                except:
                    print("overlap error!")
                    overlap = 0 # in case some weird error occurs where overlap txt is empty, assume its ok
            
            # if overlap >20%, discard outputs and skip this dataset
            if overlap>20:
                discarded_dataset_ids.append(dataset_id)
                print("discarded_dataset_ids")
                print(discarded_dataset_ids)
                np.save(f"discarded_dataset_ids_{worker_id}.npy",discarded_dataset_ids)
            else:
                afni_filename = mni_dir + '/' + dataset_id + '/' + func_path.split(".nii.gz")[0] + "_MNI.nii.gz"
    
                command = f"aws s3 cp {afni_filename} s3://proj-fmri/fmri_foundation_datasets/openneuro_MNI/{dataset_id}/{func_path.split('.nii.gz')[0] + '_MNI.nii.gz'}"
                call(command,shell=True)
    
                # load contents of Nifti
                # func_nii = nib.load(afni_filename).get_fdata()
                
                # for batch in np.arange(0,func_nii.shape[-1],24):
                #     batch_nii = func_nii[:,:,:,batch:batch+24]
                #     if len(batch_nii)<4:
                #         continue
    
                #     # send to aws s3
                #     np.save(f"{mni_dir}/npy/{func_path.split('.nii.gz')[0]}_{batch}_to_{batch+24}.npy", batch_nii)
                    
                # command = f"aws s3 sync {mni_dir}/npy s3://proj-fmri/fmri_foundation_datasets/openneuro_MNI_npy"
                # call(command,shell=True)
    
                # # remove npy files from local
                # command = f"rm {mni_dir}/npy/*.npy"
                # call(command,shell=True)
    
                # print("Done! Creating done txt file...")
                # command = f"touch {mni_file_path.split('.nii.gz')[0] + '.txt'}"
                # call(command,shell=True)
    
                print(f"Removing _MNI file... {afni_filename}")
                command = f"rm {afni_filename}"
                call(command,shell=True)
    
                time.sleep(5)
                    
                    # sample_idx += 1
                    # sink.write({
                    #     "__key__": "%06d" % sample_idx,
                    #     "func.npy": batch_nii,
                    # })
                # if sample_idx > 1024:
                #     sink.close()
                #     sample_idx = -1
                #     mni_count += 1
                #     sink = wds.TarWriter(f"{mni_dir}/{mni_count}.tar")
                # print(f"wrote {func_path.split('.nii.gz')[0]} to {mni_dir}/0.tar")
        else:
            print("waiting time exceeded...")
            
            # remove all files
            command = f"rm {temp_dir}/{dataset_id}/*"
            call(command,shell=True)

            # write placeholder txt overlap file
            with open(mni_file_path.split(".nii.gz")[0] + "_overlap.txt", 'w') as file:
                file.write('-999')

starting...
downloading /weka/proj-fmri/paulscotti/fMRI-foundation-model/dataset_creation/afni_conversion/temp0/ds000107/sub-13_task-onebacktask_run-01_bold.nii.gz
waiting...
waiting time exceeded...
downloading /weka/proj-fmri/paulscotti/fMRI-foundation-model/dataset_creation/afni_conversion/temp0/ds000107/sub-13_task-onebacktask_run-02_bold.nii.gz
waiting...
upload: MNIs0/ds000107/sub-13_task-onebacktask_run-02_bold_MNI.nii.gz to s3://proj-fmri/fmri_foundation_datasets/openneuro_MNI/ds000107/sub-13_task-onebacktask_run-02_bold_MNI.nii.gz
Removing _MNI file... /weka/proj-fmri/paulscotti/fMRI-foundation-model/dataset_creation/afni_conversion/MNIs0/ds000107/sub-13_task-onebacktask_run-02_bold_MNI.nii.gz
downloading /weka/proj-fmri/paulscotti/fMRI-foundation-model/dataset_creation/afni_conversion/temp0/ds000107/sub-14_task-onebacktask_run-01_bold.nii.gz
waiting...
upload: MNIs0/ds000107/sub-14_task-onebacktask_run-01_bold_MNI.nii.gz to s3://proj-fmri/fmri_foundation_datasets/openneuro_MN


KeyboardInterrupt

