# Deduplicating Text in Common-Crawl for LLM Training. 

In this notebook, we will cover how to perform the minhash deduplication algorithm on html documents from the common crawl dataset.

The Common Crawl corpus contains petabytes of data, with its oldest entries dating back to 2008, including raw web page data, metadata extracts, and text extracts.

LLMs require massive amounts of data to train on. Early foundation models like GPT-3 and T5 saw improvements in model performance due to deduplication efforts. Deduplication makes it far less likely that the model regurgitates memorized text leading to better responses.

*See [Deduplicating Training Data Makes Language Models Better (Lee et. all)](https://aclanthology.org/2022.acl-long.577.pdf)*

### The MinHash Deduplication algorithm

If you google "minhash deduplication" you'll find a variety of sources that can walk you through the aglorithm. [Finding Near Duplicates with Jaccard Similarity and MinHash by Nelson Elhage](https://blog.nelhage.com/post/fuzzy-dedup/) is a great place to start, but if you are looking for the canonical reference for the MinHash deduplication algorithm, it originates from the seminal paper by Andrei Z. Broder, published in 1997, titled:

```text
"On the resemblance and containment of documents"
Published in: Proceedings of the Compression and Complexity of Sequences 1997 (SEQUENCES '97)
Publisher: IEEE Computer Society
DOI: 10.1109/SEQUEN.1997.666900
```

A video walkthough of the algorithm is also available through [Mike Mull's presentation on YouTube](https://www.youtube.com/watch?v=KKNPmvELUP4). He even provides a [jupyter notebook](https://github.com/papers-we-love/san-diego/blob/master/presentations/2016-11-03-resemblance-containment-documents/Broder97.ipynb) detailing the core primatives and how they are calculated in pure python. 

---

In this notebook, we will walk through a practical approach to deduplicating text at scale. 


### First we will need to authenticate with AWS to access S3

Crawl data is free to access by anyone from anywhere. The data is hosted by Amazon Web Services’ Open Data Sets Sponsorships program on the bucket s3://commoncrawl/, located in the US-East-1 (Northern Virginia) AWS Region. The most performative means of accessing Common crawl is through s3, so you'll need to authenticate with an `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. 

Common Crawl data can also be accessed without authentication, anonymously via it's http endpoint, but for the purposes of this walkthrough we are going to stick with S3. 


In [2]:
import daft
from daft.io import IOConfig, S3Config
import os
from dotenv import load_dotenv

# Make sure to define your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY in your environment variables or in a .env file
load_dotenv()

s3_config = S3Config(
    region_name="us-east-1",
    requester_pays=True,
    key_id=os.environ["AWS_ACCESS_KEY_ID"],
    access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
    anonymous=False,
)

IO_CONFIG = IOConfig(s3=s3_config)
daft.set_planning_config(default_io_config=IO_CONFIG)

DaftContext(_ctx=<builtins.PyDaftContext object at 0x105c6b3d0>)

## Loading & Preprocessing Common Crawl Documents 

We will be accessing Common Crawl through [WARC files](https://commoncrawl.org/blog/navigating-the-warc-file-format) since daft supports the format natively with `daft.read_warc(uri)`

In [3]:
NUM_ROWS = 500 # We'll limit this demo to a small number of rows for our initial walkthrough
index_col = "block_id" 
content_col = "block"

In [4]:
df_warc = daft.read_warc("s3://commoncrawl/crawl-data/CC-MAIN-2018-17/segments/*/warc/*.warc.gz").limit(NUM_ROWS)
df_warc.collect()

  from .autonotebook import tqdm as notebook_tqdm
2025-09-06 18:50:23,091	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


                                                            d

WARC-Record-ID Utf8,WARC-Type Utf8,"WARC-Date Timestamp(Nanoseconds, Some(""Etc/UTC""))",Content-Length Int64,WARC-Identified-Payload-Type Utf8,warc_content Binary,warc_headers Utf8
1fe1885f-4be1-4df7-b0e4-cfa6e6b3cfcc,warcinfo,2018-04-19 13:05:50 UTC,327,,"b""robots: classic\r\nhostname: ip-10""...","{""Content-Type"":""application/warc-fields"",""WARC-Filename"":""CC-MAIN-20180419130550-20180419150550-00000.warc.gz""}"
37508f7d-d935-47e1-8811-da8926e386d7,request,2018-04-19 13:38:17 UTC,234,,"b""GET /msbigclit-pic HTTP/1.0\r\nHos""...","{""Content-Type"":""application/http; msgtype=request"",""WARC-IP-Address"":""193.70.31.4"",""WARC-Target-URI"":""http://00000000sexbom.myseostats.com/msbigclit-pic"",""WARC-Warcinfo-ID"":""<urn:uuid:1fe1885f-4be1-4df7-b0e4-cfa6e6b3cfcc>""}"
4e91800f-82d0-42d2-acdc-d0421f045179,response,2018-04-19 13:38:17 UTC,12512,application/xhtml+xml,"b""HTTP/1.1 200 OK\r\nServer: Apache/""...","{""Content-Type"":""application/http; msgtype=response"",""WARC-Block-Digest"":""sha1:7HNAQCEVWZ5ZQSJJCAIJGOXU5FCI357X"",""WARC-Concurrent-To"":""<urn:uuid:37508f7d-d935-47e1-8811-da8926e386d7>"",""WARC-IP-Address"":""193.70.31.4"",""WARC-Payload-Digest"":""sha1:DTMDPBY234254BEAMQU3SAOV42N262GF"",""WARC-Target-URI"":""http://00000000sexbom.myseostats.com/msbigclit-pic"",""WARC-Warcinfo-ID"":""<urn:uuid:1fe1885f-4be1-4df7-b0e4-cfa6e6b3cfcc>""}"
f3e8768d-d5d8-4916-b87f-24abfd5f1e85,metadata,2018-04-19 13:38:17 UTC,20,,"b""fetchTimeMs: 370\r\n\r\n""","{""Content-Type"":""application/warc-fields"",""WARC-Concurrent-To"":""<urn:uuid:4e91800f-82d0-42d2-acdc-d0421f045179>"",""WARC-Target-URI"":""http://00000000sexbom.myseostats.com/msbigclit-pic"",""WARC-Warcinfo-ID"":""<urn:uuid:1fe1885f-4be1-4df7-b0e4-cfa6e6b3cfcc>""}"
1712b9ab-3a87-4fdd-bc59-16a686e63e5b,request,2018-04-19 13:54:26 UTC,239,,"b""GET /kissssssssssss-cam HTTP/1.0\r""...","{""Content-Type"":""application/http; msgtype=request"",""WARC-IP-Address"":""193.70.31.4"",""WARC-Target-URI"":""http://0000sweetkelly.myseostats.com/kissssssssssss-cam"",""WARC-Warcinfo-ID"":""<urn:uuid:1fe1885f-4be1-4df7-b0e4-cfa6e6b3cfcc>""}"
26bfa20e-2b67-4ab3-ab71-0a85854c696f,response,2018-04-19 13:54:26 UTC,10661,application/xhtml+xml,"b""HTTP/1.1 200 OK\r\nServer: Apache/""...","{""Content-Type"":""application/http; msgtype=response"",""WARC-Block-Digest"":""sha1:F66VYVMWJSB2U2FGTF4BHK27GMDRFUHB"",""WARC-Concurrent-To"":""<urn:uuid:1712b9ab-3a87-4fdd-bc59-16a686e63e5b>"",""WARC-IP-Address"":""193.70.31.4"",""WARC-Payload-Digest"":""sha1:H4RPFIQYELNBFGZ25JXQNY2IZJ534USU"",""WARC-Target-URI"":""http://0000sweetkelly.myseostats.com/kissssssssssss-cam"",""WARC-Warcinfo-ID"":""<urn:uuid:1fe1885f-4be1-4df7-b0e4-cfa6e6b3cfcc>""}"
a5ae8cc0-ccda-4bfc-a68c-1695fd8f6693,metadata,2018-04-19 13:54:26 UTC,20,,"b""fetchTimeMs: 670\r\n\r\n""","{""Content-Type"":""application/warc-fields"",""WARC-Concurrent-To"":""<urn:uuid:26bfa20e-2b67-4ab3-ab71-0a85854c696f>"",""WARC-Target-URI"":""http://0000sweetkelly.myseostats.com/kissssssssssss-cam"",""WARC-Warcinfo-ID"":""<urn:uuid:1fe1885f-4be1-4df7-b0e4-cfa6e6b3cfcc>""}"
fed18bc7-0924-4a33-9fa8-ce2b0281126b,request,2018-04-19 13:51:43 UTC,255,,"b""GET /wp-content/uploads/2016/03/IO""...","{""Content-Type"":""application/http; msgtype=request"",""WARC-IP-Address"":""206.188.193.123"",""WARC-Target-URI"":""http://000bncm.wcomhost.com/wp-content/uploads/2016/03/IOM-461-2015.pdf"",""WARC-Warcinfo-ID"":""<urn:uuid:1fe1885f-4be1-4df7-b0e4-cfa6e6b3cfcc>""}"


In [5]:
# Lets investigate the different types of payloads we have: 
df_warc.select("WARC-Identified-Payload-Type").distinct().show()

WARC-Identified-Payload-Type Utf8
application/xhtml+xml
application/atom+xml
application/pdf
text/html
""


### Preprocessing
Since we are primarily concerned with text, we will focus on text/html payloads, extracting text content from html body and normalizing the text itself. 


In [6]:
from daft import col
from daft.functions import monotonically_increasing_id


# Define a UDF to remove http headers from the payload
@daft.func()
def remove_http_headers(payload: str) -> str:
    if payload is None:
        return ""
    parts = payload.split("\r\n\r\n", 1)
    return parts[1] if len(parts) == 2 else payload

# Filter the dataframe to only include text/html payloads
df_html = df_warc.where(col("WARC-Identified-Payload-Type")== "text/html")

# Seperate the http headers from the payloads
df_html = (
    df_html
    .with_column("content_raw", remove_http_headers(col("warc_content").try_decode("utf-8")))
    .where(col("content_raw") != "")
)  

#### Extracting Text from HTML

In [7]:
from selectolax.parser import HTMLParser
import re

# Define a UDF to extract text from HTML content, Specifically (article, main, p, h1, h2, h3, li)
@daft.func()
def extract_blocks(html: str) -> list[str]:
    tree = HTMLParser(html)
    for n in tree.css("script,style,noscript"):
        n.decompose()

    blocks = []
    for node in tree.css("""title, article, main, p, h1, h2, h3, h4, h5, h6, li, div, section, img[alt], figcaption, caption, blockquote, table th, table td, pre, code, summary, meta[name="description"], meta[property="og:title"], meta[property="og:description"]"""):
        txt = node.text(separator=" ", strip=True)
        if txt: 
            blocks.append(txt)
    return blocks

@daft.func()
def get_block_idx(blocks: list[str]) -> list[int]:
    return list(range(len(blocks)))

df_text = (
    df_html
    .with_column("blocks", extract_blocks(col("content_raw")))
    .with_column("block_idx", get_block_idx(col("blocks")))
    .explode("blocks", "block_idx")
    .where(col("blocks") != "")
    .where(col("blocks").not_null())
    .with_column(index_col, col("WARC-Record-ID")+ "-" + col("block_idx"))
    .with_column(content_col, col("blocks"))
    .select(
        "WARC-Record-ID",
        index_col,
        content_col,
    )
)
df_text = df_text.collect()
df_text.show(3)

WARC-Record-ID Utf8,block_id Utf8,block Utf8
b9738a42-83c9-4fb1-9e8f-2530a0cbe068,b9738a42-83c9-4fb1-9e8f-2530a0cbe068-0,Kendall Tenney – Page 5 – 10emedia
b9738a42-83c9-4fb1-9e8f-2530a0cbe068,b9738a42-83c9-4fb1-9e8f-2530a0cbe068-1,"Three Reasons Donald Sterling Won’t Make a Media Comeback Like | Kendall Tenney | May 5, 2014 | Uncategorized The question people most often ask me when someone has imploded in the media is whether that person can overcome all the negative attention and restore his or her reputation. And so it has been with L.A. Clippers owner Donald Sterling. The answer in this case, “Not a chance.” Not even our best public relations services could help Sterling now. He committed three huge mistakes ... Three Reasons Donald Sterling Won’t Make a Media Comeback Read More"
b9738a42-83c9-4fb1-9e8f-2530a0cbe068,b9738a42-83c9-4fb1-9e8f-2530a0cbe068-2,"10e Media Job Opportunity Like | Kendall Tenney | Feb 20, 2014 | Uncategorized 10e Media is expanding its dynamic team of professionals and looking for an experienced organizational administrator to help take our agency to the next level. Do you know of someone who might be interested? See below for more information and application instructions. Part-Time Organizational Administrator Job Description 10e Media is a full-service public relations agency based in Las Vegas, ... 10e Media Job Opportunity Read More"


### Text Normalization 

So far we have extracted the text out of each html document into blocks. Now we move to normalize the text blocks to prepare for the MinHash operation. 

*Note: It is recommended to run your preprocessing pipeline seperately from your minhash deduplication workload.* 

In [8]:
# Normalize text 
df_norm = df_ready.with_column("normalized", 
    col(content_col).str.normalize(
        remove_punct=True, 
        lowercase=True, 
        nfd_unicode=True, 
        white_space=True
    ) 
)
df_norm.select("nodes", content_col, "normalized").show(3)

NameError: name 'df_ready' is not defined

In [None]:
# Drop Un-needed Columns and add an integer index surrogate
df_ready = (
    df_text
    .select(index_col, content_col)
    .with_column("nodes", monotonically_increasing_id())
)
# Note: "nodes" maps directly to our "block_id" for this session only

NameError: name 'df_text' is not defined

## Minhash 
and calculate our minhash vectors using daft's `minhash` expression! 
No need to build shingles!

In [None]:
K = 64 # Number of Permutations
SEED = 42 # Seed for the hash function 
NGRAM_SIZE = 5 # Size of the n-grams

In [None]:
# Calculate the minhash vectors
df_minhash = (
    df_norm
    .with_column("min_hashes", col("normalized").minhash(
        num_hashes = K,
        ngram_size = NGRAM_SIZE,
        seed = SEED, 
        hash_function = 'xxhash'
        )
    )
)
df_minhash.select("nodes", content_col, "min_hashes").show(3)

nodes UInt64,block Utf8,min_hashes FixedSizeList[UInt32; 64]
0,5 appliances that make life way easier,"[2567617806, 583603331, 950903248, 1099293893, 1503146628, 127981495, 934407494, 468550446, 296298175, 233336007, 967725273, 91534179, 1062252632, 1215219737, 104221614, 1018723700, 371354144, 746239416, 1181096810, 767883357, 2162686897, 1489490333, 2316710070, 1113703927, 2041826910, 1103818119, 599526336, 358495183, 2534618608, 1267167239, 3128973137, 2334619808, 203574577, 1420233481, 685191566, 2892430538, 1800834982, 1306607780, 1704029061, 664495915, 1138730815, 107799664, 1771625155, 2210873729, 485712021, 2481057357, 260695272, 427086660, 49996916, 112990061, 1194056068, 2087380852, 676784938, 822011383, 461047225, 2009954461, 107611766, 1791699949, 80088978, 1566677230, 328272590, 670393872, 423145881, 1536692964]"
1,5 appliances that make life way easier,"[2567617806, 583603331, 950903248, 1099293893, 1503146628, 127981495, 934407494, 468550446, 296298175, 233336007, 967725273, 91534179, 1062252632, 1215219737, 104221614, 1018723700, 371354144, 746239416, 1181096810, 767883357, 2162686897, 1489490333, 2316710070, 1113703927, 2041826910, 1103818119, 599526336, 358495183, 2534618608, 1267167239, 3128973137, 2334619808, 203574577, 1420233481, 685191566, 2892430538, 1800834982, 1306607780, 1704029061, 664495915, 1138730815, 107799664, 1771625155, 2210873729, 485712021, 2481057357, 260695272, 427086660, 49996916, 112990061, 1194056068, 2087380852, 676784938, 822011383, 461047225, 2009954461, 107611766, 1791699949, 80088978, 1566677230, 328272590, 670393872, 423145881, 1536692964]"
2,"1. Urban Cultivator ($2,499)","[3319795349, 4261608729, 236267498, 3376212958, 2524395044, 2814896286, 1858548538, 1856848405, 3340268929, 3147475677, 478229479, 2399438427, 1942975941, 2727781382, 3472840287, 2060145216, 3805166111, 1184215723, 1460376922, 215335326, 3040368182, 2767959720, 2217792065, 684172897, 3664918910, 3469216890, 1107646919, 104495794, 3590496955, 52250265, 2987505160, 61918236, 365309380, 1544610557, 3625160524, 1329622720, 2909759363, 3867744087, 683939021, 1770932831, 2090340498, 964654335, 3536194636, 367399241, 875105570, 566390758, 2462754282, 3618807225, 1421329816, 808596251, 2906696088, 2912448202, 2021845245, 800089867, 4115368585, 3994954676, 2434877723, 2686370354, 2996179104, 890313694, 1376323088, 3431834482, 3701337844, 3408666446]"


### LSH Band Generation and Grouping

Next, we will:
1. Use the optimal_param function to determine the best band (b) and row (r) parameters for our LSH bucketing
2. Split each document's minhash vector into `B` bands of `R` rows each
3. Create buckets by hashing each band's signature, grouping similar documents together


In [None]:
from scipy.integrate import quad as integrate

def optimal_param(
    threshold: float,
    num_perm: int,
    false_positive_weight: float = 0.5,
    false_negative_weight: float = 0.5,
):
    """
    Compute the optimal `MinHashLSH` parameter that minimizes the weighted sum
    of probabilities of false positive and false negative, taken from datasketch.

    Parameters
    ----------
    threshold : float
        The threshold for similarity.
    num_perm : int
        The number of permutations.
    false_positive_weight : float
        The weight of false positive.
    false_negative_weight : float
        The weight of false negative.

    Returns
    -------
    Tuple[int, int]
        The optimal `b` and `r` parameters.
        The number of bands, and the number of rows per band respectively.

    Examples
    --------
    >>> optimal_param(0.7, 256)
    (25, 10)
    """

    def false_positive_area(threshold: float, b: int, r: int):
        """Source: `datasketch.lsh`"""

        def area(s):
            return 1 - (1 - s ** float(r)) ** float(b)

        a, _ = integrate(area, 0.0, threshold)
        return a

    def false_negative_area(threshold: float, b: int, r: int):
        """Source: `datasketch.lsh`"""

        def area(s):
            return 1 - (1 - (1 - s ** float(r)) ** float(b))

        a, _ = integrate(area, threshold, 1.0)
        return a

    min_error = float("inf")
    opt = (0, 0)
    for b in range(1, num_perm + 1):
        max_r = int(num_perm / b)
        for r in range(1, max_r + 1):
            fp = false_positive_area(threshold, b, r)
            fn = false_negative_area(threshold, b, r)
            error = fp * false_positive_weight + fn * false_negative_weight
            if error < min_error:
                min_error = error
                opt = (b, r)
    return opt

Try tweaking the threshold value to see what happens

In [None]:
# Choose B bands and R rows per band such that B · R = num_perm.
B, R = optimal_param(0.7, K)
print(B, R, K)
# Verify that B * R = K
assert B * R == K 

8 8 64


In [None]:
# Band Generation
df_bands = (
    df_minhash
    .with_column("bands", col("min_hashes").list.chunk(R))
)
df_bands.select("nodes",content_col, "min_hashes", "bands").show(3)    

nodes UInt64,block Utf8,min_hashes FixedSizeList[UInt32; 64],bands List[FixedSizeList[UInt32; 8]]
0,5 appliances that make life way easier,"[2567617806, 583603331, 950903248, 1099293893, 1503146628, 127981495, 934407494, 468550446, 296298175, 233336007, 967725273, 91534179, 1062252632, 1215219737, 104221614, 1018723700, 371354144, 746239416, 1181096810, 767883357, 2162686897, 1489490333, 2316710070, 1113703927, 2041826910, 1103818119, 599526336, 358495183, 2534618608, 1267167239, 3128973137, 2334619808, 203574577, 1420233481, 685191566, 2892430538, 1800834982, 1306607780, 1704029061, 664495915, 1138730815, 107799664, 1771625155, 2210873729, 485712021, 2481057357, 260695272, 427086660, 49996916, 112990061, 1194056068, 2087380852, 676784938, 822011383, 461047225, 2009954461, 107611766, 1791699949, 80088978, 1566677230, 328272590, 670393872, 423145881, 1536692964]","[[2567617806, 583603331, 950903248, 1099293893, 1503146628, 127981495, 934407494, 468550446], [296298175, 233336007, 967725273, 91534179, 1062252632, 1215219737, 104221614, 1018723700], [371354144, 746239416, 1181096810, 767883357, 2162686897, 1489490333, 2316710070, 1113703927], [2041826910, 1103818119, 599526336, 358495183, 2534618608, 1267167239, 3128973137, 2334619808], [203574577, 1420233481, 685191566, 2892430538, 1800834982, 1306607780, 1704029061, 664495915], [1138730815, 107799664, 1771625155, 2210873729, 485712021, 2481057357, 260695272, 427086660], [49996916, 112990061, 1194056068, 2087380852, 676784938, 822011383, 461047225, 2009954461], [107611766, 1791699949, 80088978, 1566677230, 328272590, 670393872, 423145881, 1536692964]]"
1,5 appliances that make life way easier,"[2567617806, 583603331, 950903248, 1099293893, 1503146628, 127981495, 934407494, 468550446, 296298175, 233336007, 967725273, 91534179, 1062252632, 1215219737, 104221614, 1018723700, 371354144, 746239416, 1181096810, 767883357, 2162686897, 1489490333, 2316710070, 1113703927, 2041826910, 1103818119, 599526336, 358495183, 2534618608, 1267167239, 3128973137, 2334619808, 203574577, 1420233481, 685191566, 2892430538, 1800834982, 1306607780, 1704029061, 664495915, 1138730815, 107799664, 1771625155, 2210873729, 485712021, 2481057357, 260695272, 427086660, 49996916, 112990061, 1194056068, 2087380852, 676784938, 822011383, 461047225, 2009954461, 107611766, 1791699949, 80088978, 1566677230, 328272590, 670393872, 423145881, 1536692964]","[[2567617806, 583603331, 950903248, 1099293893, 1503146628, 127981495, 934407494, 468550446], [296298175, 233336007, 967725273, 91534179, 1062252632, 1215219737, 104221614, 1018723700], [371354144, 746239416, 1181096810, 767883357, 2162686897, 1489490333, 2316710070, 1113703927], [2041826910, 1103818119, 599526336, 358495183, 2534618608, 1267167239, 3128973137, 2334619808], [203574577, 1420233481, 685191566, 2892430538, 1800834982, 1306607780, 1704029061, 664495915], [1138730815, 107799664, 1771625155, 2210873729, 485712021, 2481057357, 260695272, 427086660], [49996916, 112990061, 1194056068, 2087380852, 676784938, 822011383, 461047225, 2009954461], [107611766, 1791699949, 80088978, 1566677230, 328272590, 670393872, 423145881, 1536692964]]"
2,"1. Urban Cultivator ($2,499)","[3319795349, 4261608729, 236267498, 3376212958, 2524395044, 2814896286, 1858548538, 1856848405, 3340268929, 3147475677, 478229479, 2399438427, 1942975941, 2727781382, 3472840287, 2060145216, 3805166111, 1184215723, 1460376922, 215335326, 3040368182, 2767959720, 2217792065, 684172897, 3664918910, 3469216890, 1107646919, 104495794, 3590496955, 52250265, 2987505160, 61918236, 365309380, 1544610557, 3625160524, 1329622720, 2909759363, 3867744087, 683939021, 1770932831, 2090340498, 964654335, 3536194636, 367399241, 875105570, 566390758, 2462754282, 3618807225, 1421329816, 808596251, 2906696088, 2912448202, 2021845245, 800089867, 4115368585, 3994954676, 2434877723, 2686370354, 2996179104, 890313694, 1376323088, 3431834482, 3701337844, 3408666446]","[[3319795349, 4261608729, 236267498, 3376212958, 2524395044, 2814896286, 1858548538, 1856848405], [3340268929, 3147475677, 478229479, 2399438427, 1942975941, 2727781382, 3472840287, 2060145216], [3805166111, 1184215723, 1460376922, 215335326, 3040368182, 2767959720, 2217792065, 684172897], [3664918910, 3469216890, 1107646919, 104495794, 3590496955, 52250265, 2987505160, 61918236], [365309380, 1544610557, 3625160524, 1329622720, 2909759363, 3867744087, 683939021, 1770932831], [2090340498, 964654335, 3536194636, 367399241, 875105570, 566390758, 2462754282, 3618807225], [1421329816, 808596251, 2906696088, 2912448202, 2021845245, 800089867, 4115368585, 3994954676], [2434877723, 2686370354, 2996179104, 890313694, 1376323088, 3431834482, 3701337844, 3408666446]]"


#### Lets pause a moment to break down this last operation

**Previously** we calculated the minhashes for our `content_text` where we hashed each word token into an 8 byte integer, taking only 32 samples (at a uniform random sample). 

**Then** we took those 32 hashes and chunked them into 4 lists of 8 values. 

**Now** we will explode our bands into new rows, keeping track of their position in the band using `band_idx`. 

In [None]:
@daft.func()
def get_band_idx(band: list[int], B: int) -> list[int]:
    return list(range(min(len(band), B)))

df_bands_exploded = (
    df_bands
    .with_column("band_idx", get_band_idx(col("bands"), B)) 
    .explode("bands", "band_idx")
)
df_bands_exploded.select("nodes", "band_idx", "bands",).show(3)

🗡️ 🐟 UDF get_band_idx: 00:00 [A[A[A




nodes UInt64,band_idx Int64,bands FixedSizeList[UInt32; 8]
0,0,"[2567617806, 583603331, 950903248, 1099293893, 1503146628, 127981495, 934407494, 468550446]"
0,1,"[296298175, 233336007, 967725273, 91534179, 1062252632, 1215219737, 104221614, 1018723700]"
0,2,"[371354144, 746239416, 1181096810, 767883357, 2162686897, 1489490333, 2316710070, 1113703927]"


🗡️ 🐟 UDF get_band_idx: 00:00 8,312 rows received, 8,312 rows emitted[A[A[A

[A[A

### Grouping bands 
We then group the bands against their 'signature', which is a combination of their band index and the band itself. If two segments are duplicates, we expect their signatures to match. 

In [None]:
# Grouping Bands
df_grouped = (
    df_bands_exploded
    .groupby(col("band_idx"), col("bands"))
    .agg(col("nodes").agg_list())
)
df_grouped.select("band_idx", "bands", "nodes").show()

band_idx Int64,bands FixedSizeList[UInt32; 8],nodes List[UInt64]
7,"[3151605336, 3885777648, 4137040297, 4147669534, 1184537642, 2005418764, 826576808, 1782230857]",[7298]
7,"[521029646, 3394758789, 1146099738, 2555085614, 572022846, 2882151040, 456793869, 1873974548]",[1835]
4,"[2059358136, 2570064209, 3869411890, 4127264206, 1115703836, 3267346446, 138493978, 1875447171]",[107]
3,"[2016086465, 182620752, 1250693045, 2136436348, 3833791785, 1422415550, 30130678, 1992683765]",[311]
1,"[555368185, 219166509, 815977239, 624119524, 273103732, 41456283, 473073990, 535221729]",[458]
2,"[1120093654, 3886470132, 178876218, 2563426147, 1099605856, 2140060326, 1578718007, 1651150368]","[6760, 6761, 6762]"
7,"[3149077954, 1858338203, 1112607253, 1659969150, 3896895982, 1709459729, 2472813415, 2912350301]",[6159]
3,"[3820727710, 1234204549, 88404221, 2136566294, 1071487122, 3891082603, 2696759455, 2667480840]",[6445]


#### Last Note on LSH Banding
If we had expected every band to be unique the `nodes` column would only ever contain a single value, making this groupby a pretty expensive and pointless operation. Since the internet is full of duplicate data, we end up seeing a few candidates. 

In [None]:
# Inspecting bands with multiple nodes
df_grouped.where(col("nodes").list.length() > 1).select("band_idx", "bands", "nodes").show()

band_idx Int64,bands FixedSizeList[UInt32; 8],nodes List[UInt64]
5,"[2119825340, 1983862797, 3999424214, 1630576213, 2810772710, 2059305656, 3617450287, 4232518760]","[1488, 1521, 1529]"
3,"[192773728, 2970120711, 2527085763, 574804336, 2262543580, 484445144, 3137378665, 4076722577]","[6796, 6808]"
0,"[3948801037, 2403107767, 3706926718, 3855607459, 2177991364, 2444764633, 3312377656, 4178248276]","[728, 842, 944, 1010, 1112]"
4,"[2077138780, 3392147413, 336241344, 3259882724, 1855596593, 2319263241, 2072174995, 3100410775]","[717, 795, 909, 1077, 1143, 1245, 1353]"
7,"[1621987198, 42062113, 81674044, 1427606334, 700273398, 1233482456, 250879270, 31967733]","[528, 318]"
4,"[622206236, 4218478485, 1450450400, 2653052804, 523652321, 1627103321, 3546007555, 3494230615]","[1485, 1518]"
1,"[3299253456, 2460227983, 3582938177, 3981214691, 3294256065, 136710230, 2352488759, 2332951172]","[4216, 4271]"
0,"[3375819684, 4081690774, 300697521, 3486686152, 2838535013, 1088006325, 1135945366, 3922625459]","[725, 803, 917, 4322, 1085, 1151, 1253, 1325, 1361]"


## Connected Components
Every band whose **nodes** have more than one entry are now candidates for consideration. But there is something wrong... Our nodes are repeated across different band indices! 

In order to reduce our candidates into their unique set, we leverage a few tricks from graph theory to isolate the duplicates. Here we get to implement one the most important algorithms in distributed computing. [*Connected Components in MapReduce and Beyond*](https://dl.acm.org/doi/pdf/10.1145/2670979.2670997) is a seminal paper from 2014 written by researchers at Google. 

We’ll follow the paper’s star‑contraction recipe: alternate a Large‑star and Small‑star pass that repeatedly points each node to the smallest ID in its neighborhood. After a few rounds the edge set stabilizes; the “parent” each node points to is its component representative.

Concretely, we’ll collapse band groups into a simple graph:
- Treat each document as a node.
- For every band with multiple nodes, connect each node to the group’s minimum ID (drop self-loops and duplicates).
- This produces an undirected edge list that captures “co-occurred somewhere” linkage.

From there we use star-contraction (Kiveris et al., 2014) to snap clusters together:
- Large-star: for each node, point to the smallest ID in its neighborhood (including itself). Emit edges (v, m(u)) only where v > u.
- Small-star: canonicalize edges so u ≥ v, recompute the same “point to the minimum,” and emit (v, m(u)) for all neighbors.

Repeat Large-star then Small-star until the edge set stops changing. The final “parent” each node points to is its component representative (typically the component’s minimum ID). It’s fast, scalable, and after a handful of rounds, the clusters just fall out!

In [None]:
# First we must convert our list of nodes into an edge list 
df_edges = (
    df_grouped
    .with_column("left_edge", col("nodes").list.min())
    .explode("nodes")
    .select("left_edge", right_edge=col("nodes"))
    .filter(col("left_edge") != col("right_edge"))
    .distinct()
)
df_edges.show(5)

u UInt64,v UInt64
5978,6099
2390,2952
81,4827
5593,5597
2159,2985


### 1. Canonicalize edges to undirected form

- Direct from larger id -> smaller id
- drop self-loops & duplicates
- Remove nulls

In [None]:
# Re-Label Edges 
df_labeled = df_edges.with_columns_renamed(
    
)

# Canonicalize Directed Edges to Undirected Edges
df_canonicalized = (
    df_labeled.select(
        (col("u") >= col("v")).if_else(col("u"), col("v")).alias("u"),
        (col("u") >= col("v")).if_else(col("v"), col("u")).alias("v"),
    )
    .where(col("u") != col("v"))
    .where(~col("u").is_null())
    .where(~col("v").is_null())
    .distinct()
    .collect()
)

In [None]:
# prep edges for connected components
df_edges_clean = (
    
    .where(~col("u").is_null())
    .where(~col("v").is_null())
    .where(col("u") != col("v"))
)


In [None]:
import igraph as ig

df_pd_edges = df_edges_clean.select(col("u").cast(daft.DataType.int64()), col("v").cast(daft.DataType.int64())).to_pandas()

# using igraph
g = ig.Graph.DataFrame(df_pd_edges, directed=False)
strong_components = {frozenset(c) for c in g.connected_components(mode="strong")}
weak_components = {frozenset(c) for c in g.connected_components(mode="weak")}

print(strong_components)
print(weak_components)
assert strong_components == weak_components


In [None]:
import matplotlib.pyplot as plt
import random

fig, ax = plt.subplots()
ig.plot(
    weak_components,
    target=ax,
    palette=ig.RainbowPalette(),
    vertex_size=7,
    vertex_color=list(map(int, ig.rescale(components.membership, (0, 200), clamp=True))),
    edge_width=0.7
)
plt.show()

### Star Contraction with Daft
Now we will iteratively compress the graph using two alternating phases until convergence:
- Large-star: Every node points to the minimum ID in its neighborhood (including itself). This quickly pulls nodes toward low-ID “hubs.”
- Small-star: Re-orient edges to ensure u < v (canonicalize) and repeat contraction, which merges local hubs together.
- Repeat large-star then small-star until nothing changes. The “parent” each node ends up pointing to is its component representative.

### 2. Large-star phase
- Group neighbors by u.
- Compute min_neighbor = min(neighbors).
- Use min(u, min_neighbor) as the node’s “parent.”
- Emit edges (u, parent) but only where parent > u to avoid self-loops and duplicates.

In [None]:
a = (
    b
    # large_star_map
    .select("u", "v") 
    .union_all(b.select(col("v").alias("u"), col("u").alias("v"))) # Include upper and lower triangles
    .groupby("u").agg_list("v") # Group by u and aggregate the list of v's 
    .with_column("min_edge", col("v").list.min()) # Find the minimum v for each u and call it min_edge
    .with_column("min_edge", (col("u") <= col("min_edge")).if_else(col("u"), col("min_edge"))) # If u is less than the min_edge, use u, otherwise use the min_edge... this is just a sanity check to ensure we are always moving towards lower ids. 
    .with_column("v", col("v").explode())
    .where(col("v") > col("u"))
    .where(~col("v").is_null()) #should be a no-op but just in case
    .distinct()
    .select(col("u"), col("v"))
)

In [None]:
# Compare results after 1 large star iteration 
daft_components = {frozenset([d["u"],*d["v"]]) for d in a.to_pylist()}
assert daft_components == strong_components

### 3. Small-star phase
- Re-orient all edges so u < v (canonical).
- Group neighbors by u, compute min_neighbor, connect (u, parent) like above.
- This step merges local minima across previously separate stars.

In [None]:
# TODO: this will fail.
def small_star_phase(df: DataFrame):
    return (
        df
        # small_star_map
        .select((col("u") > col("v")).if_else(ee(col("u"), col("v")), ee(col("v"), col("u"))).alias("e"))
        .select(col("e")["*"])

        .groupby("u").agg_list("v")
        # small_star_reduce
        .with_column("min_edge", col("v").list.min())
        .with_column("min_edge", (col("u") <= col("min_edge")).if_else(col("u"), col("min_edge")))
        .select(col("u").list.map(ee(daft.element(), col("min_edge"))).alias("e"), col("u"), col("min_edge"))
        # TODO: list_append

        .explode("e")
        .where(~col("e").is_null())
        .distinct()
        .select(col("e")["*"])
        .collect() # Materialize
  )

### 4. Convergence check
- Compare a stable summary of edges before/after (hash sum is fine).
- If stable, stop; otherwise repeat.

In [None]:
def check_convergence(a: DataFrame, b: DataFrame):
    a_hash = a.select(col("u").hash().alias("hash")).sum("hash").to_pydict()["hash"][0]
    b_hash = b.select(col("u").hash().alias("hash")).sum("hash").to_pydict()["hash"][0]
    if a_hash == b_hash:
        return True

### Combining Stages

In [None]:
def connected_components(
    edges: DataFrame,     
    left_id_col="left_edge",
    right_id_col="right_edge",
    output_index_col=index_col,
    output_component_col="__component__",
):
    # Convert column names to u, v
    b = (
        edges.select(col(left_id_col).alias("u"), col(right_id_col).alias("v"))
        .where(~col("u").is_null())
        .where(~col("v").is_null())
        .collect()
    )    
    while True:
        a = large_star_phase(b)
        b = small_star_phase(a)
        if check_convergence(a, b):
            break
    
    # Revert column names and return contracted star edges
    return (
        b
        .select(col("u").alias(output_index_col), col("v").alias(output_component_col))
        .collect()
    )
    

In [None]:

def components(
    df: DataFrame,
    left_id_col: str = "u",
    right_id_col: str = "v",
    output_index_col: str = "u",
    output_component_col: str = "component"
) -> DataFrame:
    b = (
        df.select(col(left_id_col).alias("u"), col(right_id_col).alias("v"))
        .where(~col("u").is_null())
        .where(~col("v").is_null())
        .collect()
    )    
    while True:
        a = (b
             # large_star_map
             .select("u", "v")
             .union_all(b.select(col("v").alias("u"), col("u").alias("v")))

             .groupby("u").agg_list("v")
             # large_star_reduce
             .with_column("min_edge", col("v").list.min())
             .with_column("min_edge", (col("u") <= col("min_edge")).if_else(col("u"), col("min_edge")))
             .select(col("u").list.map(ee(daft.element(), col("min_edge"))).alias("e"), col("u"))

             .explode("e")
             .where(col("e")["v"] > col("u")).select("e")
             .where(~col("e").is_null())
             .distinct()
             .select(col("e")["*"])
             .where(col("u") != col("v"))
             .collect()
        )
        b = (a
             # small_star_map
             .select((col("u") > col("v")).if_else(ee(col("u"), col("v")), ee(col("v"), col("u"))).alias("e"))
             .select(col("e")["*"])

             .groupby("u").agg_list("v")
             # small_star_reduce
             .with_column("min_edge", col("v").list.min())
             .with_column("min_edge", (col("u") <= col("min_edge")).if_else(col("u"), col("min_edge")))
             .select(col("u").list.map(ee(daft.element(), col("min_edge"))).alias("e"), col("u"), col("min_edge"))
             # TODO: list_append

             .explode("e")
             .where(~col("e").is_null())
             .distinct()
             .select(col("e")["*"])
             .collect()
        )
        # check convergence
        a_hash = a.select(col("u").hash().alias("hash")).sum("hash").to_pydict()["hash"][0]
        b_hash = b.select(col("u").hash().alias("hash")).sum("hash").to_pydict()["hash"][0]
        if a_hash == b_hash:
            return (
                b
                .select(col("u").alias(output_index_col), col("v").alias(output_component_col))
                .collect()
            )

In [None]:
assignment = components(
    df_edges,
    left_id_col="left_edge",
    right_id_col="right_edge",
    output_index_col=index_col,
    output_component_col="__component__",
)


In [None]:
# Running the Star Contraction
df_star_edges = connected_components(
    edges=df_edges,
    left_id_col="left_edge",
    right_id_col="right_edge",
    output_index_col=index_col,
    output_component_col="__component__",
)
df_star_edges.show(3)


### 5. Final assignment
- Treat the final v for each u as the component representative.
- Join back to your documents and keep the representative per component.

In [None]:
# Keep one per component (the representative equals the index)
 (
    df
    .join(assignment.select(col(index_col), col("__component__")), on=index_col, how="left")
    .filter(col("__component__").is_null() | (col("__component__") == col(index_col)))
    .exclude("__component__")
)