In [None]:


# %% [markdown]
# #  CloakID — Phase 1: Image Immunization
#
# This notebook implements a **dual-layer adversarial defense** that injects
# imperceptible noise into personal images, making them resistant to
# manipulation by Latent Diffusion Models and Deepfake algorithms.

# %% — Cell 1: Setup & Installs
# ============================================================================
import subprocess, sys
import os

def install(pkg):
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg])

# Install dependencies if not already present
print(" Installing dependencies...")
pkgs = ["diffusers", "transformers", "accelerate", "gradio", "lpips", "scikit-image"]
for p in pkgs:
    install(p)

print(" All dependencies installed.")

# %% — Cell 2: Imports & Model Loading
# ============================================================================
import torch
import torch.nn.functional as F
import numpy as np
from PIL import Image
from skimage.metrics import structural_similarity as compare_ssim
from diffusers import AutoencoderKL
from transformers import CLIPModel, CLIPProcessor
import gradio as gr
import warnings, time

warnings.filterwarnings("ignore")

# ── Device Setup ─────────────────────────────────────────────────────────────
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Use float16 on GPU for memory efficiency, float32 on CPU
DTYPE  = torch.float16 if DEVICE.type == "cuda" else torch.float32

print(f"  Device : {DEVICE}  |  Dtype : {DTYPE}")

# ── Layer 1 Model: VAE (The Eye) ─────────────────────────────────────────────
print(" Loading VAE (stabilityai/sd-vae-ft-mse) …")
vae = AutoencoderKL.from_pretrained(
    "stabilityai/sd-vae-ft-mse",
    torch_dtype=DTYPE,
).to(DEVICE)
vae.eval()
vae.requires_grad_(False) # Freeze model
print("  VAE loaded...")

# ── Layer 2 Model: CLIP (The Brain) ──────────────────────────────────────────
print(" Loading CLIP (openai/clip-vit-large-patch14) …")
clip_model = CLIPModel.from_pretrained(
    "openai/clip-vit-large-patch14",
    torch_dtype=DTYPE,
).to(DEVICE)
clip_model.eval()
clip_model.requires_grad_(False) # Freeze model

clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14")
print("  CLIP loaded.")
print(" All models ready.\n")


# %% — Cell 3: Core Utility Functions
# ============================================================================

ATTACK_RES = 512  # Resolution used for internal gradient calculation

# ── Image ↔ Tensor Helpers ──────────────────────────────────────────────────

def pil_to_tensor(pil_img: Image.Image, size: int = ATTACK_RES) -> torch.Tensor:
    """Convert PIL image → float32 tensor [1,3,H,W] in [0,1], resized for attack."""
    img = pil_img.convert("RGB").resize((size, size), Image.LANCZOS)
    arr = np.array(img).astype(np.float32) / 255.0
    tensor = torch.from_numpy(arr).permute(2, 0, 1).unsqueeze(0)
    # We always compute gradients in float32 for stability, even if models are fp16
    return tensor.to(DEVICE, dtype=torch.float32)


def tensor_to_pil(tensor: torch.Tensor) -> Image.Image:
    """Convert tensor [1,3,H,W] in [0,1] → PIL image."""
    arr = tensor.squeeze(0).clamp(0, 1).detach().cpu().permute(1, 2, 0).numpy()
    return Image.fromarray((arr * 255).astype(np.uint8))


def apply_perturbation_fullres(
    original_pil: Image.Image,
    delta_lowres: torch.Tensor,
) -> Image.Image:
    """
    CRITICAL FUNCTION: Solves the blurriness issue.
    1. Takes the noise pattern (delta) calculated at 512x512.
    2. Upscales ONLY the noise to the original image dimensions.
    3. Adds noise to the high-res original image.
    """
    orig_w, orig_h = original_pil.size

    # Upscale the noise (delta) to match original image
    # We use bilinear interpolation for the noise pattern
    delta_fullres = F.interpolate(
        delta_lowres,
        size=(orig_h, orig_w),
        mode="bilinear",
        align_corners=False,
    )

    # Convert original full-res image to tensor
    orig_tensor = (
        torch.from_numpy(np.array(original_pil.convert("RGB")).astype(np.float32) / 255.0)
        .permute(2, 0, 1)
        .unsqueeze(0)
        .to(DEVICE, dtype=torch.float32)
    )

    # Apply noise and clamp to valid range [0, 1]
    protected = (orig_tensor + delta_fullres).clamp(0, 1)
    
    # Return as PIL Image
    return tensor_to_pil(protected)


# ── Quality Metrics ─────────────────────────────────────────────────────────

def compute_ssim(img_a: Image.Image, img_b: Image.Image) -> float:
    """Compute SSIM (Visual Similarity) score."""
    # Resize for comparison speed if image is huge
    size = min(img_a.size[0], 1024), min(img_a.size[1], 1024)
    a = np.array(img_a.convert("RGB").resize(size, Image.LANCZOS))
    b = np.array(img_b.convert("RGB").resize(size, Image.LANCZOS))
    return compare_ssim(a, b, channel_axis=2, data_range=255)


# %% — Cell 4: Dual-Layer PGD Attack Engine
# ============================================================================

def compute_vae_loss(vae_model, perturbed: torch.Tensor):
    """
    Layer 1 Target: "Gray Death"
    Force the VAE to generate Zero Latents (Gray/Static).
    """
    # Normalize to [-1, 1] for VAE
    perturbed_scaled = perturbed * 2.0 - 1.0

    # Get Latents (Cast to model DTYPE e.g. float16)
    pert_latent = vae_model.encode(perturbed_scaled.to(DTYPE)).latent_dist.mean

    # Target: All Zeros (Gray Image)
    target_latent = torch.zeros_like(pert_latent)
    
    # Loss: Minimize distance to Gray Target
    loss = F.mse_loss(pert_latent, target_latent)
    return loss


def compute_clip_loss(model, processor, perturbed: torch.Tensor):
    """
    Layer 2 Target: "Semantic Void"
    Push the image embedding towards a random, non-sensical vector.
    """
    # CLIP Normalization constants
    clip_mean = torch.tensor([0.48145466, 0.4578275, 0.40821073], device=DEVICE).view(1, 3, 1, 1)
    clip_std  = torch.tensor([0.26862954, 0.26130258, 0.27577711], device=DEVICE).view(1, 3, 1, 1)

    # Resize to 224x224 for CLIP input requirements
    img_clip = F.interpolate(perturbed, size=(224, 224), mode="bilinear", align_corners=False)
    img_clip = (img_clip - clip_mean) / clip_std

    # Get features
    image_emb = model.get_image_features(pixel_values=img_clip.to(DTYPE))
    image_emb = image_emb / image_emb.norm(dim=-1, keepdim=True)

    # -------------------------------------------------------------------------
    # FIX: Initialize Generator on the same device as the model
    # -------------------------------------------------------------------------
    if DEVICE.type == "cuda":
        gen = torch.Generator(device=DEVICE)
    else:
        gen = torch.Generator(device="cpu")
    gen.manual_seed(999) # Fixed seed for consistency
    
    target_emb = torch.randn(1, image_emb.shape[-1], generator=gen, device=DEVICE).to(DTYPE)
    target_emb = target_emb / target_emb.norm(dim=-1, keepdim=True)

    # Loss: Minimize distance to the Random Target
    loss = F.mse_loss(image_emb, target_emb)
    return loss


@torch.enable_grad()
def pgd_attack(
    original_pil: Image.Image,
    steps: int       = 50,
    epsilon: float   = 0.04,
    vae_weight: float  = 1.0,
    clip_weight: float = 0.5,
    progress_callback  = None,
):
    """
    The Core CloakID Algorithm using Projected Gradient Descent (PGD).
    """
    # Setup inputs
    x_orig = pil_to_tensor(original_pil) # [1,3,512,512]
    x_orig.requires_grad_(False)

    # Initialize noise (delta)
    delta = torch.zeros_like(x_orig, requires_grad=True, device=DEVICE)

    # Optimization Step Size
    alpha = epsilon / (steps * 0.4) 

    print(f"\n Starting CloakID Attack | Eps: {epsilon} | Steps: {steps}")

    for step in range(steps):
        # Create Adversarial Image
        x_adv = (x_orig + delta).clamp(0, 1)

        # 1. Calculate Losses
        l_vae = compute_vae_loss(vae, x_adv)
        l_clip = compute_clip_loss(clip_model, clip_processor, x_adv)

        # 2. Combined Loss
        # We want to MINIMIZE the distance to our targets
        total_loss = (vae_weight * l_vae) + (clip_weight * l_clip)

        # 3. Calculate Gradient
        total_loss.backward()

        # 4. PGD Update (Minimize Loss)
        with torch.no_grad():
            grad = delta.grad.detach()
            # Move noise towards the target
            delta.data = delta.data - alpha * torch.sign(grad)
            
            # Constraint A: Epsilon (Invisibility)
            delta.data = delta.data.clamp(-epsilon, epsilon)
            
            # Constraint B: Valid Pixel Range
            delta.data = (x_orig + delta.data).clamp(0, 1) - x_orig

        delta.grad.zero_()

        # Progress Update
        if progress_callback and step % 5 == 0:
            progress_callback((step + 1) / steps, desc=f"Optimizing... Loss: {total_loss.item():.4f}")

    # ── Final Output Generation ─────────────────────────────────────────
    # We take the best noise pattern and apply it to the FULL RES image
    protected_pil = apply_perturbation_fullres(original_pil, delta.detach())
    
    # Calculate Metrics
    ssim_val = compute_ssim(original_pil, protected_pil)
    
    print(f" Attack Complete. SSIM: {ssim_val:.4f}")
    
    metrics = {
        "ssim": ssim_val,
        "epsilon": epsilon,
        "steps": steps
    }

    return protected_pil, metrics


# %% — Cell 5: Gradio User Interface
# ============================================================================

OUTPUT_DIR = "/kaggle/working"
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

def immunize(
    image: Image.Image,
    intensity: float,
    steps: int,
    progress=gr.Progress(track_tqdm=True),
):
    if image is None:
        raise gr.Error("Please upload an image first.")

    # Execute Attack
    # Weights optimized for general defense (Layer 1=1.0, Layer 2=0.5)
    protected_pil, metrics = pgd_attack(
        original_pil = image,
        steps        = int(steps),
        epsilon      = intensity,
        vae_weight   = 1.0,
        clip_weight  = 0.5,
        progress_callback = progress,
    )

    # Save as Lossless PNG
    save_path = os.path.join(OUTPUT_DIR, "cloakid_protected.png")
    protected_pil.save(save_path, format="PNG", compress_level=1)

    # Create Status Report
    status = (
        f"###  Immunization Successful\n"
        f"**Visual Fidelity (SSIM):** {metrics['ssim']:.4f} "
        f"{' (Good)' if metrics['ssim'] >= 0.90 else ' (Visible Noise)'}\n\n"
        f"The image has been inoculated against Latent Diffusion models. "
        f"Download the PNG below."
    )

    return protected_pil, status, save_path


# ── UI Layout ───────────────────────────────────────────────────────────────

with gr.Blocks(title="CloakID Phase 1", theme=gr.themes.Soft()) as demo:
    gr.Markdown("#  CloakID: Image Immunization (Phase 1)")
    gr.Markdown("### Adversarial Defense Against Multimodal Diffusion Models")
    gr.Markdown("Upload your photo to apply the Dual-Layer Adversarial Defense (VAE + CLIP).")

    with gr.Row():
        with gr.Column():
            input_image = gr.Image(label="Original Image", type="pil", height=450)
            
            with gr.Group():
                gr.Markdown("### Protection Settings")
                intensity_slider = gr.Slider(0.01, 0.08, value=0.04, step=0.01, label="Shield Strength (Epsilon)")
                steps_slider = gr.Slider(10, 100, value=50, step=10, label="Optimization Steps")
            
            run_btn = gr.Button(" Apply Immunization", variant="primary", size="lg")

        with gr.Column():
            output_image = gr.Image(label="Protected Result", type="pil", height=450)
            status_md = gr.Markdown()
            download_file = gr.File(label=" Download Lossless PNG")

    run_btn.click(
        fn=immunize,
        inputs=[input_image, intensity_slider, steps_slider],
        outputs=[output_image, status_md, download_file],
    )

# %% — Cell 6: Launch
# ============================================================================
print(" Launching CloakID Interface...")
demo.queue().launch(share=True, debug=True)

 Installing dependencies...
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 68.6/68.6 kB 2.1 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 444.8/444.8 kB 11.3 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.0/2.0 MB 54.0 MB/s eta 0:00:00


ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
google-adk 1.22.1 requires google-cloud-bigquery-storage>=2.0.0, which is not installed.
langchain-core 0.3.79 requires packaging<26.0.0,>=23.2.0, but you have packaging 26.0rc2 which is incompatible.
fastai 2.8.4 requires fastcore<1.9,>=1.8.0, but you have fastcore 1.11.3 which is incompatible.


   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 53.8/53.8 kB 2.3 MB/s eta 0:00:00
 All dependencies installed.


2026-02-13 09:15:50.939175: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1770974151.110706      55 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1770974151.160737      55 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1770974151.594099      55 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1770974151.594142      55 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1770974151.594144      55 computation_placer.cc:177] computation placer alr

  Device : cuda  |  Dtype : torch.float16
 Loading VAE (stabilityai/sd-vae-ft-mse) …


config.json:   0%|          | 0.00/547 [00:00<?, ?B/s]

diffusion_pytorch_model.safetensors:   0%|          | 0.00/335M [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


  VAE loaded...
 Loading CLIP (openai/clip-vit-large-patch14) …


config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/1.71G [00:00<?, ?B/s]

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


preprocessor_config.json:   0%|          | 0.00/316 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/905 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/389 [00:00<?, ?B/s]

  CLIP loaded.
 All models ready.

 Launching CloakID Interface...
* Running on local URL:  http://127.0.0.1:7860
* Running on public URL: https://6017015508a7e01bb6.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://6017015508a7e01bb6.gradio.live


