The code belows performs basic MinHash near-deduplication without any locality sensitive hashing (lsh). It simply computes minhash signatures for each document, and finds pairs of documents with high overlap of minhashes in their signatures, removing one of them.


## Similarity and Minhash Overview
### Similarity
For documents A and B, we compute their similarity as (the number of n-grams that are in intersection between A and B) / (the number of n-grams that are in union between A and B). This is sometimes called the Jaccard similarity.

This might not be always good (can you tell us why?), so instead minhash is used to approximate the Jaccard similarity.

### Minhash
To approximate the Jaccard similarity using Minhash, we first obtain minimal hash value of all n-grams in a document -> minhash_i(A).
It's easy to see that P(minhash_i(A) == minhash_i(B)) is equal to the Jaccard similarity between A and B. To approximate this probability, we therfore use multiple independent hash functions and check how many of them match between A and B.

In [None]:
!pip install xxhash numpy

In [None]:
from dataclasses import dataclass
from pathlib import Path

import numpy as np
from xxhash import xxh64_intdigest


@dataclass
class Document:
    id: int
    text: str

def ngrams(sequence: list, n: int):
    """
    Generate n-grams from a sequence of items
    Example:
        ngrams("Hi how are you?".split(), 3) -> [('Hi', 'how', 'are'), ('how', 'are', 'you')]
    """
    if len(sequence) < n:
        return []
    
    return [tuple(sequence[i:i+n]) for i in range(len(sequence) - n + 1)]

def get_signatures(shingles: np.ndarray) -> np.ndarray:
    """
    Get signatures (minhash of n-grams) from a string of text

    Args:
        shingles: numpy array of shingles: dtype = uint64, shape = (k, n_grams)

    Returns:
        numpy array of signatures: dtype = uint64, shape = (k, n_grams)
    """
    return np.min(shingles, axis=1)

def get_shingles(text: str, n_grams: int, k: int) -> np.ndarray:
    """
    Get kxn shingles (hashed n-grams) from a string of text

    Args:
        text: input text
        n_grams: n-grams size to use
        k: number of hash functions to use

    Returns:
        numpy array of shingles: dtype = uint64, shape = (k, n_grams)
    """
    text_ngrams = ngrams(text.split(), n_grams)
    ngrams_hashes = np.array([
        [
            # for each hash function(seed) compute the hash of each text_ngram
            xxh64_intdigest(" ".join(text_ngram), seed)
            for text_ngram in text_ngrams
        ] for seed in range(k)
    ], dtype=np.uint64)
    return ngrams_hashes



def dedup(data: list[Document], n_grams: int, k: int, jaccard_threshold: float = 0.8):
    """
        Takes a list of documents and near-deduplicates them using minhash with `n_grams`-grams, `k` hashes per document and a minimum jaccard similarity of `jaccard_threshold` to remove documents
    :param data: list of documents
    :param n_grams: size of n-grams to consider. for example 3 will consider contiguous 3 word-grams
    :param k: number of hash functions to use
    :param jaccard_threshold: minimum threshold to consider 2 documents as duplicates and remove one of them
    :return: a subset of `data` without near-duplicates
    """
    # First stage: create a signatures (k minhashes of n-grams) for each document
    signatures: list[tuple[int, np.ndarray]] = []
    for doc in data:
        shingles = get_shingles(doc.text, n_grams, k)
        if shingles.size != 0:
            signatures.append((doc.id, get_signatures(shingles)))
    

    # Second stage: compute the jaccard similarity between all signature pairs
    # When duplicates are found, always keep only the one with the smallest index
    to_remove_ids = set()
    for i, (doc_id_i, sig_i) in enumerate(signatures):
        for doc_id_j, sig_j in signatures[i+1:]:
            jaccard_similarity = np.sum(sig_i == sig_j) / len(sig_i)
            if jaccard_similarity > jaccard_threshold:
                to_remove_ids.add(doc_id_j)

    # Third stage: remove the documents that are marked for removal
    kept_docs = []
    for doc in data:
        if doc.id not in to_remove_ids:
            kept_docs.append(doc)

    return kept_docs


In [None]:
def load_document(path: Path) -> str:
    with open(path, "r") as f:
        return f.read()

documents = [
    Document(i, load_document(path)) for i, path in enumerate(Path("documents").glob("*.txt"))
]

print(f"Loaded {len(documents)} documents")

In [None]:
for doc in documents:
    print(f"Document {doc.id}: {doc.text}")
    print("-------")


In [None]:
deduplicated_documents = dedup(data=documents, n_grams=5, k=10, jaccard_threshold=0.7)
print(f"Kept: {len(deduplicated_documents)}/{len(documents)} documents")


Check the list of deduplicated documents. Anything that jumps out at you?

In [None]:
print("Kept documents:")
for doc in deduplicated_documents:
    print(f"Document {doc.id}: {doc.text}")
    print("-------")
