In [None]:
# default_exp utils

# Utils

> This module offers useful utilities.

In [None]:
#hide
#slow
from nbverbose.showdoc import *

In [None]:
#export
import os
import ast
import wandb
import numpy as np
import pandas as pd
from tqdm import tqdm
from pathlib import Path
# from fastcore.xtras import globtastic

# pydicom related imports
import pydicom
from pydicom.pixel_data_handlers.util import apply_voi_lut

# kagglerecipes imports
from kagglerecipes.data import TINY_DATA_PATH

# multi processing with mpire
import mpire
from mpire import WorkerPool

## Temporary Utils
Will be removed on next fastcore release

In [None]:
#export
'''
TEMPORARY UTILS ADDED HERE UNTIL THE NEXT fastcore RELEASE
'''
from fastcore.imports import *
from fastcore.foundation import *
from fastcore.basics import *
from functools import wraps

from typing import Iterable,Generator,Sequence,Iterator,List,Set,Dict,Union,Optional

import mimetypes,pickle,random,json,subprocess,shlex,bz2,gzip,zipfile,tarfile
import imghdr,struct,distutils.util,tempfile,time,string,collections,shutil
from copy import copy
from contextlib import contextmanager,ExitStack
from pdb import set_trace
from datetime import datetime, timezone
from timeit import default_timer
from fnmatch import fnmatch

def globtastic(
    path:Union[Path,str], # path to start searching
    recursive:bool=True, # search subfolders
    symlinks:bool=True, # follow symlinks?
    file_glob:str=None, # Only include files matching glob
    file_re:str=None, # Only include files matching regex
    folder_re:str=None, # Only enter folders matching regex
    skip_file_glob:str=None, # Skip files matching glob
    skip_file_re:str=None, # Skip files matching regex
    skip_folder_re:str=None # Skip folders matching regex
)->L: # Paths to matched files
    "A more powerful `glob`, including regex matches, symlink handling, and skip parameters"
    path = Path(path)
    if path.is_file(): return L([path])
    if not recursive: skip_folder_re='.'
    file_re,folder_re = compile_re(file_re),compile_re(folder_re)
    skip_file_re,skip_folder_re = compile_re(skip_file_re),compile_re(skip_folder_re)
    def _keep_file(root, name):
        return (not file_glob or fnmatch(name, file_glob)) and (
                not file_re or file_re.search(name)) and (
                not skip_file_glob or not fnmatch(name, skip_file_glob)) and (
                not skip_file_re or not skip_file_re.search(name))
    def _keep_folder(root, name):
        return (not folder_re or folder_re.search(name)) and (
            not skip_folder_re or not skip_folder_re.search(name))
    return L(walk(path, symlinks=symlinks, keep_file=_keep_file, keep_folder=_keep_folder))

In [None]:
#export
def compile_re(pat):
    "Compile `pat` if it's not None"
    return None if pat is None else re.compile(pat)

def walk(
    path:(Path,str), # path to start searching
    symlinks:bool=True, # follow symlinks?
    keep_file:callable=noop, # function that returns True for wanted files
    keep_folder:callable=noop, # function that returns True for folders to enter
    func:callable=os.path.join # function to apply to each matched file
): # Generator of `func` applied to matched files
    "Generator version of `os.walk`, using functions to filter files and folders"
    for root,dirs,files in os.walk(path, followlinks=symlinks):
        yield from (func(root, name) for name in files if keep_file(root,name))
        for name in copy(dirs):
            if not keep_folder(root,name): dirs.remove(name)

## Constants

Column names from Kaggle Brain Tumor DICOM files

In [None]:
#export
KAGGLE_BRAINTUMOR_META_COLS = ['SpecificCharacterSet','ImageType','SOPClassUID',
             'SOPInstanceUID','AccessionNumber','Modality', 'SeriesDescription', 
             'PatientID', 'MRAcquisitionType', 'SliceThickness', 
             'EchoTime', 'NumberOfAverages', 'ImagingFrequency', 'ImagedNucleus', 
             'MagneticFieldStrength', 'SpacingBetweenSlices', 
             'EchoTrainLength', 'PercentSampling', 'PercentPhaseFieldOfView',
             'PixelBandwidth', 'TriggerWindow', 'ReconstructionDiameter', 'AcquisitionMatrix',
             'FlipAngle', 'SAR', 'PatientPosition',
             'StudyInstanceUID', 'SeriesInstanceUID', 'SeriesNumber', 'InstanceNumber',
             'ImagePositionPatient', 'ImageOrientationPatient', 'Laterality',
             'PositionReferenceIndicator', 'SliceLocation', 'InStackPositionNumber',
             'SamplesPerPixel', 'PhotometricInterpretation', 'Rows', 'Columns', 'PixelSpacing',
             'BitsAllocated', 'BitsStored', 'HighBit', 'PixelRepresentation', 'WindowCenter',
             'WindowWidth', 'RescaleIntercept', 'RescaleSlope', 'RescaleType']

## DICOM Processing

Returns the metadata of a single dicom file as a dictionary.

In [None]:
#export
def get_dicom_metadata(
    path_to_dicom_file:str, # path to the dicom file
    meta_cols:list # list of metadata columns to extract from the dicom file
):
    "Returns the metadata of a single dicom file as a dictionary."
    
    dicom_object = pydicom.dcmread(path_to_dicom_file)

    col_dict_train = dict()
    for col in meta_cols: 
        try:
            col_dict_train[col] = str(getattr(dicom_object, col))
        except AttributeError:
            col_dict_train[col] = "NaN"
    
    return col_dict_train

In [None]:
sample_file = globtastic(TINY_DATA_PATH, file_glob='*.*dcm*')[0]
dicom_metadata = get_dicom_metadata(sample_file, KAGGLE_BRAINTUMOR_META_COLS)
assert type(dicom_metadata) == dict

Retrieve metadata for each BraTS21ID and return as a dataframe.

Returns the correct patient id of a dicom file.

In [None]:
#export
def get_patient_id(
    patient_id:int # patient id of the dicom file
):
    "Returns a patient id as a string, formatted as the Kaggle Brain Tumor competition data expects e.g 20 will return 00020"

    if patient_id < 10:
        return '0000'+str(patient_id)
    elif patient_id >= 10 and patient_id < 100:
        return '000'+str(patient_id)
    elif patient_id >= 100 and patient_id < 1000:
        return '00'+str(patient_id)
    else:
        return '0'+str(patient_id)

In [None]:
assert get_patient_id(1) == '00001'

Get path to patient folder

In [None]:
#export
def get_patient_BraTS21ID_path(
    row,  # Row from a DataFrame
    path:str # Path to patient folders, e.g. could be "train" or "test"
):
    "Construct the path to a patient id folder from a DataFrame row"
    
    patient_id = get_patient_id(int(row.BraTS21ID))
    return f'{path}/{patient_id}/'

In [None]:
TRAIN_PATH = TINY_DATA_PATH/'train'
train_df = pd.read_csv(TINY_DATA_PATH/'train_labels.csv')
# This assertion fails in a Windows system due to path with \\
# assert get_patient_BraTS21ID_path(train_df.iloc[0], TRAIN_PATH) == 'data\\tiny\\train/00000/'

In [None]:
#export
def get_patient_metadata(
    meta_cols_scan_types:list,  # list of [meta_cols, scan_types]
    patient_path:str,  # Path to patient folder 
    BraTS21ID:int  # BraTS21ID of patient
):
    "Returns a list of dicts with the dicom metadata for every dicom file for a single patient folder"
    
    meta_cols_dict_ls = []
    meta_cols = meta_cols_scan_types[0]
    scan_types = meta_cols_scan_types[1]
    path = Path(patient_path)
    
    # create list of tuples of (dicom_path, scan_type), uses `globtastic` from fastcore
    dcm_pth_scan_ls = [[globtastic(path / scan_type, file_glob='*.*dcm*'), scan_type] for scan_type in scan_types]
    dcm_pth_scan_ls = [(item, sublist[1]) for sublist in dcm_pth_scan_ls for item in sublist[0]]
    
    for t in dcm_pth_scan_ls:
        dcm_path = t[0]
        scan_type = t[1]
        dicom_metadata = get_dicom_metadata(dcm_path, meta_cols)
        dicom_metadata['scan_type'] = scan_type
        dicom_metadata['id'] = BraTS21ID
        meta_cols_dict_ls.append(dicom_metadata)
        
    return pd.DataFrame(meta_cols_dict_ls)

In [None]:
#export
def get_all_BraTS21_dicom_meta(
    df,  # Dataframe with path to patient folder and BraTS21ID
    meta_cols:list,  # Metadata columns to extract from the dicom file metadata
    scan_types:list=['FLAIR', 'T1w', 'T1wCE', 'T2w'],  # The subfolders in the patient data to loop through
    n_jobs:int=4,  # Number of jobs for multiprocessing, set to 0 to disable multiprocessing (default:4)
    progress_bar:bool=True,  # Use mpire built-in tqdm progress bar
    enable_insights:bool=False  # Returns timings from multiprocessing tasks, from mpire. The higher the working ratio the more efficient your multiprocessing setup is.
):    
    "Function to extract all dicom meta from all folder paths in given df. Returns a dataframe. Multiprocessing available when n_jobs > 0"
    
    n_items = len(df)
    patient_path_ls = df.path.values
    BraTS21ID_ls = df.BraTS21ID.values
    
    if n_jobs == 0:
        results = []
        for i in tqdm(range(n_items)):
            res = get_patient_metadata([meta_cols, scan_types], patient_path_ls[i], BraTS21ID_ls[i])
            results.append(res)
    elif n_jobs > 0:
        data = [(patient_path_ls[i], BraTS21ID_ls[i]) for i in range(n_items)]
            
        with WorkerPool(n_jobs=n_jobs, shared_objects=[meta_cols, scan_types]) as pool:
            results = pool.map(get_patient_metadata, data, 
                               progress_bar=progress_bar, enable_insights=enable_insights)
            if enable_insights: 
                pool.print_insights()
    else:
        print("n_jobs must be greater than or equal to 0")
    
    return pd.concat(results)

In [None]:
# n_jobs = 0

# data_patient_path = TRAIN_PATH / os.listdir(TRAIN_PATH)[0]

# # construct path based on csv and TRAIN_PATH
# train_df['path'] = train_df.apply(lambda row: get_patient_BraTS21ID_path(row, TRAIN_PATH), axis=1)
# train_meta_df = get_all_BraTS21_dicom_meta(train_df[:2], KAGGLE_BRAINTUMOR_META_COLS, scan_types, n_jobs, True)

# # folder_exists = train_df['path'] == str(data_patient_path)+'/'
# # train_df = train_df[folder_exists]

# # Again failing because of windows path issue
# # assert len(train_df) >= 1
# # assert len(get_all_dicom_metadata(train_df, kaggle_braintumor_meta_cols)) == 8

Returns the MRI's plane from the dicom data.


In [None]:
#export
def get_image_plane(
    data:dict  # Dictionary of dicom metadata
):
    "Returns the MRI's plane from the dicom data."

    x1,y1,_,x2,y2,_ = [round(j) for j in ast.literal_eval(data.ImageOrientationPatient)]
    cords = [x1,y1,x2,y2]

    if cords == [1,0,0,0]:
        return 'coronal'
    if cords == [1,0,0,1]:
        return 'axial'
    if cords == [0,1,0,0]:
        return 'sagittal'

In [None]:
#hide
from nbdev.export import notebook2script; notebook2script()

Converted 00_data.ipynb.
Converted 01_preprocess.ipynb.
Converted 02_utils.ipynb.
Converted 03_wandb_utils.ipynb.
Converted 04_wandb_viz.ipynb.
Converted index.ipynb.
