<img src="./images/DLI_Header.png" style="width: 400px;">


# 2. Advanced Data Processing


In this notebook, we will use NeMo Curator to perform several crutial data cleaning steps, such as language detection and filtering, topic classification, and deduplication. 

This notebook is structured as follows:
- First, we will explore language detection and filtering to separate our multilingual dataset by language.
- Next, we will dive into topic classification to categorize the datasets into relevant themes.
- Finally, we will explore document deduplication, covering both exact and fuzzy methods.


**[2.1 Language Separation](#2.1-Language-Separation)<br>**
**[2.2 Domain Classification](#2.2-Domain-Classification)<br>**
**[2.3 Documents Deduplication](#2.3-Deduplication)<br>**




## CUDA Environment Setup (One-Time)

This cell creates symlinks so CuPy can find all CUDA headers in one location.  
**Run this once** per environment setup - it's safe to run multiple times.


In [1]:
import os
import sys
import subprocess
from pathlib import Path

# Setup CUDA paths for CuPy/cuDF compatibility
venv = Path(sys.prefix) / "lib" / "python3.12" / "site-packages" / "nvidia"

cuda_nvcc = venv / "cuda_nvcc"
cuda_nvcc_include = venv / "cuda_nvcc" / "include"
cuda_runtime_include = venv / "cuda_runtime" / "include"

# Create include directory if it doesn't exist
cuda_nvcc_include.mkdir(parents=True, exist_ok=True)

# Create symlinks to make all CUDA headers accessible in one location
print("Setting up CUDA headers...")
result = subprocess.run(
    f"ln -sf {cuda_runtime_include}/* {cuda_nvcc_include}/",
    shell=True, capture_output=True, text=True
)

if result.returncode == 0:
    print(f"‚úÖ CUDA headers symlinked: {cuda_runtime_include} -> {cuda_nvcc_include}")
else:
    print(f"‚ö†Ô∏è  Symlink failed, copying headers instead...")
    import shutil
    for header in cuda_runtime_include.glob("*.h"):
        shutil.copy2(header, cuda_nvcc_include / header.name)
    print(f"‚úÖ Copied {len(list(cuda_runtime_include.glob('*.h')))} header files")

# Verify cuda_fp16.h is accessible
if (cuda_nvcc_include / "cuda_fp16.h").exists():
    print(f"‚úÖ cuda_fp16.h accessible at: {cuda_nvcc_include}/cuda_fp16.h")
else:
    print(f"‚ùå ERROR: cuda_fp16.h not found!")

# Set environment variables
os.environ["CUDA_PATH"] = str(cuda_nvcc)
os.environ["CUDA_HOME"] = str(cuda_nvcc)
print(f"‚úÖ CUDA environment ready")


Setting up CUDA headers...
‚úÖ CUDA headers symlinked: /home/aibeceles/inferencing/.venv/lib/python3.12/site-packages/nvidia/cuda_runtime/include -> /home/aibeceles/inferencing/.venv/lib/python3.12/site-packages/nvidia/cuda_nvcc/include
‚úÖ cuda_fp16.h accessible at: /home/aibeceles/inferencing/.venv/lib/python3.12/site-packages/nvidia/cuda_nvcc/include/cuda_fp16.h
‚úÖ CUDA environment ready


***************
### Environment Setup



In [2]:
import warnings

# Ignore any warning
warnings.filterwarnings("ignore")

In [None]:
# GPU cluster note
# - This notebook uses a single-GPU Dask cluster (good default for 8GB GPUs)
# - We enable cuDF spilling to reduce OOM risk
# Run the next cell to start the cluster.

In [None]:
import os
import sys
from pathlib import Path
from nemo_curator.utils.distributed_utils import get_client, get_num_workers

# Enable CUDF spilling to prevent OOM errors on 8GB GPU
os.environ["CUDF_SPILL"] = "1"

def pre_imports():
    """Set CUDA environment and import cudf on each worker"""
    import os
    import sys
    from pathlib import Path
    
    # Set CUDA paths on worker (same as main process)
    venv = Path(sys.prefix) / "lib" / "python3.12" / "site-packages" / "nvidia"
    cuda_nvcc = venv / "cuda_nvcc"
    
    os.environ["CUDA_PATH"] = str(cuda_nvcc)
    os.environ["CUDA_HOME"] = str(cuda_nvcc)
    
    # Import cudf with CUDA environment set
    import cudf
    print(f"‚úì Worker ready - CUDA_PATH={os.environ.get('CUDA_PATH')}")

# Initialize GPU cluster with memory-safe settings
client = get_client(
    cluster_type="gpu", 
    set_torch_to_use_rmm=False  # Disable RMM for stability
)

print(f"‚úì GPU cluster ready!")
print(f"Number of dask workers: {get_num_workers(client)}")
print(f"üìä Dashboard: {client.dashboard_link}")

# Set up workers with CUDA environment
client.run(pre_imports)

## GPU Cluster Configuration

The GPU Dask cluster is started in the previous cell with settings tuned for an 8GB GPU:

- **1 GPU worker** to avoid fragmentation
- **cuDF spilling enabled**
- **RMM disabled** for stability

üí° Monitor GPU usage with `watch -n 1 nvidia-smi`.

In [None]:
# (intentionally blank)

Let's load the multilingual dataset.

In [None]:
from nemo_curator.datasets import DocumentDataset

multilingual_data_path = "./original_data"
multilingual_dataset = DocumentDataset.read_json(
    multilingual_data_path, add_filename=True
)

In [None]:
# check the data
multilingual_dataset.head()

## 2.1 Language Separation

In this section, we will use a language classification model by [fasttext](https://fasttext.cc/docs/en/language-identification.html). 


Let's first create the output folders and download the fasttext model for text language detection:


In [None]:
import os

language_base_output_path = "./curated/04_language_separation"
language_separated_output_path = os.path.join(language_base_output_path, "language")

# Create directories (with parents as needed)
os.makedirs(language_base_output_path, exist_ok=True)
os.makedirs(language_separated_output_path, exist_ok=True)


In [None]:
language_separated_output_path

Let's create the filter which uses the downloaded fasttext model.

In [None]:
# Download fasttext language classification model
import os
import urllib.request

model_path = "lid.176.bin"
if not os.path.exists(model_path):
    print("Downloading FastText language model (131 MB)...")
    url = "https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin"
    urllib.request.urlretrieve(url, model_path)
    print(f"‚úì Model downloaded to {model_path}")
else:
    print(f"‚úì FastText model already exists at {model_path}")

In [None]:
from nemo_curator import ScoreFilter
from nemo_curator.filters import FastTextLangId

lang_filter = FastTextLangId("lid.176.bin")
language_field = "language"
language_id_pipeline = ScoreFilter(
    lang_filter, score_field=language_field, score_type="object"
)

Now, let's apply the language detection filter on our multilingual dataset. 

In [None]:
# Apply language separation to our multilingual dataset
filtered_dataset = language_id_pipeline(multilingual_dataset)

Let's check the detected language for each sample. 

Notice the new fields `language` in the output with the language code `FR/EN/ES`and the classification score. 

In [None]:
# check the detected language per item
filtered_dataset.head(3)

Let's separate documents by the language label and save each language separately. This will create sub-folders for each languages under the output path.


In [None]:
# Save separated languages and get stats
from nemo_curator.utils.file_utils import separate_by_metadata

filtered_dataset.df[language_field] = filtered_dataset.df[language_field].apply(
    lambda score: score[1], meta=(language_field, "object")
)
language_stats = separate_by_metadata(
    filtered_dataset.df, language_separated_output_path, metadata_field=language_field
).compute()

In [None]:
# check the language distribution stats
print(f"Number of document:{len(multilingual_dataset)}")
print(f"Number of filtered document:{len(filtered_dataset)}")

print("Language separation stats and  ", language_stats)

We can check the output jsonl file per language.

In [None]:
# check first element for French
! head -n 1 {language_separated_output_path}/FR/file.jsonl | jq

In [None]:
# check first element for spanish
! head -n 1 {language_separated_output_path}/ES/file.jsonl |jq

## 2.2 Domain Classification

Nemo Curator supports various text classification models allowing data annotation, useful for cleaning and data blending. Check the documentation for [distributed data classification](https://github.com/NVIDIA/NeMo-Curator/blob/main/tutorials/distributed_data_classification/README.md).


Each classifier is available on Hugging Face Hub. When run with NeMo Curator, they are accelerated using RAPIDS [CrossFit](https://github.com/rapidsai/crossfit) library.


In this section, we will experiment with the `MultilingualDomainClassifier` a Multilingual Domain Classifier that support 52 languages and annotate 26 domain classes:

`Arts_and_Entertainment`, `Autos_and_Vehicles`, `Adult`,`Beauty_and_Fitness`, `Books_and_Literature`, `Business_and_Industrial`, `Computers_and_Electronics`, `Finance`, `Food_and_Drink`, `Games`, `Health`, `Hobbies_and_Leisure`, `Home_and_Garden`, `Internet_and_Telecom`, `Jobs_and_Education`, `Law_and_Government`, `News`, `Online_Communities`, `People_and_Society`, `Pets_and_Animals`, `Real_Estate`, `Science`, `Sensitive_Subjects`, `Shopping`, `Sports`, `Travel_and_Transportation`

The model architecture is a transformer-based encoder Deberta V3 Base available on Hugging Face Hub. Learn more about the classifier [MultilingualDomainClassifier Model's Card](https://huggingface.co/nvidia/multilingual-domain-classifier).


Let's set the output folder for domain classification.

In [None]:
import cudf
import dask_cudf
from nemo_curator.classifiers import MultilingualDomainClassifier

domain_output_path = "./curated/05_domain_classification"

# Create directory (with parents if needed)
os.makedirs(domain_output_path, exist_ok=True)

In [None]:
# (not needed anymore)
# CUDA headers are handled by the "CUDA Environment Setup" cell at the top of the notebook.

In [None]:
# (not needed anymore)
# Keeping this cell empty avoids confusing CuPy debug settings.

First, let's apply the Multilingual Domain Classifier on a toy multilingual dataset. Let's create the dataset with multiple languages and topics.

In [None]:
# Create sample DataFrame
text = [
    # French
    "Il adore les chats.",
    # English
    "Investing in index funds is a popular strategy for long-term financial growth.",
    # Spanish
    "Ir de compras en el centro comercial es una excelente manera de encontrar ofertas y descubrir nuevas tiendas.",
    # Polish
    "Dziƒôki wykorzystaniu analizy danych programy treningowe dla sportowc√≥w sta≈Çy siƒô bardziej wyrafinowane.",
    # Arabic
    ".ÿ™ŸÇÿØŸÖ ÿßŸÑÿ™ÿ∑Ÿàÿ±ÿßÿ™ ÿßŸÑÿ≠ÿØŸäÿ´ÿ© ŸÅŸä ÿßŸÑÿπŸÑÿßÿ¨ ÿßŸÑÿ¨ŸäŸÜŸä ÿ£ŸÖŸÑÿßŸã ÿ¨ÿØŸäÿØŸãÿß ŸÑÿπŸÑÿßÿ¨ ÿßŸÑÿßÿ∂ÿ∑ÿ±ÿßÿ®ÿßÿ™ ÿßŸÑŸàÿ±ÿßÿ´Ÿäÿ©",
]
df = cudf.DataFrame({"text": text})

toy_dataset = DocumentDataset(dask_cudf.from_cudf(df, npartitions=1))

We can define the `MultilingualDomainClassifier` filter as follows. 

On its first run, it will download the DeBERTa model from the Hugging Face Hub (~500MB).

**Memory Settings for 8GB GPU:**
- `batch_size=64` (reduced from 1024)
- `max_mem_gb=6` (limits memory during model fitting)

In [None]:
# Create the classifier with memory-safe settings for an 8GB GPU
#
# IMPORTANT: CrossFit fits a GPU "memory estimate curve" the first time it sees a model.
# On 8GB cards this can OOM during cleanup due to a PyTorch issue.
# We avoid that by pre-seeding CrossFit's cache (`mem_model.pkl`) with a small regression model.

import os
from pathlib import Path

import joblib
import numpy as np
from sklearn.linear_model import LinearRegression
from transformers import AutoConfig

from crossfit.dataset.home import CF_HOME
from nemo_curator.classifiers import MultilingualDomainClassifier

# Reduce allocator fragmentation risk
os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True,max_split_size_mb:128")

# Pre-seed CrossFit memory model cache so it doesn't run GPU calibration
base_model = "microsoft/mdeberta-v3-base"
model_cfg = AutoConfig.from_pretrained(base_model)
cache_dir = Path(CF_HOME) / "memory" / model_cfg._name_or_path
mem_model_path = cache_dir / "mem_model.pkl"
cache_dir.mkdir(parents=True, exist_ok=True)

# Delete old cache to force regeneration with conservative values
if mem_model_path.exists():
    mem_model_path.unlink()
    print("‚úì Removed old CrossFit memory cache")

# Features: [batch_size, seq_len, seq_len^2] -> predicted memory (MB)
# Use conservative values for 8GB GPU
X = np.array(
    [
        [1, 1, 1],
        [8, 128, 128**2],
        [16, 256, 256**2],
    ],
    dtype=np.float32,
)
y = np.array([256, 1024, 2048], dtype=np.float32)  # MB
mem_model = LinearRegression().fit(X, y)
joblib.dump(mem_model, mem_model_path)
print(f"‚úì Wrote CrossFit memory cache: {mem_model_path}")

# Clear GPU memory before starting
import gc
import torch
gc.collect()
torch.cuda.empty_cache()
print("‚úì GPU memory cleared")

# Now create the classifier with VERY conservative settings for 8GB GPU
domain_classifier = MultilingualDomainClassifier(
    batch_size=8,       # Very small batch size to avoid OOM
    max_chars=512,      # Shorter sequences use less memory
    autocast=True,      # Use mixed precision
    max_mem_gb=4,       # Conservative memory limit
)
print("‚úì Domain classifier created with batch_size=8, max_chars=512")

Now, let's run the filter on our multilingual multi topics toy samples.

In [None]:
%%time
result_domain = domain_classifier(dataset=toy_dataset)

Check the outputs. Notice the new field `domain_pred`. Example of expected outputs: 
```
Il adore les chats.	                                Pets_and_Animals
Investing in index funds is a popular strategy...	Finance
Ir de compras en el centro comercial es una ex...	Shopping
Dziƒôki wykorzystaniu analizy danych programy t...	Sports
.ÿ™ŸÇÿØŸÖ ÿßŸÑÿ™ÿ∑Ÿàÿ±ÿßÿ™ ÿßŸÑÿ≠ÿØŸäÿ´ÿ© ŸÅŸä ÿßŸÑÿπŸÑÿßÿ¨ ÿßŸÑÿ¨ŸäŸÜŸä ÿ£ŸÖŸÑÿßŸã ...	        Health
...
```

In [None]:
# check the results
result_domain.head()

Now, let's use the `MultilingualDomainClassifier` to process our previously filtered multilingual corpus (French and Spanish).

In [None]:
# load the filtered data
from nemo_curator.datasets import DocumentDataset

multilingual_data_path = "./curated/01_clean_and_unify"
multilingual_dataset = DocumentDataset.read_json(multilingual_data_path, backend="cudf")

# Domain classification
multilingual_result_domain = domain_classifier(dataset=multilingual_dataset)

Let's check the output. Expected to see an aditional field `domain_pred`:
```
text                                            		domain_pred
Dragon Ball: Le 20e film de la sage sortira le...		Arts_and_Entertainment
Cours D'histoire Des √âtats Europ√©ens: Depuis L...		Books_and_Literature
Se realiz√≥ una jornada de promoci√≥n del buentr...		People_and_Society
...
```

Execute the following cell to review the topic predictions:


In [None]:
# check the domain classification
multilingual_result_domain.head()

Let's now save the output.

In [None]:
# save
result_domain.to_json(domain_output_path)

We can check the saved outputs by executing the next cell:

In [None]:
! head -n 1 {domain_output_path}/0.part | jq

## 2.3 Deduplication

Document-level deduplication aims to reduce the occurrence of duplicate and near-duplicate documents in a dataset. This is crucial for datasets cleaning, reducing redundancy, and ensuring that models are trained on diverse and unique data.

In this section, we will explore both the Exact and Fuzzy deduplication. Both functionalities are supported in NeMo Curator and accelerated using the [RAPIDS](https://rapids.ai/) library.


Remember, we created our multilingual (Spanish and French) dataset by deduplicating each sample once.
Before running deduplication, we need to ensure that each document in the dataset has a unique ID. We can use the `add_id` module within NeMo Curator to accomplish this.

In [None]:
# create output folders
from nemo_curator import AddId

data_dir = "curated/06_add_id"
added_id_output_path = os.path.join(data_dir, "add_id/cleaned")
!mkdir -p {data_dir}

dataset_fr = DocumentDataset.read_json(
    os.path.join(language_separated_output_path, "FR/"), add_filename=True
)
dataset_es = DocumentDataset.read_json(
    os.path.join(language_separated_output_path, "ES/"), add_filename=True
)

### 2.3.1 Add Unique ID

Let's start by adding a unique ID for out dataset separated per language (Spanish and French)  

Let's run the `AddId` on the French corpus by running the next cell. The Format of output ID will be `<prefix>_<id>` where `prefix` is provided and `id` is a generated unique number. 

Let's apply the `AddId` function to the French corpus by running the next cell. The output ID format will be `<prefix>_<id>`, where `prefix` is specified by the user, and `id` is a uniquely generated number.


Example of expected output:
```
text	                                         		id
Dragon Ball: Le 20e film de la sage sortira le...		FR_data-0000000000
Cours D'histoire Des √âtats Europ√©ens: Depuis L...		FR_data-0000000001
...
```

Execute the following cell to apply `AddId` to the French corpus, user prefix here is set to `FR_data`:

In [None]:
%%time
# Define user's prefix
FR_add_ID_id_prefix = "FR_data"

add_id = AddId(id_field="id", id_prefix=FR_add_ID_id_prefix, start_index=0)
id_dataset_fr = add_id(dataset_fr)

Let's check the outputs. Notice the new field `id`.

In [None]:
# check outputs
id_dataset_fr.head(3)

We can save the outputs in their designated folder.

In [None]:
id_dataset_fr.to_json(os.path.join(added_id_output_path, "FR/"), write_to_filename=True)

#### Exercice:  Add Unique ID for Spanish data.
Make sure to replace the `# Your code here`. If you get stuck, refer to the solution below.

In [None]:
ES_add_ID_id_prefix = # Your code here

add_id = AddId(id_field="id", id_prefix=ES_add_ID_id_prefix, start_index=0)
id_dataset_es = # Your code here

# save to relevant folder
id_dataset_es.to_json(os.path.join(added_id_output_path, "ES/"), write_to_filename=True)

In [None]:
# solution
ES_add_ID_id_prefix = "ES_data"

add_id = AddId(id_field="id", id_prefix=ES_add_ID_id_prefix, start_index=0)
id_dataset_es = add_id(dataset_es)

# save to relevant folder
id_dataset_es.to_json(os.path.join(added_id_output_path, "ES/"), write_to_filename=True)

### 2.3.2 Exact Deduplication

Exact Deduplication consists in identifying and removing duplicate documents that are exactly identical within a dataset. This process helps eliminate redundant data, prevents models from overfitting on repeated examples, and ensures that training and test sets do not contain the same samples, which could otherwise lead to misleading evaluation metrics.

In [NeMo Curator](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/gpudeduplication.html), exact deduplication works by hashing each document and keeping only one document per hash, and it can be run on both GPU ([CuDF](https://docs.rapids.ai/api/cudf)) and CPU ([Pandas](https://pandas.pydata.org/)) based backends.


Let's create the folders for the exact deduplication. We will save the output results in `/data`, temporary files in `/cache`, and logs in `/log`.


In [None]:
data_dir_es = "curated/07_Deduplicate/exact/ES"

exact_dedup_log_dir_es = os.path.join(data_dir_es, "log")
exact_dedup_cache_dir_es = os.path.join(data_dir_es, "cache")
exact_dedup_output_dir_es = os.path.join(data_dir_es, "data")

# Create all required directories
os.makedirs(exact_dedup_log_dir_es, exist_ok=True)
os.makedirs(exact_dedup_cache_dir_es, exist_ok=True)
os.makedirs(exact_dedup_output_dir_es, exist_ok=True)

Before running exact deduplication in NeMo Curator, the dataset needs to present a unique ID for each document (sample). We already added these unique IDs in the previous step in the field `"id"`.

We will be running the exact deduplication on the GPU using cudf backend.

In [None]:
id_field = "id"
input_dataset_es = DocumentDataset.read_json(
    os.path.join(added_id_output_path, "ES/"), backend="cudf", add_filename=True
)

Execute the next cell to run the exact deduplication on the Spanish dataset. This should take about 10 seconds to process.

We can use `perform_removal=True` to apply the duplicate removal directly on the dataset. But, for the sake of this exercise, we will first show the deduplication identifification before actually applying the removal.

In [None]:
%%time
from nemo_curator.modules import ExactDuplicates

# run exact deducplicate
exact_dup_es = ExactDuplicates(
    logger=exact_dedup_log_dir_es,
    id_field="id",
    text_field="text",
    hash_method="md5",
    cache_dir=exact_dedup_cache_dir_es,
)
duplicates_es = exact_dup_es(dataset=input_dataset_es)
exact_docs_to_remove_es = duplicates_es.df.map_partitions(
    lambda x: x[x._hashes.duplicated(keep="first")]
)

Check how many detected documents have duplicates:

In [None]:
print(f"Number of documents in the original data:{len(input_dataset_es)}")
print(f"Number of documents to be removed:{len(exact_docs_to_remove_es)}")

Check some duplicate documents: 

Example of output: 
```
     id                  _hashes
18   ES_data-0000000146 2f610eed57653fbe68328fbaf3274c2a
20   ES_data-0000000148  e473009ec2e1a246de93fea08488ca4c
21   ES_data-0000000149  066347c8a96bc73056a9f172e4d9710

```

In [None]:
exact_docs_to_remove_es.head(3)

Now, apply the deduplication removal and save the results to the output data folder.

In [None]:
result_es = input_dataset_es.df[
    ~input_dataset_es.df[id_field].isin(exact_docs_to_remove_es[id_field].compute())
]
DocumentDataset(result_es).to_json(exact_dedup_output_dir_es, write_to_filename=True)

Check saved output file.

In [None]:
! head -n 1 {exact_dedup_output_dir_es}/file.jsonl |jq

#### Exercice: Run Exact Desuplication for the French data.

Run the same exact deduplication for the French data. 

Let's first create the relevant folders and set the dataset and id field.

In [None]:
data_dir_fr = "curated/07_Deduplicate/exact/FR"

exact_dedup_log_dir_fr = os.path.join(data_dir_fr, "log")
exact_dedup_cache_dir_fr = os.path.join(data_dir_fr, "cache")
exact_dedup_output_dir_fr = os.path.join(data_dir_fr, "data")
!mkdir -p {exact_dedup_log_dir_fr}
!mkdir -p {exact_dedup_cache_dir_fr}
!mkdir -p {exact_dedup_output_dir_fr}

id_field = "id"
input_dataset_fr = DocumentDataset.read_json(
    os.path.join(added_id_output_path, "FR/"), backend="cudf", add_filename=True
)

Run the deduplication. Make sure to replace the `# Your code here`. If you get stuck, refer to the solution below.

In [None]:
# run exact deduplicate
exact_dup_fr = # Your code here
duplicates_fr = # Your code here
exact_docs_to_remove_fr = # Your code here

In [None]:
# solution
# run exact deducplicate
exact_dup_fr = ExactDuplicates(
    logger=exact_dedup_log_dir_fr,
    id_field="id",
    text_field="text",
    hash_method="md5",
    cache_dir=exact_dedup_cache_dir_fr,
)

duplicates_fr = exact_dup_fr(dataset=input_dataset_fr)
exact_docs_to_remove_fr = duplicates_fr.df.map_partitions(
    lambda x: x[x._hashes.duplicated(keep="first")]
)

Check how many detected documents have duplicates:

In [None]:
print(f"Number of documents in the original data:{len(input_dataset_fr)}")
print(f"Number of documents to be removed:{len(exact_docs_to_remove_fr)}")

Now, apply the deduplication removal and save the results to the output data folder.

In [None]:
result_fr = input_dataset_fr.df[
    ~input_dataset_fr.df[id_field].isin(exact_docs_to_remove_fr[id_field].compute())
]
DocumentDataset(result_fr).to_json(exact_dedup_output_dir_fr, write_to_filename=True)

In [None]:
client.cluster.close()
client.shutdown()

In [None]:
import IPython

IPython.Application.instance().kernel.do_shutdown(True)  # automatically restarts kernel

### 2.3.3 Fuzzy Deduplication

Removing near-duplicates is referred to as fuzzy deduplication at the document level, which is based on Jaccard similarity scores.

This approach can be broken down into the following stages:
- **Stage 1 - Minhash + LSH:** The first step involves generating MinHash signatures for the documents. NeMo Curator currently supports character-based n-grams for MinHashing. Then, the Locality Sensitive Hashing (LSH) is performed to identify candidate duplicates.
- **Stage 2 - LSH Buckets to Graph edgelist:** LSH buckets are directly converted to edges for the connected components computation.
- **Stage 3 - Connect Components:** Since LSH is an approximate method, documents that are near duplicates may end up in different buckets, with some overlapping documents between them. A GPU-accelerated connected components algorithm is used to identify all connected components in the graph formed by the edges between documents within the same bucket. The output of this step is a list of document IDs and the groups they belong to.

All documents within the same group are considered near duplicates, and results can then be used to remove them from the corpus.
For more information, refer to the Deduplication documentation of [NeMo Curator](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/gpudeduplication.html).


There are no near-duplicates in out example datasets. However, to demonstrate the process, let's run fuzzy deduplication on the French dataset and go through the steps involved.

Let's create fisrt the output folder.

In [3]:
import os

fuzzy_dedup_log_dir_fr = "curated/07_Deduplicate/fuzzy_wrapper/FR"
os.makedirs(fuzzy_dedup_log_dir_fr, exist_ok=True)

data_dir = "curated/06_add_id"
added_id_output_path = os.path.join(data_dir, "add_id/cleaned")
os.makedirs(added_id_output_path, exist_ok=True)  # Creates "curated/06_add_id/add_id/cleaned"

## Restart GPU Cluster for Fuzzy Deduplication

Let's start a fresh Dask GPU cluster optimized for fuzzy deduplication. Make sure the previous cluster is stopped.

**Configuration:**
- **1 GPU worker** (optimal for 8GB GPU)
- **Device memory limit: 6GB** (leaves headroom for system operations)
- **CUDF spilling enabled** for handling large graphs

In [4]:
import os
from dask.distributed import Client
from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from

cudf = gpu_only_import("cudf")
dask_cudf = gpu_only_import("dask_cudf")
LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster")

# Create GPU cluster optimized for 8GB GPU memory
cluster = LocalCUDACluster(
    n_workers=1,                    # Single worker for 8GB GPU
    device_memory_limit="6GB",      # Leave 2GB headroom
    rmm_pool_size="6GB",            # RMM memory pool
    protocol="tcp",                 # Use TCP for stability
)
client = Client(cluster)

print(f"‚úì Fuzzy dedup GPU cluster ready!")
print(f"Workers: {len(cluster.workers)}")
print(f"üìä Dashboard: {client.dashboard_link}")

‚úì Fuzzy dedup GPU cluster ready!
Workers: 1
üìä Dashboard: http://127.0.0.1:8787/status


In [5]:
# Enable CUDF spilling for memory-intensive operations
os.environ["CUDF_SPILL"] = "1"
print("‚úì CUDF spilling enabled - large operations will spill to disk if needed")

‚úì CUDF spilling enabled - large operations will spill to disk if needed


We will use the `FuzzyDuplicates` method from NeMo Curator to run the fuzzy deduplication process on the French dataset. This will allow us to identify and handle any near-duplicates based on similarity scores.

You should see the three stages logged during the process.

In [6]:
fuzzy_dedup_log_dir_fr = "curated/07_Deduplicate/fuzzy_wrapper/FR"

data_dir = "curated/06_add_id"
added_id_output_path = os.path.join(data_dir, "add_id/cleaned")
input_fr = os.path.join(added_id_output_path, "FR/file.jsonl")

In [7]:
from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig
from nemo_curator.datasets import DocumentDataset

# Memory-optimized configuration for 8GB GPU
config = FuzzyDuplicatesConfig(
    cache_dir=fuzzy_dedup_log_dir_fr,  # must be cleared between runs
    id_field="id",
    text_field="text",
    seed=42,
    char_ngrams=24,
    num_buckets=20,
    hashes_per_bucket=13,
    use_64_bit_hash=False,
    buckets_per_shuffle=1,              # Reduced from 2 for memory efficiency
    false_positive_check=False,
)

print("Initializing fuzzy deduplication...")
# Initialize the deduplication object
FuzzyDups = FuzzyDuplicates(config=config, logger="./")

# Load the dataset
print("Loading French dataset...")
dataset_fr = DocumentDataset.read_json(
    input_files=input_fr,
    backend="cudf",  # FuzzyDuplicates only supports datasets with the cuDF backend.
)

# Run Fuzzy Duplicate detection
print("Running fuzzy deduplication (this may take a few minutes)...")
duplicate_docs = FuzzyDups(dataset_fr)
print("‚úì Fuzzy deduplication complete!")

Initializing fuzzy deduplication...
Loading French dataset...
Reading 1 files with blocksize='1gb' / files_per_partition=None
Running fuzzy deduplication (this may take a few minutes)...
Stage 1: Starting Minhash + LSH computation
Stage 1: Minhash + LSH complete!
Stage 2: Starting LSH Buckets to Graph Edgelist
Stage 2: Starting LSH Buckets to Graph Edgelist Complete!
Stage 3: Connected Components across buckets
Stage 3: Connected Components across buckets complete!
Reading 1 files with blocksize=None / files_per_partition=1
‚úì Fuzzy deduplication complete!


The result from the connected components stage is a list of document IDs and the group they belong to. All documents in the same group are considered near duplicates. 

```
id	                group
FR_data-0000000062	46
FR_data-0000000013	47
FR_data-0000000104	160
FR_data-0000000185	161
FR_data-0000000155	65
...
```
Let's check the outputs. Notice the `group` field.

In [8]:
duplicate_docs.head(3)

Unnamed: 0,group,id
0,68,FR_data-0000000074
1,71,FR_data-0000000028
2,177,FR_data-0000000036


These groups can be then used to remove the near duplicates from the corpus.

Let's run that by executing the next cell.

In [9]:
docs_to_remove = duplicate_docs.df.map_partitions(
    lambda x: x[x.group.duplicated(keep="first")]
)
result = dataset_fr.df[~dataset_fr.df["id"].isin(docs_to_remove["id"].compute())]

Check how many detected documents have duplicates:

In [10]:
print(f"Number of documents in the original data : {len(dataset_fr)}")
print(f"Number of documents to be removed : {len(result)}")

Number of documents in the original data : 194
Number of documents to be removed : 97


#### [Optional] Explore further Deduplication on downstream tasks

Large Language Models are typically evaluated based on their performance on downstream tasks using unseen test data. However, when working with extensive datasets, there is a risk of test data leaking into the model's training set. 

To mitigate this, NeMo Curator provides a Decontamination strategy, in order to ensure that any document sections appearing in downstream tasks are removed from the training set. 

You can explore this in more detail in the [task decontamination](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/taskdecontamination.html) of NeMo Curator documentation. 

---
<h2 style="color:green;">Congratulations!</h2>


In this notebook, you have used NeMo Curator to apply several data cleaning steps, including language detection and filtering, topic classification and document deduplication. These steps help ensure that the dataset is clean, diverse, and free from redundant data, improving the quality of the data used for training and evaluation.

Before moving on to the next notebook, make sure to stop the Dask cluster. Please run the next cell.

In [11]:
import IPython

IPython.Application.instance().kernel.do_shutdown(True)  # automatically restarts kernel

{'status': 'ok', 'restart': True}

Move to the next notebook to explore synthetic data generation with NeMo Curator. This will allow us to learn how to create artificial data for various tasks, enhancing the diversity and richness of our dataset.

Let's move to the [synthetic_data_generation](03_synthetic_data_generation.ipynb).

<img src="./images/DLI_Header.png" style="width: 400px;">
