In [1]:
import os
import sys
import itk
import glob
import json
import shutil
import dicom2nifti 
from directory_tree import display_tree  # Nice tool to display directory trees (https://pypi.org/project/directory-tree/)

In [2]:
def list_folder_content(path, show_hidden=False):
    if show_hidden:
        ddfldrlst = os.listdir(path)
    else:
        ddfldrlst = list(filter(lambda item: not item.startswith('.'),os.listdir(path)))      
    return ddfldrlst

def display_folder_list(file_list):
    print('\n'.join(f'[{idx}] - {file_idx}' for idx, file_idx in enumerate(file_list)))

def getenv():
    """
    Requires sys and os modules:
    import sys
    import os
    """
    if sys.platform == 'win32':
        env_home = 'HOMEPATH'
    elif (sys.platform == 'darwin') | (sys.platform == 'linux'):
        env_home = 'HOME'
    HOMEPATH = os.getenv(env_home)
    
    return HOMEPATH

def check_path_exist(path, file=False):
    """
    Flag FILE indicates the path contains a file name (FLAG=TRUE) or the path only points to a folder (FLAG=FALSE (Default))
    """
    if file:
        is_path = os.path.isfile(path)
    else:
        is_path = os.path.isdir(path)

    print(f'{"OK:" if is_path else "ERROR:"} Path to {"file" if file else "folder"} {path} does{"" if is_path else " NOT"} exist')

    return is_path

In [3]:
HOMEPATH = getenv()
SRCPATH = os.path.join(HOMEPATH, 'Data', 'fMRIBreastData')
INPUTPATH = os.path.join(SRCPATH, 'StudyData')

_ = check_path_exist(INPUTPATH)

OK: Path to folder /Users/joseulloa/Data/fMRIBreastData/StudyData does exist


Setting the flag concat_vol=TRUE will concatenate the dce timepoints into a single 4D Nifti volume per visit and then will delete the intermediate sub-folders. Depending on the workflow, I recommend the following options:

1. If converting setting up the Nifti folder for the analyses, where the original Nifti files are recommended to use, use the following combination of variables:
    * `OUTPUTFLDR` = 'NiftiData'
    * `concat_vol` = False
2. If setting up the folder to be uploaded in the S3 bucket for sharing, use the following:
    * `OUTPUTFLDR` = os.path.join('tests', 'Test000')
    * `concat_vol` = True


In [4]:
# Pick up an option:
options = {1, 2}
option = None
while option not in options:
    option = input(f'Pick up a valid option {tuple(options)} or type "x" to quit (see paragraph above): ')
    if option == 'x':
        break
    else:
        option = int(option)
if option == 1:
    OUTPUTFDLR = 'NiftiData'
    concat_vol = False
elif option == 2:
    concat_vol = True
    description = {'TestID': 'Test000',
                   'Summary': f'Test000 is not an evaulation of registration, is just to copy the raw unregistered data into the same folder structure so is easier to compare the following tests. The data are not generated by the notebook GENERALREGISTRATION.ipynb, but with the Dicom2NiftiCONVERTER.ipynb',
                   'Intended Platform': 'any',
                   'Run Platform': sys.platform,
                   'Registration Details': {'algorithm': 'Raw',
                                            'configuration parameters': [''],
                                            'reference volume': 1,
                                            'register fixed': False
                                            },
                    'Preprocessing': {'bias_correction': None, 
                                      'histogram_matching': False
                                      }
                  }  
    OUTPUTFDLR = os.path.join('tests', description['TestID'], 'datasets')
    DESCPATH = os.path.join('tests', description['TestID'], 'description.json')
else:
    print(f'ERROR!: option "{option}" is not valid')

OUTPUTPATH = os.path.join(SRCPATH, OUTPUTFDLR)

# if OUTPUTPATH does not exist, creates one:
os.makedirs(OUTPUTPATH, exist_ok=True)
if option == 2:
    OUTPUTDESC = os.path.join(SRCPATH, DESCPATH)
    # Save the description as a JSON file in the output directory:
    with open(OUTPUTDESC, 'w') as fp:
        json.dump(description, fp)

In [5]:
print(display_tree(INPUTPATH, header=True, string_rep=True, show_hidden=False, max_depth=4))

StudyData/
├── CR-ANON68760/
│   ├── CR-Post-Treatment-20230120/
│   │   └── 301/
│   │       ├── 1/
│   │       ├── 2/
│   │       ├── 3/
│   │       ├── 4/
│   │       ├── 5/
│   │       └── 6/
│   └── CR-Pre-Treatment-20221212/
│       └── 301/
│           ├── 1/
│           ├── 2/
│           ├── 3/
│           ├── 4/
│           ├── 5/
│           └── 6/
├── DC-ANON97378/
│   ├── DC-Post-Treatment-20230726/
│   │   └── 301/
│   │       ├── 1/
│   │       ├── 2/
│   │       ├── 3/
│   │       ├── 4/
│   │       ├── 5/
│   │       └── 6/
│   └── DC-Pre-Treatment-20230621/
│       └── 301/
│           ├── 1/
│           ├── 2/
│           ├── 3/
│           ├── 4/
│           ├── 5/
│           └── 6/
├── EilB-ANON98269/
│   ├── EilB-Post-Treatment-20230726/
│   │   └── 301/
│   │       ├── 1/
│   │       ├── 2/
│   │       ├── 3/
│   │       ├── 4/
│   │       ├── 5/
│   │       └── 6/
│   └── EilB-Pre-Treatment-20230621/
│       └── 501/
│           ├── 1/
│           ├── 2/
│     

In [10]:
# Expected sub-folder tree follows the pattern:
# PATIENT_INITIALS+"-"+PATIENTID -> PATIENT_NAME+"-"+DATE_OF_VISIT -> DCE_MRI_SEQUENCE
# Valid PATIENT_NAMES contains at most 3 dashes (i.e. to exclude "Motion Corrected" one, but keeping "RICE001")
# 
PATIENTSID = list_folder_content(INPUTPATH)
for patientID in PATIENTSID:
    path2patientID = os.path.join(INPUTPATH, patientID)
    patient_visits = list_folder_content(path2patientID)
    for visit_name in patient_visits:
        path2visit = os.path.join(path2patientID, visit_name)
        if visit_name.count('-') > 3:
            print(f'Folder Name is too long to match the criteria. Skipping {path2visit}')
            continue
        print(f'Processing {visit_name}. Please wait...')
        seq_nro = os.path.join(path2visit, list_folder_content(path2visit)[0])
        dce_time_points = list_folder_content(seq_nro)
        if concat_vol:
            tseries_volume = [None]*len(dce_time_points)
        for time_point_i in dce_time_points:
            path2time_point = os.path.join(seq_nro, time_point_i)
            print(f'Replicating sub-folder "{time_point_i}" in the output directory {OUTPUTPATH}:')
            ipath = path2time_point
            opath = ipath.replace(INPUTPATH, OUTPUTPATH)
            print(f'Creating folder {opath}...')
            try:
                os.makedirs(opath, exist_ok=True)
            except Exception as err:
                print(f'ERROR: Cannot create the folder {opath}')
                break
            print(f'Performing conversion from {ipath}, please wait...')
            dicom2nifti.convert_directory(ipath, opath)
            if concat_vol:
                tseries_volume[int(time_point_i)-1] = itk.imread(glob.glob(os.path.join(opath,'*.nii.gz'))[0])
        if concat_vol:
            print(f"Conversion done successfully, now will concatenate the timeseries into a single 4D volume saved at {os.path.join(path2visit.replace(INPUTPATH, OUTPUTPATH),'.'.join([visit_name,'nii.gz']))}. Please wait a little bit more :) ...")
            # Initialise the 4d volume with the first time point (pre-contrast)
            t0_nii = tseries_volume[0]
            in_dim = t0_nii.GetImageDimension()
            pixel_type = itk.template(t0_nii)[1][0]
            out_dim = in_dim + 1

            input_image_type = itk.Image[pixel_type, in_dim]
            output_image_type = itk.Image[pixel_type, out_dim]

            layout = [1, 1, 1, len(dce_time_points)]
            vol_tiles = itk.TileImageFilter[input_image_type, output_image_type].New()
            vol_tiles.SetLayout(layout)
            for idx in range(len(dce_time_points)):
                vol_tiles.SetInput(idx, tseries_volume[idx])
            # Write 4D Volume:
            volume_writer = itk.ImageFileWriter[output_image_type].New()
            volume_writer.SetFileName(os.path.join(path2visit.replace(INPUTPATH, OUTPUTPATH),'.'.join([visit_name,'nii.gz'])))
            volume_writer.SetInput(vol_tiles.GetOutput())
            volume_writer.Update()
            # And removing useless data:
            print(f'Removing {seq_nro} and its content...')
            shutil.rmtree(seq_nro.replace(INPUTPATH, OUTPUTPATH))
        else:
            print('Conversion done successfully, moving to the next data folder.')
        print(''.join(['*']*100))

print(f'All done, check the folders to review the data. Bye!')

Processing DC-Pre-Treatment-20230621. Please wait...
Replicating sub-folder "6" in the output directory /Users/joseulloa/Data/fMRIBreastData/tests/Test000/dataset:
Creating folder /Users/joseulloa/Data/fMRIBreastData/tests/Test000/dataset/DC-ANON97378/DC-Pre-Treatment-20230621/301/6...
Performing conversion from /Users/joseulloa/Data/fMRIBreastData/StudyData_v2/DC-ANON97378/DC-Pre-Treatment-20230621/301/6, please wait...
Replicating sub-folder "1" in the output directory /Users/joseulloa/Data/fMRIBreastData/tests/Test000/dataset:
Creating folder /Users/joseulloa/Data/fMRIBreastData/tests/Test000/dataset/DC-ANON97378/DC-Pre-Treatment-20230621/301/1...
Performing conversion from /Users/joseulloa/Data/fMRIBreastData/StudyData_v2/DC-ANON97378/DC-Pre-Treatment-20230621/301/1, please wait...
Replicating sub-folder "4" in the output directory /Users/joseulloa/Data/fMRIBreastData/tests/Test000/dataset:
Creating folder /Users/joseulloa/Data/fMRIBreastData/tests/Test000/dataset/DC-ANON97378/DC-P

In [None]:
vol_tiles