# üé® CNN-Based Image Watermarking using DWT
## Google Colab - Optimized Version

This notebook trains and evaluates a deep learning watermarking system.

**Features:**
- Invisible watermark embedding
- Robust against 7 attack types
- Automatic evaluation with metrics
- GPU accelerated training

---

## üì¶ Step 1: Setup Environment

In [None]:
# Check GPU availability
import tensorflow as tf
print("TensorFlow version:", tf.__version__)
print("GPU Available:", "‚úì YES" if tf.config.list_physical_devices('GPU') else "‚úó NO")
print("\n‚ö†Ô∏è If GPU is not available, go to: Runtime ‚Üí Change runtime type ‚Üí GPU")

In [None]:
# Clone repository (if not already cloned)
import os

# Clean up any existing directory first
if os.path.exists('Watermarking-cnn'):
    print("Removing existing directory...")
    !rm -rf Watermarking-cnn

# Clone fresh
print("Cloning repository...")
!git clone https://github.com/Mehulsri07/Watermarking-cnn.git

# Change to project directory
%cd Watermarking-cnn

# Verify we're in the right place
print("\nüìÅ Current directory:", os.getcwd())
print("üìÇ Contents:", os.listdir('.')[:10], '...')

In [None]:
# Install dependencies with NumPy fix
print("Installing dependencies...")
!pip install -q 'numpy<2.0' tensorflow-wavelets tensorflow-addons opencv-python scikit-image 'matplotlib>=3.8.0'
print("‚úì Dependencies installed")

# Verify NumPy version
import numpy as np
print(f"NumPy version: {np.__version__}")

## üì• Step 2: Download Sample Images

In [None]:
# Download sample images from Lorem Picsum
import urllib.request
import os

def download_samples(num_train=80, num_test=20):
    os.makedirs('train_images', exist_ok=True)
    os.makedirs('test_images', exist_ok=True)
    
    print(f"Downloading {num_train} training images...")
    for i in range(num_train):
        url = f"https://picsum.photos/256/256?random={i}"
        urllib.request.urlretrieve(url, f"train_images/train_{i:03d}.jpg")
        if (i+1) % 10 == 0:
            print(f"  Downloaded {i+1}/{num_train}")
    
    print(f"\nDownloading {num_test} test images...")
    for i in range(num_test):
        url = f"https://picsum.photos/256/256?random={100+i}"
        urllib.request.urlretrieve(url, f"test_images/test_{i:03d}.jpg")
        if (i+1) % 5 == 0:
            print(f"  Downloaded {i+1}/{num_test}")
    
    print(f"\n‚úì Downloaded {num_train} training + {num_test} test images")

download_samples(num_train=80, num_test=20)

## ‚öôÔ∏è Step 3: Configure Training

In [None]:
# Training configuration - Optimized for better results
EPOCHS = 50          # Number of training epochs
BATCH_SIZE = 10      # Batch size (using GPU memory efficiently)
LEARNING_RATE = 0.001

print("Training Configuration:")
print(f"  Epochs: {EPOCHS}")
print(f"  Batch Size: {BATCH_SIZE}")
print(f"  Learning Rate: {LEARNING_RATE}")
print(f"  Training Images: 80")
print(f"  Test Images: 20")
print("\n‚è±Ô∏è Expected training time: ~15-20 minutes on GPU")

## üöÄ Step 4: Train Model

In [None]:
# Verify setup before training
import os
import sys

print("Pre-training checks:")
print(f"Current directory: {os.getcwd()}")

# Check directories
print(f"\nChecking required directories:")
for dir_name in ['models', 'attacks', 'data_loaders', 'utils']:
    exists = os.path.exists(dir_name)
    print(f"  {'‚úì' if exists else '‚úó'} {dir_name}/")
    if exists:
        # Check for __init__.py
        init_exists = os.path.exists(os.path.join(dir_name, '__init__.py'))
        print(f"    {'‚úì' if init_exists else '‚úó'} __init__.py")

# Check images
print(f"\nChecking images:")
train_count = len([f for f in os.listdir('train_images') if f.endswith(('.jpg', '.png'))]) if os.path.exists('train_images') else 0
test_count = len([f for f in os.listdir('test_images') if f.endswith(('.jpg', '.png'))]) if os.path.exists('test_images') else 0
print(f"  Training images: {train_count}")
print(f"  Test images: {test_count}")

# Test imports
print(f"\nTesting imports:")
try:
    from models.wavetf_model import WaveTFModel
    print("  ‚úì models.wavetf_model")
except Exception as e:
    print(f"  ‚úó models.wavetf_model: {e}")

try:
    from data_loaders.merged_data_loader import MergedDataLoader
    print("  ‚úì data_loaders.merged_data_loader")
except Exception as e:
    print(f"  ‚úó data_loaders.merged_data_loader: {e}")

print("\n" + "="*60)
if train_count == 0 or test_count == 0:
    print("‚ö†Ô∏è Warning: No images found! Run Step 2 first.")
else:
    print("‚úì Ready to train!")
print("="*60)

In [None]:
# Run training and evaluation
!python train_and_evaluate.py

## üìä Step 5: View Results

In [None]:
# Check if training completed successfully
import os

print("Checking training results...\n")

checks = {
    'Model weights': 'config_1_baseline/final_model_weights.h5',
    'Evaluation report': 'config_1_baseline/evaluation_results/evaluation_report.json',
    'Summary chart': 'config_1_baseline/evaluation_results/summary_metrics.png',
    'Result images': 'config_1_baseline/evaluation_results/images/'
}

all_good = True
for name, path in checks.items():
    if os.path.exists(path):
        print(f"‚úì {name}: Found")
    else:
        print(f"‚úó {name}: Missing")
        all_good = False

print("\n" + "="*60)
if all_good:
    print("‚úÖ Training completed successfully!")
    print("You can now view the results below.")
else:
    print("‚ö†Ô∏è Training may not have completed.")
    print("Please run the training cell (Step 4) first.")
print("="*60)

In [None]:
# Display summary metrics
from IPython.display import Image, display
import json
import os

# Check if results exist
results_path = 'config_1_baseline/evaluation_results/summary_metrics.png'
if os.path.exists(results_path):
    print("üìä Performance Summary:")
    display(Image(results_path))
else:
    print("‚ö†Ô∏è Results not found. Make sure training completed successfully.")
    print("Run the training cell above first!")

In [None]:
# Show detailed results
import os
import json

report_path = 'config_1_baseline/evaluation_results/evaluation_report.json'
if os.path.exists(report_path):
    with open(report_path, 'r') as f:
        report = json.load(f)
    
    print("\nüìã Detailed Results:\n")
    print("="*80)
    for attack_name, stats in report['attack_statistics'].items():
        print(f"\n{attack_name}:")
        print(f"  PSNR: {stats['avg_psnr']:.2f} dB")
        print(f"  SSIM: {stats['avg_ssim']:.4f}")
        print(f"  BER:  {stats['avg_ber']:.2f}%")
    print("\n" + "="*80)
else:
    print("‚ö†Ô∏è Evaluation report not found.")
    print("Make sure training completed successfully!")

In [None]:
# Display sample visualizations
import glob
import os
from IPython.display import Image, display

images_dir = 'config_1_baseline/evaluation_results/images/'
if os.path.exists(images_dir):
    print("\nüñºÔ∏è Sample Visualizations:\n")
    image_files = glob.glob(images_dir + '*.png')[:3]
    
    if image_files:
        for img_file in image_files:
            print(f"\n{os.path.basename(img_file)}:")
            display(Image(img_file, width=800))
    else:
        print("No visualization images found.")
else:
    print("‚ö†Ô∏è Results directory not found.")
    print("Make sure training completed successfully!")

## üíæ Step 6: Download Results

In [None]:
# Create zip file with all results
!zip -r results.zip config_1_baseline/

# Download
from google.colab import files
files.download('results.zip')

print("‚úì Results downloaded!")

## üß™ Step 7: Test Custom Image (Optional)

In [None]:
# Upload your own image
from google.colab import files
import cv2
import numpy as np
from IPython.display import Image as IPImage, display

print("Upload an image to test watermarking:")
uploaded = files.upload()

if uploaded:
    filename = list(uploaded.keys())[0]
    print(f"\n‚úì Uploaded: {filename}")
    
    # Process image
    img = cv2.imread(filename, cv2.IMREAD_GRAYSCALE)
    img = cv2.resize(img, (256, 256))
    img = img.astype(np.float32) / 255.0
    img = np.expand_dims(img, axis=-1)
    
    # Generate watermark
    watermark = np.random.randint(0, 2, size=(256,)).astype(np.float32)
    
    # Load model
    from models.wavetf_model import WaveTFModel
    wavetf_model = WaveTFModel(image_size=(256, 256, 1), watermark_size=(256,))
    model = wavetf_model.get_model()
    model.load_weights('config_1_baseline/final_model_weights.h5')
    
    # Embed watermark
    img_batch = np.expand_dims(img, axis=0)
    wm_batch = np.expand_dims(watermark, axis=0)
    attack_batch = np.array([[0]], dtype=np.int32)
    
    watermarked, extracted = model.predict([img_batch, wm_batch, attack_batch])
    
    # Save and display
    cv2.imwrite('watermarked_output.png', (watermarked[0].squeeze() * 255).astype(np.uint8))
    
    print("\nüì∏ Results:")
    display(IPImage('watermarked_output.png'))
    
    # Calculate metrics
    from utils.metrics import calculate_psnr, calculate_ssim, calculate_ber
    psnr = calculate_psnr(img_batch, watermarked)
    ssim = calculate_ssim(img_batch, watermarked)
    ber = calculate_ber(wm_batch, extracted)
    
    print(f"\nüìä Metrics:")
    print(f"  PSNR: {psnr:.2f} dB")
    print(f"  SSIM: {ssim:.4f}")
    print(f"  BER:  {ber:.2f}%")

---
## ‚úÖ Done!

Your watermarking model is trained and evaluated. Check the results above!

**Next Steps:**
- Increase EPOCHS for better accuracy
- Add more training images
- Test with your own images
- Download the trained model

**Questions?** Open an issue on GitHub!