# OpenNeuro Data Loader
A data loader for open neuro MRI datasets https://openneuro.org/

Getting usable data from open neuro was more difficult than it should be. I aim to create a 3 part system to expedite this process.

The architecture is as follows:
1. Given a dataset ID (ds#######) download the dataset to a specified folder and extract it using datalad
1. A 'patient' class to hold data relevant to model training as well as data related to the patient
1. A dataset class that has various dataset-related methods (preprocessing, train-val-test splits or stratified k-fold cross validation, ect)

## Todos
1. Using datalad and git, download dataset
1. Figure out memory measuring tool
1. Load batch of n scans based on available memory
1. Create generator of m batches of n scans which load on demand

## Install Packages

In [1]:
!pip install -r requirements.txt

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com


You should consider upgrading via the 'D:\Side_Projects\MRI_Project\env_mri\Scripts\python.exe -m pip install --upgrade pip' command.


In [2]:
import time
import nibabel as nib
import numpy as np
import os
import json
import random
import SimpleITK as sitk
import psutil
# from datalad.api import get, drop
import datalad.api as dl
import shutil
from datalad.api import install
import subprocess


In [39]:
class patient:
    '''
    Struct for holding patient information and scan data
    '''
    def __init__(self,path):
        self.info = {} #data-metadata pairs using pre-extension name
        self.folder_path = path
        self.date_loaded = time.time()
        self.parse_and_assign_filenames(self.folder_path)
        self.loaded_onto_disk = False
        
    def __str__(self):
        return f'{len(self.info.keys())} scans from {self.folder_path}'
        
    def parse_and_assign_filenames(self,path):
        patient_scans=[]
        for root,dirs,files in os.walk(path):
            # compressed_files = [file for file in files if file.split('.')[-2] == 'nii' and file.split('.')[-1] == 'gz']
            compressed_files = [file for file in files if file.split('.')[-1] == 'gz']
            for file in compressed_files:
                self.info[file.split('.')[0]] = {
                    'scan':os.path.join(root,file),
                    'metadata':os.path.join(root,file.split('.')[0]+'.json') if os.path.exists(os.path.join(root,file.split('.')[0]+'.json')) else None,
                }

    
    def load(self):
        #return 4D set of values [(H,W,Scans(Depth),N),metadata]
        def load_json(path:str):
            if path==None or path=='':
                return None
            with open(path) as f:
                out = json.load(f)
            return out
        def load_scan(path):
            img = nib.load(path)
            data = np.asarray(img.dataobj)
            return sitk.GetImageFromArray(data)

        #use datalad to fetch unavailable data
        if self.loaded_onto_disk == False:
            # for k,v in self.info.items():
            #     dl.get(v['scan'])
            dl.get(self.folder_path,jobs=4)
            self.loaded_onto_disk = True

        for k,v in self.info.items():
            v['data'] = load_scan(v['scan'])
            v['metadata_loaded'] = load_scan(v['scan'])
        
    def unload(self):
        if self.loaded_onto_disk == False:
            print("Unloading data that was never loaded...strange")
        #use datalad to unload scan
        self.loaded_onto_disk = False
        dl.drop(self.folder_path,recursive=True)

    # def unload_from_ram(self):
    #     if self.info['data'] == None or self.info['metadata_loaded'] == None:
    #         print("Unloading data from RAM that was never loaded...strange")
    #     self.info['data'] = None 
    #     self.info['metadata_loaded'] = None
        
class patient_dataset:
    '''
    Responsible for organizing and grouping scans + metadata per patient
    Passes path to patient class 
    Also responsible for image preprocessing methods
    '''
    def __init__(self,path,standard_size=(256,256,200)):
        #where path is the path to the dataset (should end in ds007045 or similar)
        self.ds = dl.install(
            path=path,
            source=f"https://github.com/OpenNeuroDatasets/{path}.git"
        )
        assert ds.is_installed()
        self.run(["git-annex", "init"],dataset_path=path)
        self.run(["git", "annex", "enableremote", "s3-PUBLIC"],dataset_path=path)
        
        self.path = path
        self.standard_size = standard_size
        self.patients = []
        for folder in os.listdir(self.path):
            if self._is_folder(folder) == False:
                continue
            p = patient(os.path.join(self.path,folder))
            if len(p.info) != 0: #filter non-patient folders
                self.patients.append(p)
        print('length patients', len(self.patients))
        self.length = len(self.patients)
        self.loaded_idxs = []#if slow, replace with a deque
        self.ram_loaded_idxs = []#if slow, replace with a deque
        
    def run(self,cmd, check=True,dataset_path=''):
            print(f"$ {' '.join(cmd)}")
            return subprocess.run(' '.join(cmd),cwd=dataset_path, check=False, capture_output=True)
        
    def _is_folder(self,folder):
        is_folder = True
        if 'sub' not in folder.split('-'): #temp fix for picking up non-patient folders
            is_folder = False
        if os.path.isdir(os.path.join(self.path,folder)) == False:
            is_folder = False
        return is_folder
    
    def __iter__(self):
        """
        Stream samples one-by-one without holding everything in memory.
        """
        for file_id in range(self.length):
            yield self.get(file_id)
    
    def __getitem__(self, file_id):
        print('here in getitem')
        if isinstance(file_id, slice):
            start, stop, step = file_id.indices(self.length)
            return [self.get(i) for i in range(start, stop, step)]
        elif isinstance(file_id, list):
            return [self.get(i) for i in file_id]
        elif isinstance(file_id, int):
            if file_id < 0 or file_id >= self.length:
                raise IndexError("patient index out of range")
            return self.get(file_id)
        else:
            raise TypeError("Indices must be integers, slices, or a list")
    
    def get(self,file_id):#wilo 2/16 6pm, test the item loading and dropping. Consider replacing the "what to drop" with the datalad process
        print('here in get')
        #check available memory
        # available_ram,total_ram,percent_ram_used = self.get_ram_info()
        available_disk,total_disk,percent_disk_used = self.get_disk_info('D:\\')#hardcoded disk
        available_disk = 10e+9 #10GB
        file_stats = ds.status(#wilo use this to detemine if we should go get other data
            # path="sub-01/ses-T1",
            path=self.folder_path,
            annex="all",
            recursive=True,
            return_type="list",
        )
        patient_folder_size = [r["bytesize"] for r in ds.status(path=self.folder_path,annex="all",recursive=True,return_type="list") if "bytesize" in r]
        
        #This will fail with OOM if the file size is more than 10% of RAM or disk space
        while available_disk < patient_folder_size :
            self.drop_an_item()
            patient_folder_size =[r["bytesize"] for r in ds.status(path=self.folder_path,annex="all",recursive=True,return_type="list") if "bytesize" in r]
        self.loaded_idxs.append(file_id)
        
        # while percent_ram_used > 0.9 : #this is imperfect and should check how large the incoming data is. 
        #     self.patients[self.ram_loaded_idxs[0]].unload_from_ram()
        #     self.ram_loaded_idxs.pop(0)
        #     print('popping item from working memory')
        #     available_ram,total_ram,percent_ram_used = self.get_ram_info()
        # self.ram_loaded_idxs.append(file_id)
        self.patients[file_id].load()
        return self.patients[file_id].info
        
    def drop_an_item(self):
        '''
        Drop an item from disk using datalad
        '''
        print('dropping file with ID:',self.loaded_idxs[0])
        self.patients[self.loaded_idxs[0]].unload()
        self.loaded_idxs.pop(0)#if slow, replace with a deque
        
    def get_ram_info(self):
        vm = psutil.virtual_memory()
        total_ram = vm.total      # bytes
        available_ram = vm.available  # bytes
        return available_ram, total_ram, available_ram/total_ram

    def get_disk_info(self, path="/"):
        usage = shutil.disk_usage(path)
        total = usage.total      # bytes
        available = usage.free   # bytes
        return available, total, available/total

    def sample(self):
        #get one random patient obj and call get method
        random_idx = random.randint(0,self.length)
        return self.get(random_idx)
        
    def resample_to_shape(
        self,
        images, #list of sitk images
        out_size,
        interpolator=sitk.sitkLinear
    ):
        resampled_images = []
        for img in images:
            original_size = img.GetSize()
            original_spacing = [1.0,1.0,1.0] #change to grabbing this from metadata
            # original_spacing = self. #change to grabbing this from metadata
        
            new_spacing = [
                (original_size[i] * original_spacing[i]) / out_size[i]
                for i in range(3)
            ]
            
            resampler = sitk.ResampleImageFilter()
            
            resampler.SetSize(out_size)
            resampler.SetOutputSpacing(new_spacing)
            resampler.SetInterpolator(interpolator)
            resampler.SetOutputDirection(img.GetDirection())
            resampler.SetOutputOrigin(img.GetOrigin())
            resampled_images.append(resampler.Execute(img))
        return resampled_images
    
    def preprocess(self,idx,count):
        #standardize size
        scan_sets = self.patients[idx:idx+count]
        patient_scan_sets = [p['data'] for p in scan_sets]
        resized_patient_scans = [self.resample_to_shape(patient_scans,self.standard_size) for patient_scans in patient_scan_sets]
    
    def generate_folds(self,k=10):
        #Create an array from 0 to self.length, shuffle, and make k-1 even cuts 
        assignments = [i for i in range(self.length)]
        random.shuffle(assignments)
        fold_size = self.length//k #last fold will have extra items from excluded by rounding
        self.folds = {}
        for foldnum in range(k-2):
            self.folds[foldnum] = assignments[fold_size*foldnum:fold_size*(foldnum+1)]
        self.folds[k-1] = assignments[fold_size*(foldnum+1):]

    def get_fold(self,fold_num):
        assert len(self.folds.keys()) > 0
        return self.__getitem__(self.folds[fold_num])#what if this ALSO returned a generator??
# dataset = patient_dataset('ds007045')
# dataset = patient_dataset('ds007156')
dataset = patient_dataset('ds002424')
# dataset = patient_dataset('ds002424',gb_limit=10)


$ git-annex init
$ git annex enableremote s3-PUBLIC
length patients 79


In [40]:
start = time.time()
dataset.generate_folds(20)
end = time.time()


In [41]:
start = time.time()
fold = dataset.get_fold(1)
fold = dataset.get_fold(2)
fold = dataset.get_fold(3)
end = time.time()
(end-start)/60, "Minutes for ",len(fold)," Patients" #2.7min per patient (9 scans) for scans and metadata

here in getitem
here in get
here in get
here in get
here in getitem
here in get
here in get
here in get
here in getitem
here in get
here in get
here in get


(3.644381908575694, 'Minutes for ', 3, ' Patients')

In [42]:
from pympler import asizeof
asizeof.asizeof(fold) #23680

14360

104

[None, None, None]


In [None]:
# wilo 2/9: I have to rethink how im keeping/dropping items for RAM/disk
#if the dataset is too large for the disk, its certainly too large for RAM. 
#so i definitely have to manage both.
#i should NOT rely on fold size for memory management. That is bad
#i should not wait until model-run time to dl get

#what if i worked on a batching system that was unrelated to the folds
#it would be assumed that you have 2x disc space than your batch size and x RAM to your batch size
#and while one batch was being used, another is getting downloaded with datalad

In [None]:
#The problem statement is: Make a dataloader for OpenNeuro where the dataset is larger than available disk (and RAM) space

In [None]:
!pip install pympler


In [47]:
from datalad.api import Dataset

ds = Dataset("ds002424")

results = ds.status(#wilo use this to detemine if we should go get other data
    # path="sub-01/ses-T1",
    path="",
    annex="all",
    recursive=True,
    return_type="list",
)

total_bytes = 0

for r in results:
    if "bytesize" in r:
        total_bytes += r["bytesize"]
        # print(r["path"], r["bytesize"], r.get("has_content"))

print("Total MB:", total_bytes/1e+6)


untracked: sub-03\ses-T1\func\.ipynb_checkpoints (directory)
638 annex'd files (39.9 GB/40.5 GB present/total size)
Total MB: 43436.320562


In [43]:
print(type(results))

<class 'list'>


In [44]:
print(r)

{'type': 'file', 'gitshasum': '9ac951ec58d53650cbe7887b623fe18aadf09b52', 'bytesize': 5025, 'prev_gitshasum': '9ac951ec58d53650cbe7887b623fe18aadf09b52', 'state': 'clean', 'path': 'D:\\Side_Projects\\MRI_Project\\OpenNeuroDataLoader\\ds002424\\sub-01\\ses-T1\\func\\sub-01_ses-T1_task-SSI_events.tsv', 'parentds': 'D:\\Side_Projects\\MRI_Project\\OpenNeuroDataLoader\\ds002424', 'status': 'ok', 'refds': 'D:\\Side_Projects\\MRI_Project\\OpenNeuroDataLoader\\ds002424', 'action': 'status'}


In [45]:
ds.status?

[1;31mSignature:[0m
[0mds[0m[1;33m.[0m[0mstatus[0m[1;33m([0m[1;33m
[0m    [1;33m*[0m[1;33m,[0m[1;33m
[0m    [0mdataset[0m[1;33m=[0m[1;32mNone[0m[1;33m,[0m[1;33m
[0m    [0mannex[0m[1;33m=[0m[1;32mNone[0m[1;33m,[0m[1;33m
[0m    [0muntracked[0m[1;33m=[0m[1;34m'normal'[0m[1;33m,[0m[1;33m
[0m    [0mrecursive[0m[1;33m=[0m[1;32mFalse[0m[1;33m,[0m[1;33m
[0m    [0mrecursion_limit[0m[1;33m=[0m[1;32mNone[0m[1;33m,[0m[1;33m
[0m    [0meval_subdataset_state[0m[1;33m=[0m[1;34m'full'[0m[1;33m,[0m[1;33m
[0m    [0mreport_filetype[0m[1;33m=[0m[1;32mNone[0m[1;33m,[0m[1;33m
[0m[1;33m)[0m[1;33m[0m[1;33m[0m[0m
[1;31mDocstring:[0m
Report on the state of dataset content.

This is an analog to `git status` that is simultaneously crippled and more
powerful. It is crippled, because it only supports a fraction of the
functionality of its counter part and only distinguishes a subset of the
states that Git knows about. B