# End-to-end Fuzzy Deduplication

GPU accelerated implementation of a MinHash-LSH based fuzzy deduplication. For more information about Fuzzy deduplication in NeMo Curator, refer to the [Deduplication](https://docs.nvidia.com/nemo/curator/latest/curate-text/process-data/deduplication/index.html) section of the documentation page.

The tutorial here shows how to run Fuzzy Duplication on text data by executing 2 end to end workflows.
These 2 workflows roughly cover the following steps to perform fuzzy deduplication:

1. Read original dataset
2. Compute MinHashes signatures of these documents
3. Perform LSH - Group Minhashes into bands/buckets and shuffle these bands/buckets so that documents in the same bucket are in the same batch/file.
4. Convert the LSH outputs (bucket_id -> doc_id mapping) into a edgelist in preparation for connected components. 
5. Compute connected components across all potential duplicates found via LSH.
6. Generate list of duplicate documents by randomly selecting 1 document to keep from each group/component and dropping the rest.
7. Remove duplicates based on the generated duplicate list.

We also allow users to also run these steps independently, which will be covered in the step by step tutorial in the same directory as this tutorial.

In [1]:
import os

import fsspec

# Silence Curator logs via Loguru
os.environ["LOGURU_LEVEL"] = "INFO"

import pandas as pd

input_dataset_path = "./input"  # Path to input dataset
fuzzy_output_dir = "./fuzzy_outputs"  # Path to store all fuzzy outputs including cache & deduped dataset

fuzzy_cache_path = os.path.join(
    fuzzy_output_dir, "cache"
)  # Path to store fuzzy deduplication intermediates (minhash, lsh etc.)
deduplicated_output_path = os.path.join(fuzzy_output_dir, "fuzzy_deduped_dataset")

input_filetype = (
    "parquet"  # this can be either of jsonl or parquet (you'll need to change how input data is generated)
)
output_filetype = "parquet"  # this can be either of jsonl or parquet

storage_options = None  # Optional additional cloud I/O args to pass into Pandas/cuDF during I/O operations.
fs, _ = fsspec.url_to_fs(fuzzy_cache_path, **storage_options if storage_options is not None else {})

### Downloading and saving a sample dataset

We download and save the [Tinystories](https://huggingface.co/datasets/roneneldan/TinyStories) dataset to the specified `input_dataset_path` above. This step can be skipped if running on a different dataset that's already present in the input_dataset_path.

In [2]:
from nemo_curator.utils.file_utils import get_all_file_paths_under

if len(get_all_file_paths_under(input_dataset_path, storage_options=storage_options)) == 0:
    import os
    import uuid

    from datasets import load_dataset

    input_df = load_dataset("roneneldan/TinyStories", split="train").to_pandas()
    num_rows_per_file = 10_000

    os.makedirs(input_dataset_path, exist_ok=True)

    for i, start_idx in enumerate(range(0, len(input_df), num_rows_per_file)):
        if i % 50 == 0:
            print(f"Processing file {i}")
        end_idx = min(len(input_df), start_idx + num_rows_per_file)
        subset_df = input_df.iloc[start_idx:end_idx].copy()
        subset_df["id"] = [str(uuid.uuid4()) for _ in range(len(subset_df))]
        subset_df.to_parquet(
            os.path.join(input_dataset_path, f"part_{i}.parquet"), index=False, storage_options=storage_options
        )

    print(f"Created {i + 1} files")

## Running as a Single Stage (End-to-End)

See the [API Reference](https://docs.nvidia.com/nemo/curator/latest/apidocs/stages/stages.deduplication.fuzzy.workflow.html#api) for more information about the `FuzzyDeduplicationWorkflow` class.

### General Notes
#### ID Generation
1. The Fuzzy Deduplication Workflow doesn't utilize any existing IDs in the input dataset and instead generates IDs on the fly using an ID Generator actor.
2. The ID Generator gives each row a unique increasing integer ID, based on the order files are read.
3. This avoids expensive ID->Integer encoding for the underlying connected components algorithm which only supports integer IDs.
4. When we find duplicates, we save these integer IDs in sorted files with multiple row groups.
5. We also save a `fuzzy_id_generator.json` which maintains a mapping of input file partitions to ID ranges for that batch.
6. During removal, reading the same file groups will give the same integer IDs, using the min/max ID values, we can find all corresponding duplicates in that range making the process faster.

#### Performance Considerations
1. LSH - Configuring `bands_per_iteration` controls how many bands to process simultaneously in a single shuffle. Higher values can lead to faster performance but might increase memory pressure.
2. A low `input_blocksize` may not saturate the GPUs enough while a high `input_blocksize` can lead to OOM errors during MinHash and excessive object store usage during removal. It's recommend to keep it at 1-1.5GiB and reduce if running into OOMs during MinHash.
3. The removal step can be memory intensive and it's recommend to set a higher fraction of object store memory for removal (if the machine has enough RAM). The `RayDataExecutor` showed better results during duplicate removal.
4. The removal workflow is CPU only and can be run  on machines that don't have GPUs

#### Hyperparameter Considerations
1. The current defaults for fuzzy deduplication (260 hashes, 13 hashes per band) approximate finding documents with a Jaccard similarity of 0.8. For more information on selecting the number of bands/hashes it's recommended to analyze the S curve and tolerable threshold for false positives (and negatives). More information about LSH can be found in section `3.4.2` [here](http://infolab.stanford.edu/~ullman/mmds/ch3n.pdf).
2. The `char_ngrams` values of 24 is set to approximate roughly ngrams that correspond to ~5 words.


In [3]:
from nemo_curator.stages.deduplication.fuzzy import FuzzyDeduplicationWorkflow
from nemo_curator.stages.deduplication.id_generator import CURATOR_DEDUP_ID_STR
from nemo_curator.stages.text.deduplication import TextDuplicatesRemovalWorkflow

# All workflows support passing in different kwargs and storage_options for the read, cache and output datasets
# We use a common one here for simplicity
io_kwargs = {"storage_options": storage_options} if storage_options is not None else None

identification_workflow = FuzzyDeduplicationWorkflow(
    cache_path=fuzzy_cache_path,
    output_path=fuzzy_output_dir,
    input_path=input_dataset_path,
    input_filetype=input_filetype,
    input_blocksize="1GiB",
    text_field="text",
    seed=42,
    char_ngrams=24,
    minhashes_per_band=13,
    bands_per_iteration=10,
    read_kwargs=io_kwargs,
    cache_kwargs=io_kwargs,
    write_kwargs=io_kwargs,
)

removal_workflow = TextDuplicatesRemovalWorkflow(
    input_path=input_dataset_path,  # Must be identical to the path used during identification
    ids_to_remove_path=os.path.join(fuzzy_output_dir, "FuzzyDuplicateIds"),
    output_path=deduplicated_output_path,
    input_filetype=input_filetype,
    input_blocksize="1GiB",  # This must be identical to the blocksize used during identification
    ids_to_remove_duplicate_id_field=CURATOR_DEDUP_ID_STR,
    id_generator_path=os.path.join(fuzzy_output_dir, "fuzzy_id_generator.json"),
    output_filetype="parquet",
    input_kwargs=io_kwargs,  # read_kwargs for input dataset
    ids_to_remove_read_kwargs=io_kwargs,  # read_kwargs for removal_id's generated by Fuzzy workflow
    id_generator_storage_options=storage_options,
    output_kwargs=io_kwargs,
)

In [4]:
from nemo_curator.backends.experimental.ray_data import RayDataExecutor
from nemo_curator.core.client import RayClient

client = RayClient(num_cpus=64, num_gpus=2)  # change as needed
client.start()

_ = identification_workflow.run()
_ = removal_workflow.run(executor=RayDataExecutor())

client.stop()

[32m2025-12-09 03:10:36.389[0m | [1mINFO    [0m | [36mnemo_curator.core.utils[0m:[36minit_cluster[0m:[36m135[0m - [1mRay start command: ray start --head --node-ip-address 127.0.1.1 --port 6380 --metrics-export-port 8081 --dashboard-host 127.0.0.1 --dashboard-port 8267 --ray-client-server-port 20000 --temp-dir /tmp/ray --disable-usage-stats --num-gpus 2 --num-cpus 64 --block[0m
2025-12-09 03:10:36,390	INFO worker.py:1691 -- Using address 127.0.1.1:6380 set in the environment variable RAY_ADDRESS
2025-12-09 03:10:36,395	INFO worker.py:1832 -- Connecting to existing Ray cluster at address: 127.0.1.1:6380...
[2025-12-09 03:10:44,056 W 87202 87202] global_state_accessor.cc:505: Some processes that the driver needs to connect to have not registered with GCS, so retrying. Have you run 'ray start' on this node?
[2025-12-09 03:10:45,058 W 87202 87202] global_state_accessor.cc:505: Some processes that the driver needs to connect to have not registered with GCS, so retrying. Have you 

2025-12-09 03:10:39,174	INFO usage_lib.py:447 -- Usage stats collection is disabled.
2025-12-09 03:10:39,175	INFO scripts.py:914 -- [37mLocal node IP[39m: [1m127.0.1.1[22m
2025-12-09 03:10:47,028	SUCC scripts.py:950 -- [32m--------------------[39m
2025-12-09 03:10:47,028	SUCC scripts.py:951 -- [32mRay runtime started.[39m
2025-12-09 03:10:47,028	SUCC scripts.py:952 -- [32m--------------------[39m
2025-12-09 03:10:47,028	INFO scripts.py:954 -- [36mNext steps[39m
2025-12-09 03:10:47,028	INFO scripts.py:957 -- To add another node to this Ray cluster, run
2025-12-09 03:10:47,028	INFO scripts.py:960 -- [1m  ray start --address='127.0.1.1:6380'[22m
2025-12-09 03:10:47,029	INFO scripts.py:969 -- To connect to this Ray cluster:
2025-12-09 03:10:47,029	INFO scripts.py:971 -- [35mimport[39m[26m ray
2025-12-09 03:10:47,029	INFO scripts.py:972 -- ray[35m.[39m[26minit(_node_ip_address[35m=[39m[26m[33m'127.0.1.1'[39m[26m)
2025-12-09 03:10:47,029	INFO scripts.py:984 -- To su

[32m2025-12-09 03:10:51.085[0m | [1mINFO    [0m | [36mnemo_curator.pipeline.pipeline[0m:[36mbuild[0m:[36m70[0m - [1mPlanning pipeline: minhash_pipeline[0m
2025-12-09 03:10:51,086	INFO worker.py:1691 -- Using address 127.0.1.1:6380 set in the environment variable RAY_ADDRESS
2025-12-09 03:10:51,091	INFO worker.py:1832 -- Connecting to existing Ray cluster at address: 127.0.1.1:6380...
2025-12-09 03:10:51,097	INFO worker.py:2003 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8267 [39m[22m
[32m2025-12-09 03:10:51.115[0m | [1mINFO    [0m | [36mnemo_curator.backends.experimental.utils[0m:[36mexecute_setup_on_node[0m:[36m120[0m - [1mExecuting setup on node f0b30e122c7ad6b4e779347d12e6eb559cf781bebc1691f36d7a2e56 for 2 stages[0m
[32m2025-12-09 03:10:57.161[0m | [1mINFO    [0m | [36mnemo_curator.backends.experimental.ray_actor_pool.executor[0m:[36mexecute[0m:[36m87[0m - [1mSetup on node complete for all stages. Starting Ray Act

Running 0: 0.00 row [00:00, ? row/s]

- MapBatches(FilePartitioningStageTask) 1: 0.00 row [00:00, ? row/s]

- StreamingRepartition 2: 0.00 row [00:00, ? row/s]

- MapBatches(ParquetReaderStageActor) 3: 0.00 row [00:00, ? row/s]

- MapBatches(TextDuplicatesRemovalStageTask)->MapBatches(ParquetWriterTask) 4: 0.00 row [00:00, ? row/s]

[36m(MapBatches(FilePartitioningStageTask) pid=99439)[0m 2025-12-09 03:13:09.821 | INFO     | nemo_curator.stages.file_partitioning:process:95 - Found 212 files
[36m(MapBatches(FilePartitioningStageTask) pid=99439)[0m 2025-12-09 03:13:09.822 | INFO     | nemo_curator.stages.file_partitioning:process:132 - Created 1 file groups from 212 files
2025-12-09 03:13:39,195	INFO streaming_executor.py:298 -- ✔️  Dataset dataset_5_0 execution finished in 29.69 seconds
2025-12-09 03:13:39,203	INFO util.py:257 -- Exiting prefetcher's background thread
[32m2025-12-09 03:13:39.205[0m | [1mINFO    [0m | [36mnemo_curator.backends.experimental.ray_data.executor[0m:[36mexecute[0m:[36m97[0m - [1mPipeline completed. Final results: 1 tasks[0m
2025-12-09 03:13:39,296	INFO worker.py:1691 -- Using address 127.0.1.1:6380 set in the environment variable RAY_ADDRESS
2025-12-09 03:13:39,300	INFO worker.py:1832 -- Connecting to existing Ray cluster at address: 127.0.1.1:6380...
2025-12-09 03:13:39,3

### Looking at Intermediate Results and Output

#### MinHash Results
1. `_curator_dedup_id` - The IDs assigned to this dataset on the fly during the initial read.
2. `_minhash_signature` - MinHash Signature

#### LSH Results
1. `_bucket_id` - The bucket/band identifier
2. `_curator_dedup_id` - List of all document IDs that belong to that bucket

#### Buckets To Edges Result
1. `_curator_dedup_id_x`, `_curator_dedup_id_y` - Mapping of edges in a Graph where each column are documents that are potential duplicates.

In [5]:
minhash_path = os.path.join(fuzzy_cache_path, "MinHashStage")
display(pd.read_parquet(fs.unstrip_protocol(fs.find(minhash_path)[0]), storage_options=storage_options).head())

lsh_path = os.path.join(fuzzy_cache_path, "LSHStage")
display(pd.read_parquet(fs.unstrip_protocol(fs.find(lsh_path)[0]), storage_options=storage_options).head())

b2e_path = os.path.join(fuzzy_cache_path, "BucketsToEdgesStage")
display(pd.read_parquet(fs.unstrip_protocol(fs.find(b2e_path)[0]), storage_options=storage_options).head())

Unnamed: 0,_curator_dedup_id,_minhash_signature
0,0,"[11644717, 429172, 6014805, 86354, 2387151, 49..."
1,1,"[2103321, 653305, 2941429, 5780991, 6977799, 7..."
2,2,"[1891498, 3797631, 2961751, 50078, 21382505, 5..."
3,3,"[1286357, 4060996, 1376561, 3044837, 7369355, ..."
4,4,"[6272013, 12535265, 819579, 5975720, 25677928,..."


Unnamed: 0,_bucket_id,_curator_dedup_id
0,b0_000055fd7daae1e46223e8b7e06bf2e0,"[1178375, 2079489]"
1,b0_00006a5f30f7b2c96588bfc1bfb5321a,"[365218, 1933514]"
2,b0_00006f316d5bd251bd83702e3f1e017f,"[161590, 771961]"
3,b0_0000d90e9e4140a7ac31e6b227a62f62,"[8290, 567169]"
4,b0_0000f975e5bcda25838df43b0d37737f,"[965853, 1334885]"


Unnamed: 0,_curator_dedup_id_x,_curator_dedup_id_y
0,1178375,2079489
1,365218,1933514
2,161590,771961
3,8290,567169
4,965853,1334885


#### Connected Components Result

1. `_curator_dedup_id` - The document IDs
2. `_duplicate_group_id` - The group ID that document belongs to. Documents with the same duplicate group ID are duplicates

In [6]:
cc_path = os.path.join(fuzzy_cache_path, "ConnectedComponentsStage")
cc_df = pd.read_parquet(cc_path, storage_options=storage_options)  # works with pandas since the input here is small
display(cc_df)
grouped_cc_df = cc_df.groupby("_duplicate_group_id")._curator_dedup_id.agg(list)
display(grouped_cc_df)
duplicate_cluster_sizes = cc_df._duplicate_group_id.value_counts()
display(duplicate_cluster_sizes)

Unnamed: 0,_curator_dedup_id,_duplicate_group_id
0,576,0
1,577,482274
2,578,2
3,579,161180
4,581,161181
...,...,...
640509,2119669,640509
640510,2119670,480105
640511,2119671,480106
640512,2119673,480107


_duplicate_group_id
0              [576, 187440]
2              [578, 187442]
6              [584, 187448]
8              [586, 187450]
11             [591, 187455]
                 ...        
640505    [1942710, 2119661]
640506    [1942713, 2119664]
640507    [1942715, 2119666]
640509    [1942718, 2119669]
640513    [1942724, 2119675]
Name: _curator_dedup_id, Length: 320043, dtype: object

_duplicate_group_id
476285    230
534130      3
78584       3
88204       3
558429      3
         ... 
106810      2
623214      2
106808      2
623212      2
636876      2
Name: count, Length: 320043, dtype: int64

Based on the distribution above we can see that there is one cluster/group where 230 documents are all duplicates followed by many smaller clusters with 2/3 documents that are duplicates.

#### FuzzyDuplicateIds Results (List of duplicate docs to remove)
1. `_curator_dedup_id` - ID of docs in the removal list

In [7]:
duplicate_ids_path = os.path.join(fuzzy_output_dir, "FuzzyDuplicateIds")
duplicates_df = pd.read_parquet(duplicate_ids_path, storage_options=storage_options)
display(duplicates_df.head())

print(f"Number of duplicate documents found for removal: {len(duplicates_df)}")

Unnamed: 0,_curator_dedup_id
0,577
1,591
2,593
3,597
4,598


Number of duplicate documents found for removal: 320471


#### Checking that the duplicate ids list contains only one document per group

In [8]:
# As an example let's look at the group with the largest number of duplicates
largest_duplicate_cluster = grouped_cc_df.loc[duplicate_cluster_sizes.index[0]]

# number of docs in the removal list from this group
docs_to_remove_in_group = duplicates_df._curator_dedup_id.isin(largest_duplicate_cluster).sum()

print(f"Number of documents in the duplicate group: {len(largest_duplicate_cluster)}")
print(f"Number of documents in the removal list from the same group: {docs_to_remove_in_group}")
assert docs_to_remove_in_group == (len(largest_duplicate_cluster) - 1)  # noqa: S101

Number of documents in the duplicate group: 230
Number of documents in the removal list from the same group: 229


#### Advanced: Looking at examples of duplicate documents

1. This analysis involves re-reading the input data with the same ID mapping that was used during duplicate identification.
2. Merging the input data with the connected components results on the `_curator_dedup_id` column to associate each document which the duplicate group it belongs to which can be used for further analysis.

**NOTE**: This analysis approach is intended as an example for smaller datasets and only works for cases where the connected components dataframe is small and fits comfortable in memory. It is not recommended for larger datasets.

In [9]:
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.base import ProcessingStage
from nemo_curator.stages.resources import Resources
from nemo_curator.stages.text.io.reader import ParquetReader
from nemo_curator.tasks.document import DocumentBatch


class CustomMergeStage(ProcessingStage[DocumentBatch, DocumentBatch]):
    """
    Warning: This should not be attempted with large connected components results.
    A small stage that merges the input data (using the id's generated) with the connected components result.
    Works because CC results are small enough to fit per batch.
    """

    resources = Resources(cpus=1.0)

    def process(self, batch: DocumentBatch) -> DocumentBatch:
        df = batch.to_pandas().merge(cc_df, how="inner", on=[CURATOR_DEDUP_ID_STR])
        return DocumentBatch(
            task_id=batch.task_id, dataset_name=batch.dataset_name, data=df, _stage_perf=batch._stage_perf
        )


pipeline = Pipeline(
    name="Explore duplicates",
    stages=[
        ParquetReader(file_paths=input_dataset_path, blocksize="1GiB", _assign_ids=True, read_kwargs=io_kwargs),
        CustomMergeStage(),
    ],
)

In [None]:
from nemo_curator.core.client import RayClient
from nemo_curator.stages.deduplication.id_generator import create_id_generator_actor, kill_id_generator_actor

os.environ["RAY_ADDRESS"] = ""  # reset the ray address from the previous GPU cluster
client = RayClient(num_cpus=8)  # change as needed
client.start()

create_id_generator_actor(
    filepath=os.path.join(fuzzy_output_dir, "fuzzy_id_generator.json"), storage_options=storage_options
)
merged_results = pipeline.run()
merged_df = pd.concat([batch.to_pandas() for batch in merged_results]).sort_values("_duplicate_group_id")
kill_id_generator_actor()

In [13]:
display(merged_df[merged_df._curator_dedup_id.isin(largest_duplicate_cluster)])

Unnamed: 0,text,id,_curator_dedup_id,_duplicate_group_id
442448,,c5b4b736-7c1d-4811-b1fe-1245e408ef55,1470911,476285
300453,,9a6a840f-9d2c-4691-b95c-c3eee0183011,985385,476285
192037,,13caa084-5173-4708-bffa-63504097a2bb,630289,476285
442447,,e11a03ae-eacf-4f2b-bbb8-326ec98225c9,1470910,476285
192049,,d6d8106b-d5f8-4392-b4b7-06a8042fb4af,630301,476285
...,...,...,...,...
515976,,aff0af8a-b08b-4bc5-b520-63ba12803182,1720912,476285
373157,,0a593871-cb80-4cf1-bafb-72df442adf5a,1225085,476285
330555,,00748418-8f47-4115-9a6c-2193528bbaf9,1083885,476285
118054,,1edd5a53-dac5-425f-af45-618bba15d8b6,397188,476285


The largest cluster/group of duplicates in this dataset seems to be all documents with empty/no text.

Let's look at the second largest cluster of documents.

In [14]:
duplicates = merged_df[merged_df._curator_dedup_id.isin(grouped_cc_df.loc[duplicate_cluster_sizes.index[1]])]
display(duplicates)

print(f"\nDocument1\n----------\n{duplicates.iloc[0].text}")
print(f"\nDocument2\n----------\n{duplicates.iloc[1].text}")

Unnamed: 0,text,id,_curator_dedup_id,_duplicate_group_id
227663,Sara and Ben are friends. They like to play to...,44f5c27d-ea91-48c0-946d-e4eaca60114e,745466,534130
182026,Sara and Ben are friends. They like to play to...,479c928d-4033-47d6-9abe-d2ac9df008a1,600335,534130
371194,Sara and Ben are friends. They like to play to...,a8ec3d5a-8693-4985-8254-71b6fbc0393f,1218218,534130



Document1
----------
Sara and Ben are friends. They like to play together. One day, they go to the park with their moms. There are many things to play with in the park. There are swings, slides, seesaws and sand.

Sara and Ben run to the swings. They take turns to push each other. They go high and low, high and low. They laugh and shout. "Wee!" Sara says. "This is fun!" Ben says.

Then they see a big box near the slide. It is open. They are curious. They go to the box and look inside. There are many toys in the box. There are balls, dolls, cars, books and puzzles. They are jolly. They clap their hands. "Wow!" Sara says. "Look at all these toys!" Ben says.

They take out some toys and play with them. They roll the balls, dress the dolls, drive the cars, read the books and make the puzzles. They share and help each other. They are happy. They smile and hug.

Their moms come to the box and see them playing. They are proud. They smile and hug too. "You are good friends," Sara's mom says. 

In [15]:
client.stop()

[32m2025-12-09 03:16:50.913[0m | [1mINFO    [0m | [36mnemo_curator.core.client[0m:[36mstop[0m:[36m181[0m - [1mNeMo Curator has stopped the Ray cluster it started by killing the Ray GCS process. It is advised to wait for a few seconds before running any Ray commands to ensure Ray can cleanup other processes.If you are seeing any Ray commands like `ray status` failing, please ensure /tmp/ray/ray_current_cluster has correct information.[0m


### Conclusion
We were able to find and remove ~320_000 duplicate documents in a dataset of ~2.1 Million Rows 