# mini-MIAS data preprocessing

# Important libraries

In [12]:
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import glob
from scipy import ndimage
from sklearn.linear_model import RANSACRegressor

# Preprocessing class with all needed functions

In [13]:
class MammogramPreprocessor:
    def __init__(self, image_dir, metadata_path, output_dir, img_size=(1024, 1024)):
        """
        Initialize the mammogram preprocessor.
        
        Args:
            image_dir: Directory containing the mini-MIAS images
            metadata_path: Path to the CSV file with metadata
            output_dir: Directory to save processed images
            img_size: Target size for the processed images
        """
        self.image_dir = image_dir
        self.metadata_path = metadata_path
        self.output_dir = output_dir
        self.img_size = img_size
        
        # Create output directories if they don't exist
        os.makedirs(os.path.join(output_dir, "processed_images"), exist_ok=True)
        os.makedirs(os.path.join(output_dir, "roi_images"), exist_ok=True)
        
        # Load metadata
        self.metadata = pd.read_csv(metadata_path)
        
        # Create a dictionary for quick lookup of image metadata
        self.image_metadata = {}
        for _, row in self.metadata.iterrows():
            ref_num = row['REFNUM']
            if ref_num not in self.image_metadata:
                self.image_metadata[ref_num] = []
            
            # Only add entries with abnormalities (those with coordinates)
            if not pd.isna(row['X']) and not pd.isna(row['Y']) and not pd.isna(row['RADIUS']):
                self.image_metadata[ref_num].append({
                    'x': int(row['X']),
                    'y': int(row['Y']),
                    'radius': int(row['RADIUS']),
                    'class': row['CLASS'],
                    'severity': row['SEVERITY'] if not pd.isna(row['SEVERITY']) else None,
                    'bg': row['BG']  # Breast density type
                })

    
    # loading an image and grayscaling it if necessary
    def load_image(self, image_path):
        """Load an image and convert to grayscale if needed."""
        img = cv2.imread(image_path)
        if img is None:
            raise ValueError(f"Could not load image: {image_path}")
        
        return img


    # Noise removal
    # Background removal 
    def remove_background(self, img):
        """Extract breast region mask without altering breast pixel values."""
        
        # Ensure grayscale
        if len(img.shape) == 3:
            gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        else:
            gray = img.copy()
        
        # Keep a clean version of grayscale to use later
        original_gray = gray.copy()
        
        # Use filtered version only for mask creation
        eq = cv2.equalizeHist(gray)
        blurred = cv2.GaussianBlur(eq, (5, 5), 0)
        
        _, binary = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        
        # Invert if necessary
        if np.sum(binary == 255) > np.sum(binary == 0):
            binary = cv2.bitwise_not(binary)
        
        # Morphological cleanup
        kernel = np.ones((15, 15), np.uint8)
        binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
        binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)
        
        # Find contours
        contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        # Create mask from largest contour (breast region)
        mask = np.zeros_like(gray)
        if contours:
            largest_contour = max(contours, key=cv2.contourArea)
            cv2.drawContours(mask, [largest_contour], 0, 255, -1)
            
            # Dilate slightly to cover edges
            mask = cv2.dilate(mask, np.ones((3, 3), np.uint8), iterations=1)
            
            # Apply mask to original grayscale
            result = cv2.bitwise_and(original_gray, original_gray, mask=mask)
            return result, mask
        
        # If no contours found, return the original image and a full mask
        return original_gray, np.ones_like(gray) * 255

    # Pectoral muscle removal
    def remove_pectoral_muscle(self, img, mask=None):
        """
        Remove the pectoral muscle from the mammogram with extremely aggressive removal.
        
        This implementation prioritizes complete removal of the pectoral muscle,
        particularly for images with larger pectoral regions.
        
        Args:
            img: Input mammogram image
            mask: Optional breast region mask
            
        Returns:
            Image with pectoral muscle removed
        """
        if img is None:
            return None
        
        # Create a copy of the image
        result = img.copy()
        height, width = img.shape
        
        # Determine if the breast is on the left or right side
        if mask is None:
            # Create a simple mask if none is provided
            _, simple_mask = cv2.threshold(img, 5, 255, cv2.THRESH_BINARY)
            mask = simple_mask
        
        left_sum = np.sum(mask[:, :mask.shape[1]//2])
        right_sum = np.sum(mask[:, mask.shape[1]//2:])
        is_left_breast = left_sum > right_sum
        
        try:
            # Step 1: Define a very large region of interest for pectoral muscle
            # Use a much larger portion of the image to ensure we capture the entire pectoral muscle
            pectoral_height = int(height * 0.74)  # Increased from 0.6 to 0.7
            pectoral_width = int(width * 0.54)    # Increased from 0.4 to 0.5
            
            # Extract the pectoral muscle region
            if is_left_breast:
                pectoral_region = img[0:pectoral_height, 0:pectoral_width]
                x_offset, y_offset = 0, 0
            else:
                pectoral_region = img[0:pectoral_height, width-pectoral_width:width]
                x_offset, y_offset = width-pectoral_width, 0
            
            # Check if pectoral region is valid
            if pectoral_region.size == 0 or np.max(pectoral_region) == 0:
                return img  # Return original image if pectoral region is invalid
            
            # Step 2: Create a very aggressive triangular mask
            # This is our primary approach - a geometric mask that covers the typical pectoral region
            pectoral_mask = np.zeros_like(pectoral_region)
            
            # Calculate the slope for the triangular mask based on image dimensions
            # This creates an extremely aggressive triangle that covers more of the pectoral region
            if is_left_breast:
                # For left breast, create a triangle from top-left corner
                # Use a much more aggressive slope
                for y in range(pectoral_height):
                    # Quadratic function for more aggressive coverage at the top
                    width_at_y = int(pectoral_width * (1 - (y / pectoral_height)))
                    pectoral_mask[y, 0:width_at_y] = 255
            else:
                # For right breast, create a triangle from top-right corner
                for y in range(pectoral_height):
                    # Quadratic function for more aggressive coverage at the top
                    width_at_y = int(pectoral_width * (1 - (y / pectoral_height)))
                    pectoral_mask[y, pectoral_width-width_at_y:pectoral_width] = 255
            
            # Step 3: Apply morphological operations to ensure complete coverage
            kernel = np.ones((5, 5), np.uint8)  # Much larger kernel for aggressive morphology
            pectoral_mask = cv2.morphologyEx(pectoral_mask, cv2.MORPH_DILATE, kernel)
            
            # Step 4: Create a full-sized mask
            full_mask = np.zeros_like(img)
            full_mask[y_offset:y_offset+pectoral_height, x_offset:x_offset+pectoral_width] = pectoral_mask
            
            # Step 5: Apply a straight line boundary for a clean edge
            # Instead of detecting the edge, we'll create a straight line from top to bottom
            # This ensures a consistent, clean removal
            
            # Define the line parameters based on breast orientation
            if is_left_breast:
                top_x = int(width * 0.36)     # Increase from 0.25 to 0.35 or 0.4
                bottom_x = int(width * 0.08)  # Increase from 0.05 to 0.08
            else:
                top_x = int(width * 0.63)     # Decrease from 0.75 to 0.65 or 0.6
                bottom_x = int(width * 0.91)  # Decrease from 0.95 to 0.92
            
            # Create a refined mask with the straight line
            refined_mask = np.zeros_like(img)
            
            # Draw the line and fill the appropriate side
            for y in range(height):
                # Linear interpolation between top_x and bottom_x
                x_line = int(top_x + (bottom_x - top_x) * (y / height))
                
                if is_left_breast:
                    x_range = range(0, min(x_line, width))
                else:
                    x_range = range(max(0, x_line), width)
                
                for x in x_range:
                    refined_mask[y, x] = 255
            
            # Step 6: Combine the triangular mask with the straight line mask
            # Use the most aggressive of the two approaches
            combined_mask = cv2.bitwise_or(full_mask, refined_mask)
            
            # Invert the mask to keep everything except the pectoral muscle
            final_mask = cv2.bitwise_not(combined_mask)
            
            # Step 7: Apply the mask to the original image
            result = cv2.bitwise_and(img, img, mask=final_mask)
            
            return result
            
        except Exception as e:
            print(f"Error in pectoral muscle removal: {e}")
            # Return the original image if any error occurs
            return img


    

    # Filters applied to the mini-MIAS images
    def enhance_contrast(self, img):
        """
        Enhance image contrast using adaptive CLAHE,
        tuned for low-contrast mammograms (like mini-MIAS).
        """
        if img.dtype != np.uint8:
            img = (img * 255).astype(np.uint8)  # Normalize before CLAHE if needed
    
        # Adaptive CLAHE parameters
        clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(16, 16))  # Slightly higher clipLimit and tile size
    
        enhanced = clahe.apply(img)
        return enhanced

    def normalize_image(self, img):
        """
        Normalize image to [0, 1] range, ensuring robustness.
        """
        img = img.astype(np.float32)
        
        # Normalize only if max > 0
        max_val = np.max(img)
        if max_val > 0:
            img /= max_val
        return img




    # Extract region of interest
    def extract_roi(self, img, x, y, radius, padding=50):
        """Extract a region of interest around a suspicious area."""
        # Add padding to the radius
        padded_radius = radius + padding
        
        # Calculate the bounding box
        x1 = max(0, x - padded_radius)
        y1 = max(0, y - padded_radius)
        x2 = min(img.shape[1], x + padded_radius)
        y2 = min(img.shape[0], y + padded_radius)
        
        # Extract the ROI
        roi = img[y1:y2, x1:x2]
        
        # Resize the ROI to the target size
        roi_resized = cv2.resize(roi, self.img_size)
        
        return roi_resized
    


    
    # All steps in processing an image (the order is important for efficiency and reliability)
    def process_image(self, image_path, save=True):
        """Process a single mammogram image with pectoral removal first."""
        # Extract the image reference number from the filename
        filename = os.path.basename(image_path)
        ref_num = os.path.splitext(filename)[0]
        
        # Load the image
        original_img = self.load_image(image_path)


        
        # Remove background
        img_no_bg, mask = self.remove_background(original_img)
        
        # Remove pectoral muscle
        img_no_pectoral = self.remove_pectoral_muscle(img_no_bg, mask)

        
        
        # Enhance contrast
        img_enhanced = self.enhance_contrast(img_no_pectoral)
        
        # Normalize the image
        img_normalized = self.normalize_image(img_enhanced)

        
        # Resize the image to the target size -> (224 x 224) -> smaller and better results for CNN approach
        img_resized = cv2.resize(img_enhanced, self.img_size)
        
        # Save the processed image
        if save:
            output_path = os.path.join(self.output_dir, "processed_images", f"{ref_num}_processed.png")
            cv2.imwrite(output_path, img_resized)
            
        # Extract ROIs if abnormalities exist
        rois = []
        if ref_num in self.image_metadata and self.image_metadata[ref_num]:
            for abnormality in self.image_metadata[ref_num]:
                x, y, radius = abnormality['x'], abnormality['y'], abnormality['radius']
                roi = self.extract_roi(img_enhanced, x, y, radius)
                
                if save:
                    roi_path = os.path.join(self.output_dir, "roi_images", 
                                           f"{ref_num}_roi_x{x}_y{y}_r{radius}.png")
                    cv2.imwrite(roi_path, roi)
                
                rois.append({
                    'roi': roi,
                    'metadata': abnormality
                })
        
        return {
            'ref_num': ref_num,
            'processed_image': img_resized,
            'rois': rois,
            'has_abnormality': len(rois) > 0
        }

    
    def process_all_images(self):
        """Process all images in the dataset."""
        image_paths = glob.glob(os.path.join(self.image_dir, "*.png"))
        if not image_paths:
            image_paths = glob.glob(os.path.join(self.image_dir, "*.pgm"))
        
        results = []
        for image_path in tqdm(image_paths, desc="Processing images"):
            try:
                result = self.process_image(image_path)
                results.append(result)
            except Exception as e:
                print(f"Error processing {image_path}: {e}")
        
        return results
    
    def create_dataset(self):
        """Create a dataset for training a CNN with ROI masks included."""
        processed_data = self.process_all_images()
    
        X, y, masks = [], [], []
    
        for case in processed_data:
            # Base processed image
            X.append(case['processed_image'])
            y.append(0 if not case['has_abnormality'] else 1)
    
            # For normal cases → no mask
            if not case['has_abnormality']:
                masks.append(np.zeros_like(case['processed_image'], dtype=np.uint8))
            else:
                # Build a mask with abnormalities
                mask = np.zeros_like(case['processed_image'], dtype=np.uint8)
                for roi_data in case['rois']:
                    x, y_c, r = roi_data['metadata']['x'], roi_data['metadata']['y'], roi_data['metadata']['radius']
                    # Scale coordinates if resized
                    scale_x = self.img_size[0] / case['processed_image'].shape[1]
                    scale_y = self.img_size[1] / case['processed_image'].shape[0]
                    center = (int(x * scale_x), int(y_c * scale_y))
                    radius = int(r * scale_x)  # assuming square pixels
                    cv2.circle(mask, center, radius, 255, -1)
                masks.append(mask)

            # Also add ROI crops as extra training samples
            for roi_data in case['rois']:
                roi = roi_data['roi']
                roi_mask = np.ones_like(roi, dtype=np.uint8) * 255  # ROI fully relevant
                X.append(roi)
                if roi_data['metadata']['severity'] == 'M':
                    y.append(2)  # Malignant
                else:
                    y.append(1)  # Benign
                masks.append(roi_mask)
    
        X, y, masks = np.array(X), np.array(y), np.array(masks)
    
        # Train-val split (keep masks aligned)
        X_train, X_val, y_train, y_val, masks_train, masks_val = train_test_split(
            X, y, masks, test_size=0.2, random_state=42, stratify=y
        )
    
        return X_train, X_val, y_train, y_val, masks_train, masks_val

    


In [14]:
def data_augmentation(X_train, y_train, masks=None):
    """
    Apply data augmentation to images (and masks if provided).
    Masks are binary (0/255).
    """
    X_augmented, y_augmented = [], []
    mask_augmented = [] if masks is not None else None

    for i in range(len(X_train)):
        img, label = X_train[i], y_train[i]
        mask = masks[i] if masks is not None else None

        # Define per-class augmentation factor
        if label == 0:
            aug_factor = 4
        elif label == 2:
            aug_factor = 3
        else:
            aug_factor = 1

        # Keep original
        X_augmented.append(img)
        y_augmented.append(label)
        if mask is not None:
            mask_augmented.append(mask)

        # Apply augmentations
        for _ in range(aug_factor):
            aug_type = np.random.choice(['flip', 'rotate', 'zoom', 'noise', 'brightness'])
            img_2d = img.copy()
            mask_2d = mask.copy() if mask is not None else None

            if aug_type == 'flip':
                aug_img = cv2.flip(img_2d, 1)
                aug_mask = cv2.flip(mask_2d, 1) if mask is not None else None

            elif aug_type == 'rotate':
                angle = np.random.uniform(-15, 15)
                aug_img = ndimage.rotate(img_2d, angle, reshape=False)
                aug_mask = ndimage.rotate(mask_2d, angle, reshape=False) if mask is not None else None

            elif aug_type == 'zoom':
                zoom_factor = np.random.uniform(0.9, 1.1)
                h, w = img_2d.shape[:2]
                new_h, new_w = int(h * zoom_factor), int(w * zoom_factor)
                aug_img = cv2.resize(img_2d, (new_w, new_h))
                aug_mask = cv2.resize(mask_2d, (new_w, new_h), interpolation=cv2.INTER_NEAREST) if mask is not None else None

                # Crop/pad to original size
                if zoom_factor < 1.0:
                    pad_h = (h - new_h) // 2
                    pad_w = (w - new_w) // 2
                    aug_img = cv2.copyMakeBorder(aug_img, pad_h, h - new_h - pad_h,
                                                 pad_w, w - new_w - pad_w,
                                                 cv2.BORDER_CONSTANT, value=0)
                    if mask is not None:
                        aug_mask = cv2.copyMakeBorder(aug_mask, pad_h, h - new_h - pad_h,
                                                     pad_w, w - new_w - pad_w,
                                                     cv2.BORDER_CONSTANT, value=0)
                else:
                    start_h = (new_h - h) // 2
                    start_w = (new_w - w) // 2
                    aug_img = aug_img[start_h:start_h + h, start_w:start_w + w]
                    if mask is not None:
                        aug_mask = aug_mask[start_h:start_h + h, start_w:start_w + w]

            elif aug_type == 'noise':
                noise = np.random.normal(0, np.random.uniform(3, 8), img_2d.shape)
                aug_img = np.clip(img_2d.astype(np.float32) + noise, 0, 255).astype(np.uint8)
                aug_mask = mask_2d

            elif aug_type == 'brightness':
                factor = np.random.uniform(0.9, 1.1)
                aug_img = np.clip(img_2d.astype(np.float32) * factor, 0, 255).astype(np.uint8)
                aug_mask = mask_2d

            # Append
            X_augmented.append(aug_img)
            y_augmented.append(label)
            if mask is not None:
                # Binarize to avoid gray artifacts
                aug_mask = (aug_mask > 127).astype(np.uint8) * 255
                mask_augmented.append(aug_mask)

    if masks is not None:
        return np.array(X_augmented), np.array(y_augmented), np.array(mask_augmented)
    else:
        return np.array(X_augmented), np.array(y_augmented)


In [17]:
# Define paths
image_dir = "C:/Users/DragosTrandafiri/BreastCancer_CNN/data/raw/mini_mias/MIAS" 
metadata_path = "C:/Users/DragosTrandafiri/BreastCancer_CNN/data/raw/mini_mias/mias_info.csv"  
output_dir = "C:/Users/DragosTrandafiri/BreastCancer_CNN/data/processed/mini_mias"

# Create preprocessor
preprocessor = MammogramPreprocessor(image_dir, metadata_path, output_dir)


# Create dataset
X_train, X_val, y_train, y_val, masks_train, masks_val = preprocessor.create_dataset()

# Function to print class distribution
def print_class_distribution(y, dataset_name):
    """Print the distribution of classes in the dataset"""
    unique, counts = np.unique(y, return_counts=True)
    class_names = {0: 'Normal', 1: 'Benign', 2: 'Malignant'}
    
    print(f"\n{dataset_name} Class Distribution:")
    print("-" * 40)
    total = len(y)
    for class_id, count in zip(unique, counts):
        class_name = class_names.get(class_id, f'Class_{class_id}')
        percentage = (count / total) * 100
        print(f"{class_name} (Class {class_id}): {count} samples ({percentage:.1f}%)")
    print(f"Total samples: {total}")
    print("-" * 40)

# Print original distributions
print("="*60)
print("ORIGINAL DATASET (BEFORE AUGMENTATION)")
print("="*60)

print_class_distribution(y_train, "Training Set")
print_class_distribution(y_val, "Validation Set")

# Combine train and val to show overall distribution
y_total = np.concatenate([y_train, y_val])
print_class_distribution(y_total, "Complete Dataset")

# Apply data augmentation
print("\n" + "="*60)
print("APPLYING DATA AUGMENTATION...")
print("="*60)

X_train_aug, y_train_aug, masks_train_aug = data_augmentation(X_train, y_train, masks=masks_train)

# Print augmented distributions
print("\n" + "="*60)
print("AUGMENTED DATASET (AFTER AUGMENTATION)")
print("="*60)

print_class_distribution(y_train_aug, "Augmented Training Set")
print_class_distribution(y_val, "Validation Set (unchanged)")

# Show augmentation effect
print("\n" + "="*60)
print("AUGMENTATION SUMMARY")
print("="*60)

print(f"Original training samples: {len(y_train)}")
print(f"Augmented training samples: {len(y_train_aug)}")
print(f"Augmentation factor: {len(y_train_aug) / len(y_train):.1f}x")

# Show per-class augmentation effect
print("\nPer-class augmentation effect:")
print("-" * 40)
for class_id in [0, 1, 2]:
    original_count = np.sum(y_train == class_id)
    augmented_count = np.sum(y_train_aug == class_id)
    if original_count > 0:
        factor = augmented_count / original_count
        class_names = {0: 'Normal', 1: 'Benign', 2: 'Malignant'}
        class_name = class_names.get(class_id, f'Class_{class_id}')
        print(f"{class_name}: {original_count} → {augmented_count} ({factor:.1f}x)")

print(f"\nDataset shapes:")
print(f"Original training set: {X_train.shape}, {y_train.shape}, masks: {masks_train.shape}")
print(f"Augmented training set: {X_train_aug.shape}, {y_train_aug.shape}, masks: {masks_train_aug.shape}")
print(f"Validation set: {X_val.shape}, {y_val.shape}, masks: {masks_val.shape}")

# Additional statistics
print("\n" + "="*60)
print("ADDITIONAL STATISTICS")
print("="*60)

# Calculate class balance metrics
def calculate_balance_metrics(y):
    unique, counts = np.unique(y, return_counts=True)
    max_count = np.max(counts)
    min_count = np.min(counts)
    balance_ratio = min_count / max_count
    return balance_ratio

original_balance = calculate_balance_metrics(y_train)
augmented_balance = calculate_balance_metrics(y_train_aug)

print(f"Original training set balance ratio: {original_balance:.3f}")
print(f"Augmented training set balance ratio: {augmented_balance:.3f}")
print(f"Balance improvement: {(augmented_balance/original_balance):.2f}x better")

# Show memory usage
def get_memory_usage_mb(array):
    return array.nbytes / (1024 * 1024)

print(f"\nMemory usage:")
print(f"Original training images: {get_memory_usage_mb(X_train):.1f} MB")
print(f"Augmented training images: {get_memory_usage_mb(X_train_aug):.1f} MB")
print(f"Training masks: {get_memory_usage_mb(masks_train_aug):.1f} MB")
print(f"Total training data: {get_memory_usage_mb(X_train_aug) + get_memory_usage_mb(masks_train_aug):.1f} MB")

Processing images: 100%|█████████████████████████████████████████████████████████████| 235/235 [00:12<00:00, 18.86it/s]


ORIGINAL DATASET (BEFORE AUGMENTATION)

Training Set Class Distribution:
----------------------------------------
Normal (Class 0): 124 samples (48.4%)
Benign (Class 1): 104 samples (40.6%)
Malignant (Class 2): 28 samples (10.9%)
Total samples: 256
----------------------------------------

Validation Set Class Distribution:
----------------------------------------
Normal (Class 0): 31 samples (48.4%)
Benign (Class 1): 26 samples (40.6%)
Malignant (Class 2): 7 samples (10.9%)
Total samples: 64
----------------------------------------

Complete Dataset Class Distribution:
----------------------------------------
Normal (Class 0): 155 samples (48.4%)
Benign (Class 1): 130 samples (40.6%)
Malignant (Class 2): 35 samples (10.9%)
Total samples: 320
----------------------------------------

APPLYING DATA AUGMENTATION...

AUGMENTED DATASET (AFTER AUGMENTATION)

Augmented Training Set Class Distribution:
----------------------------------------
Normal (Class 0): 620 samples (66.0%)
Benign (Clas

In [1]:
!pip install opencv-python
!pip install numpy pandas matplotlib scikit-learn tqdm scipy

Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 25.1.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 25.1.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip
