# Duplicate / Near-Duplicate Search with Embeddings, SimHash & MinHash

Notebook này thực hiện pipeline:

1. Load embedding từ file NPZ (ví dụ: `bge-base-en-v1.5_pair_embeddings.npz`).
2. Lấy mẫu một số cặp văn bản (mặc định 1000 cặp).
3. Ghép thành một tập document embedding chung (~2M documents cho M cặp).
4. Xây chỉ mục tìm kiếm tương đồng với các lựa chọn:
   - Exact cosine (không dùng hash).
   - SimHash + LSH trên embedding.
   - MinHash (top-k chiều embedding) + LSH (banding).
5. Đánh giá khả năng tìm lại văn bản trùng/near-duplicate bằng nhãn `is_duplicate` thông qua Recall@K.

Bạn cần sửa đường dẫn `EMBEDDING_FILE` cho phù hợp với vị trí file embedding trong Google Drive.

In [None]:
# Kết nối Google Drive (bỏ qua nếu không chạy trên Colab)
try:
    from google.colab import drive  # type: ignore
    drive.mount('/content/drive')
    print('Đã mount Google Drive.')
except ImportError:
    print('Không phải môi trường Colab, bỏ qua bước mount Drive.')

In [None]:
import os
import math
import numpy as np
from dataclasses import dataclass

# =============================
# Cấu hình thí nghiệm
# =============================

# TODO: Sửa đường dẫn này cho đúng với file embedding của bạn
# Ví dụ trên Colab: '/content/drive/MyDrive/your_folder/bge-base-en-v1.5_pair_embeddings.npz'
EMBEDDING_FILE = '/content/drive/MyDrive/path/to/bge-base-en-v1.5_pair_embeddings.npz'

@dataclass
class ExperimentConfig:
    embedding_file: str
    sample_pairs: int      # số cặp (row) sẽ lấy mẫu
    hash_method: str       # 'none' | 'simhash' | 'minhash'

    # Cho SimHash
    n_bits: int = 64       # số bit simhash

    # Cho MinHash
    n_hashes: int = 128    # số hash function
    topk_dims: int = 20    # số chiều top-k của embedding dùng làm "set"
    n_bands: int = 16      # số band cho LSH (n_hashes phải chia hết cho n_bands)

    top_k: int = 10        # số hàng xóm cần tìm khi search

In [None]:
# =============================
# Hàm load embedding & sampling
# =============================

def load_embeddings_npz(path: str):
    """Load NPZ có 3 mảng: text1_embeddings, text2_embeddings, is_duplicate."""
    data = np.load(path)
    E1 = data['text1_embeddings']  # (N, D)
    E2 = data['text2_embeddings']  # (N, D)
    labels = data['is_duplicate']  # (N,)
    return E1, E2, labels


def sample_pairs(E1: np.ndarray, E2: np.ndarray, labels: np.ndarray, n_pairs: int, seed: int = 42):
    """Lấy ngẫu nhiên n_pairs cặp từ toàn bộ embedding."""
    N = E1.shape[0]
    n_pairs = min(n_pairs, N)
    rng = np.random.default_rng(seed)
    idx = rng.choice(N, size=n_pairs, replace=False)
    return E1[idx], E2[idx], labels[idx]


def build_corpus(E1_sampled: np.ndarray, E2_sampled: np.ndarray, labels_sampled: np.ndarray):
    """Ghép thành tập docs chung và lưu mapping cho từng cặp.

    E1_sampled: (M, D)
    E2_sampled: (M, D)
    labels_sampled: (M,)
    Trả về:
        docs: (2M, D)
        pair_info: list dict với keys: pair_index, doc1_id, doc2_id, label
    """
    M, D = E1_sampled.shape
    docs = np.vstack([E1_sampled, E2_sampled])  # (2M, D)

    pair_info = []
    for i in range(M):
        pair_info.append({
            'pair_index': int(i),
            'doc1_id': int(i),
            'doc2_id': int(i + M),
            'label': int(labels_sampled[i]),
        })
    return docs, pair_info

In [None]:
# =============================
# Hashing: SimHash & MinHash
# =============================

class HashEncoder:
    def fit(self, docs: np.ndarray):
        raise NotImplementedError

    def encode(self, docs: np.ndarray) -> np.ndarray:
        raise NotImplementedError


class SimHashEncoder(HashEncoder):
    """SimHash trên embedding: dùng random hyperplanes để tạo mã nhị phân."""
    def __init__(self, n_bits: int = 64, seed: int = 42):
        self.n_bits = n_bits
        self.seed = seed
        self.random_planes = None  # (n_bits, D)

    def fit(self, docs: np.ndarray):
        N, D = docs.shape
        rng = np.random.default_rng(self.seed)
        self.random_planes = rng.standard_normal(size=(self.n_bits, D))

    def encode(self, docs: np.ndarray) -> np.ndarray:
        projections = docs @ self.random_planes.T  # (N, n_bits)
        bits = (projections >= 0).astype(np.uint8)
        return bits


class MinHashEncoder(HashEncoder):
    """MinHash trên embedding.

    Ý tưởng:
    - Mỗi vector -> set các index top-k theo |giá trị|.
    - Dùng num_hashes hàm hash dạng (a_i * x + b_i) mod P.
    - Signature[j] = min hash_j(s) trên s thuộc set.
    """
    def __init__(self, num_hashes: int = 128, topk_dims: int = 20, seed: int = 42):
        self.num_hashes = num_hashes
        self.topk_dims = topk_dims
        self.seed = seed
        self.prime = None
        self.a = None
        self.b = None
        self.dim = None

    @staticmethod
    def _next_prime(n: int) -> int:
        def is_prime(x: int) -> bool:
            if x < 2:
                return False
            if x % 2 == 0:
                return x == 2
            r = int(math.isqrt(x))
            for i in range(3, r + 1, 2):
                if x % i == 0:
                    return False
            return True

        while not is_prime(n):
            n += 1
        return n

    def _embedding_to_set_topk(self, vec: np.ndarray) -> np.ndarray:
        """vec: (D,) -> mảng index top-k theo |vec|."""
        D = vec.shape[0]
        k = min(self.topk_dims, D)
        if k <= 0:
            return np.array([], dtype=np.int64)
        # top-k theo |giá trị| (không cần sắp xếp toàn bộ)
        idx = np.argpartition(-np.abs(vec), k - 1)[:k]
        return idx.astype(np.int64)

    def fit(self, docs: np.ndarray):
        N, D = docs.shape
        self.dim = D
        # P là số nguyên tố > D
        self.prime = self._next_prime(D + 1)

        rng = np.random.default_rng(self.seed)
        self.a = rng.integers(1, self.prime, size=self.num_hashes, endpoint=False, dtype=np.int64)
        self.b = rng.integers(0, self.prime, size=self.num_hashes, endpoint=False, dtype=np.int64)

    def encode(self, docs: np.ndarray) -> np.ndarray:
        N, D = docs.shape
        assert self.dim == D, 'Docs dimension khác với lúc fit MinHashEncoder'

        sigs = np.full((N, self.num_hashes), fill_value=self.prime + 1, dtype=np.int64)

        for i in range(N):
            idx = self._embedding_to_set_topk(docs[i])
            if idx.size == 0:
                continue
            # hashes: (num_hashes, |set|)
            hashes = (self.a[:, None] * idx[None, :] + self.b[:, None]) % self.prime
            sigs[i, :] = hashes.min(axis=1)

        return sigs

In [None]:
# =============================
# Index cho similarity search
# =============================

class BaseIndex:
    def build(self, docs: np.ndarray, signatures: np.ndarray = None):
        raise NotImplementedError

    def query(self, query_vec: np.ndarray, top_k: int = 10, encoder: HashEncoder = None):
        raise NotImplementedError


class ExactCosineIndex(BaseIndex):
    """Search exact theo cosine similarity (baseline)."""
    def build(self, docs: np.ndarray, signatures: np.ndarray = None):
        self.docs = docs
        norms = np.linalg.norm(docs, axis=1, keepdims=True) + 1e-9
        self.normalized = docs / norms

    def query(self, query_vec: np.ndarray, top_k: int = 10, encoder: HashEncoder = None):
        q = query_vec / (np.linalg.norm(query_vec) + 1e-9)
        scores = self.normalized @ q  # (N,)
        top_idx = np.argpartition(-scores, top_k)[:top_k]
        top_idx = top_idx[np.argsort(-scores[top_idx])]
        return [(int(i), float(scores[i])) for i in top_idx]


class SimHashLSHIndex(BaseIndex):
    """LSH cho SimHash: bucket theo mã nhị phân."""
    def __init__(self):
        self.docs = None
        self.bits = None
        self.hash_table = {}

    @staticmethod
    def bits_to_key(bits_row: np.ndarray) -> str:
        return ''.join('1' if b else '0' for b in bits_row)

    def build(self, docs: np.ndarray, signatures: np.ndarray):
        self.docs = docs
        self.bits = signatures
        self.hash_table = {}
        N = docs.shape[0]
        for doc_id in range(N):
            key = self.bits_to_key(signatures[doc_id])
            self.hash_table.setdefault(key, []).append(doc_id)

    def query(self, query_vec: np.ndarray, top_k: int = 10, encoder: HashEncoder = None):
        assert encoder is not None, 'Cần SimHashEncoder để query'
        q_bits = encoder.encode(query_vec[None, :])[0]
        key = self.bits_to_key(q_bits)
        candidates = self.hash_table.get(key, [])
        if not candidates:
            return []

        docs = self.docs[candidates]
        norms = np.linalg.norm(docs, axis=1, keepdims=True) + 1e-9
        normalized_docs = docs / norms
        q = query_vec / (np.linalg.norm(query_vec) + 1e-9)

        scores = normalized_docs @ q  # (len(candidates),)
        idx = np.argsort(-scores)[:top_k]
        result_ids = [candidates[i] for i in idx]
        return [(int(doc_id), float(scores[i])) for doc_id, i in zip(result_ids, idx)]


class MinHashLSHIndex(BaseIndex):
    """LSH cho MinHash bằng banding."""
    def __init__(self, num_bands: int = 16):
        self.num_bands = num_bands
        self.hash_tables = []
        self.docs = None
        self.signatures = None
        self.rows_per_band = None

    def build(self, docs: np.ndarray, signatures: np.ndarray):
        self.docs = docs
        self.signatures = signatures
        N, num_hashes = signatures.shape
        assert num_hashes % self.num_bands == 0, 'num_hashes phải chia hết cho num_bands'

        self.rows_per_band = num_hashes // self.num_bands
        self.hash_tables = [dict() for _ in range(self.num_bands)]

        for doc_id in range(N):
            sig = signatures[doc_id]
            for b in range(self.num_bands):
                start = b * self.rows_per_band
                end = start + self.rows_per_band
                band_tuple = tuple(sig[start:end].tolist())
                table = self.hash_tables[b]
                table.setdefault(band_tuple, []).append(doc_id)

    def query(self, query_vec: np.ndarray, top_k: int = 10, encoder: HashEncoder = None):
        assert encoder is not None, 'Cần MinHashEncoder để query'
        q_sig = encoder.encode(query_vec[None, :])[0]

        candidates = set()
        for b in range(self.num_bands):
            start = b * self.rows_per_band
            end = start + self.rows_per_band
            band_tuple = tuple(q_sig[start:end].tolist())
            table = self.hash_tables[b]
            if band_tuple in table:
                for doc_id in table[band_tuple]:
                    candidates.add(doc_id)

        candidates = list(candidates)
        if not candidates:
            return []

        docs = self.docs[candidates]
        norms = np.linalg.norm(docs, axis=1, keepdims=True) + 1e-9
        normalized_docs = docs / norms
        q = query_vec / (np.linalg.norm(query_vec) + 1e-9)

        scores = normalized_docs @ q
        idx = np.argsort(-scores)[:top_k]
        result_ids = [candidates[i] for i in idx]
        return [(int(doc_id), float(scores[i])) for doc_id, i in zip(result_ids, idx)]

In [None]:
# =============================
# Evaluation với is_duplicate
# =============================

def evaluate_recall_at_k(index: BaseIndex,
                         docs: np.ndarray,
                         pair_info: list,
                         k: int,
                         encoder: HashEncoder = None):
    """Tính Recall@K: doc2_id của các cặp label=1 có xuất hiện trong top-K khi query doc1_id không."""
    hits = 0
    total = 0

    for p in pair_info:
        if p['label'] != 1:
            continue
        total += 1
        doc1_id = p['doc1_id']
        doc2_id = p['doc2_id']

        query_vec = docs[doc1_id]
        neighbors = index.query(query_vec, top_k=k, encoder=encoder)
        retrieved_ids = [doc_id for doc_id, _ in neighbors]
        if doc2_id in retrieved_ids:
            hits += 1

    recall_at_k = hits / total if total > 0 else 0.0
    return recall_at_k, hits, total

In [None]:
# =============================
# Hàm chạy toàn bộ thí nghiệm
# =============================

def run_experiment(cfg: ExperimentConfig):
    print('Embedding file:', cfg.embedding_file)
    print('Sample pairs:', cfg.sample_pairs)
    print('Hash method:', cfg.hash_method)
    print('-----------------------------')

    # 1) Load & sample
    E1, E2, labels = load_embeddings_npz(cfg.embedding_file)
    print('Tổng số cặp ban đầu:', E1.shape[0])
    E1_s, E2_s, labels_s = sample_pairs(E1, E2, labels, cfg.sample_pairs)
    docs, pair_info = build_corpus(E1_s, E2_s, labels_s)
    print('Số docs trong corpus:', docs.shape[0])
    print('Số cặp sau khi sample:', len(pair_info))

    # 2) Build index
    if cfg.hash_method == 'none':
        encoder = None
        index = ExactCosineIndex()
        index.build(docs)

    elif cfg.hash_method == 'simhash':
        encoder = SimHashEncoder(n_bits=cfg.n_bits)
        encoder.fit(docs)
        signatures = encoder.encode(docs)
        index = SimHashLSHIndex()
        index.build(docs, signatures)

    elif cfg.hash_method == 'minhash':
        assert cfg.n_hashes % cfg.n_bands == 0, 'n_hashes phải chia hết cho n_bands'
        encoder = MinHashEncoder(num_hashes=cfg.n_hashes, topk_dims=cfg.topk_dims)
        encoder.fit(docs)
        signatures = encoder.encode(docs)
        index = MinHashLSHIndex(num_bands=cfg.n_bands)
        index.build(docs, signatures)

    else:
        raise ValueError(f'Unknown hash_method: {cfg.hash_method}')

    # 3) Evaluate Recall@K
    for k in [1, 5, 10]:
        recall_k, hits, total = evaluate_recall_at_k(index, docs, pair_info, k, encoder=encoder)
        print(f'Recall@{k}: {recall_k:.4f}  (hits={hits} / total positives={total})')


# Ví dụ chạy thử: dùng MinHash trên 1000 cặp
cfg = ExperimentConfig(
    embedding_file=EMBEDDING_FILE,
    sample_pairs=1000,
    hash_method='minhash',   # 'none' | 'simhash' | 'minhash'
    n_bits=64,
    n_hashes=128,
    topk_dims=20,
    n_bands=16,
    top_k=10,
)

print('Cấu hình thí nghiệm:', cfg)
run_experiment(cfg)