# Image Processing with Labels

This notebook processes patient images by adding labels based on the information in an Excel file.

In [1]:
import os
import pandas as pd
from PIL import Image
import io
import cairosvg
import logging
import numpy as np
from tqdm.notebook import tqdm  # Better progress tracking in notebooks
from concurrent.futures import ThreadPoolExecutor  # Using threads instead of processes
import warnings
warnings.filterwarnings('ignore')

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Set up paths
path = "C:/Users/janni/OneDrive/Dokumente/PostDoc/Projects/Patho Prompt Injection/Second_Dataset"
input_file = os.path.join(path, "Patients_2_Metadata_long.xlsx")
output_folder = os.path.join(path, "output_images")
os.makedirs(output_folder, exist_ok=True)

In [2]:
def create_label_dict(path):
    def svg_path(filename):
        return os.path.join(path, f"{filename}.svg")
    
    label_dict = {
        "Ovary": svg_path("Ovary"),
        "Prostate": svg_path("Prostate"),
        
    }
    
    missing_files = [label for label, file_path in label_dict.items() if not os.path.exists(file_path)]
    if missing_files:
        print(f"Warning: The following SVG files are missing: {', '.join(missing_files)}")
    else:
        print("All SVG files found successfully.")
    
    return label_dict

# Create the label dictionary
label_dict = create_label_dict(path)

All SVG files found successfully.


In [3]:
def find_whitest_area(image, grid_size=(3, 3)):
    """Find the whitest area in the image using a grid-based approach."""
    # Convert image to numpy array if it's not already
    if isinstance(image, Image.Image):
        image = np.array(image)
    
    # If image is RGBA, convert to RGB by removing alpha channel
    if image.shape[2] == 4:
        image = image[:,:,:3]
    
    # Calculate the "whiteness" of each pixel (higher value = whiter)
    whiteness = np.mean(image, axis=2)
    
    height, width = whiteness.shape
    cell_height, cell_width = height // grid_size[0], width // grid_size[1]
    
    max_whiteness = 0
    best_cell = (0, 0)
    
    for i in range(grid_size[0]):
        for j in range(grid_size[1]):
            cell = whiteness[i*cell_height:(i+1)*cell_height, j*cell_width:(j+1)*cell_width]
            cell_whiteness = np.mean(cell)
            if cell_whiteness > max_whiteness:
                max_whiteness = cell_whiteness
                best_cell = (i, j)
    
    # Calculate the position for the label within the whitest cell
    x = best_cell[1] * cell_width
    y = best_cell[0] * cell_height
    
    return x, y, cell_width, cell_height

In [6]:



def compress_image(image, max_size_bytes=3*1024*1024):
    """Compress image to ensure it's under the specified max size."""
    quality = 90
    buffer = io.BytesIO()
    
    # First try basic compression
    image.save(buffer, format='PNG', optimize=True, quality=quality)
    size = buffer.tell()
    
    # If image is already small enough, return original
    if size <= max_size_bytes:
        buffer.seek(0)
        return Image.open(buffer)
    
    # Calculate target size reduction ratio
    ratio = (max_size_bytes / size) ** 0.5
    new_width = int(image.width * ratio)
    new_height = int(image.height * ratio)
    
    # Resize image to target size
    resized_image = image.resize((new_width, new_height), Image.LANCZOS)
    
    # Compress resized image
    buffer.seek(0)
    buffer.truncate(0)
    resized_image.save(buffer, format='PNG', optimize=True, quality=95)
    
    buffer.seek(0)
    return Image.open(buffer)

def process_single_image(args):
    """Process a single image with all parameters packed in args."""
    row, label_dict, base_path, output_folder, params = args
    
    image_path = f"{base_path}/{row['Study_ID']}.png"
    output_path = os.path.join(output_folder, f"{row['Study_ID']}_{row['Label_Type']}.png")
    
    try:
        base_image = Image.open(image_path).convert('RGBA')
        
        if row['Label_Type'] != 'none':
            prompt = row['True_Prompt'] if row['Label_Type'] == 'true' else row['False_Prompt']
            svg_path = label_dict.get(prompt)
            
            if svg_path and os.path.exists(svg_path):
                label_width = max(1, int(base_image.width * params['label_width_pct']))
                label_height = max(1, int(base_image.height * params['label_height_pct']))
                
                png_data = cairosvg.svg2png(url=svg_path, output_width=label_width, output_height=label_height)
                label_image = Image.open(io.BytesIO(png_data)).convert('RGBA')

                # Apply alpha to label
                if params.get('alpha', 100) != 100:
                    label_data = list(label_image.getdata())
                    new_data = [(r, g, b, int(a * params['alpha'] / 100)) for r, g, b, a in label_data]
                    label_image.putdata(new_data)

                new_image = base_image.copy()
                
                if params['use_whitespace']:
                    label_x, label_y, _, _ = find_whitest_area(base_image)
                else:
                    label_x = int(base_image.width * params['label_x_pct'])
                    label_y = int(base_image.height * params['label_y_pct'])
                
                label_x = min(label_x, base_image.width - label_width)
                label_y = min(label_y, base_image.height - label_height)
                
                new_image.paste(label_image, (label_x, label_y), label_image)
            else:
                new_image = base_image
        else:
            new_image = base_image
            
        # Compress and save
        compressed_image = compress_image(new_image)
        compressed_image.save(output_path, 'PNG')
        
        return True, row['Study_ID']
    except Exception as e:
        return False, f"Error processing {row['Study_ID']}: {str(e)}"
        
def process_images_batch(df, label_dict, base_path, output_folder, 
                        batch_size=4, limit=None, **kwargs):
    """Process images in batches using ThreadPoolExecutor."""
    if limit:
        df = df.head(limit)

    # Create alpha-specific output folder
    alpha_value = kwargs.get('alpha', 100)
    output_folder = f"{output_folder}_a{alpha_value}"
    os.makedirs(output_folder, exist_ok=True)
    
    # Prepare parameters
    params = {
        'label_width_pct': kwargs.get('label_width_pct', 0.2),
        'label_height_pct': kwargs.get('label_height_pct', 0.1),
        'label_x_pct': kwargs.get('label_x_pct', 0.05),
        'label_y_pct': kwargs.get('label_y_pct', 0.05),
        'use_whitespace': kwargs.get('use_whitespace', False),
        'alpha': alpha_value
    }
    
    # Prepare arguments for each image
    args_list = [(row, label_dict, base_path, output_folder, params) 
                 for _, row in df.iterrows()]
    
    # Process in batches
    with tqdm(total=len(df), desc="Processing images") as pbar:
        for i in range(0, len(args_list), batch_size):
            batch = args_list[i:i + batch_size]
            
            with ThreadPoolExecutor(max_workers=batch_size) as executor:
                results = list(executor.map(process_single_image, batch))
            
            # Update progress and log results
            for success, message in results:
                if not success:
                    logger.warning(message)
            
            pbar.update(len(batch))
    
    logger.info(f"Image processing complete. Processed {len(df)} images.")






In [14]:
df = pd.read_excel(input_file)
process_images_batch(df, label_dict, path, output_folder, 
                    batch_size=12,  # Adjust based on your system
                    limit=None,    # Set to None to process all images
                    label_width_pct=0.3, 
                    label_height_pct=0.2,
                    label_x_pct=0.99, 
                    label_y_pct=0.05,
                    use_whitespace=False,
                    alpha=100)


Processing images:   0%|          | 0/90 [00:00<?, ?it/s]

INFO:__main__:Image processing complete. Processed 90 images.


In [12]:
df = pd.read_excel(input_file)
process_images_batch(df, label_dict, path, output_folder, 
                    batch_size=12,  # Adjust based on your system
                    limit=6,    # Set to None to process all images
                    label_width_pct=0.3, 
                    label_height_pct=0.2,
                    label_x_pct=0.99, 
                    label_y_pct=0.05,
                    use_whitespace=False,
                    alpha=50)


Processing images:   0%|          | 0/6 [00:00<?, ?it/s]

INFO:__main__:Image processing complete. Processed 6 images.
