Converting a List of DICOMs to PNG. This notebook will read DICOM files and convert them to 16-bit PNG. Flipping logic is included such that both left and right breast images will be pointed in the same direction to improve model training.

In [None]:
import pydicom
import os
import numpy as np
import pandas as pd
import png
import pylibjpeg
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt

In [None]:
# Get DICOM image metadata
class DCM_Tags():
    def __init__(self, img_dcm):
        try:
            self.laterality = img_dcm.ImageLaterality
        except AttributeError:
            self.laterality = np.nan

        try:
            self.view = img_dcm.ViewPosition
        except AttributeError:
            self.view = np.nan

        try:
            self.orientation = img_dcm.PatientOrientation
        except AttributeError:
            self.orientation = np.nan

# Check whether DICOM should be flipped
def check_dcm(imgdcm):
    # Get DICOM metadata
    tags = DCM_Tags(imgdcm)

    # If image orientation tag is defined
    if ~pd.isnull(tags.orientation):
        # CC view
        if tags.view == 'CC':
            if tags.orientation[0] == 'P':
                flipHorz = True
            else:
                flipHorz = False

            if (tags.laterality == 'L') & (tags.orientation[1] == 'L'):
                flipVert = True
            elif (tags.laterality == 'R') & (tags.orientation[1] == 'R'):
                flipVert = True
            else:
                flipVert = False

        # MLO or ML views
        elif (tags.view == 'MLO') | (tags.view == 'ML'):
            if tags.orientation[0] == 'P':
                flipHorz = True
            else:
                flipHorz = False

            if (tags.laterality == 'L') & ((tags.orientation[1] == 'H') | (tags.orientation[1] == 'HL')):
                flipVert = True
            elif (tags.laterality == 'R') & ((tags.orientation[1] == 'H') | (tags.orientation[1] == 'HR')):
                flipVert = True
            else:
                flipVert = False

        # Unrecognized view
        else:
            flipHorz = False
            flipVert = False

    # If image orientation tag is undefined
    else:
        # Flip RCC, RML, and RMLO images
        if (tags.laterality == 'R') & ((tags.view == 'CC') | (tags.view == 'ML') | (tags.view == 'MLO')):
            flipHorz = True
            flipVert = False
        else:
            flipHorz = False
            flipVert = False

    return flipHorz, flipVert

# Save DICOM pixel array as PNG
def save_dcm_image_as_png(image, png_filename, bitdepth=12):
    with open(png_filename, 'wb') as f:
        writer = png.Writer(height=image.shape[0],
                            width=image.shape[1],
                            bitdepth=bitdepth,
                            greyscale=True)
        writer.write(f, image.tolist())

In [None]:
from pathlib import Path
image_path = Path("../images/")
file_paths = list(image_path.rglob("*.dicom")) + list(image_path.rglob("*.dcm"))
len(file_paths)

TEST TO SEE IF WE GOT THE CORRECT IMAGES

In [None]:
dcm_list = file_paths
sample_file_name = dcm_list[0]
# Read in sample dicom file
ds = pydicom.dcmread(sample_file_name)
arr = ds.pixel_array

# Flip the dicom file left to right, the flipping logic is included in the dcm to png file conversion code
new_np_array = np.copy(arr)
#define the width(w) and height(h) of the image
h, w = arr.shape
#make the image left-right
for j in range(0,w):
    for i in range(0,h):
        new_np_array[i,j] = arr[i,w-1-j]

# Display ROI on image
figure, ax = plt.subplots(1)
# rect = patches.Rectangle((x1, y1), x2-x1, y2-y1, linewidth=1, edgecolor='r', facecolor='none')
ax.imshow(new_np_array, cmap="gray")
# ax.add_patch(rect)

In [None]:
import multiprocessing
from functools import partial

def process_single_dcm(dcm_path, save_path):
    # Extract the core processing logic from your original function
    dcm = pydicom.dcmread(dcm_path)
    img = dcm.pixel_array

    horz, _ = check_dcm(dcm)
    if horz:
        img = np.fliplr(img)

    split_fn = dcm_path.parent.parts[1] + '/' + dcm_path.name.split(".")[1]
    new_fn = f"{split_fn}.png"

    png_path = os.path.join(save_path, new_fn)
    os.makedirs(os.path.dirname(png_path), exist_ok=True)
    save_dcm_image_as_png(img, png_path)

def process_dcm_list_multiprocessing(dcm_list, save_path):
    # Determine the number of CPU cores
    num_cores = multiprocessing.cpu_count()

    # Create a partial function with fixed save_path
    process_func = partial(process_single_dcm, save_path=save_path)

    # Use Pool to process images in parallel
    with multiprocessing.Pool(processes=num_cores) as pool:
        list(tqdm(pool.imap(process_func, dcm_list), total=len(dcm_list), desc="Processing DICOM..."))

# Provide a list of DICOM paths and a target directory
dcm_list = file_paths
save_path = "../vinidr/images_png/"

# Convert DICOMs with multiprocessing
process_dcm_list_multiprocessing(dcm_list, save_path)

check to see if the proper images have been converted

In [None]:
from pathlib import Path
image_path = Path("../vinidr/images_png")
image_paths = list(image_path.rglob("*.png"))

len(image_paths)

check to delete the converted images from their previous location, set dry_run to True to just check the current status

In [None]:
def cleanup_converted_dicoms(dicom_paths, png_paths, dry_run=True):
    def extract_identifier(path):
        return path.parent.parts[1] + '/' + path.name.split(".")[1]

    dicom_identifiers = {extract_identifier(path) for path in dicom_paths}
    png_identifiers = {extract_identifier(path) for path in png_paths}

    converted_dicom_identifiers = dicom_identifiers.intersection(png_identifiers)

    files_to_delete = [path for path in dicom_paths if extract_identifier(path) in converted_dicom_identifiers]

    if dry_run:
        print(f"Dry run: {len(files_to_delete)} DICOM files would be deleted:")
        for file in files_to_delete:
            print(file)
        return files_to_delete
    else:
        for file in files_to_delete:
            try:
                os.remove(file)
                print(f"Deleted: {file}")
            except Exception as e:
                print(f"Error deleting {file}: {e}")

        print(f"Total files deleted: {len(files_to_delete)}")


    # Actual deletion logic similar to previous example

# First, do a dry run to see what would be deleted
# files_to_delete = cleanup_converted_dicoms(file_paths, image_paths, dry_run=True)
cleanup_converted_dicoms(file_paths, image_paths, dry_run=False)