# Watermark Robustness Testing Pipeline

## Welcome!

This notebook allows you to test how well watermarks survive various image transformations. You don't need to be a technical expert to use this tool.

### What Does This Notebook Do?

1. **Loads watermarked images** - Brings in images that already have watermarks embedded
2. **Applies transformations** - Modifies the images (resize, crop, blur, etc.)
3. **Tests watermark detection** - Checks if the watermark can still be found after transformation
4. **Generates reports** - Creates CSV files with the results for analysis

### Supported Watermarking Methods

- **Stable Signature** - Watermarks embedded in latent diffusion models
- **Watermark Anything** - General-purpose watermarking system
- **TrustMark** - Robust watermarking with quality-factor tuning

### How to Use This Notebook

**Please read the instructions in each section before running the code.** Do not simply click "Run All" - some sections require configuration first.

💡 **Tip**: Look for cells marked with ⚙️ (configuration) or ▶️ (action required).

---
## ⚙️ Section 1: Configuration Settings

### Important: Please Update These Settings

Before running any other cells, you must configure the settings below to match your environment.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════
# CONFIGURATION: Choose Your Watermarking Method
# ═══════════════════════════════════════════════════════════════════════════
# 
# Please select ONE watermarking method to test by uncommenting the appropriate line:
#
# Options:
#   - "Stable_Signature"      : For images watermarked with Stable Signature
#   - "Watermark_Anything"    : For images watermarked with Watermark Anything
#   - "TrustMark"             : For images watermarked with TrustMark
#
# Note: This notebook processes one method at a time. To test multiple methods,
#       run the notebook separately for each one.

WATERMARK_METHOD = "Watermark_Anything"  # ← Change this to your method
# WATERMARK_METHOD = "TrustMark"
# WATERMARK_METHOD = "Stable_Signature"

print(f"✓ Selected watermarking method: {WATERMARK_METHOD}")

In [None]:
# ═══════════════════════════════════════════════════════════════════════════
# CONFIGURATION: Directory Paths
# ═══════════════════════════════════════════════════════════════════════════
#
# These paths tell the notebook where to find files and save results.
# 
# If you're using Azure AI:
#   - Change 'YourName' to your actual username
#   - The default paths should work once you've cloned the repository
#
# If you're running locally:
#   - Modify these paths to match your local directory structure

import os

# ─── Azure AI Settings ───
USER_NAME = 'YourName'  # ← CHANGE THIS to your Azure AI username
AZURE_ROOT = '/home/azureuser/cloudfiles/code/Users/'
IS_AZURE = os.path.exists(AZURE_ROOT)  # Auto-detect if running on Azure

# ─── Path Configuration ───
if IS_AZURE:
    ROOT_DIR = os.path.join(AZURE_ROOT, USER_NAME)
    print(f"✓ Running on Azure AI with root directory: {ROOT_DIR}")
else:
    # Local development - use current working directory
    ROOT_DIR = os.getcwd()
    print(f"✓ Running locally with root directory: {ROOT_DIR}")

# Verify the directory exists
if not os.path.exists(ROOT_DIR):
    print(f"⚠️  WARNING: Root directory does not exist: {ROOT_DIR}")
    print(f"   Please update USER_NAME or ROOT_DIR paths above.")
else:
    print(f"✓ Root directory confirmed")

In [None]:
# ═══════════════════════════════════════════════════════════════════════════
# CONFIGURATION: Method-Specific Paths
# ═══════════════════════════════════════════════════════════════════════════
#
# Each watermarking method requires different files and directories.
# These paths are automatically set based on your chosen method.
# You may need to adjust these if your repository structure differs.

from pathlib import Path

# ─── Repository Structure ───
REPO_DIR = Path(ROOT_DIR) / "ost-embedding-research"
WATERMARK_MODELS_DIR = REPO_DIR / "watermark_models"

# ─── Method-Specific Configuration ───
if WATERMARK_METHOD == "Watermark_Anything":
    METHOD_DIR = WATERMARK_MODELS_DIR / "watermark-anything"
    RAW_IMAGES_PATH = METHOD_DIR / "output" / "imgs_w"
    CHECKPOINT_PATH = METHOD_DIR / "models" / "watermark_anything_model.pth"
    OUTPUT_DIR = REPO_DIR / "outputs" / "watermark_anything"
    
elif WATERMARK_METHOD == "TrustMark":
    METHOD_DIR = WATERMARK_MODELS_DIR / "trustmark"
    RAW_IMAGES_PATH = METHOD_DIR / "watermarked_images"
    CHECKPOINT_PATH = None  # TrustMark uses built-in models
    OUTPUT_DIR = REPO_DIR / "outputs" / "trustmark"
    
elif WATERMARK_METHOD == "Stable_Signature":
    METHOD_DIR = WATERMARK_MODELS_DIR / "stable-signature"
    RAW_IMAGES_PATH = METHOD_DIR / "watermarked_images"
    CHECKPOINT_PATH = METHOD_DIR / "models" / "dec_48b_whit.torchscript.pt"
    OUTPUT_DIR = REPO_DIR / "outputs" / "stable_signature"

# ─── Create Output Directories ───
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
TRANSFORMED_IMAGES_DIR = OUTPUT_DIR / "transformed_images"
RESULTS_DIR = OUTPUT_DIR / "results"
TRANSFORMED_IMAGES_DIR.mkdir(exist_ok=True)
RESULTS_DIR.mkdir(exist_ok=True)

print(f"✓ Configuration for {WATERMARK_METHOD}:")
print(f"  - Method directory: {METHOD_DIR}")
print(f"  - Raw images: {RAW_IMAGES_PATH}")
print(f"  - Output directory: {OUTPUT_DIR}")
print(f"  - Checkpoint: {CHECKPOINT_PATH if CHECKPOINT_PATH else 'Built-in model'}")

In [None]:
# ═══════════════════════════════════════════════════════════════════════════
# CONFIGURATION: Processing Options
# ═══════════════════════════════════════════════════════════════════════════
#
# Control how many images to process and which transformations to apply.
# Start with small numbers for testing, then increase for full analysis.

# ─── Number of Images to Process ───
# Set to None to process all images, or specify a number for testing
MAX_IMAGES_TO_PROCESS = 10  # ← Start with 10 images for testing
# MAX_IMAGES_TO_PROCESS = None  # ← Uncomment to process all images

# ─── Transformations to Apply ───
# Enable or disable specific transformation types
APPLY_RESIZE = True          # Resize images to different dimensions
APPLY_CROP = True            # Crop images (centre crop with various percentages)
APPLY_ROTATION = True        # Rotate images by various angles
APPLY_BLUR = True            # Apply Gaussian blur with different strengths
APPLY_COMPRESSION = True     # JPEG compression at different quality levels
APPLY_NOISE = True           # Add Gaussian noise
APPLY_COLOUR_JITTER = True   # Adjust brightness, contrast, saturation

# ─── Transformation Parameters ───
RESIZE_DIMENSIONS = [(256, 256), (512, 512), (1024, 1024)]
CROP_PERCENTAGES = [0.99, 0.90, 0.80, 0.70, 0.60, 0.50]
ROTATION_ANGLES = [5, 15, 30, 45, 90, 180]
BLUR_KERNEL_SIZES = [3, 11, 21, 51]
COMPRESSION_QUALITIES = [95, 85, 75, 50, 25]

print("✓ Processing configuration set:")
print(f"  - Maximum images: {MAX_IMAGES_TO_PROCESS if MAX_IMAGES_TO_PROCESS else 'All'}")
print(f"  - Transformations enabled: ", end="")
enabled = [name for name, enabled in [
    ("Resize", APPLY_RESIZE),
    ("Crop", APPLY_CROP),
    ("Rotation", APPLY_ROTATION),
    ("Blur", APPLY_BLUR),
    ("Compression", APPLY_COMPRESSION),
    ("Noise", APPLY_NOISE),
    ("Colour Jitter", APPLY_COLOUR_JITTER)
] if enabled]
print(", ".join(enabled))

---
## 📦 Section 2: Install Dependencies

This section installs the required Python packages. These packages enable watermark detection and image processing.

**You only need to run this once per environment.** If you've already installed the packages, you can skip this section.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════
# Install Required Packages
# ═══════════════════════════════════════════════════════════════════════════
#
# This cell checks for required packages and installs them if missing.
# The installation may take a few minutes on first run.

import importlib.util
import subprocess
import sys

def install_if_missing(package_name, import_name=None):
    """Install a package if it's not already available."""
    if import_name is None:
        import_name = package_name
    
    if importlib.util.find_spec(import_name) is None:
        print(f"📥 Installing {package_name}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package_name])
        print(f"✓ {package_name} installed successfully")
    else:
        print(f"✓ {package_name} already installed")

# Core dependencies
print("Checking and installing dependencies...\n")
install_if_missing("torch")
install_if_missing("torchvision")
install_if_missing("Pillow", "PIL")
install_if_missing("numpy")
install_if_missing("opencv-python", "cv2")
install_if_missing("scikit-image", "skimage")
install_if_missing("pandas")
install_if_missing("matplotlib")
install_if_missing("tqdm")

# Method-specific dependencies
if WATERMARK_METHOD == "Watermark_Anything":
    install_if_missing("timm")
elif WATERMARK_METHOD == "TrustMark":
    install_if_missing("trustmark")
elif WATERMARK_METHOD == "Stable_Signature":
    install_if_missing("omegaconf")
    install_if_missing("einops")

print("\n✓ All dependencies are installed and ready!")

---
## 🔧 Section 3: Import Libraries and Utilities

This section imports all the necessary Python libraries and custom functions for the pipeline.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════
# Import Required Libraries
# ═══════════════════════════════════════════════════════════════════════════

import os
import sys
import csv
from pathlib import Path
from typing import Dict, List, Optional, Tuple

# Image processing
import numpy as np
from PIL import Image
import cv2

# PyTorch
import torch
import torchvision.transforms as transforms
from torchvision.transforms import functional as TF

# Utilities
from tqdm import tqdm
import pandas as pd
import matplotlib.pyplot as plt

# Add repository directories to path
sys.path.insert(0, str(REPO_DIR))
sys.path.insert(0, str(METHOD_DIR))

print("✓ Libraries imported successfully")
print(f"✓ Using PyTorch version: {torch.__version__}")
print(f"✓ CUDA available: {torch.cuda.is_available()}")

---
## 🔍 Section 4: Load Watermark Detection Model

This section loads the appropriate watermark detection model based on your chosen method.

Each watermarking method has its own detection system:
- **Stable Signature**: Uses a pre-trained decoder model
- **Watermark Anything**: Loads a checkpoint and uses message prediction
- **TrustMark**: Uses the built-in TrustMark library

In [None]:
# ═══════════════════════════════════════════════════════════════════════════
# Load Watermark Detection Model
# ═══════════════════════════════════════════════════════════════════════════

if WATERMARK_METHOD == "Watermark_Anything":
    print("Loading Watermark Anything model...")
    
    # Import method-specific modules
    from watermark_anything.data.metrics import msg_predict_inference
    from notebooks.inference import load_model_from_checkpoint
    
    # Load the model
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = load_model_from_checkpoint(str(CHECKPOINT_PATH), device=device)
    model.eval()
    
    def detect_watermark(image_path: str) -> np.ndarray:
        """
        Detect and return the watermark bits from an image.
        
        Args:
            image_path: Path to the watermarked image
            
        Returns:
            Array of detected watermark bits (0s and 1s)
        """
        image = Image.open(image_path).convert('RGB')
        image_tensor = TF.to_tensor(image).unsqueeze(0).to(device)
        
        with torch.no_grad():
            detected_bits = msg_predict_inference(model, image_tensor)
        
        return detected_bits.cpu().numpy()
    
    print("✓ Watermark Anything model loaded successfully")

elif WATERMARK_METHOD == "TrustMark":
    print("Loading TrustMark model...")
    
    from trustmark import TrustMark
    
    # Initialise TrustMark with quality model
    trustmark_model = TrustMark(verbose=False, model_type='Q')
    
    def detect_watermark(image_path: str) -> np.ndarray:
        """
        Detect and return the watermark bits from an image using TrustMark.
        
        Args:
            image_path: Path to the watermarked image
            
        Returns:
            Array of detected watermark bits (0s and 1s)
        """
        image = Image.open(image_path).convert('RGB')
        detected_bits, _ = trustmark_model.decode(image)
        return np.array(detected_bits)
    
    print("✓ TrustMark model loaded successfully")

elif WATERMARK_METHOD == "Stable_Signature":
    print("Loading Stable Signature model...")
    
    # Import method-specific modules
    from hidden.utils import get_watermarking_mask
    
    # Load the decoder model
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    decoder = torch.jit.load(str(CHECKPOINT_PATH)).to(device)
    decoder.eval()
    
    def detect_watermark(image_path: str) -> np.ndarray:
        """
        Detect and return the watermark bits from an image using Stable Signature.
        
        Args:
            image_path: Path to the watermarked image
            
        Returns:
            Array of detected watermark bits (0s and 1s)
        """
        image = Image.open(image_path).convert('RGB')
        image_tensor = TF.to_tensor(image).unsqueeze(0).to(device)
        
        with torch.no_grad():
            detected_bits = decoder(image_tensor)
        
        return (detected_bits.cpu().numpy() > 0).astype(int).flatten()
    
    print("✓ Stable Signature model loaded successfully")

print(f"\n✓ Detection function ready: detect_watermark()")

---
## 🖼️ Section 5: Image Transformation Functions

This section defines functions to apply various transformations to images. Each transformation tests how well the watermark survives different types of image manipulation.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════
# Image Transformation Functions
# ═══════════════════════════════════════════════════════════════════════════

class ImageTransformations:
    """Collection of image transformation functions for watermark robustness testing."""
    
    @staticmethod
    def resize(image: Image.Image, width: int, height: int) -> Image.Image:
        """Resize image to specified dimensions."""
        return image.resize((width, height), Image.Resampling.LANCZOS)
    
    @staticmethod
    def centre_crop(image: Image.Image, percentage: float) -> Image.Image:
        """
        Crop image from the centre, keeping specified percentage of the original.
        
        Args:
            image: Input image
            percentage: Percentage of image to keep (0.0 to 1.0)
        """
        width, height = image.size
        new_width = int(width * percentage)
        new_height = int(height * percentage)
        
        left = (width - new_width) // 2
        top = (height - new_height) // 2
        right = left + new_width
        bottom = top + new_height
        
        return image.crop((left, top, right, bottom))
    
    @staticmethod
    def rotate(image: Image.Image, angle: float) -> Image.Image:
        """Rotate image by specified angle (degrees)."""
        return image.rotate(angle, expand=True, fillcolor=(255, 255, 255))
    
    @staticmethod
    def gaussian_blur(image: Image.Image, kernel_size: int) -> Image.Image:
        """
        Apply Gaussian blur to image.
        
        Args:
            image: Input image
            kernel_size: Size of the Gaussian kernel (must be odd)
        """
        # Convert PIL to numpy for OpenCV
        img_array = np.array(image)
        blurred = cv2.GaussianBlur(img_array, (kernel_size, kernel_size), 0)
        return Image.fromarray(blurred)
    
    @staticmethod
    def jpeg_compression(image: Image.Image, quality: int) -> Image.Image:
        """
        Apply JPEG compression at specified quality level.
        
        Args:
            image: Input image
            quality: JPEG quality (1-100, higher is better quality)
        """
        from io import BytesIO
        buffer = BytesIO()
        image.save(buffer, format='JPEG', quality=quality)
        buffer.seek(0)
        return Image.open(buffer)
    
    @staticmethod
    def add_noise(image: Image.Image, noise_level: float = 0.02) -> Image.Image:
        """
        Add Gaussian noise to image.
        
        Args:
            image: Input image
            noise_level: Standard deviation of noise (0.0 to 1.0)
        """
        img_array = np.array(image).astype(np.float32) / 255.0
        noise = np.random.normal(0, noise_level, img_array.shape)
        noisy_img = np.clip(img_array + noise, 0, 1)
        return Image.fromarray((noisy_img * 255).astype(np.uint8))
    
    @staticmethod
    def colour_jitter(image: Image.Image, brightness: float = 0.2, 
                     contrast: float = 0.2, saturation: float = 0.2) -> Image.Image:
        """
        Randomly change brightness, contrast, and saturation.
        
        Args:
            image: Input image
            brightness: Brightness jitter factor (0.0 to 1.0)
            contrast: Contrast jitter factor (0.0 to 1.0)
            saturation: Saturation jitter factor (0.0 to 1.0)
        """
        jitter = transforms.ColorJitter(
            brightness=brightness,
            contrast=contrast,
            saturation=saturation
        )
        return jitter(image)

# Create instance for easy access
transform = ImageTransformations()

print("✓ Image transformation functions loaded")
print("  Available transformations:")
print("  - resize()")
print("  - centre_crop()")
print("  - rotate()")
print("  - gaussian_blur()")
print("  - jpeg_compression()")
print("  - add_noise()")
print("  - colour_jitter()")

---
## 🎯 Section 6: Apply Transformations to Images

### ▶️ Action Required

This section applies all enabled transformations to your watermarked images. 

**Important Notes:**
- This process may take considerable time depending on the number of images and transformations
- We recommend testing with a small number of images first (set `MAX_IMAGES_TO_PROCESS = 10` above)
- Results are saved to separate subdirectories for each transformation type
- You can monitor progress with the progress bars displayed during processing

In [None]:
# ═══════════════════════════════════════════════════════════════════════════
# Apply All Transformations
# ═══════════════════════════════════════════════════════════════════════════

def apply_transformations_to_dataset():
    """
    Apply all enabled transformations to the dataset of watermarked images.
    Creates subdirectories for each transformation type and saves results.
    """
    
    # Get list of images to process
    image_files = sorted([f for f in os.listdir(RAW_IMAGES_PATH) 
                         if f.lower().endswith(('.png', '.jpg', '.jpeg'))])
    
    if MAX_IMAGES_TO_PROCESS:
        image_files = image_files[:MAX_IMAGES_TO_PROCESS]
    
    print(f"Processing {len(image_files)} images...\n")
    
    # Counter for total transformations
    transformation_count = 0
    
    # Process each image
    for image_name in tqdm(image_files, desc="Processing images"):
        image_path = os.path.join(RAW_IMAGES_PATH, image_name)
        base_name = Path(image_name).stem
        
        try:
            image = Image.open(image_path).convert('RGB')
            
            # ─── Resize Transformations ───
            if APPLY_RESIZE:
                resize_dir = TRANSFORMED_IMAGES_DIR / "resize"
                resize_dir.mkdir(exist_ok=True)
                for width, height in RESIZE_DIMENSIONS:
                    resized = transform.resize(image, width, height)
                    output_path = resize_dir / f"{base_name}_resize_{width}x{height}.png"
                    resized.save(output_path)
                    transformation_count += 1
            
            # ─── Centre Crop Transformations ───
            if APPLY_CROP:
                crop_dir = TRANSFORMED_IMAGES_DIR / "crop"
                crop_dir.mkdir(exist_ok=True)
                for percentage in CROP_PERCENTAGES:
                    cropped = transform.centre_crop(image, percentage)
                    output_path = crop_dir / f"{base_name}_crop_{int(percentage*100)}pct.png"
                    cropped.save(output_path)
                    transformation_count += 1
            
            # ─── Rotation Transformations ───
            if APPLY_ROTATION:
                rotation_dir = TRANSFORMED_IMAGES_DIR / "rotation"
                rotation_dir.mkdir(exist_ok=True)
                for angle in ROTATION_ANGLES:
                    rotated = transform.rotate(image, angle)
                    output_path = rotation_dir / f"{base_name}_rotate_{angle}deg.png"
                    rotated.save(output_path)
                    transformation_count += 1
            
            # ─── Blur Transformations ───
            if APPLY_BLUR:
                blur_dir = TRANSFORMED_IMAGES_DIR / "blur"
                blur_dir.mkdir(exist_ok=True)
                for kernel_size in BLUR_KERNEL_SIZES:
                    blurred = transform.gaussian_blur(image, kernel_size)
                    output_path = blur_dir / f"{base_name}_blur_k{kernel_size}.png"
                    blurred.save(output_path)
                    transformation_count += 1
            
            # ─── Compression Transformations ───
            if APPLY_COMPRESSION:
                compression_dir = TRANSFORMED_IMAGES_DIR / "compression"
                compression_dir.mkdir(exist_ok=True)
                for quality in COMPRESSION_QUALITIES:
                    compressed = transform.jpeg_compression(image, quality)
                    output_path = compression_dir / f"{base_name}_jpeg_q{quality}.jpg"
                    compressed.save(output_path)
                    transformation_count += 1
            
            # ─── Noise Transformations ───
            if APPLY_NOISE:
                noise_dir = TRANSFORMED_IMAGES_DIR / "noise"
                noise_dir.mkdir(exist_ok=True)
                noisy = transform.add_noise(image)
                output_path = noise_dir / f"{base_name}_noise.png"
                noisy.save(output_path)
                transformation_count += 1
            
            # ─── Colour Jitter Transformations ───
            if APPLY_COLOUR_JITTER:
                jitter_dir = TRANSFORMED_IMAGES_DIR / "colour_jitter"
                jitter_dir.mkdir(exist_ok=True)
                jittered = transform.colour_jitter(image)
                output_path = jitter_dir / f"{base_name}_jitter.png"
                jittered.save(output_path)
                transformation_count += 1
                
        except Exception as e:
            print(f"\n⚠️  Error processing {image_name}: {str(e)}")
            continue
    
    print(f"\n✓ Transformation complete!")
    print(f"  - Processed {len(image_files)} images")
    print(f"  - Created {transformation_count} transformed images")
    print(f"  - Results saved to: {TRANSFORMED_IMAGES_DIR}")

# Run the transformation process
apply_transformations_to_dataset()

---
## 📊 Section 7: Calculate Image Quality Metrics

This section defines functions to calculate various image quality metrics. These metrics help quantify how much the transformations have altered the images.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════
# Image Quality Metrics
# ═══════════════════════════════════════════════════════════════════════════

from skimage.metrics import structural_similarity as ssim
from skimage.metrics import peak_signal_noise_ratio as psnr

class ImageMetrics:
    """Calculate various image quality and similarity metrics."""
    
    @staticmethod
    def calculate_psnr(original: np.ndarray, modified: np.ndarray) -> float:
        """
        Calculate Peak Signal-to-Noise Ratio (PSNR).
        Higher values indicate better quality (less distortion).
        """
        return psnr(original, modified)
    
    @staticmethod
    def calculate_ssim(original: np.ndarray, modified: np.ndarray) -> float:
        """
        Calculate Structural Similarity Index (SSIM).
        Values range from 0 to 1, where 1 indicates identical images.
        """
        # Ensure images have the same dimensions
        if original.shape != modified.shape:
            return 0.0
        
        return ssim(original, modified, channel_axis=2)
    
    @staticmethod
    def calculate_mse(original: np.ndarray, modified: np.ndarray) -> float:
        """
        Calculate Mean Squared Error (MSE).
        Lower values indicate greater similarity.
        """
        return np.mean((original.astype(float) - modified.astype(float)) ** 2)
    
    @staticmethod
    def calculate_all_metrics(original_path: str, modified_path: str) -> Dict[str, float]:
        """
        Calculate all quality metrics between two images.
        
        Args:
            original_path: Path to original image
            modified_path: Path to modified image
            
        Returns:
            Dictionary containing all calculated metrics
        """
        try:
            original = np.array(Image.open(original_path).convert('RGB'))
            modified = np.array(Image.open(modified_path).convert('RGB'))
            
            # Resize modified to match original if needed
            if original.shape != modified.shape:
                modified_img = Image.fromarray(modified).resize(
                    (original.shape[1], original.shape[0]), 
                    Image.Resampling.LANCZOS
                )
                modified = np.array(modified_img)
            
            return {
                'psnr': ImageMetrics.calculate_psnr(original, modified),
                'ssim': ImageMetrics.calculate_ssim(original, modified),
                'mse': ImageMetrics.calculate_mse(original, modified)
            }
        except Exception as e:
            return {
                'psnr': 0.0,
                'ssim': 0.0,
                'mse': float('inf')
            }

metrics_calculator = ImageMetrics()

print("✓ Image quality metrics loaded")
print("  Available metrics:")
print("  - PSNR (Peak Signal-to-Noise Ratio)")
print("  - SSIM (Structural Similarity Index)")
print("  - MSE (Mean Squared Error)")

---
## 🔬 Section 8: Test Watermark Detection

### ▶️ Action Required

This section tests watermark detection on all transformed images and generates a comprehensive results file.

**What This Does:**
1. Scans all transformed images in the output directory
2. Attempts to detect watermarks in each image
3. Calculates image quality metrics compared to the original
4. Saves all results to a CSV file for analysis

**Note**: This may take significant time for large datasets.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════
# Watermark Detection and Results Generation
# ═══════════════════════════════════════════════════════════════════════════

def test_watermark_robustness():
    """
    Test watermark detection on all transformed images and generate results.
    """
    
    results = []
    
    # Process each transformation type
    transformation_dirs = [d for d in TRANSFORMED_IMAGES_DIR.iterdir() if d.is_dir()]
    
    print(f"Testing watermark detection on {len(transformation_dirs)} transformation types...\n")
    
    for transform_dir in tqdm(transformation_dirs, desc="Processing transformation types"):
        transform_type = transform_dir.name
        
        # Get all images in this transformation directory
        image_files = sorted([f for f in transform_dir.iterdir() 
                            if f.suffix.lower() in ['.png', '.jpg', '.jpeg']])
        
        for image_path in tqdm(image_files, desc=f"  {transform_type}", leave=False):
            try:
                # Extract original image name from transformed filename
                # Assumes format: {original_name}_{transformation_details}.ext
                parts = image_path.stem.split('_')
                original_name = parts[0] if parts else image_path.stem
                
                # Find corresponding original image
                original_image = None
                for ext in ['.png', '.jpg', '.jpeg']:
                    potential_path = RAW_IMAGES_PATH / f"{original_name}{ext}"
                    if potential_path.exists():
                        original_image = potential_path
                        break
                
                if not original_image:
                    print(f"\n⚠️  Could not find original image for {image_path.name}")
                    continue
                
                # Detect watermark
                detected_bits = detect_watermark(str(image_path))
                
                # Calculate quality metrics
                metrics = metrics_calculator.calculate_all_metrics(
                    str(original_image), 
                    str(image_path)
                )
                
                # Store results
                result = {
                    'original_image': original_image.name,
                    'transformed_image': image_path.name,
                    'transformation_type': transform_type,
                    'watermark_detected': len(detected_bits) > 0,
                    'detected_bits': ''.join(map(str, detected_bits.astype(int).tolist())),
                    'num_bits_detected': len(detected_bits),
                    'psnr': metrics['psnr'],
                    'ssim': metrics['ssim'],
                    'mse': metrics['mse']
                }
                results.append(result)
                
            except Exception as e:
                print(f"\n⚠️  Error processing {image_path.name}: {str(e)}")
                continue
    
    # Save results to CSV
    results_file = RESULTS_DIR / f"watermark_detection_results_{WATERMARK_METHOD}.csv"
    
    if results:
        df = pd.DataFrame(results)
        df.to_csv(results_file, index=False)
        
        print(f"\n✓ Testing complete!")
        print(f"  - Tested {len(results)} images")
        print(f"  - Results saved to: {results_file}")
        
        # Print summary statistics
        print(f"\n📊 Summary Statistics:")
        print(f"  - Total images processed: {len(results)}")
        print(f"  - Watermarks detected: {sum(r['watermark_detected'] for r in results)}")
        print(f"  - Detection rate: {sum(r['watermark_detected'] for r in results) / len(results) * 100:.1f}%")
        print(f"\n  By transformation type:")
        
        for transform_type in df['transformation_type'].unique():
            subset = df[df['transformation_type'] == transform_type]
            detection_rate = subset['watermark_detected'].sum() / len(subset) * 100
            print(f"    {transform_type:20s}: {detection_rate:5.1f}% detected ({subset['watermark_detected'].sum()}/{len(subset)})")
        
        return df
    else:
        print("\n⚠️  No results generated. Please check your configuration and try again.")
        return None

# Run the watermark detection tests
results_df = test_watermark_robustness()

---
## 📈 Section 9: Visualise Results (Optional)

This section creates visualisations of the test results to help understand watermark robustness across different transformations.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════
# Results Visualisation
# ═══════════════════════════════════════════════════════════════════════════

if results_df is not None:
    
    # Create detection rate by transformation type
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    
    # 1. Detection Rate by Transformation Type
    detection_by_type = results_df.groupby('transformation_type')['watermark_detected'].agg(['sum', 'count'])
    detection_by_type['rate'] = detection_by_type['sum'] / detection_by_type['count'] * 100
    
    axes[0, 0].bar(detection_by_type.index, detection_by_type['rate'])
    axes[0, 0].set_xlabel('Transformation Type')
    axes[0, 0].set_ylabel('Detection Rate (%)')
    axes[0, 0].set_title('Watermark Detection Rate by Transformation')
    axes[0, 0].tick_params(axis='x', rotation=45)
    axes[0, 0].grid(axis='y', alpha=0.3)
    
    # 2. PSNR Distribution
    axes[0, 1].hist(results_df['psnr'], bins=30, edgecolor='black')
    axes[0, 1].set_xlabel('PSNR (dB)')
    axes[0, 1].set_ylabel('Frequency')
    axes[0, 1].set_title('Distribution of PSNR Values')
    axes[0, 1].grid(axis='y', alpha=0.3)
    
    # 3. SSIM Distribution
    axes[1, 0].hist(results_df['ssim'], bins=30, edgecolor='black')
    axes[1, 0].set_xlabel('SSIM')
    axes[1, 0].set_ylabel('Frequency')
    axes[1, 0].set_title('Distribution of SSIM Values')
    axes[1, 0].grid(axis='y', alpha=0.3)
    
    # 4. Detection Rate vs Image Quality
    detected = results_df[results_df['watermark_detected'] == True]
    not_detected = results_df[results_df['watermark_detected'] == False]
    
    axes[1, 1].scatter(detected['ssim'], detected['psnr'], alpha=0.5, label='Detected', color='green')
    axes[1, 1].scatter(not_detected['ssim'], not_detected['psnr'], alpha=0.5, label='Not Detected', color='red')
    axes[1, 1].set_xlabel('SSIM')
    axes[1, 1].set_ylabel('PSNR (dB)')
    axes[1, 1].set_title('Watermark Detection vs Image Quality')
    axes[1, 1].legend()
    axes[1, 1].grid(alpha=0.3)
    
    plt.tight_layout()
    
    # Save figure
    visualisation_path = RESULTS_DIR / f"results_visualisation_{WATERMARK_METHOD}.png"
    plt.savefig(visualisation_path, dpi=300, bbox_inches='tight')
    print(f"✓ Visualisation saved to: {visualisation_path}")
    
    plt.show()
else:
    print("⚠️  No results available to visualise. Please run the watermark detection tests first.")

---
## ✅ Pipeline Complete!

### What You've Accomplished

1. ✓ Configured the pipeline for your chosen watermarking method
2. ✓ Applied various transformations to your watermarked images
3. ✓ Tested watermark detection robustness
4. ✓ Generated comprehensive results and visualisations

### Next Steps

- **Review the results CSV** in the `outputs/{method}/results/` directory
- **Analyse the visualisations** to understand which transformations affect watermark detection most
- **Test different watermarking methods** by changing `WATERMARK_METHOD` and running the notebook again
- **Adjust transformation parameters** in Section 1 to test different scenarios

### Need Help?

- Refer to the repository README for detailed documentation
- Check the configuration sections if you encounter path errors
- Review the transformation parameters if you need different test conditions

Thank you for using the Watermark Robustness Testing Pipeline!