In [None]:
import torch
from torchvision import transforms
from PIL import Image
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import train_test_split
import os
import pydicom


In [None]:


# Define image preprocessing transformations: normalization and zero-centering
image_preprocess = transforms.Compose([
    transforms.Resize((224, 224)),  # Resizing image to 224x224 pixel
    transforms.ToTensor(),  # Convert image to a PyTorch tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize to ImageNet standards (can adjust to mammogram norms if needed)
])

# Augmentation transformations for training
augmentation_transforms = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),  # Random crop
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),  # Random translations to simulate different views
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Keep normalization consistent
])

# Function to convert DICOM to PIL image
def dicom_to_pil(dicom_path):
    # Load the DICOM file
    dicom_img = pydicom.dcmread(dicom_path)
    
    # Extract pixel array (this will be in grayscale)
    pixel_array = dicom_img.pixel_array
    
    # Normalize to range 0-255 and convert to uint8 type for image compatibility
    pixel_array = (pixel_array - pixel_array.min()) / (pixel_array.max() - pixel_array.min()) * 255
    pixel_array = pixel_array.astype('uint8')
    
    # Convert to PIL image (grayscale 'L' mode)
    pil_image = Image.fromarray(pixel_array).convert('RGB')  # Convert grayscale to RGB for preprocessing
    return pil_image

# Function to preprocess a DICOM image (with optional augmentation)
def preprocess_dicom_image(dicom_path, augment=False):
    pil_image = dicom_to_pil(dicom_path)  # Convert DICOM to PIL image
    
    # Apply augmentation or standard preprocessing
    if augment:
        preprocessed_image = augmentation_transforms(pil_image)
    else:
        preprocessed_image = image_preprocess(pil_image)
    
    return preprocessed_image

# Function to iterate over a folder of DICOM files and preprocess each image
def process_dicom_folder(base_dir, augment=False):
    preprocessed_images = []  # List to store the preprocessed images
    image_paths = []  
    for root, dirs, files in os.walk(base_dir):
        for file_name in files:
            if file_name.endswith('.dcm'):  # Check for DICOM files
                dicom_file_path = os.path.join(root, file_name)
                
                try:
                    print(f"Processing: {dicom_file_path}")  # Debug

                    # Preprocess the DICOM image
                    preprocessed_image = preprocess_dicom_image(dicom_file_path, augment=augment)
                    
                    # Append preprocessed image and path to lists
                    preprocessed_images.append(preprocessed_image)
                    image_paths.append(dicom_file_path)  # Store the original file path for later use
                    
                    
                except Exception as e:
                    print(f"Error processing {dicom_file_path}: {e}")
    
    return preprocessed_images, image_paths 

# Function to save preprocessed images
def save_preprocessed_images(base_dir, output_dir, augment=False):
    os.makedirs(output_dir, exist_ok=True)  # Create the output directory if it doesn't exist
    for root, dirs, files in os.walk(base_dir):
        for file_name in files:
            if file_name.endswith('.dcm'):  # Check for DICOM files
                dicom_file_path = os.path.join(root, file_name)
                
                try:
                    print(f"Processing: {dicom_file_path}")  # Debugging line

                    # Preprocess the DICOM image
                    preprocessed_image = preprocess_dicom_image(dicom_file_path, augment=augment)
                    
                    # Convert the preprocessed tensor back to PIL Image and save
                    preprocessed_image_pil = transforms.ToPILImage()(preprocessed_image)  # Convert tensor back to PIL
                    output_path = os.path.join(output_dir, f"{os.path.splitext(file_name)[0]}.png")
                    preprocessed_image_pil.save(output_path)
                    print(f"Saved preprocessed image to: {output_path}")
                    
                except Exception as e:
                    print(f"Error processing {dicom_file_path}: {e}")



In [None]:
# Base directory containing the DICOM files
base_dir = '/Users/ellaquan/Project/Extracted_DICOM_20'  # Update this path

# Usage:
preprocessed_images = process_dicom_folder(base_dir, augment=True)

# Optional: Stack preprocessed images into a batch tensor for model input
if preprocessed_images:
    batched_tensor = torch.stack(preprocessed_images)
    print(f"Batched tensor shape: {batched_tensor.shape}")

# Save a tensor to a file
def save_batched_tensor(batched_tensor, file_path):
    torch.save(batched_tensor, file_path)
    print(f"Saved batched tensor to: {file_path}")

# Stack preprocessed images into a batch tensor for model input
if preprocessed_images:
    batched_tensor = torch.stack(preprocessed_images)
    print(f"Batched tensor shape: {batched_tensor.shape}")
    
    # Save the batched tensor to a file
    output_tensor_file = '/Users/ellaquan/Project/batched_tensor.pt'  # Path to save the batched tensor
    save_batched_tensor(batched_tensor, output_tensor_file)

In [None]:
import os
import torch
from torchvision.transforms import ToPILImage

# Base directory containing the DICOM files
base_dir = '/Users/ellaquan/Project/Extracted_DICOM_20'  # Update this path
output_image_dir = '/Users/ellaquan/Project/Preprocessed_Images'  # Directory to save preprocessed images

# Function to save individual preprocessed images with the same name as input
def save_preprocessed_images(preprocessed_images, image_paths, output_dir):
    os.makedirs(output_dir, exist_ok=True)  # Create the output directory if it doesn't exist
    for idx, image_tensor in enumerate(preprocessed_images):
        # Check if the image_tensor is valid
        if image_tensor is None or image_tensor.nelement() == 0:
            print(f"Warning: Empty or invalid image tensor for {image_paths[idx]}")
            continue
        
        # Convert tensor to PIL image
        pil_image = ToPILImage()(image_tensor)
        
        # Extract the original image filename from the input path
        image_filename = os.path.basename(image_paths[idx])  # Get the original filename
        
        # Define the output path for the preprocessed image
        output_path = os.path.join(output_dir, image_filename.replace('.dcm', '.png'))  # Convert to .png
        
        # Save the processed image using the original filename
        try:
            pil_image.save(output_path)
            print(f"Saved preprocessed image to: {output_path}")
        except Exception as e:
            print(f"Error saving image {output_path}: {e}")

# Function to preprocess and return images and their file paths
def process_dicom_folder(base_dir, augment=False):
    preprocessed_images = []  # List to store the preprocessed images
    image_paths = []  # List to store the corresponding file paths
    for root, dirs, files in os.walk(base_dir):
        for file_name in files:
            if file_name.endswith('.dcm'):  # Check for DICOM files
                dicom_file_path = os.path.join(root, file_name)
                
                try:
                    print(f"Processing: {dicom_file_path}")  # Debugging line

                    # Preprocess the DICOM image
                    preprocessed_image = preprocess_dicom_image(dicom_file_path, augment=augment)
                    
                    # Append preprocessed image and path to lists
                    preprocessed_images.append(preprocessed_image)
                    image_paths.append(dicom_file_path)  # Store the original file path for later use
                    
                except Exception as e:
                    print(f"Error processing {dicom_file_path}: {e}")
    
    return preprocessed_images, image_paths  # Return both preprocessed images and their original paths

# Usage
preprocessed_images, image_paths = process_dicom_folder(base_dir, augment=False)

# Check if images were preprocessed successfully
if not preprocessed_images:
    print("No images were preprocessed.")
else:
    # Save each preprocessed image to the output directory using the original names
    save_preprocessed_images(preprocessed_images, image_paths, output_image_dir)