In [45]:
import os
import numpy as np
import pywt
from PIL import Image
import matplotlib.pyplot as plt

In [46]:
def load_gray_resize(image_path: str, size=(256, 256)) -> np.ndarray:
    """Đọc ảnh, chuyển sang grayscale và resize về kích thước chuẩn"""
    img = Image.open(image_path).convert("L")
    img = img.resize(size, Image.BILINEAR)
    return np.asarray(img, dtype=np.float32)

In [47]:
def dwt2(image_arr: np.ndarray, wavelet="haar", level=2):
    """Thực hiện phân tích Wavelet 2D đa cấp"""
    return pywt.wavedec2(image_arr, wavelet=wavelet, level=level)

In [48]:
def _flatten_concat(arr_list) -> np.ndarray:
    """Làm phẳng và nối các mảng thành vector 1D"""
    return np.concatenate([a.flatten() for a in arr_list], axis=0)

def get_feature_vector(coeffs, mode="LL") -> np.ndarray:
    """
    Trích xuất vector đặc trưng từ các subband đã chọn ở cấp cao nhất.
    - LL: approximation band (cA_n)
    - LH: thường tương ứng với cH_n (chi tiết theo một hướng)
    - HL: thường tương ứng với cV_n
    - HH: tương ứng với cD_n
    Lưu ý: Tên gọi LH/HL có thể khác nhau tùy quy ước; với hashing, tính nhất quán quan trọng hơn tên gọi.
    """
    mode = mode.upper()

    cA = coeffs[0]                 # LL ở cấp cao nhất
    cH, cV, cD = coeffs[1]         # các chi tiết ở cấp cao nhất (vì coeffs[1] là chi tiết cấp cao nhất)

    if mode == "LL":
        return cA.flatten()

    if mode == "LL_LH":
        return _flatten_concat([cA, cH])

    if mode == "LL_HL":
        return _flatten_concat([cA, cV])

    if mode == "LL_LH_HL":
        return _flatten_concat([cA, cH, cV])

    if mode == "ALL":
        return _flatten_concat([cA, cH, cV, cD])

    raise ValueError("subband_mode phải là một trong: LL, LL_LH, LL_HL, LL_LH_HL, ALL")

In [49]:
def quantize_to_bits(vec: np.ndarray, method="median", hash_bits=256, **kwargs) -> np.ndarray:
    """
    Chuyển đổi vector đặc trưng (float) -> mảng bit (0/1) với độ dài cố định.
    Các phương pháp hỗ trợ:
      - median: bit = vec > median(vec)
      - mean:   bit = vec > mean(vec)
      - ternary: sử dụng ngưỡng robust xung quanh median:
                vec < (med - k*mad) -> 0
                vec > (med + k*mad) -> 1
                ngược lại -> gán thành 0 (hoặc 1) theo mid_policy (mặc định 0)
      - uniform_step: lượng tử hóa theo bước delta rồi lấy LSB:
                q = round(vec/delta); bit = q % 2
    """
    method = method.lower()
    v = vec.astype(np.float32)

    if method in ["median", "mean"]:
        thr = np.median(v) if method == "median" else np.mean(v)
        bits = (v > thr).astype(np.uint8)

    elif method == "ternary":
        # ước lượng độ phân tán robust (MAD)
        k = float(kwargs.get("k", 1.0))          # độ nhạy
        mid_policy = int(kwargs.get("mid_policy", 0))  # 0 hoặc 1 cho vùng giữa
        med = np.median(v)
        mad = np.median(np.abs(v - med)) + 1e-12
        low = med - k * mad
        high = med + k * mad

        bits = np.empty_like(v, dtype=np.uint8)
        bits[v < low] = 0
        bits[v > high] = 1
        bits[(v >= low) & (v <= high)] = mid_policy

    elif method == "uniform_step":
        delta = float(kwargs.get("delta", 5.0))  # kích thước bước
        q = np.rint(v / delta).astype(np.int32)
        bits = (q & 1).astype(np.uint8)          # LSB

    else:
        raise ValueError("quant_method phải là một trong: median, mean, ternary, uniform_step")

    # độ dài cố định
    if len(bits) >= hash_bits:
        indices = np.linspace(0, len(bits) - 1, hash_bits, dtype=int)
        return bits[indices].copy()
    out = np.zeros(hash_bits, dtype=np.uint8)
    out[:len(bits)] = bits
    return out

In [50]:
def wavelet_hash_path(
    image_path: str,
    size=(256, 256),
    wavelet="haar",
    level=2,
    subband_mode="LL",
    quant_method="median",
    hash_bits=256,
    quant_kwargs=None
):
    """Tạo wavelet hash từ đường dẫn ảnh"""
    if quant_kwargs is None:
        quant_kwargs = {}

    img = load_gray_resize(image_path, size=size)
    coeffs = dwt2(img, wavelet=wavelet, level=level)
    feat = get_feature_vector(coeffs, mode=subband_mode)
    bits = quantize_to_bits(feat, method=quant_method, hash_bits=hash_bits, **quant_kwargs)
    return bits

def hamming_distance(bits1: np.ndarray, bits2: np.ndarray) -> int:
    """Tính khoảng cách Hamming giữa 2 hash"""
    if len(bits1) != len(bits2):
        raise ValueError("Các hash phải có cùng độ dài bit")
    return int(np.sum(bits1 != bits2))

In [51]:
IMG_EXT = (".jpg", ".jpeg", ".png")

def collect_pairs(root_dir="./images"):
    """Thu thập các cặp ảnh similar/dissimilar từ thư mục"""
    pairs = []
    for label_name, y in [("similar", 1), ("dissimilar", 0)]:
        base = os.path.join(root_dir, label_name)
        if not os.path.isdir(base):
            continue
        for pair_id in sorted(os.listdir(base)):
            pair_dir = os.path.join(base, pair_id)
            if not os.path.isdir(pair_dir):
                continue
            files = sorted([f for f in os.listdir(pair_dir) if f.lower().endswith(IMG_EXT)])
            if len(files) != 2:
                print(f"Bỏ qua {pair_dir}: mong đợi 2 ảnh, tìm thấy {len(files)}")
                continue
            p1 = os.path.join(pair_dir, files[0])
            p2 = os.path.join(pair_dir, files[1])
            pairs.append((p1, p2, y))

    if not pairs:
        raise RuntimeError("Không tìm thấy cặp ảnh nào. Kiểm tra cấu trúc thư mục ./images.")
    return pairs

In [52]:
def evaluate_method(pairs, cfg):
    """Đánh giá một phương pháp hashing trên tập cặp ảnh"""
    dists, labels = [], []
    for p1, p2, y in pairs:
        h1 = wavelet_hash_path(
            p1,
            size=cfg["size"],
            wavelet=cfg["wavelet"],
            level=cfg["level"],
            subband_mode=cfg["subband_mode"],
            quant_method=cfg["quant_method"],
            hash_bits=cfg["hash_bits"],
            quant_kwargs=cfg.get("quant_kwargs", {})
        )
        h2 = wavelet_hash_path(
            p2,
            size=cfg["size"],
            wavelet=cfg["wavelet"],
            level=cfg["level"],
            subband_mode=cfg["subband_mode"],
            quant_method=cfg["quant_method"],
            hash_bits=cfg["hash_bits"],
            quant_kwargs=cfg.get("quant_kwargs", {})
        )
        dists.append(hamming_distance(h1, h2))
        labels.append(y)
    return np.array(dists), np.array(labels)

def metrics_at_threshold(dists, labels, thr):
    """Tính các metric (accuracy, sensitivity, specificity) tại một ngưỡng"""
    pred = (dists <= thr).astype(int)  # <= thr => tương tự
    TP = int(np.sum((pred == 1) & (labels == 1)))
    TN = int(np.sum((pred == 0) & (labels == 0)))
    FP = int(np.sum((pred == 1) & (labels == 0)))
    FN = int(np.sum((pred == 0) & (labels == 1)))

    acc = (TP + TN) / (TP + TN + FP + FN + 1e-12)
    sens = TP / (TP + FN + 1e-12)  # recall
    spec = TN / (TN + FP + 1e-12)

    return acc, sens, spec, (TP, TN, FP, FN)

In [53]:
def make_methods():
    """Tạo danh sách các cấu hình phương pháp để khảo sát"""
    methods = []
    size = (256, 256)
    hash_bits = 256

    # (A) Khảo sát Wavelet (cố định các tham số khác)
    for w in ["haar", "db2", "db4", "sym2"]:
        methods.append({
            "name": f"A-Wavelet:{w} | L2 | LL | median",
            "wavelet": w, "level": 2,
            "subband_mode": "LL",
            "quant_method": "median",
            "hash_bits": hash_bits,
            "size": size
        })

    # (B) Khảo sát Level (chọn wavelet cơ sở trước; có thể thay đổi sau)
    for lvl in [1, 2, 3]:
        methods.append({
            "name": f"B-Level:{lvl} | haar | LL | median",
            "wavelet": "haar", "level": lvl,
            "subband_mode": "LL",
            "quant_method": "median",
            "hash_bits": hash_bits,
            "size": size
        })

    # (C) Khảo sát Quantization (giữ wavelet/level cố định)
    methods.append({
        "name": "C-Quant:median | haar L2 | LL",
        "wavelet": "haar", "level": 2,
        "subband_mode": "LL",
        "quant_method": "median",
        "hash_bits": hash_bits,
        "size": size
    })
    methods.append({
        "name": "C-Quant:mean | haar L2 | LL",
        "wavelet": "haar", "level": 2,
        "subband_mode": "LL",
        "quant_method": "mean",
        "hash_bits": hash_bits,
        "size": size
    })
    methods.append({
        "name": "C-Quant:ternary(k=1.0) | haar L2 | LL",
        "wavelet": "haar", "level": 2,
        "subband_mode": "LL",
        "quant_method": "ternary",
        "quant_kwargs": {"k": 1.0, "mid_policy": 0},
        "hash_bits": hash_bits,
        "size": size
    })
    methods.append({
        "name": "C-Quant:uniform_step(delta=5) | haar L2 | LL",
        "wavelet": "haar", "level": 2,
        "subband_mode": "LL",
        "quant_method": "uniform_step",
        "quant_kwargs": {"delta": 5.0},
        "hash_bits": hash_bits,
        "size": size
    })

    # (D) Khảo sát Subband
    for sb in ["LL", "LL_LH", "LL_HL", "LL_LH_HL", "ALL"]:
        methods.append({
            "name": f"D-Subband:{sb} | haar L2 | median",
            "wavelet": "haar", "level": 2,
            "subband_mode": sb,
            "quant_method": "median",
            "hash_bits": hash_bits,
            "size": size
        })

    return methods

In [54]:
def run_survey(root_dir="./images", methods=None):
    """Chạy khảo sát đánh giá các phương pháp wavelet hashing"""
    pairs = collect_pairs(root_dir)
    print(f"Đã load {len(pairs)} cặp ảnh (similar=1, dissimilar=0)")

    if methods is None:
        methods = make_methods()

    results = []

    for cfg in methods:
        dists, labels = evaluate_method(pairs, cfg)

        # tìm ngưỡng tốt nhất theo accuracy tối đa
        best = {"acc": -1, "thr": None, "sens": None, "spec": None}
        for thr in range(0, int(dists.max()) + 1):
            acc, sens, spec, _ = metrics_at_threshold(dists, labels, thr)
            if acc > best["acc"]:
                best = {"acc": acc, "thr": thr, "sens": sens, "spec": spec}

        results.append({
            "name": cfg["name"],
            "best_thr": best["thr"],
            "acc": best["acc"],
            "sens": best["sens"],
            "spec": best["spec"],
            "dist_min": int(dists.min()),
            "dist_max": int(dists.max()),
            "dist_mean": float(dists.mean()),
        })

    # sắp xếp theo Accuracy
    results_sorted = sorted(results, key=lambda x: x["acc"], reverse=True)

    print("\n==================== KẾT QUẢ TẤT CẢ CÁC PHƯƠNG PHÁP (Tốt nhất trước) ====================")
    print(f"Tổng số phương pháp được đánh giá: {len(results_sorted)}\n")
    
    for idx, r in enumerate(results_sorted, 1):
        print(f"[{idx}/{len(results_sorted)}] {r['name']}")
        print(f"  Ngưỡng tốt nhất: {r['best_thr']}")
        print(f"  Accuracy:   {r['acc']:.3f}")
        print(f"  Sensitivity:{r['sens']:.3f}")
        print(f"  Specificity:{r['spec']:.3f}")
        print(f"  Khoảng dist: {r['dist_min']}..{r['dist_max']} (mean={r['dist_mean']:.2f})")
        print()

    return results_sorted

In [55]:
results = run_survey("./images")

Đã load 5 cặp ảnh (similar=1, dissimilar=0)

Tổng số phương pháp được đánh giá: 16

[1/16] A-Wavelet:haar | L2 | LL | median
  Ngưỡng tốt nhất: 13
  Accuracy:   1.000
  Sensitivity:1.000
  Specificity:1.000
  Khoảng dist: 3..129 (mean=48.00)

[2/16] A-Wavelet:db2 | L2 | LL | median
  Ngưỡng tốt nhất: 9
  Accuracy:   1.000
  Sensitivity:1.000
  Specificity:1.000
  Khoảng dist: 3..141 (mean=50.20)

[3/16] A-Wavelet:db4 | L2 | LL | median
  Ngưỡng tốt nhất: 15
  Accuracy:   1.000
  Sensitivity:1.000
  Specificity:1.000
  Khoảng dist: 1..144 (mean=51.20)

[4/16] A-Wavelet:sym2 | L2 | LL | median
  Ngưỡng tốt nhất: 9
  Accuracy:   1.000
  Sensitivity:1.000
  Specificity:1.000
  Khoảng dist: 3..141 (mean=50.20)

[5/16] B-Level:1 | haar | LL | median
  Ngưỡng tốt nhất: 25
  Accuracy:   1.000
  Sensitivity:1.000
  Specificity:1.000
  Khoảng dist: 5..158 (mean=59.40)

[6/16] B-Level:2 | haar | LL | median
  Ngưỡng tốt nhất: 13
  Accuracy:   1.000
  Sensitivity:1.000
  Specificity:1.000
  Khoảng