Environment Setup

In [1]:
!pip install ipywidgets
!pip install matplotlib ipywidgets
!pip install boto3

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [2]:
import os
import boto3
import gc
import pydicom
from pydicom.datadict import dictionary_VR
from pydicom.sequence import Sequence
from pydicom.dataset import Dataset
from pydicom.multival import MultiValue
import pandas as pd
import csv
from tqdm import tqdm
import glob
import botocore
from collections import defaultdict
import logging
from datetime import datetime

Application Beginning

In [3]:
# Function to create log file
def create_log_file(log_file_name):
    logging.basicConfig(filename=log_file_name, level=logging.INFO,
                        format='%(asctime)s - %(levelname)s - %(message)s')

# Function to prompt user for information
def prompt_user():
    name = input("Enter your name: ")
    project_name = input("Enter project name: ")
    log_date = input("Enter log file date (YYYY-MM-DD): ")
    output_path = input("Enter the output directory path: ")
    source_path = input("Enter the source directory path (local or S3 bucket): ")
    return name, project_name, log_date, output_path, source_path



In [4]:
# Function to append to project report CSV file
def append_to_project_report(module_name, summary, output_path, user_name):
    report_file = os.path.join(output_path, "project_report.csv")
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open(report_file, "a", newline="") as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow([now, user_name, module_name, summary])

Functions

In [5]:
# Define functions from the second notebook

# AWS S3 Setup
s3 = boto3.client('s3')
# Function 1: Verify DICOM files
def verify_dicom_files(directory, output_path, project_report_file=None):
    """
    Verify DICOM files in a directory (local or S3 bucket).
    
    Args:
    - directory (str): The root directory to search for DICOM files.
    - output_path (str): The path to save the output CSV file and log file.
    - project_report_file (str): The path to the project report file.
    
    Returns:
    - None
    """
    # Setup logging
    log_file = os.path.join(output_path, "dicom_processing.log")
    logging.basicConfig(filename=log_file, level=logging.INFO,
                        format='%(asctime)s - %(levelname)s - %(message)s')
    
    logging.info(f"Verifying DICOM files in directory: {directory}")
    
    # Create an empty list to store verification results
    verification_results = []
    
    # Iterate over all DICOM files in the directory
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".dcm"):
                file_path = os.path.join(root, file)
                try:
                    # Verify DICOM file
                    dicom_data = pydicom.dcmread(file_path)
                    
                    # Add verification result to the list
                    verification_results.append({
                        "File": file_path,
                        "Verification": "Passed"
                    })
                except Exception as e:
                    logging.error(f"Error verifying DICOM file {file_path}: {str(e)}")
                    verification_results.append({
                        "File": file_path,
                        "Verification": "Failed"
                    })
                    continue
    
    # Convert the list of verification results to a DataFrame
    verification_df = pd.DataFrame(verification_results)
    
    # Save verification results to a CSV file
    verification_csv_file = os.path.join(output_path, "dicom_verification_results.csv")
    verification_df.to_csv(verification_csv_file, index=False)
    
    logging.info(f"Verification results saved to {verification_csv_file}")
    print(f"Verification results saved to {verification_csv_file}")

    if project_report_file:
        try:
            with open(project_report_file, 'a', newline='') as csvfile:
                fieldnames = ['Module', 'Summary', 'Timestamp']
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writerow({
                    'Module': 'Verify DICOM Files',
                    'Summary': f"Executed Verify DICOM Files {directory}",
                    'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                })
        except PermissionError as e:
            print(f"PermissionError when writing to project report file: {e}. Please ensure it is not open in another program and that you have the necessary permissions.")


# Function 2: Check for duplicate SOP Instance UIDs
def check_duplicate_sop_uids(directory, output_path, project_report_file=None):
    """
    Check for duplicate SOP Instance UIDs in DICOM files within a directory (local or S3 bucket).
    
    Args:
    - directory (str): The root directory to search for DICOM files.
    - output_path (str): The path to save the output CSV file and log file.
    - project_report_file (str): The path to the project report file.
    
    Returns:
    - None
    """
    # Setup logging
    log_file = os.path.join(output_path, "dicom_processing.log")
    logging.basicConfig(filename=log_file, level=logging.INFO,
                        format='%(asctime)s - %(levelname)s - %(message)s')
    
    logging.info(f"Checking for duplicate SOP Instance UIDs in directory: {directory}")
    
    # Create a defaultdict to store lists of files with duplicate SOP UID for each UID
    duplicate_uids = defaultdict(list)
    
    # Create a dictionary to store duplicate SOPs and corresponding paths
    duplicates_dict = {}
    
    # Iterate over all DICOM files in the directory
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".dcm"):
                file_path = os.path.join(root, file)
                try:
                    # Read DICOM file and extract SOP Instance UID
                    dicom_data = pydicom.dcmread(file_path)
                    sop_instance_uid = dicom_data.SOPInstanceUID
                    
                    # Check if SOP Instance UID already exists in the defaultdict
                    if sop_instance_uid in duplicate_uids:
                        # If exists, add file path to the list
                        duplicate_uids[sop_instance_uid].append(file_path)
                        # Add to the duplicates dictionary
                        duplicates_dict.setdefault(sop_instance_uid, []).append(file_path)
                    else:
                        # If not exists, create a new list with the file path
                        duplicate_uids[sop_instance_uid] = [file_path]
                except Exception as e:
                    logging.error(f"Error processing DICOM file {file_path}: {str(e)}")
                    continue
    
    # Filter the dictionary to include only duplicates
    duplicates_dict = {key: value for key, value in duplicates_dict.items() if len(value) > 1}
    
    # Save duplicate SOP Instance UIDs and corresponding paths to a CSV file
    duplicate_uids_csv_file = os.path.join(output_path, "duplicate_sop_instance_uids.csv")
    with open(duplicate_uids_csv_file, "w", newline="") as csvfile:
        fieldnames = ["SOPInstanceUID", "FilePaths"]
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for uid, file_paths in duplicates_dict.items():
            writer.writerow({"SOPInstanceUID": uid, "FilePaths": ", ".join(file_paths)})
    
    logging.info(f"Duplicate SOP Instance UIDs and corresponding paths saved to {duplicate_uids_csv_file}")
    print(f"Duplicate SOP Instance UIDs and corresponding paths saved to {duplicate_uids_csv_file}")

    if project_report_file:
        try:
            with open(project_report_file, 'a', newline='') as csvfile:
                fieldnames = ['Module', 'Summary', 'Timestamp']
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writerow({
                    'Module': 'Check Duplicate SOP Instance UIDs',
                    'Summary': f"Executed Check Duplicate SOP Instance UIDs {directory}",
                    'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                })
        except PermissionError as e:
            print(f"PermissionError when writing to project report file: {e}. Please ensure it is not open in another program and that you have the necessary permissions.")

    
    
    


# Function 3: Check DICOM consistency
def check_dicom_consistency(directory, output_path, project_report_file=None):
    """
    Perform basic and extended consistency checks for a directory containing folders of DICOM files.

    Args:
    - directory: Path to the directory containing folders of DICOM files.
    - output_path: Path to save the output CSV file.
    - project_report_file (str): The path to the project report file.

    Returns:
    - csv_file_path: Path to the output CSV file.
    """

    # Dictionary to store errors
    errors = defaultdict(list)

    # Iterate over each folder (DICOM series) in the directory
    for series_folder in os.listdir(directory):
        series_path = os.path.join(directory, series_folder)

        if not os.path.isdir(series_path):
            continue

        # Collect DICOM files within the series folder
        dicom_files = [f for f in os.listdir(series_path) if f.endswith('.dcm')]

        # Check if there are DICOM files in the series folder
        if not dicom_files:
            errors[series_folder].append((None, "No DICOM files found in this series folder."))
            continue

        # Initialize variables to store attributes for consistency checks
        attributes = defaultdict(list)
        image_positions = []

        # Iterate over DICOM files in the series folder
        for dicom_file in tqdm(dicom_files, desc=f'Processing {series_folder}'):
            file_path = os.path.join(series_path, dicom_file)

            try:
                # Read DICOM file
                dicom_data = pydicom.dcmread(file_path)

                # Check consistency of essential attributes
                essential_attributes = ['PatientID', 'StudyInstanceUID', 'SeriesInstanceUID', 'SOPInstanceUID']
                for attribute in essential_attributes:
                    if attribute not in dicom_data:
                        errors[series_folder].append((file_path, f"Missing {attribute} in DICOM file: {dicom_file}"))
                    else:
                        attributes[attribute].append(dicom_data[attribute].value)

                # Additional consistency checks
                if 'StudyDate' in dicom_data:
                    attributes['StudyDate'].append(dicom_data.StudyDate)
                if 'StudyTime' in dicom_data:
                    attributes['StudyTime'].append(dicom_data.StudyTime)
                if 'SeriesDate' in dicom_data:
                    attributes['SeriesDate'].append(dicom_data.SeriesDate)
                if 'SeriesTime' in dicom_data:
                    attributes['SeriesTime'].append(dicom_data.SeriesTime)
                if 'Modality' in dicom_data:
                    attributes['Modality'].append(dicom_data.Modality)
                if 'ImagePositionPatient' in dicom_data:
                    image_positions.append(dicom_data.ImagePositionPatient)
                if 'ImageOrientationPatient' in dicom_data:
                    attributes['ImageOrientationPatient'].append(dicom_data.ImageOrientationPatient)
                if 'PixelSpacing' in dicom_data:
                    attributes['PixelSpacing'].append(dicom_data.PixelSpacing)
                if 'SOPClassUID' in dicom_data:
                    attributes['SOPClassUID'].append(dicom_data.SOPClassUID)

                # Check image consistency
                if len(image_positions) > 1 and len(set(image_positions)) != 1:
                    errors[series_folder].append((file_path, "Inconsistent ImagePositionPatient values across DICOM files."))

            except Exception as e:
                errors[series_folder].append((file_path, f"Error processing DICOM file {dicom_file}: {str(e)}"))

    # Write errors to CSV file
    csv_file_path = os.path.join(output_path, 'dicom_consistency_errors.csv')
    with open(csv_file_path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['SeriesFolder', 'FilePath', 'Error'])
        for series_folder, error_list in errors.items():
            for error in error_list:
                writer.writerow([series_folder, error[0], error[1]])

    print(f"Output CSV file saved to: {csv_file_path}")

    if project_report_file:
        try:
            with open(project_report_file, 'a', newline='') as csvfile:
                fieldnames = ['Module', 'Summary', 'Timestamp']
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writerow({
                    'Module': 'Check DICOM Consistency',
                    'Summary': f"Executed Check DICOM Consistency: Checking DICOM consistency in {directory}",
                    'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                })
        except PermissionError as e:
            print(f"PermissionError when writing to project report file: {e}. Please ensure it is not open in another program and that you have the necessary permissions.")


    return csv_file_path

# Function 4: Verify DICOM IOD data consistency
# Modality-specific rules
modality_rules = {
    'CT': {
        'required': {
            'PatientID': 'UI', 'StudyInstanceUID': 'UI', 'SeriesInstanceUID': 'UI',
            'SOPInstanceUID': 'UI', 'Modality': ('CS', ['CT']), 'Manufacturer': 'LO'
        },
        'conditional': {
            'SliceThickness': 'required'
        }
    },
    'MRI': {
        'required': {
            'PatientID': 'UI', 'StudyInstanceUID': 'UI', 'SeriesInstanceUID': 'UI',
            'SOPInstanceUID': 'UI', 'Modality': ('CS', ['MR']), 'Manufacturer': 'LO'
        },
        'conditional': {}
    },
    'US': {
        'required': {
            'PatientID': 'UI', 'StudyInstanceUID': 'UI', 'SeriesInstanceUID': 'UI',
            'SOPInstanceUID': 'UI', 'Modality': ('CS', ['US']), 'Manufacturer': 'LO'
        },
        'conditional': {}
    },
    'PET': {
        'required': {
            'PatientID': 'UI', 'StudyInstanceUID': 'UI', 'SeriesInstanceUID': 'UI',
            'SOPInstanceUID': 'UI', 'Modality': ('CS', ['PT']), 'Manufacturer': 'LO'
        }
    },
    'PT': {
        'required': {
            'PatientID': 'UI', 'StudyInstanceUID': 'UI', 'SeriesInstanceUID': 'UI',
            'SOPInstanceUID': 'UI', 'Modality': ('CS', ['PT']), 'Manufacturer': 'LO',  
        },
        'conditional': {
            'FrameReferenceTime': 'required'
        }
    },
    'DX': {
        'required': {
            'PatientID': 'UI', 'StudyInstanceUID': 'UI', 'SeriesInstanceUID': 'UI',
            'SOPInstanceUID': 'UI', 'Modality': ('CS', ['DX']), 'Manufacturer': 'LO'
        },
        'conditional': {
            'Exposure': 'required'
        }
    },
    'MR': {
        'required': {
            'PatientID': 'UI', 'StudyInstanceUID': 'UI', 'SeriesInstanceUID': 'UI',
            'SOPInstanceUID': 'UI', 'Modality': ('CS', ['MR']), 'Manufacturer': 'LO',
            'MagneticFieldStrength': 'DS',  
            'EchoTime': 'DS',  
            'RepetitionTime': 'DS' 
        },
        'conditional': {
            'FlipAngle': 'required'
        }
    },
    'RF': {
        'required': {
            'PatientID': 'UI', 'StudyInstanceUID': 'UI', 'SeriesInstanceUID': 'UI',
            'SOPInstanceUID': 'UI', 'Modality': ('CS', ['RF']), 'Manufacturer': 'LO',
        },
        'conditional': {
            'FrameTime': 'required'
        }
    },
    'XA': {
        'required': {
            'PatientID': 'UI', 'StudyInstanceUID': 'UI', 'SeriesInstanceUID': 'UI',
            'SOPInstanceUID': 'UI', 'Modality': ('CS', ['XA']), 'Manufacturer': 'LO',
        },
        'conditional': {
            'ExposureTime': 'required'
        }
    },
    'CR': {
        'required': {
            'PatientID': 'UI', 'StudyInstanceUID': 'UI', 'SeriesInstanceUID': 'UI',
            'SOPInstanceUID': 'UI', 'Modality': ('CS', ['CR']), 'Manufacturer': 'LO',
        },
        'conditional': {
            'Exposure': 'required'
        }
    },
    'NM': {
        'required': {
            'PatientID': 'UI', 'StudyInstanceUID': 'UI', 'SeriesInstanceUID': 'UI',
            'SOPInstanceUID': 'UI', 'Modality': ('CS', ['NM']), 'Manufacturer': 'LO'
        },
        'conditional': {
            'FrameReferenceTime': 'required'
        }
    }
}

def validate_dicom_modality(ds, modality):
    errors = []

    # Check if the modality is recognized and in the dictionary
    if modality not in modality_rules:
        errors.append(f"Unsupported or undefined modality: {modality}")
        return errors  # Return early as further validation doesn't make sense

    # Get the rules associated with the modality
    rules = modality_rules[modality]

    for attr, attr_type in rules['required'].items():
        if not hasattr(ds, attr):
            if attr_type == "UI":
                continue  # Silently skip attributes expected to be "UI"
            errors.append(f"Missing attribute: {attr}")
        else:
            actual_value = getattr(ds, attr)

            # Directly handle simple types
            if isinstance(actual_value, str):
                if isinstance(attr_type, tuple):
                    expected_type, expected_values = attr_type
                    if expected_type != "LO" and actual_value not in expected_values:
                        errors.append(f"Attribute {attr} has an incorrect value: {actual_value} not in {expected_values}")
                elif attr_type != "LO":
                    if attr_type == "UI":
                        continue  # Skip logging for this type mismatch
                    errors.append(f"Attribute {attr} has incorrect type: expected {attr_type}, got str")
            elif hasattr(actual_value, 'VR'):  # Complex DICOM element case
                if actual_value.VR != attr_type:
                    if attr_type == "UI":
                        continue  # Silently skip this type mismatch
                    errors.append(f"Attribute {attr} has incorrect type: expected {attr_type}, got {actual_value.VR}")
                elif isinstance(attr_type, tuple):
                    expected_values = attr_type[1]
                    if actual_value.value not in expected_values:
                        errors.append(f"Attribute {attr} has an incorrect value: {actual_value.value} not in {expected_values}")

    return errors

def verify_dicom_iod_data(directory, output_path, location='local', project_report_file=None):
    log_file = os.path.join(output_path, "dicom_processing.log")
    logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

    logging.info(f"Verifying DICOM IOD data consistency in {location} directory: {directory}")

    iod_verification_report = {}

    if location == 's3':
        # Process files from an S3 bucket
        paginator = s3.get_paginator('list_objects_v2')
        for page in paginator.paginate(Bucket=directory):
            for item in tqdm(page.get('Contents', [])):
                file_key = item['Key']
                if file_key.endswith('.dcm'):
                    try:
                        obj = s3.get_object(Bucket=directory, Key=file_key)
                        dicom_data = pydicom.dcmread(BytesIO(obj['Body'].read()))

                        modality = dicom_data.Modality if 'Modality' in dicom_data else None
                        if modality:
                            print(f"Modality found: {modality}")
                        else:
                            print("No Modality attribute found")

                        if "PixelData" not in dicom_data:
                            logging.warning(f"Missing PixelData in DICOM file: {file_key}")
                            iod_verification_report[file_key] = "Missing PixelData"
                        else:
                            errors = validate_dicom_modality(dicom_data, modality)
                            if errors:
                                iod_verification_report[file_key] = ", ".join(errors)
                                logging.warning(f"Verification issues in {file_key}: {', '.join(errors)}")
                    except Exception as e:
                        logging.error(f"Error processing DICOM file {file_key}: {str(e)}")
    else:
        # Process files from a local directory
        for root, dirs, files in tqdm(os.walk(directory)):
            for file in files:
                if file.endswith(".dcm"):
                    file_path = os.path.join(root, file)
                    try:
                        dicom_data = pydicom.dcmread(file_path)

                        modality = dicom_data.Modality if 'Modality' in dicom_data else None
                        if modality is None:
                            print("No Modality attribute found")

                        if "PixelData" not in dicom_data:
                            logging.warning(f"Missing PixelData in DICOM file: {file_path}")
                            iod_verification_report[file_path] = "Missing PixelData"
                        else:
                            errors = validate_dicom_modality(dicom_data, modality)
                            if errors:
                                iod_verification_report[file_path] = ", ".join(errors)
                                logging.warning(f"Verification issues in {file_path}: {', '.join(errors)}")
                    except Exception as e:
                        logging.error(f"Error processing DICOM file {file_path}: {str(e)}")

    # Save verification report to a CSV file
    verification_report_csv = os.path.join(output_path, "iod_verification_report.csv")
    with open(verification_report_csv, "w") as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["File", "Issue"])
        for file_path, issue in iod_verification_report.items():
            writer.writerow([file_path, issue])

    logging.info(f"IOD verification report saved to {verification_report_csv}")
    print(f"IOD verification report saved to {verification_report_csv}")

    if project_report_file:
        try:
            with open(project_report_file, 'a', newline='') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=['Module', 'Summary', 'Timestamp'])
                writer.writerow({
                    'Module': 'Verify DICOM IOD Data Consistency',
                    'Summary': f"Executed Verify DICOM IOD Data Consistency in {directory}",
                    'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                })
        except PermissionError as e:
            print(f"PermissionError when writing to project report file: {e}")

    return iod_verification_report

# Function 5: Remove PHI info from DICOM metadata

def convert_to_tuple(data):
    
    """Convert lists, sequences, datasets, and multivalued types to tuples recursively."""
    if isinstance(data, Sequence) or isinstance(data, list) or isinstance(data, MultiValue):
        return tuple(convert_to_tuple(item) for item in data)
    elif isinstance(data, Dataset):
        # Convert dataset to a dictionary and process recursively
        data_dict = {tag: convert_to_tuple(data.get(tag)) for tag in data.dir()}
        return tuple(data_dict.items())
    elif isinstance(data, dict):
        return {key: convert_to_tuple(value) for key, value in data.items()}
    return data

def convert_data(data):
    """Convert complex data types to a simpler form."""
    if isinstance(data, pydicom.multival.MultiValue) or isinstance(data, list):
        return tuple(convert_data(item) for item in data)
    elif isinstance(data, pydicom.dataset.Dataset):
        data_dict = {tag: convert_data(data[tag].value) for tag in data.dir()}
        return data_dict
    elif isinstance(data, str):
        return data
    elif hasattr(data, "value"):
        return data.value
    return data

def aggregate_dicom_metadata(directory, output_path, batch_size=10, location='local', project_report_file=None):
    """
    Aggregate unique DICOM metadata from the source directory and save it to a transposed CSV file.

    Args:
    - directory (str): The root directory containing DICOM files.
    - output_path (str): The path to save the aggregated metadata CSV file.
    - batch_size (int): Number of DICOM files to process per batch.
    - location (str): The location type ('local' or 's3'). Default is 'local'.
    - project_report_file (str): The path to the project report file.

    Returns:
    - None
    """
    # Setup logging
    log_file = os.path.join(output_path, "dicom_processing.log")
    logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

    logging.info(f"Aggregating DICOM metadata from {location} directory: {directory}")

    # List of metadata parameters to exclude
    exclude_parameters = [
        "AccessionNumber", "AcquisitionDate", "AcquisitionDateTime", "AcquisitionTime",
        "Columns", "ContentTime", "ContentDate", "BluePaletteColorLookupTableData",
        "AcquisitionMatrix", "CineRate"
    ]

    # Open the output CSV file in advance
    aggregated_metadata_file = os.path.join(output_path, "aggregated_dicom_metadata.csv")
    with open(aggregated_metadata_file, "w", newline='') as csvfile:
        writer = csv.writer(csvfile)
        
        # Write headers once at the top
        writer.writerow(["Parameter", "Unique Values"])
        
        # Initialize a defaultdict of sets to store unique values for each parameter
        metadata_dicts = defaultdict(set)

        # Iterate over all DICOM files in the directory
        for root, dirs, files in tqdm(os.walk(directory), desc="Processing folders"):
            for file in files:
                if file.endswith(".dcm"):
                    file_path = os.path.join(root, file)
                    try:
                        dicom_data = pydicom.dcmread(file_path)

                        # Convert DICOM metadata to a dictionary
                        dicom_dict = {tag: dicom_data.get(tag) for tag in dicom_data.dir()}
                        dicom_dict['File'] = file

                        # Process metadata and add to the dictionary
                        for key, value in dicom_dict.items():
                            if key in exclude_parameters:
                                continue

                            # Convert complex data types recursively
                            value = convert_data(value)
                            
                            # Convert lists to string
                            if isinstance(value, list):
                                value = tuple(value)
                            metadata_dicts[key].add(value)

                    except Exception as e:
                        logging.error(f"Error processing DICOM file {file_path}: {str(e)}")
                        continue

        # Write metadata_dicts to the output CSV incrementally
        for key, values in metadata_dicts.items():
            writer.writerow([key, ",".join(map(str, values))])
            
        # Trigger garbage collection after writing
        gc.collect()

    logging.info(f"Aggregated transposed DICOM metadata saved to: {aggregated_metadata_file}")
    print(f"Aggregated transposed DICOM metadata saved to: {aggregated_metadata_file}")

    if project_report_file:
        try:
            with open(project_report_file, 'a', newline='') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=['Module', 'Summary', 'Timestamp'])
                writer.writerow({
                    'Module': 'Remove PHI',
                    'Summary': f"Executed Remove PHI in {directory}",
                    'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                })
        except PermissionError as e:
            print(f"PermissionError when writing to project report file: {e}")

            
def remove_phi(directory, output_path, values_to_remove, location='local', project_report_file=None):
    """
    Remove specified values from DICOM metadata and save the modified files.

    Args:
    - directory (str): The root directory containing DICOM files.
    - output_path (str): The path to save the processed DICOM files or logs.
    - values_to_remove (list[str]): A list of strings to remove from DICOM metadata.
    - location (str): The location type ('local' or 's3'). Default is 'local'.
    - project_report_file (str): The path to the project report file.
    
    Returns:
    - None
    """
    # Setup logging
    log_file = os.path.join(output_path, "dicom_processing.log")
    logging.basicConfig(filename=log_file, level=logging.INFO,
                        format='%(asctime)s - %(levelname)s - %(message)s')
    
    logging.info(f"Processing DICOM metadata from {location} directory: {directory}")

    # Iterate over all DICOM files in the directory
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".dcm"):
                file_path = os.path.join(root, file)
                try:
                    dicom_data = pydicom.dcmread(file_path)
                    
                    # Remove specified values from metadata
                    tags_to_delete = []

                    for tag in dicom_data.dir():
                        value = dicom_data.get(tag)
                        if isinstance(value, str) and any(v in value for v in values_to_remove):
                            tags_to_delete.append(tag)

                    # Delete specified tags from the dataset
                    for tag in tags_to_delete:
                        delattr(dicom_data, tag)

                    # Save the modified DICOM file
                    new_file_path = os.path.join(root, f"modified_{file}")
                    dicom_data.save_as(new_file_path)

                    logging.info(f"Processed and saved modified DICOM file to: {new_file_path}")
                except Exception as e:
                    logging.error(f"Error processing DICOM file {file_path}: {str(e)}")
                    continue
    
    print(f"Processed DICOM files with specified PHI removed.")
    logging.info(f"Processed DICOM files with specified PHI removed.")

    if project_report_file:
        try:
            with open(project_report_file, 'a', newline='') as csvfile:
                fieldnames = ['Module', 'Summary', 'Timestamp']
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writerow({
                    'Module': 'Remove PHI',
                    'Summary': f"Removed specified values from DICOM metadata in {directory}",
                    'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                })
        except PermissionError as e:
            print(f"PermissionError when writing to project report file: {e}. Please ensure it is not open in another program and that you have the necessary permissions.")            

# Function 6: Generate summary of DICOM tags
def generate_summary(directory, output_path, project_report_file=None):
    logging.info(f"Generating summary of DICOM tags in {directory}")
    
    # Initialize an empty list to store DICOM metadata
    dicom_metadata = []
    
    # Iterate over all DICOM files in the directory
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".dcm"):
                file_path = os.path.join(root, file)
                try:
                    dicom_data = pydicom.dcmread(file_path)
                    
                    # Extract metadata from DICOM file
                    metadata = {
                        "File": file_path,
                        "PatientID": dicom_data.get("PatientID", ""),
                        "PatientName": dicom_data.get("PatientName", ""),
                        "StudyDate": dicom_data.get("StudyDate", "")
                    }
                    dicom_metadata.append(metadata)
                except Exception as e:
                    logging.error(f"Error processing DICOM file {file_path}: {str(e)}")
                    continue
    
    # Convert the list of metadata dictionaries to a DataFrame
    df = pd.DataFrame(dicom_metadata)
    
    # Save the DataFrame to a CSV file at the specified output path
    output_file = "dicom_summary.csv"
    output_file_path = os.path.join(output_path, output_file)
    df.to_csv(output_file_path, index=False)
    
    logging.info(f"Summary of DICOM tags saved to {output_file_path}")
    print(f"Summary of DICOM tags saved to {output_file_path}")

    if project_report_file:
        try:
            with open(project_report_file, 'a', newline='') as csvfile:
                fieldnames = ['Module', 'Summary', 'Timestamp']
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writerow({
                    'Module': 'Generate Summary',
                    'Summary': f"Executed Generate Summary: Generating summary of DICOM tags in {directory}",
                    'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                })
        except Exception as e:
            logging.error(f"Error writing to project report file {project_report_file}: {str(e)}")


    return output_file_path

# Function 7: Rename DICOM files
def rename_dicom_files(input_directory, output_path, project_report_file=None):
    logging.info(f"Renaming DICOM files in {input_directory}")
    
    # Initialize a list to store rename log entries
    rename_log = []
    
    # Iterate over all DICOM files in the directory
    for root, dirs, files in os.walk(input_directory):
        for file in files:
            if file.endswith(".dcm"):
                file_path = os.path.join(root, file)
                try:
                    dicom_data = pydicom.dcmread(file_path)
                    
                    # Extract SOP Instance UID
                    sop_instance_uid = dicom_data.SOPInstanceUID
                    
                    # Extract patient's age and check if it's empty or > 89
                    patient_age = dicom_data.PatientAge
                    if not patient_age or int(patient_age) > 89:
                        patient_age = ''
                    
                    # Generate new filename based on SOP Instance UID and empty age
                    new_filename = f"{sop_instance_uid}_{patient_age}.dcm"
                    
                    # Rename the DICOM file
                    new_file_path = os.path.join(root, new_filename)
                    os.rename(file_path, new_file_path)
                    
                    # Log the renaming action
                    rename_log.append((file_path, new_file_path))
                    logging.info(f"Renamed {file_path} to {new_file_path}")
                except Exception as e:
                    logging.error(f"Error renaming DICOM file {file_path}: {str(e)}")
                    continue
    
    # Save the rename log to a CSV file at the specified output path
    csv_file_path = os.path.join(output_path, 'dicom_rename_log.csv')
    with open(csv_file_path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Previous Name', 'New Name', 'File Path'])
        for old_name, new_name in rename_log:
            writer.writerow([os.path.basename(old_name), os.path.basename(new_name), new_name])
    
    logging.info(f"Rename log saved to {csv_file_path}")
    print(f"Rename log saved to {csv_file_path}")

    if project_report_file:
        try:
            with open(project_report_file, 'a', newline='') as csvfile:
                fieldnames = ['Module', 'Summary', 'Timestamp']
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writerow({
                    'Module': 'Rename DICOM Files',
                    'Summary': f"Executed Rename DICOM Files in {directory}",
                    'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                })
        except PermissionError as e:
            print(f"PermissionError when writing to project report file: {e}. Please ensure it is not open in another program and that you have the necessary permissions.")

    return csv_file_path


# Function 8: Generate DICOM metadata Extract CSV file
def generate_DME(directory, output_path, location='local', project_report_file=None):
    logging.info(f"Generating DICOM metadata CSV file from {location} directory: {directory}")

    metadata_list = []
    total_files = sum(len(files) for _, _, files in os.walk(directory))
    progress_bar = tqdm(total=total_files, desc='Processing DICOM files', unit='files')

    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".dcm"):
                file_path = os.path.join(root, file)
                try:
                    dicom_data = pydicom.dcmread(file_path)
                    metadata = {
                        "file_name": os.path.basename(file_path),
                        "accession_number": str(dicom_data.get("AccessionNumber", "")),
                        "acquisition_type": str(dicom_data.get("AcquisitionType", "")),
                        "body_part_examined": str(dicom_data.get("BodyPartExamined", "")),
                        "case_ids": str(dicom_data.get("PatientID", "")),
                        "contrast_bolus_agent": str(dicom_data.get("ContrastBolusAgent", "")),
                        "patient_position": str(dicom_data.get("PatientPosition", "")),
                        "convolution_kernel": "_".join(dicom_data.get("ConvolutionKernel", []) if dicom_data.get("ConvolutionKernel") else ""),
                        "detector_type": str(dicom_data.get("DetectorType", "")),
                        "exposure_modulation_type": str(dicom_data.get("ExposureModulationType", "")),
                        "image_type": "_".join(dicom_data.get("ImageType", []) if dicom_data.get("ImageType") else ""),
                        "imager_pixel_spacing": str(dicom_data.get("ImagerPixelSpacing", "")),
                        "lossy_image_compression": str(dicom_data.get("LossyImageCompression", "")),
                        "manufacturer": str(dicom_data.get("Manufacturer", "")),
                        "manufacturer_model_name": str(dicom_data.get("ManufacturerModelName", "")),
                        "modality": str(dicom_data.get("Modality", "")),
                        "sop_instance_uid": str(dicom_data.get("SOPInstanceUID", "")),
                        "pixel_spacing": str(dicom_data.get("PixelSpacing", "")),
                        "series_description": str(dicom_data.get("SeriesDescription", "")),
                        "series_uid": str(dicom_data.get("SeriesInstanceUID", "")),
                        "slice_thickness": str(dicom_data.get("SliceThickness", "")),
                        "spacing_between_slices": str(dicom_data.get("SpacingBetweenSlices", "")),
                        "spatial_resolution": str(dicom_data.get("SpatialResolution", "")),
                        "study_description": str(dicom_data.get("StudyDescription", "")),
                        "study_uid": str(dicom_data.get("StudyInstanceUID", "")),
                        "view_position": str(dicom_data.get("ViewPosition", "")),
                        "study_date": str(dicom_data.get("StudyDate", ""))
                    }
                    metadata_list.append(metadata)
                    progress_bar.update(1)
                except Exception as e:
                    logging.error(f"Error processing DICOM file {file_path}: {str(e)}")

    progress_bar.close()
    metadata_df = pd.DataFrame(metadata_list)
    output_file = os.path.join(output_path, "DME.csv")
    metadata_df.to_csv(output_file, index=False)

    logging.info(f"Generated DICOM metadata CSV file: {output_file}")
    print(f"Generated DICOM metadata CSV file: {output_file}")

    if project_report_file:
        try:
            with open(project_report_file, 'a', newline='') as csvfile:
                fieldnames = ['Module', 'Summary', 'Timestamp']
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writerow({
                    'Module': 'Generate DME',
                    'Summary': f"Executed Generate DME in {directory}",
                    'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                })
        except PermissionError as e:
            print(f"PermissionError when writing to project report file: {e}. Please ensure it is not open in another program and that you have the necessary permissions.")

    return output_file

Module Execution

In [6]:

# Function to execute individual modules
def execute_module(module_number, directory, output_path, project_report_file):
    """
    Execute a specific DICOM processing module.
    
    Args:
    - module_number (int): The number corresponding to the module to execute (1-8).
    - directory (str): The root directory containing DICOM files.
    - output_path (str): The path to save the output files.
    - project_report_file (str): The path to the project report file.
    
    Returns:
    - None
    """
    module_names = {
        1: "Verify DICOM Files",
        2: "Check Duplicate SOP Instance UIDs",
        3: "Check DICOM Consistency",
        4: "Verify DICOM IOD Data Consistency",
        5: "Remove PHI",
        6: "Generate Summary",
        7: "Rename DICOM Files",
        8: "Generate DME"
    }
    
    if module_number in module_names:
        print(f"Executing Module {module_number}: {module_names[module_number]}")
        if module_number == 1:
            verify_dicom_files(directory, output_path, project_report_file)
        elif module_number == 2:
            check_duplicate_sop_uids(directory, output_path, project_report_file)
        elif module_number == 3:
            check_dicom_consistency(directory, output_path, project_report_file)
        elif module_number == 4:
            verify_dicom_iod_data(directory, output_path, project_report_file)
        elif module_number == 5:
            #PHI Removal Tool
            aggregate_dicom_metadata(directory, output_path, project_report_file)
        elif module_number == 6:
            generate_summary(directory, output_path, project_report_file)
        elif module_number == 7:
            rename_dicom_files(directory, output_path, project_report_file)
        elif module_number == 8:
            generate_DME(directory, output_path, project_report_file)
    else:
        print("Invalid choice. Please enter a number between 1 and 8.")

# Function to execute all modules with option to skip
def execute_all_modules(source_path, output_path):
    print("Executing all functions in succession with option to skip each step:")
    for i in range(1, 9):
        choice = input(f"Execute module {i}? (yes/no): ")
        if choice.lower() == 'yes':
            execute_module(i, source_path, output_path)
        else:
            print(f"Skipping module {i}.")

Main

In [7]:
def main(name=None, project_name=None, log_date=None, output_path=None, source_path=None):
    """
    Main function to execute the DICOM processing tool.

    Args:
    - name (str): User's name.
    - project_name (str): Name of the project.
    - log_date (str): Date for the log file (YYYY-MM-DD).
    - output_path (str): Output directory path.
    - source_path (str): Path to the source DICOM files.

    Returns:
    - None
    """
    if not all([name, project_name, log_date, output_path, source_path]):
        name, project_name, log_date, output_path, source_path = prompt_user()

    log_file_name = f"{project_name}_log_{log_date}.csv"
    log_file_path = os.path.join(output_path, log_file_name)

    create_log_file(log_file_path)

    project_report_name = f"{project_name}_report_{log_date}.csv"
    project_report_file = os.path.join(output_path, project_report_name)

    print(f"Hello, {name}! Welcome to {project_name} DICOM Processing Tool.")
    print("Choose an option:")
    print("1. Execute individual modules")
    print("2. Execute all functions in succession with option to skip")
    option = int(input("Enter your choice (1-2): "))

    if option == 1:
        module_number = int(input("Enter the module number you want to execute (1-8): "))
        execute_module(module_number, source_path, output_path, project_report_file)
    elif option == 2:
        execute_all_modules(source_path, output_path, project_report_file)
    else:
        print("Invalid choice. Please enter 1 or 2.")

Call Main

In [8]:
# Execute main function
if __name__ == "__main__":

   #To run with hardcoded params
   name = "Name here"
   project_name = "Project name here"
   log_date = "YYYY-MM-DD"
   output_path = "Path with forward slashes"
   source_path = "Path with forward slashes"
   main(name, project_name, log_date, output_path, source_path)

   #main()
  

Hello, ToS! Welcome to UCSF56 DICOM Processing Tool.
Choose an option:
1. Execute individual modules
2. Execute all functions in succession with option to skip
Executing Module 5: Remove PHI


Processing folders: 1048it [29:07,  1.67s/it]


Aggregated transposed DICOM metadata saved to: C:/Users/tosullivan/Documents/Dev/Test_output\transposed_dicom_metadata.csv
