# Diagnosis and Fix PET Import Problems

### Imports

In [1]:
from typing import List, Callable
from pathlib import Path

import pydicom


### Utility Functions

In [2]:
repair_log = []
def status_output(message):
    '''Record and print repair messages.
    Args:
        message (str): Repair message.
    '''
    repair_log.append(message)
    #print(message)


#### Read the DICOM file.

In [3]:
def load_header(file_path: Path, status_output: Callable)->pydicom.Dataset:
    '''Load DICOM meta data from a given file.

    Args:
        file_path (Path): Path to the DICOM file.

    Returns:
        pydicom.Dataset: The full dataset read from the DICOM file. If the file
            does not contain valid DICOM data, returns None.
    '''
    try:
        dataset = pydicom.dcmread(file_path)
    except pydicom.errors.InvalidDicomError:
        message = f'{file_path.name} did not contain valid DICOM data.  Skipped.'
        status_output(message)
        dataset = None

    return dataset


#### Scan a directory of DICOM files, yielding the DICOM datasets.

In [4]:
def get_dicom_images(data_path: Path,
                     status_output=print,
                     include_subdirectories=True)->pydicom.Dataset:
    '''Yield DICOM meta data for each file in a given directory.

    Args:
        file_path (Path): Path to the directory containing DICOM files.
        status_output (Callable): A function taking one string parameter.  Used
            for reporting the results of the trying to load the DICOM file.
        include_subdirectories (bool) If True subdirectories of the supplied
            directory will also be scanned.

    Yields:
        pydicom.Dataset: The full dataset read from the DICOM file. If the file
            does not contain valid DICOM data, returns None.
    '''
    if include_subdirectories:
        scan_pattern = '**/*'
    else:
        scan_pattern = '*.*'
    for dicom_file in data_path.glob(scan_pattern):
        if dicom_file.is_file():
            message = f'Checking file {dicom_file.name}'
            status_output(message)
            dataset = load_header(dicom_file, status_output)
            if dataset:
                yield dataset


#### Generate a DICOM filename.

In [5]:
def build_dicom_file_name(dataset: pydicom.Dataset)->str:
    '''Generate a filename for a given DICOM dataset.

    The filename consists of the DICOM modality, the Instance UID and the
    '.dcm' extension.

    Args:
        dataset (pydicom.Dataset): The DICOM dataset requiring a corresponding
            file name.

    Returns:
        str: A filename based on the information in the DICOM dataset.
    '''
    modality = dataset.data_element('Modality').value
    instance_uid = dataset.data_element('SOPInstanceUID').value
    file_name = ''.join([
        modality,
        instance_uid,
        '.dcm'
        ])
    return file_name


#### Save a DICOM file

In [6]:
def save_dicom_dataset(dataset: pydicom.Dataset, output_path: Path):
    '''Save a DICOM file with a name based on the information in the dataset.

    Args:
        dataset (pydicom.Dataset): The DICOM dataset to be saved.
        output_path (Path): Path to the directory where the DICOM dataset
            will be saved.
    '''
    output_file_name = build_dicom_file_name(dataset)
    output_file_path = output_path / output_file_name
    dataset.save_as(output_file_path)


#### Apply Repair Functions

In [7]:
def perform_repairs(input_path: Path, output_path: Path,
                    repair_methods: List[Callable],
                    status_output: Callable):
    '''Identify and apply necessary DICOM metadata repairs.

    Iterate through each DICOM file in the `input_path` folder.  Apply any
    necessary repairs and save the results DICOM dataset in the directory
    specified by `output_path`.  Invalid DICOM files will not be saved and
    directory structure is not maintained.

    Args:
        input_path (Path): Path to the directory containing DICOM files.
        output_path (Path): Path to the directory where the repaired DICOM
            files will be stored.
        repair_methods (List[Callable]): DICOM dataset repair functions.  The
            function must take the following parameters:
            - dataset (pydicom.Dataset): The DICOM dataset to be repaired.
            - status_output (Callable): A function taking one string parameter,
                used for reporting the results of the repair.
        status_output (Callable): A function taking one string parameter.
            Passed to each of the repair_methods. Used for reporting the
            results of the repair.
    '''
    for dataset in get_dicom_images(input_path, status_output):
        for repair_method in repair_methods:
            dataset = repair_method(dataset, status_output)
            if dataset:
                save_dicom_dataset(dataset, output_path)


## Repair Functions

### Repair Invalid Character

*Problems*
- The "Body Part Examined" tag (0018, 0015) or the "Other Patient IDs" tag (0010, 1000) contain invalid characters.

*Repair Process*
- Invalid characters are identified by the tag value beginning with a "/"
- Tag values containing invalid characters are replaced with blank text ""

In [8]:
def fix_invalid_characters(dataset: pydicom.Dataset,
                           status_output: Callable)->pydicom.Dataset:
    '''Identify and remove certain elements containing invalid characters.

    Two DICOM elements are checked for invalid characters:
    - "Body Part Examined" (0018, 0015)
    - "Other Patient IDs" (0010, 1000)

    Invalid characters are identified by the element's value beginning with
    a "/", which is the beginning of a escape sequence for non-printable text.
    Elements values containing invalid characters are replaced with blank
    text "".

    Args:
        dataset (pydicom.Dataset): The full DICOM dataset.
        status_output (Callable): Function for reporting the results of the
            repair attempt.

    Returns:
        pydicom.Dataset: The DICOM dataset with any repairs made.
    '''
    data_elements = ['BodyPartExamined', 'OtherPatientIDs']
    message_format = ''.join([
        'Invalid Character found in element {name}.',
        '\tReplaced with blank string.'
        ])
    for element_name in data_elements:
        if element_name in dataset:
            data_element = dataset.data_element(element_name)
            if data_element.value.startswith(r'/'):
                message = message_format.format(name=element_name)
                status_output(message)
                data_element.value = ''
    return dataset


### Repair Incorrect Modality

*Problems*
1. The DICOM Modality tag (0008, 0060) of the PET image series is labeled “CT”.
or
2. The DICOM Modality tag (0008, 0060) of the CT image series is labeled "NM" or "OT".

*Repair Process*
- CT images are identified by the presence of the KVP tag (0018, 0060)
- PET images are identified by the presence of the Radiopharmaceutical Information Sequence (0054, 0016)
- If the image type does not match with the modality value it is corrected.


In [9]:
def fix_incorrect_modality(dataset: pydicom.Dataset,
                           status_output: Callable)->pydicom.Dataset:
    '''Check that the image modality element matches the actual image type.

    The DICOM "Modality" element (0008, 0060) must be “CT” for CT images and
    "PT" for PET images.  The image type is identified using surrogate elements
    that contain image parameters unique to that particular image type. CT
    images are identified by the presence of the "KVP" (0018, 0060) element.
    PET images are identified by the presence of the
    "Radiopharmaceutical Information Sequence" (0054, 0016) element.

    If the image type does not match with the modality value it is corrected.

    Args:
        dataset (pydicom.Dataset): The full DICOM dataset.
        status_output (Callable): Function for reporting the results of the
            repair attempt.

    Returns:
        pydicom.Dataset: The DICOM dataset with any repairs made, or None if
            the DICOM dataset does not contain the "Modality" element.
    '''
    message_format = ''.join([
        'Incorrect Modality found.',
        '\tModality changed from "{old_modality}" to "{new_modality}"'
        ])

    # if 'Modality' element is not present the DICOM data set is invalid.
    if 'Modality' not in dataset:
        message = 'Modality element not found. File not used.'
        status_output(message)
        dataset = None
    else:
        # CHeck modality set against image type
        modality = dataset.data_element('Modality').value
        # Identify CT images
        if 'KVP' in dataset:
            if 'CT' not in modality:
                message = message_format.format(old_modality=modality,
                                                new_modality='CT')
                status_output(message)
                dataset.data_element('Modality').value = 'CT'

        # Identify PET images
        if 'RadiopharmaceuticalInformationSequence' in dataset:
            if 'PT' not in modality:
                message = message_format.format(old_modality=modality,
                                                new_modality='PT')
                status_output(message)
                dataset.data_element('Modality').value = 'PT'

    return dataset


### Repair Mismatched Addresses

*Problems*
- The "Institution Address" tag (0008, 0081) of the PET image series is different from the CT image series.
- Other related tags that should match, but don't cause the same import error are:
  - Manufacturer (0008,0070)
  - Manufacturer Model Name (0008,1090)
  - Device Serial Number (0018,1000)
  - Software Versions (0018,1020)
  - Institution Name (0008,0080)
  - Institutional Department Name (0008,1040)
  - Station Name(0008,1010)
- The problem appears to be limited to PET CTs coming from Mississauga

*Repair Process*
- Ideally, the "Institution Address" for all images should be read and verified as identical.
- In practice, since the problem appears to be limited to PET CTs coming from Mississauga and the "Institution Address" is not a critical piece of information, any "Institution Address" containing "Mississauga" will be cleared.


In [10]:
def fix_incorrect_address(dataset: pydicom.Dataset,
                           status_output: Callable)->pydicom.Dataset:
    '''Repair suspected mismatched institution addresses.

    Eclipse requires that the DICOM "InstitutionAddress" element (0008, 0081)
    be the same for all images (CT and PET) in the same study.  From our
    experience, this problem only occurs with PET-CTs coming from Mississauga.
    For the sake of speed, we do not compare the PET and CT addresses.  Since
    "Institution Address" is not a critical piece of information, any time the
    "Institution Address" contains "Mississauga" it is set to a blank string.

    Args:
        dataset (pydicom.Dataset): The full DICOM dataset.
        status_output (Callable): Function for reporting the results of the
            repair attempt.

    Returns:
        pydicom.Dataset: The DICOM dataset with any repairs made.
    '''
    message_format = ''.join([
        'Mismatched Institution Addresses Suspected.',
        '\tAddress: "{address}"',
        ' Replaced with blank string.'
        ])
    if 'InstitutionAddress' in dataset:
        address = dataset.data_element('InstitutionAddress').value
        if 'Mississauga' in address:
            message = message_format.format(address=address)
            status_output(message)
            dataset.data_element('InstitutionAddress').value = ''
    return dataset


## Repair all Images in a directory

### Data Input and Output paths

In [11]:
data_path = Path.cwd() / 'DICOM Test Data'
output_path = Path.cwd() / 'Output'


In [12]:
repair_methods = [
    fix_invalid_characters,
    fix_incorrect_modality,
    fix_incorrect_address
    ]
perform_repairs(data_path, output_path, repair_methods, status_output)


In [None]:
from collections import Counter
from pprint import pprint
def drop_name(text: str)->str:
    if text.startswith('Checking file'):
        return 'Checking file'
    else:
        new_text = text.replace('\r', ' ')
        new_text = new_text.replace('\n', ' ')
        return new_text
stats_log = [drop_name(line) for line in repair_log]

stats = Counter(stats_log)
for name, count in stats.items():
    print(name, count)
pprint(stats)
