In [8]:
import pydicom
from PIL import Image
import numpy as np
import os

In [22]:
from PIL import Image

def load_and_mask_image(dicom_path, mask_path):
    # Load DICOM image
    dicom_image = pydicom.dcmread(dicom_path).pixel_array
    
    # Load mask image and convert to grayscale
    mask_image = Image.open(mask_path).convert('L')
    
    # Resize the mask to match the DICOM image dimensions
    # mask_image = mask_image.resize(dicom_image.shape[::-1], Image.NEAREST)  # Image.NEAREST is fastest resizing method
    
    # Convert mask image to numpy array and threshold to create binary mask
    
    mask = np.array(mask_image)
    assert mask.shape == dicom_image.shape, "Mask and DICOM image dimensions do not match"
    assert mask.max() == 1,"Max Values are not 1" 
    assert mask.min() == 0, "Min Values are not zero"
    # Apply the mask
    prostate_roi = np.where(mask==0,0,dicom_image)
    assert np.any(prostate_roi != 0), "There are nonzero values in image"
    return prostate_roi


In [18]:
def crop_to_bbox(image, mask):
    where = np.where(mask)
    if where[0].size == 0 or where[1].size == 0:
        return image  # Return original if the mask is empty
    y_min, y_max = np.min(where[0]), np.max(where[0])
    x_min, x_max = np.min(where[1]), np.max(where[1])
    return image[y_min:y_max+1, x_min:x_max+1]

In [27]:
import os
import pydicom

def save_image(image_array, original_dicom_path, output_path):
    # Correctly set the part of the path to replace
    original_root = '/Users/arjunmoorthy/24Spr_AMoorthy_healthai/data/new_train_data'

    # Print the initial paths for debugging
    print(f"Original DICOM Path: {original_dicom_path}")
    print(f"Output Path: {output_path}")

    # Replace the original root path with the output path
    new_dicom_path = original_dicom_path.replace(original_root, output_path)
    print(f"New DICOM Path (after replacement): {new_dicom_path}")
    
    # Load the original DICOM file to keep its header/meta information
    ds = pydicom.dcmread(original_dicom_path)

    # Update the pixel data with the processed image
    if image_array is None:
        print("No image array to process; returning without saving.")
        return
    ds.PixelData = image_array.tobytes()

    # Ensure the directory exists where the new DICOM will be saved
    directory = os.path.dirname(new_dicom_path)
    if not os.path.exists(directory):
        os.makedirs(directory, exist_ok=True)
        print(f"Created directory: {directory}")
    else:
        print(f"Directory already exists: {directory}")

    # Save the new DICOM file
    ds.save_as(new_dicom_path)
    print(f"Saved processed DICOM to {new_dicom_path}")

# Example call to the function
# This is where you would pass the specific paths when using this function in your code
# save_image(processed_image, '/path/to/original/dicom.dcm', '/path/to/output/directory')


In [5]:
""" def process_directory(root_directory, mask_root_directory, base_output_dir):
    for status in os.listdir(root_directory):
        if status == '.DS_Store':  # Skip .DS_Store files at status level
            continue
        status_path = os.path.join(root_directory, status)
        mask_status_path = os.path.join(mask_root_directory, status)

        # Check if the mask status path exists
        if not os.path.isdir(mask_status_path):
            print(f"Missing directory: {mask_status_path}, skipping...")
            continue

        for patient in os.listdir(status_path):
            if patient == '.DS_Store':  # Skip .DS_Store files at patient level
                continue
            patient_path = os.path.join(status_path, patient)
            mask_patient_path = os.path.join(mask_status_path, patient)

            # Check if the mask patient path exists
            if not os.path.isdir(mask_patient_path):
                print(f"Missing directory: {mask_patient_path}, skipping...")
                continue

            for tract in os.listdir(patient_path):
                if tract == '.DS_Store':  # Skip .DS_Store files at tract level
                    continue
                tract_path = os.path.join(patient_path, tract)
                mask_tract_path = os.path.join(mask_patient_path, tract)

                # Check if the mask tract path exists
                if not os.path.isdir(mask_tract_path):
                    print(f"Missing directory: {mask_tract_path}, skipping...")
                    continue

                dicom_files = [f for f in os.listdir(tract_path) if f.endswith('.dcm')]
                mask_files = [f for f in os.listdir(mask_tract_path) if f.endswith('.png')]

                for dicom_file in dicom_files:
                    mask_file = dicom_file.replace('.dcm', '.png')
                    if mask_file in mask_files:
                        dicom_path = os.path.join(tract_path, dicom_file)
                        mask_path = os.path.join(mask_tract_path, mask_file)
                        try: 
                            prostate_roi = load_and_mask_image(dicom_path, mask_path)
                        except: 
                            continue
                        # cropped_image = crop_to_bbox(prostate_roi, prostate_roi)
                        save_image(prostate_roi, dicom_path, base_output_dir)
                    else:
                        print(f"Skipping {dicom_file}, no corresponding mask file found at {mask_path}.")
 """

In [20]:
def process_directory(root_directory, mask_root_directory, base_output_dir):    
    for status in os.listdir(root_directory):
        if status == '.DS_Store':
            continue
        status_path = os.path.join(root_directory, status)
        mask_status_path = os.path.join(mask_root_directory, status)
        
        if not os.path.isdir(mask_status_path):
            print(f"Missing directory: {mask_status_path}, skipping...")
            continue
        
        for patient in os.listdir(status_path):
            if patient == '.DS_Store':
                continue
            patient_path = os.path.join(status_path, patient)
            mask_patient_path = os.path.join(mask_status_path, patient)
            
            if not os.path.isdir(mask_patient_path):
                print(f"Missing directory: {mask_patient_path}, skipping...")
                continue
            
            for tract in os.listdir(patient_path):
                if tract == '.DS_Store':
                    continue
                tract_path = os.path.join(patient_path, tract)
                mask_tract_path = os.path.join(mask_patient_path, tract)
                
                if not os.path.isdir(mask_tract_path):
                    print(f"Missing directory: {mask_tract_path}, skipping...")
                    continue
                
                dicom_files = [f for f in os.listdir(tract_path) if f.endswith('.dcm')]
                mask_files = [f for f in os.listdir(mask_tract_path) if f.endswith('.png')]
                
                for dicom_file in dicom_files:
                    mask_file = dicom_file.replace('.dcm', '.png')
                    mask_path = os.path.join(mask_tract_path, mask_file)
                    if mask_file in mask_files:
                        dicom_path = os.path.join(tract_path, dicom_file)
                        try: 
                            prostate_roi = load_and_mask_image(dicom_path, mask_path)
                            save_image(prostate_roi, dicom_path, base_output_dir)
                        except Exception as e:
                            print(f"Error processing {dicom_file}: {str(e)}")
                    else:
                        print(f"Skipping {dicom_file}, no corresponding mask file found at {mask_path}.")

In [6]:
from pathlib import Path
import os

path = Path('../../data/new_train_data_segs')
print(path)
if path.exists():
    print("The path exists.")
else:
    print("The path does not exist.")


../../data/new_train_data_segs
The path exists.


In [28]:
root_directory = '/Users/arjunmoorthy/24Spr_AMoorthy_healthai/data/new_train_data'  
mask_root_directory = '/Users/arjunmoorthy/24Spr_AMoorthy_healthai/data/new_train_data_segs'  # Path to where your masks are stored
output_directory = '/Users/arjunmoorthy/24Spr_AMoorthy_healthai/data/applied_segs_final'  # New root directory for segmented images
process_directory(root_directory, mask_root_directory, output_directory)


Error processing 1-35.dcm: There are nonzero values in image
Error processing 1-36.dcm: There are nonzero values in image
Error processing 1-37.dcm: There are nonzero values in image
Error processing 1-38.dcm: There are nonzero values in image
Missing directory: /Users/arjunmoorthy/24Spr_AMoorthy_healthai/data/new_train_data_segs/non_cancer/Prostate-MRI-US-Biopsy-1135/biopsy_9130_non_cancer, skipping...
Original DICOM Path: /Users/arjunmoorthy/24Spr_AMoorthy_healthai/data/new_train_data/non_cancer/Prostate-MRI-US-Biopsy-0645/biopsy_5600_non_cancer/1-42.dcm
Output Path: /Users/arjunmoorthy/24Spr_AMoorthy_healthai/data/applied_segs_final
New DICOM Path (after replacement): /Users/arjunmoorthy/24Spr_AMoorthy_healthai/data/applied_segs_final/non_cancer/Prostate-MRI-US-Biopsy-0645/biopsy_5600_non_cancer/1-42.dcm
Created directory: /Users/arjunmoorthy/24Spr_AMoorthy_healthai/data/applied_segs_final/non_cancer/Prostate-MRI-US-Biopsy-0645/biopsy_5600_non_cancer
Saved processed DICOM to /Users/

In [None]:
import pydicom
from PIL import Image
import numpy as np
import os

# Load DICOM image
dicom_path = 'path_to_dicom_image.dcm'
dicom_image = pydicom.dcmread(dicom_path).pixel_array

# Load segmentation mask
mask_path = 'path_to_mask_image.png'
mask_image = np.array(Image.open(mask_path).convert('L'))  # Convert to grayscale
mask_image = (mask_image > 128).astype(int)  # Ensure binary mask (1 for prostate, 0 elsewhere)

# Apply the mask to the DICOM image
prostate_roi = dicom_image * mask_image

from PIL import Image

def load_and_mask_image(dicom_path, mask_path):
    # Load DICOM image
    dicom_image = pydicom.dcmread(dicom_path).pixel_array
    
    # Load mask image and convert to grayscale
    mask_image = Image.open(mask_path).convert('L')
    
    # Resize the mask to match the DICOM image dimensions
    mask_image = mask_image.resize(dicom_image.shape[::-1], Image.NEAREST)  # Image.NEAREST is fastest resizing method
    
    # Convert mask image to numpy array and threshold to create binary mask
    mask_image = np.array(mask_image)
    mask_image = (mask_image > 128).astype(int)
    
    # Apply the mask
    prostate_roi = dicom_image * mask_image
    
    return prostate_roi

def crop_to_bbox(image, mask):
    where = np.where(mask)
    if where[0].size == 0 or where[1].size == 0:
        return image  # Return original if the mask is empty
    y_min, y_max = np.min(where[0]), np.max(where[0])
    x_min, x_max = np.min(where[1]), np.max(where[1])
    return image[y_min:y_max+1, x_min:x_max+1]

def save_image(image_array, original_dicom_path, output_path):
    # The part of the path to replace
    original_root = '/Users/arjunmoorthy/Desktop/Research_Capstone/ImageData/new_train_data'

    # Replace the original root path with the output path
    new_dicom_path = original_dicom_path.replace(original_root, output_path)

    # Load the original DICOM file to keep its header/meta information
    ds = pydicom.dcmread(original_dicom_path)

    # Update the pixel data with the processed image
    ds.PixelData = image_array.tobytes()

    # Adjust the data element values related to the image dimensions
    ds.Rows, ds.Columns = image_array.shape

    # Ensure the directory exists where the new DICOM will be saved
    os.makedirs(os.path.dirname(new_dicom_path), exist_ok=True)

    # Save the new DICOM file
    ds.save_as(new_dicom_path)
    print(f"Saved processed DICOM to {new_dicom_path}")

def process_directory(root_directory, mask_root_directory, base_output_dir):
    for status in os.listdir(root_directory):
        if status == '.DS_Store':  # Skip .DS_Store files at status level
            continue
        status_path = os.path.join(root_directory, status)
        mask_status_path = os.path.join(mask_root_directory, status)

        # Check if the mask status path exists
        if not os.path.isdir(mask_status_path):
            print(f"Missing directory: {mask_status_path}, skipping...")
            continue

        for patient in os.listdir(status_path):
            if patient == '.DS_Store':  # Skip .DS_Store files at patient level
                continue
            patient_path = os.path.join(status_path, patient)
            mask_patient_path = os.path.join(mask_status_path, patient)

            # Check if the mask patient path exists
            if not os.path.isdir(mask_patient_path):
                print(f"Missing directory: {mask_patient_path}, skipping...")
                continue

            for tract in os.listdir(patient_path):
                if tract == '.DS_Store':  # Skip .DS_Store files at tract level
                    continue
                tract_path = os.path.join(patient_path, tract)
                mask_tract_path = os.path.join(mask_patient_path, tract)

                # Check if the mask tract path exists
                if not os.path.isdir(mask_tract_path):
                    print(f"Missing directory: {mask_tract_path}, skipping...")
                    continue

                dicom_files = [f for f in os.listdir(tract_path) if f.endswith('.dcm')]
                mask_files = [f for f in os.listdir(mask_tract_path) if f.endswith('.png')]

                for dicom_file in dicom_files:
                    mask_file = dicom_file.replace('.dcm', '.png')
                    if mask_file in mask_files:
                        dicom_path = os.path.join(tract_path, dicom_file)
                        mask_path = os.path.join(mask_tract_path, mask_file)
                        prostate_roi = load_and_mask_image(dicom_path, mask_path)
                        cropped_image = crop_to_bbox(prostate_roi, prostate_roi)
                        save_image(cropped_image, dicom_path, base_output_dir)
                    else:
                        print(f"Skipping {dicom_file}, no corresponding mask file found at {mask_path}.")

root_directory = '/Users/arjunmoorthy/Desktop/Research_Capstone/ImageData/new_train_data'  
mask_root_directory = '/Users/arjunmoorthy/Desktop/Research_Capstone/ImageData/new_train_data_segs'  # Path to where your masks are stored
output_directory = '/Users/arjunmoorthy/Desktop/Research_Capstone/ImageData/applied_segs_final'  # New root directory for segmented images
process_directory(root_directory, mask_root_directory, output_directory)

In [None]:
import pydicom
import matplotlib.pyplot as plt

def display_dicom_image(dicom_path):
    # Load the DICOM file
    dicom_file = pydicom.dcmread(dicom_path)
    
    # Access the pixel data
    image_data = dicom_file.pixel_array
    
    # Display the image using matplotlib
    plt.imshow(image_data, cmap='gray')  # 'gray' colormap makes it look like typical medical images
    plt.title('DICOM Image Display')
    plt.axis('off')  # Turn off axis numbers and ticks
    plt.show()

# Specify the path to your DICOM file
dicom_path = '/Users/arjunmoorthy/24Spr_AMoorthy_healthai/data/applied_segs_final/non_cancer/Prostate-MRI-US-Biopsy-0001/biopsy_1_non_cancer/1-17.dcm'

# Call the function to display the image
display_dicom_image(dicom_path)
