In [1]:
import base64
from flask_cors import CORS
import torch
import torchvision.transforms as transforms
import torchvision.models as models
import torch.nn.functional as F
from facenet_pytorch import MTCNN, InceptionResnetV1
from torchvision.utils import save_image
from flask import Flask, request, jsonify, send_file
import io
from torchvision.utils import save_image
from torchvision import transforms
import google.generativeai as genai
import os
from dotenv import load_dotenv
from PIL import Image as PILImage
from IPython.display import Image as DisplayImage, display



All support for the `google.generativeai` package has ended. It will no longer be receiving 
updates or bug fixes. Please switch to the `google.genai` package as soon as possible.
See README for more details:

https://github.com/google-gemini/deprecated-generative-ai-python/blob/main/README.md

  import google.generativeai as genai


In [6]:
import base64
def image_to_base64(image_bytes: bytes) -> str:
    return base64.b64encode(image_bytes).decode("utf-8")

In [3]:

load_dotenv()  
api_key = os.getenv("GOOGLE_API_KEY")
genai.configure(api_key=api_key)


In [4]:
#HELPER FUNCTIONS
import base64
from flask_cors import CORS
import torch
import torchvision.transforms as transforms
import torchvision.models as models
import torch.nn.functional as F
from PIL import Image
from facenet_pytorch import MTCNN, InceptionResnetV1
from torchvision.utils import save_image
from flask import Flask, request, jsonify, send_file
import io
from torchvision.utils import save_image
from torchvision import transforms
import google.generativeai as genai
import os
from dotenv import load_dotenv

device = 'cuda' if torch.cuda.is_available() else 'cpu'
resnet = models.resnet50(pretrained=True).eval().to(device)

mtcnn = MTCNN(keep_all=False, device=device)

facenet = InceptionResnetV1(pretrained="vggface2").eval().to(device)

face_preprocess = transforms.Compose([
    transforms.Resize((160, 160)),
    transforms.ToTensor()
])

with open("models/imagenet_classes.txt") as f:
    idx_to_class = [line.strip() for line in f.readlines()]

preprocess_224 = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])


to_tensor = transforms.ToTensor()
# ImageNet normalization (update if your preprocess_224 uses different values)
IMAGENET_MEAN = torch.tensor([0.485, 0.456, 0.406]).view(1, 3, 1, 1).to(device)
IMAGENET_STD  = torch.tensor([0.229, 0.224, 0.225]).view(1, 3, 1, 1).to(device)
def cloak_face_facenet(
    orig_img,
    intensity=0.01,
    method="fgsm",
    targeted=False,
    target_identity_img=None
):
    
    orig_w, orig_h = orig_img.size

    boxes, probs = mtcnn.detect(orig_img)
    if boxes is None:
        return None, {"error": "No face detected"}

    x1, y1, x2, y2 = map(int, boxes[0])
    face_crop = orig_img.crop((x1, y1, x2, y2))

    face_small = face_preprocess(face_crop).unsqueeze(0).to(device)
    face_small.requires_grad = True

    orig_emb = facenet(face_small).detach()

    if targeted:
        target_face = face_preprocess(target_identity_img).unsqueeze(0).to(device)
        target_emb = facenet(target_face).detach()
    else:
        target_emb = None

    def untargeted_loss(adv_emb):
        return -F.cosine_similarity(adv_emb, orig_emb).mean()

    def targeted_loss(adv_emb):
        return F.cosine_similarity(adv_emb, target_emb).mean()

    if method == "fgsm":
        emb = facenet(face_small)
        loss = targeted_loss(emb) if targeted else untargeted_loss(emb)
        loss.backward()

        epsilon = intensity
        adv_small = torch.clamp(face_small + epsilon * face_small.grad.sign(), 0, 1).detach()

    elif method == "pgd":
        adv_small = face_small.clone()
        epsilon = intensity
        alpha = epsilon / 3
        steps = 7

        for _ in range(steps):
            adv_small = adv_small.detach()
            adv_small.requires_grad_(True)

            emb = facenet(adv_small)
            loss = targeted_loss(emb) if targeted else untargeted_loss(emb)
            loss.backward()

            adv_small = adv_small + alpha * adv_small.grad.sign()
            adv_small = torch.min(torch.max(adv_small, face_small - epsilon), face_small + epsilon)
            adv_small = torch.clamp(adv_small, 0, 1)

    delta_small = adv_small - face_small

    face_H = y2 - y1
    face_W = x2 - x1

    delta_big = torch.nn.functional.interpolate(
        delta_small,
        size=(face_H, face_W),
        mode='bilinear',
        align_corners=False
    )

    orig_tensor = to_tensor(orig_img).unsqueeze(0).to(device)
    perturbed = orig_tensor.clone()

    perturbed[:, :, y1:y2, x1:x2] = torch.clamp(
        orig_tensor[:, :, y1:y2, x1:x2] + delta_big,
        0, 1
    )

    adv_face_crop = transforms.ToPILImage()(perturbed[0, :, y1:y2, x1:x2].cpu())
    adv_face_small = face_preprocess(adv_face_crop).unsqueeze(0).to(device)
    adv_emb = facenet(adv_face_small).detach()

    metrics = {}

    orig_sim = float(F.cosine_similarity(orig_emb, orig_emb))
    adv_sim = float(F.cosine_similarity(orig_emb, adv_emb))
    emb_dist = float((orig_emb - adv_emb).norm())

    metrics["cosine_similarity_before"] = orig_sim
    metrics["cosine_similarity_after"] = adv_sim
    metrics["similarity_drop"] = 1.0 - adv_sim
    metrics["embedding_distance_original_vs_adv"] = emb_dist

    embedding_dim = orig_emb.shape[1]

    metrics["normalized_distance"] = emb_dist / embedding_dim
    metrics["adv_vs_orig_norm_ratio"] = emb_dist / (orig_emb.norm().item() + 1e-6)
    metrics["percent_change_in_distance"] = \
        float((emb_dist / (orig_emb.norm().item() + 1e-6)) * 100)

    metrics["embedding_moved_norm"] = emb_dist

    metrics["embedding_movement_per_pixel"] = emb_dist / (face_H * face_W)

    SUCCESS_THRESHOLD = 0.85
    metrics["attack_success"] = adv_sim < SUCCESS_THRESHOLD

    metrics["effective_cloaking_score"] = min(1.0, (1 - adv_sim) * 1.3)

    if targeted:
        tgt_sim_before = float(F.cosine_similarity(orig_emb, target_emb))
        tgt_sim_after = float(F.cosine_similarity(adv_emb, target_emb))

        metrics["target_similarity_before"] = tgt_sim_before
        metrics["target_similarity_after"] = tgt_sim_after
        metrics["push_toward_target"] = tgt_sim_after - tgt_sim_before

        metrics["target_push_strength"] = max(0.0, metrics["push_toward_target"])

    return perturbed, metrics


def pil_to_base64(pil_img, format="PNG"):
    buffer = io.BytesIO()
    pil_img.save(buffer, format=format)
    buffer.seek(0)
    return base64.b64encode(buffer.read()).decode("utf-8")


def base64_to_pil(b64_string):
    img_bytes = base64.b64decode(b64_string)
    return PILImage.open(io.BytesIO(img_bytes)).convert("RGB")


def tensor_to_base64(tensor, format="PNG"):
    buffer = io.BytesIO()
    save_image(tensor, buffer, format=format)
    buffer.seek(0)
    return base64.b64encode(buffer.read()).decode("utf-8")

def face_cloak_from_base64(
    image_b64: str,
    intensity: float = 0.01,
    method: str = "fgsm",
    targeted: bool = False,
    target_image_b64: str | None = None,
):
    """
    Pure function:
    - Takes images as base64
    - Returns base64 cloaked image + metrics
    """

    # Decode input image
    orig_img = base64_to_pil(image_b64)

    target_identity_img = None
    if targeted:
        if target_image_b64 is None:
            return None, {"error": "Targeted attack requires target_image"}
        target_identity_img = base64_to_pil(target_image_b64)

    # ---- CORE PROCESSING (UNCHANGED) ----
    perturbed_tensor, metrics = cloak_face_facenet(
        orig_img,
        intensity=intensity,
        method=method,
        targeted=targeted,
        target_identity_img=target_identity_img
    )

    if perturbed_tensor is None:
        return None, metrics

    # Encode output tensor to base64
    cloaked_b64 = tensor_to_base64(perturbed_tensor)
    print(metrics)
    return cloaked_b64, metrics


def fgsm_highres_cloak(pil_img, target_idx, epsilon=0.01, targeted=False):
    """
    - pil_img: PIL RGB image (original full resolution)
    - target_idx: integer class index
    - epsilon: float (applied in normalized input-space then converted to pixel-space)
    - targeted: bool (if True, push *towards* target; else push *away*)
    Returns:
    - perturbed_orig_px: tensor [1,3,H,W] with pixel values in [0,1] at original resolution
    """
    model = resnet
    model.eval()

    # 1) Prepare model input (resized+normalized) for gradient computation
    x = preprocess_224(pil_img).unsqueeze(0).to(device)   # this is normalized input
    x.requires_grad = True

    # 2) Forward + loss
    out = model(x)
    loss = F.cross_entropy(out, torch.tensor([target_idx], device=device))
    model.zero_grad()
    loss.backward()

    # 3) gradient sign in normalized space
    grad_sign = x.grad.data.sign()  # shape [1,3,224,224] (or whatever preprocess_224 produces)

    # 4) make delta in normalized space (flip sign for targeted)
    if targeted:
        delta_norm = -epsilon * grad_sign
    else:
        delta_norm = epsilon * grad_sign

    # 5) convert delta from normalized-space -> pixel-space
    #    delta_px_small = delta_norm * std (because x_norm = (x_px - mean)/std -> delta_px = delta_norm * std)
    delta_px_small = delta_norm * IMAGENET_STD  # still small spatial size (e.g. 224x224)

    # 6) upsample delta to original image size
    orig_w, orig_h = pil_img.size  # PIL: (width, height)
    delta_px_upsampled = F.interpolate(delta_px_small, size=(orig_h, orig_w), mode="bilinear", align_corners=False)

    # 7) get original image as pixel tensor
    to_tensor = transforms.ToTensor()
    orig_px = to_tensor(pil_img).unsqueeze(0).to(device)  # [1,3,H,W], values in [0,1]

    # 8) apply perturbation and clamp
    perturbed_orig_px = orig_px + delta_px_upsampled
    perturbed_orig_px = torch.clamp(perturbed_orig_px, 0.0, 1.0).detach()

    return perturbed_orig_px  # [1,3,H,W] in original resolution, ready to save


def art_cloak_from_base64(
    image_b64: str,
    intensity: float = 0.01,
    mode: str = "untargeted",
    target_class_name: str | None = None,
):
    """
    Pure function:
    - Input images as base64
    - Output cloaked image as base64 + predictions
    """
    # print(image_b64)
    # Decode input image
    orig_img = base64_to_pil(image_b64)

    # --- BEFORE PREDICTIONS ---
    with torch.no_grad():
        x_before = preprocess_224(orig_img).unsqueeze(0).to(device)
        probs_before = F.softmax(resnet(x_before), dim=1)[0]

    targeted = (mode == "targeted")

    # Resolve target class
    if target_class_name is None:
        target_idx = torch.argmax(probs_before).item()
        target_class_name = idx_to_class[target_idx]
    else:
        try:
            target_idx = idx_to_class.index(target_class_name)
        except ValueError:
            return None, {"error": "Invalid class name"}

    # --- HIGH-RES CLOAKING (UNCHANGED) ---
    perturbed_tensor = fgsm_highres_cloak(
        pil_img=orig_img,
        target_idx=target_idx,
        epsilon=intensity,
        targeted=targeted
    )

    # --- AFTER PREDICTIONS ---
    with torch.no_grad():
        pert_img_pil = transforms.ToPILImage()(
            perturbed_tensor.squeeze(0).cpu()
        )
        x_after = preprocess_224(pert_img_pil).unsqueeze(0).to(device)
        probs_after = F.softmax(resnet(x_after), dim=1)[0]

    top_before = torch.topk(probs_before, 3)
    top_after = torch.topk(probs_after, 3)

    response = {
        "mode": mode,
        "target_class": target_class_name,
        "original_top_predictions": [
            {
                "class": idx_to_class[top_before.indices[i]],
                "prob": float(top_before.values[i])
            }
            for i in range(3)
        ],
        "cloaked_top_predictions": [
            {
                "class": idx_to_class[top_after.indices[i]],
                "prob": float(top_after.values[i])
            }
            for i in range(3)
        ],
    }

    cloaked_b64 = tensor_to_base64(perturbed_tensor)
    return cloaked_b64, response





In [5]:
#TOOLS FOR AGENTS
def training_tool(image_base64: str, prompt: str):
    return {
        "tool": "training",
        "status": "started",
        "job_id": "train_001"
    }


def face_clocking_tool(image_b64: str,
    intensity: float = 0.01,
    method: str = "fgsm",
    targeted: bool = False,
    target_image_b64: str | None = None,
    ):
    return face_cloak_from_base64(image_b64,intensity,method,targeted,target_image_b64)   

def art_clocking_tool(image_b64: str,
    intensity: float = 0.01,
    mode: str = "untargeted",
    target_class_name: str | None = None,):
    return art_cloak_from_base64(image_b64,
    intensity,
    mode,
    target_class_name)
    
    
mtcnn = MTCNN(keep_all=False, device=device)

facenet = InceptionResnetV1(pretrained="vggface2").eval().to(device)

face_preprocess = transforms.Compose([
    transforms.Resize((160, 160)),
    transforms.ToTensor()
])

to_tensor = transforms.ToTensor()


In [6]:
def detect_intent(message: str):
    msg = message.lower()

    if "training" in msg:
        return "training"
    if "face" in msg:
        return "face_clocking"
    if "art" in msg:
        return "art_clocking"

    return "chat"


In [10]:
import google.generativeai as genai
import time

SYSTEM_PROMPT = """
You are Mirage AI.

Rules:
- User explicitly tells what action to perform
- Do NOT assume tools
- Explain tool outputs clearly
- If no tool is used, act as a product assistant
- Always respond in valid JSON
"""

# ✅ Switch to Flash (quota-safe)
model = genai.GenerativeModel(
    "models/gemini-2.5-flash",
    generation_config={
        "max_output_tokens": 200,
        "temperature": 0.2
    }
)


def safe_generate(prompt, retries=3):
    for i in range(retries):
        try:
            return model.generate_content(prompt)
        except Exception as e:
            if "ResourceExhausted" in str(e):
                wait = 5
                print(f"⚠️ Gemini rate-limited. Retrying in {wait}s...")
                time.sleep(wait)
            else:
                raise e
    raise RuntimeError("Gemini quota exhausted. Try again later.")

# def safe_generate(prompt, retries=100):
#     for i in range(retries):
#         try:
#             return model.generate_content(prompt)
#         except Exception as e:
#             if "ResourceExhausted" in str(e):
#                 time.sleep(5 * (i + 1))  # exponential backoff
#             else:
#                 raise e
#     raise RuntimeError("Gemini quota exhausted after retries")

def run_agent(user_message: str, image_base64: str | None = None):
    intent = detect_intent(user_message)
    print(intent)
    tool_used = None
    tool_result = None

    if intent != "chat":
        if image_base64 is None:
            return {
                "intent": intent,
                "tool_used": None,
                "result": None,
                "explanation": "Image is required to use this tool.",
                "image_base64": None
            }

        if intent == "training":
            tool_used = "training"
            image_output,tool_result = training_tool(image_base64, user_message)

        elif intent == "face_clocking":
            tool_used = "face_clocking"
            image_output, tool_result = face_clocking_tool(image_base64)
            print(image_base64,tool_result) 
        elif intent == "art_clocking":
            tool_used = "art_clocking"
            image_output , tool_result = art_clocking_tool(image_base64)
            print(image_base64,tool_result)  
        prompt = f"""
{SYSTEM_PROMPT}

User request:
{user_message}

Tool output:
{tool_result}

Explain the tool output clearly in JSON.
"""

    else:
        prompt = f"""
{SYSTEM_PROMPT}

User query:
{user_message}

Respond in JSON.
"""

    response = safe_generate(prompt)

    return {
        "intent": intent,
        "tool_used": tool_used,
        "result": tool_result,
        "explanation": response.text,
        "image_base64": image_base64
    }

In [11]:
from IPython.display import Image, display
import base64

def display_image_from_base64(image_base64):
    if image_base64:
        display(Image(data=base64.b64decode(image_base64)))


In [8]:
#ART CLOCKING AGENT
img="art.jpg"

with open(img, "rb") as f:
    img_b64 = image_to_base64(f.read())
    print(img_b64)
# display_image_from_base64(img_b64)
# test=  run_agent("Clock this art and return the image",img_b64)
# test


/9j/4AAQSkZJRgABAgEASABIAAD/4gxYSUNDX1BST0ZJTEUAAQEAAAxITGlubwIQAABtbnRyUkdCIFhZWiAHzgACAAkABgAxAABhY3NwTVNGVAAAAABJRUMgc1JHQgAAAAAAAAAAAAAAAAAA9tYAAQAAAADTLUhQICAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABFjcHJ0AAABUAAAADNkZXNjAAABhAAAAGx3dHB0AAAB8AAAABRia3B0AAACBAAAABRyWFlaAAACGAAAABRnWFlaAAACLAAAABRiWFlaAAACQAAAABRkbW5kAAACVAAAAHBkbWRkAAACxAAAAIh2dWVkAAADTAAAAIZ2aWV3AAAD1AAAACRsdW1pAAAD+AAAABRtZWFzAAAEDAAAACR0ZWNoAAAEMAAAAAxyVFJDAAAEPAAACAxnVFJDAAAEPAAACAxiVFJDAAAEPAAACAx0ZXh0AAAAAENvcHlyaWdodCAoYykgMTk5OCBIZXdsZXR0LVBhY2thcmQgQ29tcGFueQAAZGVzYwAAAAAAAAASc1JHQiBJRUM2MTk2Ni0yLjEAAAAAAAAAAAAAABJzUkdCIElFQzYxOTY2LTIuMQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWFlaIAAAAAAAAPNRAAEAAAABFsxYWVogAAAAAAAAAAAAAAAAAAAAAFhZWiAAAAAAAABvogAAOPUAAAOQWFlaIAAAAAAAAGKZAAC3hQAAGNpYWVogAAAAAAAAJKAAAA+EAAC2z2Rlc2MAAAAAAAAAFklFQyBodHRwOi8vd3d3LmllYy5jaAAAAAAAAAAAAAAAFklFQyBodHRwOi8vd3d3LmllYy5jaAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABkZXNj

In [None]:
#ART CLOCKING AGENT
face="Vats.jpg"

with open(face, "rb") as f:
    img_b64 = image_to_base64(f.read())
    print(img_b64)
# display_image_from_base64(img_b64)
# test=  run_agent("Clock this face and return the image",img_b64)
# test


: 