# Adversarial Robustness Evaluation for Text-to-3D Retrieval

This notebook evaluates the robustness of two retrieval models:

1. **Unsupervised embedding-based retrieval** (e.g. MiniLM / BERT + FAISS)
2. **Supervised contrastive multimodal model** (CLIP-style fine-tuning)

We use an adversarial test set of queryâ€“asset pairs with controlled perturbations
(lexical, syntactic, semantic distractions) and compute:

- Retrieval@k (R@1, R@5, R@10)
- Mean Reciprocal Rank (MRR)
- Cosine similarity drops between original vs adversarial queries
- Robustness Ratio: (R@k_adv / R@k_orig)
- Significance tests comparing the two methods


In [None]:
import os
from pathlib import Path
import pandas as pd
import numpy as np
from typing import List, Dict, Callable, Tuple

from tqdm import tqdm

# For significance tests
from scipy.stats import ttest_rel, wilcoxon

# For basic plotting (optional; comment out if you prefer not to plot)
import matplotlib.pyplot as plt

# Make numpy printouts nicer
np.set_printoptions(precision=4, suppress=True)

In [1]:
# Project root assumed to be one directory up from this notebook
PROJECT_ROOT = Path.cwd().parent  # adjust if needed

# Paths
DATA_PROCESSED_DIR = PROJECT_ROOT / "data" / "processed"

ADVERSARIAL_TEST_PATH = DATA_PROCESSED_DIR / "adversarial_test_set.csv"

# Name your models for consistent logging
MODEL_NAMES = {
    "unsupervised": "Embedding-based (unsupervised)",
    "contrastive": "Contrastive multimodal (supervised)",
}

# Retrieval configuration
TOP_K_EVAL = 10   # we will compute R@1, R@5, R@10 within this range

# Column names in the adversarial test CSV
COL_ASSET_ID = "asset_id"
COL_ORIG_QUERY = "original_query"
COL_ADV_QUERY = "adv_query"
COL_PERTURB_TYPE = "perturbation_type"   # e.g. lexical / syntactic / semantic_distraction

# If you have multiple adversaries per original, you may also have:
# COL_GROUP_ID = "group_id"  # or something similar; not strictly necessary


NameError: name 'Path' is not defined

In [None]:
assert ADVERSARIAL_TEST_PATH.exists(), f"Adversarial file not found: {ADVERSARIAL_TEST_PATH}"

df_adv = pd.read_csv(ADVERSARIAL_TEST_PATH)

print("Loaded adversarial test set with shape:", df_adv.shape)
df_adv.head()


In [None]:
def retrieve_unsupervised(query: str, top_k: int = 100) -> List[str]:
    """
    TODO: Implement retrieval with your embedding-based unsupervised model.
    Should return a ranked list of asset_ids (as str or int) of length <= top_k.
    """
    # Example skeleton:
    # query_emb = unsupervised_text_encoder.encode(query)
    # scores, indices = faiss_index.search(query_emb, top_k)
    # asset_ids = [id_lookup[i] for i in indices[0]]
    # return asset_ids
    raise NotImplementedError("Implement retrieve_unsupervised()")


def retrieve_contrastive(query: str, top_k: int = 100) -> List[str]:
    """
    TODO: Implement retrieval with your contrastive multimodal model.
    Should return a ranked list of asset_ids (as str or int) of length <= top_k.
    """
    # Example skeleton:
    # query_emb = contrastive_text_encoder.encode(query)
    # scores, indices = contrastive_index.search(query_emb, top_k)
    # asset_ids = [id_lookup[i] for i in indices[0]]
    # return asset_ids
    raise NotImplementedError("Implement retrieve_contrastive()")


In [None]:
def encode_unsupervised_text(query: str) -> np.ndarray:
    """
    TODO: Return a 1D numpy array embedding for the query using the unsupervised text encoder.
    """
    # Example:
    # emb = unsupervised_text_encoder.encode(query)
    # return np.array(emb, dtype=float)
    raise NotImplementedError("Implement encode_unsupervised_text()")


def encode_contrastive_text(query: str) -> np.ndarray:
    """
    TODO: Return a 1D numpy array embedding for the query using the contrastive text encoder.
    """
    # Example:
    # emb = contrastive_text_encoder.encode(query)
    # return np.array(emb, dtype=float)
    raise NotImplementedError("Implement encode_contrastive_text()")


In [None]:
def reciprocal_rank(ranked_ids: List[str], target_id: str) -> float:
    """
    Compute reciprocal rank for one query.
    """
    try:
        rank = ranked_ids.index(target_id) + 1  # 1-based
        return 1.0 / rank
    except ValueError:
        return 0.0


def retrieval_at_k(ranked_ids: List[str], target_id: str, k: int) -> int:
    """
    1 if target_id is within top-k of ranked_ids, else 0.
    """
    return int(target_id in ranked_ids[:k])


def cosine_similarity(a: np.ndarray, b: np.ndarray, eps: float = 1e-8) -> float:
    """
    Cosine similarity between two vectors.
    """
    denom = (np.linalg.norm(a) * np.linalg.norm(b) + eps)
    return float(np.dot(a, b) / denom)


def compute_robustness_ratio(r_at_k_orig: np.ndarray, r_at_k_adv: np.ndarray) -> float:
    """
    RR = R@k_adv / R@k_orig, averaged across queries.

    We compute mean(R@k_adv) / mean(R@k_orig). If mean(R@k_orig) == 0 => return 0.
    """
    mean_orig = r_at_k_orig.mean()
    mean_adv = r_at_k_adv.mean()
    if mean_orig == 0:
        return 0.0
    return float(mean_adv / mean_orig)


In [None]:
def evaluate_model(
    df: pd.DataFrame,
    retrieve_fn: Callable[[str, int], List[str]],
    encode_fn: Callable[[str], np.ndarray],
    model_name: str,
    top_k_eval: int = 10,
) -> Dict[str, pd.DataFrame]:
    """
    Returns:
      results: dict with keys "per_query" and "summary"
    """
    records = []

    print(f"Evaluating model: {model_name}")
    for _, row in tqdm(df.iterrows(), total=len(df)):
        asset_id = str(row[COL_ASSET_ID])
        q_orig = str(row[COL_ORIG_QUERY])
        q_adv = str(row[COL_ADV_QUERY])
        perturb_type = row.get(COL_PERTURB_TYPE, "unknown")

        # --- Retrieval ---
        ranked_orig = [str(x) for x in retrieve_fn(q_orig, top_k_eval)]
        ranked_adv = [str(x) for x in retrieve_fn(q_adv, top_k_eval)]

        # --- Retrieval metrics ---
        r1_orig = retrieval_at_k(ranked_orig, asset_id, 1)
        r5_orig = retrieval_at_k(ranked_orig, asset_id, 5)
        r10_orig = retrieval_at_k(ranked_orig, asset_id, 10)
        mrr_orig = reciprocal_rank(ranked_orig, asset_id)

        r1_adv = retrieval_at_k(ranked_adv, asset_id, 1)
        r5_adv = retrieval_at_k(ranked_adv, asset_id, 5)
        r10_adv = retrieval_at_k(ranked_adv, asset_id, 10)
        mrr_adv = reciprocal_rank(ranked_adv, asset_id)

        # --- Embedding metrics (cosine similarity drop) ---
        emb_orig = encode_fn(q_orig)
        emb_adv = encode_fn(q_adv)

        cos_orig_adv = cosine_similarity(emb_orig, emb_adv)
        # If you want similarity wrt the asset embedding itself, you can extend this later.

        records.append({
            "asset_id": asset_id,
            "orig_query": q_orig,
            "adv_query": q_adv,
            "perturbation_type": perturb_type,
            "R1_orig": r1_orig,
            "R5_orig": r5_orig,
            "R10_orig": r10_orig,
            "MRR_orig": mrr_orig,
            "R1_adv": r1_adv,
            "R5_adv": r5_adv,
            "R10_adv": r10_adv,
            "MRR_adv": mrr_adv,
            "cos_orig_adv": cos_orig_adv,
        })

    per_query_df = pd.DataFrame.from_records(records)

    # Aggregate global metrics
    summary_rows = []

    def add_summary(sub_df: pd.DataFrame, name: str):
        r1_orig = sub_df["R1_orig"].mean()
        r5_orig = sub_df["R5_orig"].mean()
        r10_orig = sub_df["R10_orig"].mean()
        mrr_orig = sub_df["MRR_orig"].mean()

        r1_adv = sub_df["R1_adv"].mean()
        r5_adv = sub_df["R5_adv"].mean()
        r10_adv = sub_df["R10_adv"].mean()
        mrr_adv = sub_df["MRR_adv"].mean()

        rr1 = compute_robustness_ratio(sub_df["R1_orig"].values, sub_df["R1_adv"].values)
        rr5 = compute_robustness_ratio(sub_df["R5_orig"].values, sub_df["R5_adv"].values)
        rr10 = compute_robustness_ratio(sub_df["R10_orig"].values, sub_df["R10_adv"].values)

        summary_rows.append({
            "model": model_name,
            "scope": name,
            "R1_orig": r1_orig,
            "R5_orig": r5_orig,
            "R10_orig": r10_orig,
            "MRR_orig": mrr_orig,
            "R1_adv": r1_adv,
            "R5_adv": r5_adv,
            "R10_adv": r10_adv,
            "MRR_adv": mrr_adv,
            "RR_R1": rr1,
            "RR_R5": rr5,
            "RR_R10": rr10,
            "cos_orig_adv_mean": sub_df["cos_orig_adv"].mean(),
        })

    # Overall
    add_summary(per_query_df, "overall")

    # Per perturbation type
    for p_type, sub in per_query_df.groupby("perturbation_type"):
        add_summary(sub, f"perturb={p_type}")

    summary_df = pd.DataFrame(summary_rows)

    return {
        "per_query": per_query_df,
        "summary": summary_df,
    }


In [None]:
results = {}

# Unsupervised model
results["unsupervised"] = evaluate_model(
    df=df_adv,
    retrieve_fn=retrieve_unsupervised,
    encode_fn=encode_unsupervised_text,
    model_name=MODEL_NAMES["unsupervised"],
    top_k_eval=TOP_K_EVAL,
)

# Contrastive model
results["contrastive"] = evaluate_model(
    df=df_adv,
    retrieve_fn=retrieve_contrastive,
    encode_fn=encode_contrastive_text,
    model_name=MODEL_NAMES["contrastive"],
    top_k_eval=TOP_K_EVAL,
)


In [None]:
print("=== Summary: Unsupervised ===")
display(results["unsupervised"]["summary"])

print("=== Summary: Contrastive ===")
display(results["contrastive"]["summary"])


In [None]:
def paired_significance_tests(
    df_unsup: pd.DataFrame,
    df_contrastive: pd.DataFrame,
    metric_col_orig: str,
    metric_col_adv: str,
    label: str,
):
    """
    Paired t-tests / Wilcoxon over per-query metrics between models.
    Assumes rows are aligned (same queries in same order).
    """
    assert len(df_unsup) == len(df_contrastive), "Mismatched number of rows between models"

    x_orig = df_unsup[metric_col_orig].values
    y_orig = df_contrastive[metric_col_orig].values

    x_adv = df_unsup[metric_col_adv].values
    y_adv = df_contrastive[metric_col_adv].values

    print(f"\n=== {label} ===")

    # Original queries
    t_orig, p_orig = ttest_rel(x_orig, y_orig)
    w_orig, p_w_orig = wilcoxon(x_orig, y_orig, zero_method="wilcox", alternative="two-sided")

    print(f"Original ({metric_col_orig}):")
    print(f"  Paired t-test: t={t_orig:.4f}, p={p_orig:.4e}")
    print(f"  Wilcoxon:      W={w_orig:.4f}, p={p_w_orig:.4e}")

    # Adversarial queries
    t_adv, p_adv = ttest_rel(x_adv, y_adv)
    w_adv, p_w_adv = wilcoxon(x_adv, y_adv, zero_method="wilcox", alternative="two-sided")

    print(f"Adversarial ({metric_col_adv}):")
    print(f"  Paired t-test: t={t_adv:.4f}, p={p_adv:.4e}")
    print(f"  Wilcoxon:      W={w_adv:.4f}, p={p_w_adv:.4e}")


df_u = results["unsupervised"]["per_query"]
df_c = results["contrastive"]["per_query"]

# Ensure alignment by sorting (e.g. by asset_id + original_query)
order_cols = ["asset_id", "orig_query", "adv_query"]
df_u = df_u.sort_values(order_cols).reset_index(drop=True)
df_c = df_c.sort_values(order_cols).reset_index(drop=True)

# Run tests for R@10 and MRR
paired_significance_tests(df_u, df_c, "R10_orig", "R10_adv", label="R@10")
paired_significance_tests(df_u, df_c, "MRR_orig", "MRR_adv", label="MRR")


In [None]:
def bar_plot_summary(summary_df, title: str):
    # Filter to overall rows
    overall = summary_df[summary_df["scope"] == "overall"]

    x = np.arange(len(overall))
    width = 0.15

    fig, ax = plt.subplots(figsize=(10, 5))

    ax.bar(x - 1.5*width, overall["R1_orig"], width, label="R@1 orig")
    ax.bar(x - 0.5*width, overall["R1_adv"], width, label="R@1 adv")
    ax.bar(x + 0.5*width, overall["R10_orig"], width, label="R@10 orig")
    ax.bar(x + 1.5*width, overall["R10_adv"], width, label="R@10 adv")

    ax.set_xticks(x)
    ax.set_xticklabels(overall["model"], rotation=10)
    ax.set_ylabel("Score")
    ax.set_title(title)
    ax.legend()
    plt.tight_layout()
    plt.show()

bar_plot_summary(results["unsupervised"]["summary"], "Unsupervised Retrieval Performance")
bar_plot_summary(results["contrastive"]["summary"], "Contrastive Retrieval Performance")
