In [None]:
import os
import pydicom
import random
import time
import csv
import string
import re
import pandas as pd
from pydicom.tag import Tag
from pydicom.filereader import dcmread
from datetime import datetime
from pydicom.filewriter import dcmwrite
import csv
import time
import dotenv
dotenv.load_dotenv("../config/.env")

True

In [None]:
### This notebook contains work done for DICOM image tag de-identification, primarily using the pydicom library.

In [None]:
def find_folder(path, target):
    result = None
    for root, dirs, files in os.walk(path):
        if target in dirs:
            result= os.path.join(root, target)
    if result:
        print(f'Folder found at: {result}')
    else:
        print('Folder not found.')
    
    return result

DIRECTORY = os.getenv("DIRECTORY")
# target_folder = 'E'
# result = find_folder(DIRECTORY, target_folder)



In [None]:
def remove_identifiers(dicom_file):
    tags_to_remove = [
        'PatientID',
        'PatientName', 
        'PatientSex',
        'PatientBirthDate',
        'PatientAge',
        'StudyID',
        'SeriesDate',
        'SeriesTime',
        'PatientWeight'
    ]

    for tag in tags_to_remove:
        if tag in dicom_file:
            del dicom_file[tag]

In [None]:
def replace_identifiers_with_pseudonyms(dicom_file):

    """
    Replaces sensitive identifiers in a DICOM file with pseudonyms.

    Parameters:
    dicom_file: The DICOM file object to be processed.

    The function performs the following steps:
    1. Sets new values for patient name, manufacturer, and institution address to "ERADICATED".
    2. Defines lists of keywords to remove and keywords to save.
    3. Iterates over all tags in the DICOM dataset:
       - Checks if the tag name contains any keyword from the removal list.
       - If a keyword is found and not in the save list, replaces the tag value based on its VR (Value Representation):
         - String types (LO, SH, PN, LT, ST, UT, DA, TM, DT, CS, UI) are replaced with "ERADICATED".
         - Integer types (IS, SL, SS, UL, US) are replaced with 0.
         - Decimal types (DS, FD, FL) are replaced with 0.0.
         - Byte types (OB, OW, UN) are replaced with bytes("ERADICATED", 'utf-8').
       - Prints a message for tags with VRs not handled by the code.

    Returns:
    None
    """

    new_patient_name = "ERADICATED"
    new_manufacturer = "ERADICATED"
    new_institution_address = "ERADICATED"
    
    dicom_file.PatientName = new_patient_name
    dicom_file.Manufacturer = new_manufacturer
    dicom_file.InstitutionAddress = new_institution_address
    keywords = ["Private", 'private', 'ID', 'Branch of Service', 'Date', 'Time', 'date', 'time', 'Name', 'name', 
            'Physician', 'Address', 'address', 'Admitting Diagnoses Description', 'UID', 'Unknown'] ## to remove
    
    keywords_to_save = ['Acquisition Date', 'Acquisition Time']
    # Iterate over all the tags in the DICOM dataset
    for tag in dicom_file.keys():
        # Get the element corresponding to the tag
        elem = dicom_file[tag]

        # Check if the keyword is in the tag name
        for keyword in keywords:
            if keyword in keywords_to_save:
                continue
            if keyword in str(elem.name):
                if elem.VR in ["LO", "SH", "PN", "LT", "ST", "UT", "DA", "TM", "DT", "CS", "UI"]:
                # The VR is a string type, so convert the new value to a string
                    dicom_file[tag].value = 'ERADICATED'
                elif elem.VR in  ["IS", "SL", "SS", "UL", "US"]:
                # The VR is an integer type, so convert the new value to an integer
                    dicom_file[tag].value = 0
                elif elem.VR in ["DS", "FD", "FL"]:
                # The VR is a decimal type, so convert the new value to a float
                    dicom_file[tag].value = 0.0
                elif elem.VR in ["OB", "OW", "UN"]:
                # The VR is other byte, so convert the new value to bytes
                    dicom_file[tag].value = bytes('ERADICATED', 'utf-8')
                else:
                    print(f"The tag {tag} has VR {elem.VR}, which is not handled in this code.")

In [None]:
def convert_to_digits(num_str, num):
    num_str = str(num_str)
    return num_str.zfill(num)

def get_folders_in_path(path):
    folders = []
    while True:
        path, folder = os.path.split(path)

        if folder != "":
            folders.append(folder)
        else:
            if path != "":
                folders.append(path)
            break

    folders.reverse()
    return folders

In [None]:
def get_attribute_value(input_file, attr):
    tags = [attr]
    dicom_file = pydicom.dcmread(input_file)
    attr_value = None
    for key in dicom_file.keys():
        elem = dicom_file[key]
        for tag in tags:
            if tag == str(elem.name):
                attr_value = dicom_file[key].value
    return attr_value

In [None]:
def image_date_check(date_str):
    date_format = "%Y%m%d"
    date = datetime.strptime(date_str, date_format)
    return date >= datetime(2019, 1, 1)

In [None]:
def update_csv(data, filename):
    with open(filename, 'a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(data)

In [None]:
from statistics import mean
def count_files_in_directory(directory):
    mr_count = [0]
    ct_count = [0]
    xr_count = [0]
    us_count = [0]
    other_count = [0]
    
    
    folder_count = 0
    for root, dirs, files in os.walk(directory):
        if root.count(os.sep) == directory.count(os.sep):
            folder_count +=1
#             print(f'Entering new folder at first level: {root}. This is folder number {folder_count} at this level.')
            mr_count.append(0)
            ct_count.append(0)
            xr_count.append(0)
            us_count.append(0)
            other_count.append(0)
            
        if "\MR" in root:
            mr_count[folder_count] += len(files)
        elif "\CT" in root:
            ct_count[folder_count] += len(files)
        elif "\XR" or '\XA' or '\DX' in root:
            xr_count[folder_count] += len(files)
        elif 'US' in root:
            us_count[folder_count] += len(files)
        else:
            if 'SR' not in root or 'PR' not in root:
                other_count[folder_count] += len(files)
        
        
#         print(f'The directory {root} has {len(files)} files.')
    
    xr_count = [i for i in xr_count if i != 0]
    mr_count = [i for i in mr_count if i != 0]
    us_count = [i for i in us_count if i != 0]
    ct_count = [i for i in ct_count if i != 0]
    other_count = [i for i in other_count if i != 0]
    
    if len(xr_count) != 0:
        print('XR avg:', mean(xr_count))
    if len(mr_count) != 0:
        print('MR avg:', mean(mr_count))
    if len(us_count) != 0:
        print('US avg:', mean(us_count))
    if len(ct_count) != 0:
        print('CT avg:', mean(ct_count))
    if len(other_count) != 0:
        print('Other avg:', mean(other_count))

    return xr_count, mr_count, us_count, ct_count, other_count
    
# Replace 'your_directory_path' with the path of the directory you want to walk through
YOUR_DIRECTORY_PATH = os.getenv("YOUR_DIRECTORY_PATH")
xr_count, mr_count, us_count, ct_count, other_count = count_files_in_directory(YOUR_DIRECTORY_PATH)



In [None]:
from statistics import median

print('MR Stats:')
print('Median:', median(mr_count))
print('Mean:', mean(mr_count))
print('Max:', max(mr_count))
print('Min:', min(mr_count))
print()
print('XR Stats:')
print('Median:', median(xr_count))
print('Mean:', mean(xr_count))
print('Max:', max(xr_count))
print('Min:', min(xr_count))
print()
print('CT Stats:')
print('Median:', median(ct_count))
print('Mean:', mean(ct_count))
print('Max:', max(ct_count))
print('Min:', min(ct_count))

In [None]:
def batch_process_directory(input_directory, output_directory, skip_files):
    
    """
    Processes DICOM files in a directory, anonymizes them, and saves the results to a new directory.

    Parameters:
    input_directory (str): The path to the directory containing the input DICOM files.
    output_directory (str): The path to the directory where the processed files will be saved.
    skip_files (bool): Whether to skip files that have already been processed.

    Returns:
    pd.DataFrame: A DataFrame containing information about the processed files.

    The function performs the following steps:
    1. Initializes lists to store various attributes of the DICOM files.
    2. Reads a CSV file to get the list of folders to keep.
    3. If skip_files is True, reads another CSV file to get the list of files to skip and updates the person IDs.
    4. Iterates over the folders to keep and processes each DICOM file:
       - Checks if the file should be skipped.
       - Extracts attributes such as study date, study time, series number, instance number, modality, accession number, and study description.
       - Updates lists with these attributes.
       - Generates a new file name based on the extracted attributes and person ID.
       - Anonymizes the DICOM file and saves it to the output directory.
       - Updates a CSV file with information about the processed file.
    5. Creates a DataFrame with the collected attributes and prints it.
    6. Returns the DataFrame.

    Note:
    - The function assumes the existence of helper functions such as get_attribute_value, image_date_check, convert_to_digits, remove_identifiers, replace_identifiers_with_pseudonyms, and update_csv.
    - The function also assumes the existence of the dcmread and dcmwrite functions for reading and writing DICOM files.
    """

    old_names = []
    all_path_folders = []
    folders_broken_out = [[],[],[],[]]
    new_names = []
    study_descs = []
    study_dates = []
    study_times = []
    series_nums = []
    instance_nums = []
    accession_nums = []
    mods = []
    processed_count = 0

    IMAGES_PROCESSED = os.getenv("IMAGES_PROCESSED")
    SAMPLED_DATA_FOLDER_NAMES = os.getenv("SAMPLED_DATA_FOLDER_NAMES")
    
    sampled_df = pd.read_csv(SAMPLED_DATA_FOLDER_NAMES)
    
    folders_to_keep = sampled_df['FolderName'].tolist()
    
    if skip_files:
        files_to_skip_df = pd.read_csv(IMAGES_PROCESSED)
        files_to_skip_df = files_to_skip_df.dropna()
        files_to_skip_df.to_csv(IMAGES_PROCESSED, index=False)
        
        files_to_skip_df.columns = ['Filename', 'PersonID','New_Filename', 'MRN', 'AccessionNBR',
                                    'Modality', 'Modality_Subtype', 'Date', 'Time', 'Series_Num', 'Instance_Num']
        # Get the first column
        files_to_skip = set((files_to_skip_df['Filename']))
        
        person_ids_completed = files_to_skip_df[['PersonID', 'MRN']].values.tolist()
        person_ids_completed = set(tuple(x) for x in person_ids_completed)
        person_ids = {str(int(t[1])): str(int(t[0])) for t in person_ids_completed}
        person_id = max(person_ids.values())
    
    else:
        person_id = 711
        person_ids = {}
        files_to_skip = []

    folders_to_keep = [str(folder) for folder in folders_to_keep]
    
    for folder in folders_to_keep:
        folder_path = os.path.join(input_directory, folder)
        
        if os.path.isdir(folder_path):
            for root, dirs, files in os.walk(input_directory):

                for filename in files:
                    if filename.endswith(".dcm") and filename not in files_to_skip:
            #             if processed_count == 10:
            #                 break

                        input_file = os.path.join(root, filename)

                        study_date = get_attribute_value(input_file,'Acquisition Date')

                        if type(study_date) != str:
                            break

                        if not image_date_check(study_date):
                            break 

                        study_time = get_attribute_value(input_file,'Acquisition Time')

                        if type(study_time) == str:
                            study_time = str(int(float(study_time)))
                        else:
                            continue


                        series_num = convert_to_digits(get_attribute_value(input_file,'Series Number'),2)
                        instance_num = convert_to_digits(get_attribute_value(input_file,'Instance Number'),3)
                        mod = get_attribute_value(input_file,'Modality')
                        acc_num = get_attribute_value(input_file,'Accession Number')
                        study_desc = get_attribute_value(input_file,'Study Description')


                        study_descs.append(study_desc)
                        study_dates.append(study_date)
                        study_times.append(study_time)
                        series_nums.append(series_num)
                        instance_nums.append(instance_num)
                        accession_nums.append(acc_num)
                        mods.append(mod)

                        pre_file_name = input_file.replace(input_directory,"")
                        path_folders = get_folders_in_path(pre_file_name)
                        mrn = path_folders[0]

                        if mrn not in person_ids.keys():
                            person_id +=1
                            person_ids[mrn] = person_id


                        for i in range(0,len(folders_broken_out)):
                            if len(path_folders) < 4:
                                path_folders.append('')

                            folders_broken_out[i].append(path_folders[i])


                        
                        old_names.append(filename)
                        all_path_folders.append(path_folders)
                        new_file_name = '{}_{}_{}_{}_{}_{}'.format(person_ids[mrn],study_date,study_time,mod,
                                                                   series_num,instance_num) + os.path.splitext(filename)[1]

                        output_folder = output_directory + str(person_ids[mrn]) + '/'

                        if not os.path.exists(output_folder):
                            os.makedirs(output_folder)
                        output_file = os.path.join(output_folder, new_file_name)

                        new_names.append(new_file_name)

                        dicom_data = dcmread(input_file)
                        remove_identifiers(dicom_data)
                        replace_identifiers_with_pseudonyms(dicom_data)
                        dcmwrite(output_file, dicom_data)

                            ## Update CSV ##
                        new_data = [filename, person_id, new_file_name, mrn, acc_num, mod, study_desc, study_date, study_time, 
                                       series_num, instance_num]
                        update_csv(new_data, IMAGES_PROCESSED)

                        processed_count +=1 

    df = pd.DataFrame({
                    'Old Name': old_names,
                    'New Name': new_names,
                    'MRN': folders_broken_out[0],
                    'Accession_NBR': accession_nums,
                    'Modality': mods,
                    'Modality Subtype': study_descs,
                    'Study Date':study_dates,
                    'Study Time':study_times,
                    'Series Number':series_nums,
                    'Instance Number':instance_nums
            })

    
    print(f"Processed files in {input_directory} and saved to {output_directory}")
    print(df)
    return df


In [None]:
INPUT_DIRECTORY = os.getenv("INPUT_DIRECTORY")
OUTPUT_DIRECTORY = os.getenv("OUTPUT_DIRECTORY")
df = batch_process_directory(INPUT_DIRECTORY,OUTPUT_DIRECTORY, skip_files = True)

In [None]:
def update_csv(data, filename):
    with open(filename, 'a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(data)


In [None]:
def get_patientid_list(input_directory):
    
    sampled_df = pd.read_csv('')

    # Get the list of folders from the 'MRN' column
    folders_to_keep = sampled_df['ACCESSIONNBR'].tolist()
    
    patient_ids_to_keep = []
    
    for root, dirs, files in os.walk(input_directory):
        for file in files:
            input_file = os.path.join(root, file)
            folder_path = get_folders_in_path(input_file)

            if folder_path[2] in folders_to_keep:
                patient_ids_to_keep.append(folder_path[1])
                dirs[:] = []
    print(len(set(patient_ids_to_keep)))
                
get_patientid_list(INPUT_DIRECTORY)