In [None]:
from pathlib import Path
import re
from collections import defaultdict
import csv
import sys
import math
from typing import List, Tuple, Optional

# Change paths if necessary
ROOT = "/kaggle/input/recodai-luc-scientific-image-forgery-detection"
DIR_TRAIN_AUTH = Path(f"{ROOT}/train_images/authentic")
DIR_TRAIN_FORG = Path(f"{ROOT}/train_images/forged")
DIR_TEST       = Path(f"{ROOT}/test_images")  # not used, kept for compatibility
OUTPUT_CSV     = Path("/kaggle/working/pairs.csv")

# Method selection
USE_PHASH = True           # Use pHash for unmatched images by name?
PHASH_THRESHOLD = 10       # Hamming distance threshold (lower is better)

USE_SSIM = False           # (Optional) Use SSIM for remaining unmatched -after pHash?
SSIM_THRESHOLD = 0.70      # SSIM threshold (higher is better)
SSIM_MAX_SIDE = 512        # resize for speed

# If the dataset is too large, you can limit the number of test samples (None = no limit)
DEBUG_LIMIT_FORGED = None   # e.g. 200

# ------------------- Dependencies -------------------
# Install required packages (allowed in Kaggle). If already installed, this section will just pass.
try:
    import imagehash  # type: ignore
    from PIL import Image  # type: ignore
except Exception:
    !pip -q install imagehash
    import imagehash
    from PIL import Image

if USE_SSIM:
    try:
        import cv2  # type: ignore
        from skimage.metrics import structural_similarity as ssim  # type: ignore
    except Exception:
        !pip -q install scikit-image opencv-python-headless
        import cv2
        from skimage.metrics import structural_similarity as ssim

# ------------------- Utilities -------------------
IMG_EXTS = {".png", ".jpg", ".jpeg", ".tif", ".tiff", ".bmp"}

def list_images(root: Path) -> List[Path]:
    return sorted([p for p in root.rglob("*") if p.suffix.lower() in IMG_EXTS])

def normalize_name(p: Path) -> str:
    s = p.stem.lower()
    s = s.replace("-", "_").replace(" ", "_")
    # Remove common ending tags
    s = re.sub(r"(authentic|auth|original|orig|clean|real|gt)$", "", s)
    s = re.sub(r"(forg(ed)?|fake|tampered|edit(ed)?|manipulated?)$", "", s)
    s = re.sub(r"(__+|_+$)", "", s)
    return s

def safe_open_image(path: Path) -> Optional[Image.Image]:
    try:
        return Image.open(path).convert("RGB")
    except Exception:
        return None

def phash_value(path: Path):
    im = safe_open_image(path)
    if im is None:
        return None
    try:
        return imagehash.phash(im)
    except Exception:
        return None

def hamming_distance(h1, h2) -> int:
    return abs(h1 - h2)

def load_gray_resized_cv2(path: Path, max_side=512):
    import numpy as np
    img = cv2.imread(str(path), cv2.IMREAD_GRAYSCALE)
    if img is None:
        return None
    h, w = img.shape[:2]
    scale = min(1.0, max_side / max(h, w))
    if scale < 1.0:
        img = cv2.resize(img, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_AREA)
    return img

def best_match_by_phash(gf: Path, auth_hashes: List[Tuple[Path, object]]) -> Tuple[Optional[Path], Optional[int]]:
    gh = phash_value(gf)
    if gh is None:
        return (None, None)
    best_p, best_d = None, None
    for ap, ah in auth_hashes:
        if ah is None:
            continue
        d = hamming_distance(gh, ah)
        if best_d is None or d < best_d:
            best_p, best_d = ap, d
    return (best_p, best_d)

def best_match_by_ssim(gf: Path, auth_imgs: List[Tuple[Path, 'np.ndarray']], max_side=512) -> Tuple[Optional[Path], Optional[float]]:
    import numpy as np
    gi = load_gray_resized_cv2(gf, max_side=max_side)
    if gi is None:
        return (None, None)
    best_p, best_sc = None, None
    gh, gw = gi.shape[:2]
    for ap, ai in auth_imgs:
        if ai is None:
            continue
        ah, aw = ai.shape[:2]
        # Simple alignment if dimensions differ
        if (ah, aw) != (gh, gw):
            ai_r = cv2.resize(ai, (gw, gh), interpolation=cv2.INTER_AREA)
        else:
            ai_r = ai
        try:
            sc = ssim(gi, ai_r)
        except Exception:
            continue
        if best_sc is None or sc > best_sc:
            best_p, best_sc = ap, sc
    return (best_p, best_sc)

# ------------------- Main Flow -------------------
def main():
    # 1) List image files
    auth_files = list_images(DIR_TRAIN_AUTH)
    forg_files = list_images(DIR_TRAIN_FORG)
    if DEBUG_LIMIT_FORGED is not None:
        forg_files = forg_files[:DEBUG_LIMIT_FORGED]

    print(f"#auth = {len(auth_files)}, #forg = {len(forg_files)}")

    # 2) Pairing based on file names
    auth_map = defaultdict(list)
    for p in auth_files:
        auth_map[normalize_name(p)].append(p)

    pairs: List[Tuple[Path, Path, str, float]] = []
    unmatched_forg: List[Path] = []

    for gf in forg_files:
        key = normalize_name(gf)
        if key in auth_map and len(auth_map[key]) > 0:
            # If multiple candidates exist, take the first for now
            ap = auth_map[key][0]
            pairs.append((ap, gf, "name", 0.0))
        else:
            unmatched_forg.append(gf)

    print(f"Name-matched pairs: {len(pairs)}")
    print(f"Unmatched forged by name: {len(unmatched_forg)}")

    # 3) pHash matching for unmatched images
    still_unmatched: List[Path] = unmatched_forg
    if USE_PHASH and len(still_unmatched) > 0:
        print("Computing pHash for authentic images...")
        auth_hashes = []
        for ap in auth_files:
            try:
                ah = phash_value(ap)
            except Exception:
                ah = None
            auth_hashes.append((ap, ah))

        print("Matching unmatched forged by pHash...")
        newly_paired = 0
        next_unmatched = []
        for gf in still_unmatched:
            ap, dist = best_match_by_phash(gf, auth_hashes)
            if ap is not None and dist is not None and dist <= PHASH_THRESHOLD:
                pairs.append((ap, gf, "phash", float(dist)))
                newly_paired += 1
            else:
                next_unmatched.append(gf)
        still_unmatched = next_unmatched
        print(f"pHash new pairs (<= {PHASH_THRESHOLD}): {newly_paired}")
        print(f"Remaining unmatched after pHash: {len(still_unmatched)}")

    # 4) Optional SSIM matching
    if USE_SSIM and len(still_unmatched) > 0:
        print("Preloading grayscale resized authentic images for SSIM...")
        auth_imgs = []
        for ap in auth_files:
            try:
                ai = load_gray_resized_cv2(ap, max_side=SSIM_MAX_SIDE)
            except Exception:
                ai = None
            auth_imgs.append((ap, ai))

        print("Matching unmatched forged by SSIM...")
        newly_paired = 0
        next_unmatched = []
        for gf in still_unmatched:
            try:
                ap, sc = best_match_by_ssim(gf, auth_imgs, max_side=SSIM_MAX_SIDE)
            except Exception:
                ap, sc = (None, None)
            if ap is not None and sc is not None and sc >= SSIM_THRESHOLD:
                pairs.append((ap, gf, "ssim", float(sc)))
                newly_paired += 1
            else:
                next_unmatched.append(gf)
        still_unmatched = next_unmatched
        print(f"SSIM new pairs (>= {SSIM_THRESHOLD}): {newly_paired}")
        print(f"Remaining unmatched after SSIM: {len(still_unmatched)}")

    # 5) Save output CSV
    OUTPUT_CSV.parent.mkdir(parents=True, exist_ok=True)
    with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(["auth_path", "forg_path", "method", "score"])
        for ap, gf, m, s in pairs:
            writer.writerow([str(ap), str(gf), m, s])

    print(f"\nSaved pairs: {len(pairs)} -> {OUTPUT_CSV}")
    if len(still_unmatched) > 0:
        print("Sample unmatched forged (up to 10):")
        for g in still_unmatched[:10]:
            print(" -", g)

if __name__ == "__main__":
    main()

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from pathlib import Path

PAIRS_CSV = "/kaggle/working/pairs.csv"
IMG_SIZE = 256  

def _open_rgb(path):
    img = Image.open(path).convert("RGB")
    return img.resize((IMG_SIZE, IMG_SIZE), Image.BILINEAR)

def _abs_diff(a_img, f_img):
    a = np.array(a_img, dtype=np.int16)
    f = np.array(f_img, dtype=np.int16)
    d = np.abs(f - a).astype(np.uint8)
    return Image.fromarray(d)

def show_paired_samples(pairs_csv=PAIRS_CSV, n=6, seed=42):
    df = pd.read_csv(pairs_csv)
    if len(df) == 0:
        print("pairs.csv is empty.")
        return

    np.random.seed(seed)
    idx = np.random.choice(len(df), size=min(n, len(df)), replace=False)
    rows = df.iloc[idx].reset_index(drop=True)

    fig, axes = plt.subplots(len(rows), 3, figsize=(12, 4*len(rows)))
    if len(rows) == 1:
        axes = np.array([axes])  # ensure 2D

    for r, row in rows.iterrows():
        a_path, f_path = row["auth_path"], row["forg_path"]
        method, score = row.get("method", "name"), row.get("score", 0.0)

        try:
            a_img = _open_rgb(a_path)
            f_img = _open_rgb(f_path)
        except Exception as e:
            print(f"⚠️ Skipping row {r} due to read error: {e}")
            continue

        d_img = _abs_diff(a_img, f_img)

        # Authentic
        axes[r, 0].imshow(a_img)
        axes[r, 0].set_title(f"Authentic\n{Path(a_path).name}", fontsize=10)
        axes[r, 0].axis("off")

        # Forged
        axes[r, 1].imshow(f_img)
        axes[r, 1].set_title(f"Forged\n{Path(f_path).name}", fontsize=10)
        axes[r, 1].axis("off")

        # |F-A| with pairing info
        axes[r, 2].imshow(d_img)
        axes[r, 2].set_title(f"|F - A|   (method={method}, score={score})", fontsize=10)
        axes[r, 2].axis("off")

    plt.tight_layout()
    plt.show()

# Run it:
show_paired_samples(PAIRS_CSV, n=6, seed=42)
