In [1]:
import os
import sys
import yaml
import glob
import re
import pydicom
from copy import deepcopy
from shutil import copy
from typing import (
    Dict,
    List, 
    Optional,
    Tuple,
    Union
)

In [2]:
os.getcwd()

'/Users/adebayobraimah/Desktop/projects/convert_source/work_dir'

In [3]:
__file__ = os.path.join(os.getcwd(),"read_header.ipynb")

In [4]:
mod_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"..")

In [5]:
sys.path.append(mod_path)

In [6]:
from convert_source.cs_utils.bids_info import (
    construct_bids_dict,
    construct_bids_name,
    search_bids
)

In [7]:
from convert_source.cs_utils.utils import (
    BIDSimg,
    collect_info,
    convert_image_data,
    dict_multi_update,
    get_metadata,
    list_in_substr,
    SubInfoError,
    SubDataInfo,
    read_json,
    write_json,
    # new functions
    comp_dict,
    depth,
    list_dict,
    get_bvals,
    gzip_file,
    gunzip_file
)

In [8]:
from convert_source.cs_utils.fileio import(
    LogFile,
    TmpDir,
    ConversionError,
    NiiFile
)

In [9]:
from convert_source.cs_utils.const import(
    DEFAULT_CONFIG,
    BIDS_PARAM
)

In [10]:
from convert_source.imgio.dcmio import (
    is_valid_dcm
)

In [11]:
from convert_source.imgio.niio import (
    get_data_params,
    get_num_frames
)

In [12]:
from convert_source.batch_convert import (
    read_config
)

In [13]:
test_config = os.path.abspath(
    os.path.join(
        __file__,
        '..',
        '..',
        'notebooks.old',
        'config.test',
        'config.default.yml'
        )
); test_config

'/Users/adebayobraimah/Desktop/projects/convert_source/notebooks.old/config.test/config.default.yml'

In [14]:
os.listdir(
    os.path.abspath(
    os.path.join(
        os.path.dirname(__file__),'..','tests','tests.data',
        )
))

['2_DTI_B0_TE93_FAT_SHIFT_A_MB4_SENSE_1.3_8_1.PAR',
 '2_AXIAL_3_3.PAR',
 '2_AX_SWIP_MPR_14_2.PAR',
 '2_6_DIR_B0_A_TE88_SENSE_NO_MB_NO_6_1.PAR']

In [15]:
test_file = os.path.abspath(
    os.path.join(
        os.path.dirname(__file__),'..','tests','tests.data','2_AXIAL_3_3.PAR'
        )
); test_file

'/Users/adebayobraimah/Desktop/projects/convert_source/tests/tests.data/2_AXIAL_3_3.PAR'

In [16]:
test_file2 = os.path.abspath(
    os.path.join(
        os.path.dirname(__file__),'..','tests','tests.data','2_6_DIR_B0_A_TE88_SENSE_NO_MB_NO_6_1.PAR'
        )
); test_file2

'/Users/adebayobraimah/Desktop/projects/convert_source/tests/tests.data/2_6_DIR_B0_A_TE88_SENSE_NO_MB_NO_6_1.PAR'

In [17]:
[search_dict,bids_search,bids_map,meta_dict,exclusion_list] = read_config(config_file=test_config)

In [18]:
search_dict

{'anat': {'T1w': ['T1', 'T1w', 'TFE'], 'T2w': ['T2', 'T2w', 'TSE']},
 'func': {'bold': {'rest': ['rsfMR',
    'rest',
    'FFE',
    'FEEPI',
    'rs-func',
    'func']}}}

In [24]:
def get_par_scan_tech(par_file: str,
                      search_dict: Dict
                      ) -> Tuple[str,str,str]:
    '''Searches PAR file header for scan technique/MR modality used in accordance with the search terms provided by the
    nested heursitic search dictionary. A regular expression (regEx) search string is defined and is searched in the 
    PAR header file.
    
    Usage example:
        >>> [modality_type, modality_label, task] = get_par_scan_tech(par_file,
        ...                                                           search_dict)
        ...

    Arguments:
        par_file: PAR filename.
        search_dict: Nested heursitic search dictionary (from the `read_config` function).
    
    Returns: 
        Tuple of strings that consist of:
            * modality_type: Modality type (e.g. 'anat', 'func', etc.)
            * modality_label: Modality label (e.g. 'T1w','bold', etc.)
            * task: Task name.
    '''
    par_file: str = os.path.abspath(par_file)

    search_arr: List[str] = list_dict(d=search_dict)

    mod_found: bool = False

    # Define regEx search string
    regexp: re = re.compile(r'.    Technique                          :  .*', re.M | re.I)
    
    # Open and search PAR header file
    with open(par_file) as f:
        for line in f:
            match_ = regexp.match(line)
            if match_:
                par_scan_tech_str: str = match_.group()

    if par_scan_tech_str:
        pass
    else:
        return "","",""
    
    # Set returns to empty strings
    modality_type: str = ""
    modality_label: str = ""
    task: str = ""

    # Use matching string in search dictionary
    for i in search_arr:
        if mod_found:
            break
        for k,v in i.items():
            if depth(i) == 3:
                for k2,v2 in v.items():
                    mod_type: str = k
                    mod_label: str = k2
                    mod_task: str = ""
                    mod_search: List[str] = v2
                    if list_in_substr(in_list=mod_search,in_str=par_scan_tech_str):
                        mod_found: bool = True
                        modality_type: str = mod_type
                        modality_label: str = mod_label
                        task: str = mod_task
            elif depth(i) == 4:
                for k2,v2 in v.items():
                    for k3,v3 in v2.items():
                        mod_type: str = k
                        mod_label: str = k2
                        mod_task: str = k3
                        mod_search: List[str] = v3
                        if list_in_substr(in_list=mod_search,in_str=par_scan_tech_str):
                            mod_found: bool = True
                            modality_type: str = mod_type
                            modality_label: str = mod_label
                            task: str = mod_task

    return (modality_type, 
            modality_label, 
            task)

In [21]:
get_par_scan_tech(test_file,search_dict)

('anat', 'T1w', '')

In [22]:
test_file2

'/Users/adebayobraimah/Desktop/projects/convert_source/tests/tests.data/2_6_DIR_B0_A_TE88_SENSE_NO_MB_NO_6_1.PAR'

In [23]:
get_par_scan_tech(test_file2,search_dict)

('', '', '')

In [97]:
get_par_scan_tech('test.PAR',search_dict)

FileNotFoundError: [Errno 2] No such file or directory: '/Users/adebayobraimah/Desktop/projects/convert_source/work_dir/test.PAR'

In [46]:
search_dict

{'anat': {'T1w': ['T1', 'T1w', 'TFE'], 'T2w': ['T2', 'T2w', 'TSE']},
 'func': {'bold': {'rest': ['rsfMR',
    'rest',
    'FFE',
    'FEEPI',
    'rs-func',
    'func']}}}

In [51]:
(None,None,None)

(None, None, None)

In [25]:
def get_dcm_scan_tech(dcm_file: str,
                      search_dict: Dict
                      ) -> Tuple[str,str,str]:
    '''Searches DICOM file header for scan technique/MR modality used in accordance with the search terms provided by the
    nested heursitic search dictionary. The DICOM header field searched is a Philips DICOM private tag (2001,1020) [Scanning 
    Technique Description MR]. In the case that matches are found in that field, is empty, or does not exist - then common 
    DICOM tags are searched which include: 
        * Series Description
        * Protocol Name
        * Image Type.
    
    Usage example:
        >>> [modality_type, modality_label, task] = get_dcm_scan_tech(dcm_file,
        ...                                                           search_dict)
        ...
    
    Arguments:
        dcm_file: DICOM filename.
        search_dict: Nested heursitic search dictionary (from the `read_config` function).
    
    Returns: 
        Tuple of strings that consist of:
            * modality_type: Modality type (e.g. 'anat', 'func', etc.)
            * modality_label: Modality label (e.g. 'T1w','bold', etc.)
            * task: Task name.
    '''
    dcm_file: str = os.path.abspath(dcm_file)

    search_arr: List[str] = list_dict(d=search_dict)

    mod_found: bool = False

    # Set returns to empty strings
    modality_type: str = ""
    modality_label: str = ""
    task: str = ""

    ds = pydicom.dcmread(dcm_file)

    # Search DICOM header for Scan Technique
    dcm_scan_tech_str = str(ds[0x2001,0x1020])

    # Use dictionary to search in string
    for i in search_arr:
        if mod_found:
            break
        for k,v in i.items():
            if depth(i) == 3:
                for k2,v2 in v.items():
                    mod_type: str = k
                    mod_label: str = k2
                    mod_task: str = ""
                    mod_search: List[str] = v2
                    if list_in_substr(in_list=mod_search,in_str=dcm_scan_tech_str):
                        mod_found: bool = True
                        modality_type: str = mod_type
                        modality_label: str = mod_label
                        task: str = mod_task
            elif depth(i) == 4:
                for k2,v2 in v.items():
                    for k3,v3 in v2.items():
                        mod_type: str = k
                        mod_label: str = k2
                        mod_task: str = k3
                        mod_search: List[str] = v3
                        if list_in_substr(in_list=mod_search,in_str=dcm_scan_tech_str):
                            mod_found: bool = True
                            modality_type: str = mod_type
                            modality_label: str = mod_label
                            task: str = mod_task

    if mod_found:
        return (modality_type, 
                modality_label, 
                task)

    # Secondary searches in the case that the Private Tag/Field (2001, 1020) [Scanning Technique Description MR] search is unsucessful

    # Define list of DICOM header fields to search
    dcm_fields = ['SeriesDescription', 'ImageType', 'ProtocolName']

    for dcm_field in dcm_fields:
            dcm_scan_tech_str = str(eval(f"ds.{dcm_field}")) # This makes me dangerously uncomfortable

            # Use dictionary to search in string
            for i in search_arr:
                if mod_found:
                    break
                for k,v in i.items():
                    if depth(i) == 3:
                        for k2,v2 in v.items():
                            mod_type: str = k
                            mod_label: str = k2
                            mod_task: str = ""
                            mod_search: List[str] = v2
                            if list_in_substr(in_list=mod_search,in_str=dcm_scan_tech_str):
                                mod_found: bool = True
                                modality_type: str = mod_type
                                modality_label: str = mod_label
                                task: str = mod_task
                    elif depth(i) == 4:
                        for k2,v2 in v.items():
                            for k3,v3 in v2.items():
                                mod_type: str = k
                                mod_label: str = k2
                                mod_task: str = k3
                                mod_search: List[str] = v3
                                if list_in_substr(in_list=mod_search,in_str=dcm_scan_tech_str):
                                    mod_found: bool = True
                                    modality_type: str = mod_type
                                    modality_label: str = mod_label
                                    task: str = mod_task

    return (modality_type, 
            modality_label, 
            task)

In [None]:
def header_search(img_file: str, 
                  search_dict: Dict
                  ) -> Tuple[str,str,str]:
    '''Searches a DICOM or PAR file header for relevant scan technique/parameter information provided a nested heursitic search dictionary
    of search terms to map scan acquisitions of interest. Any other image file passed as an argument will return a tuple of empty strings.

    Usage example:
    Arguments:
        img_file: Input image data file path.
        search_dict: Nested heursitic dictionary of (BIDS) related search terms.

    Returns: 
        Tuple of strings that consist of:
            * modality_type: Modality type (e.g. 'anat', 'func', etc.)
            * modality_label: Modality label (e.g. 'T1w','bold', etc.)
            * task: Task name.
    '''
    img_file: str = os.path.abspath(img_file)

    if '.dcm' in img_file.lower():
        [ modality_type, modality_label, task ] = get_dcm_scan_tech(dcm_file=img_file,
                                                                    search_dict=search_dict)  
    elif '.par' in img_file.lower():
        [ modality_type, modality_label, task ] = get_par_scan_tech(par_file=img_file, 
                                                                    search_dict=search_dict)
    elif '.nii' in img_file.lower():
        return "","",""
    else:
        return "","",""
    
    return (modality_type, 
            modality_label, 
            task)