## Processing High-Quality Hindi Data: Data Curation with NVIDIA NeMo Curator


Open-source [large language models (LLMs)](https://www.nvidia.com/en-us/glossary/large-language-models/) excel in English but struggle with other languages, especially in South Asia. This is primarily due to a lack of training data in these languages, limited understanding of local cultures, and insufficient tokens to capture unique linguistic structures and expressions. To fully meet customer needs, enterprises in non-English-speaking countries must go beyond generic models and customize them to capture the nuances of their local languages, ensuring a seamless and impactful customer experience.

In this tutorial, we will use NeMo Curator to process high-quality Hindi data. We will guide you through the data curation pipeline used and share sample code for each stage.


## Table of Contents
- **1. [Prerequisites and Environment setups](#prerequisites-and-environment-setups)**
- **2. [Data Collecting](#data-collecting)**
- **3. [Data Curation flow](#data-curation-flow)**
    - a. [Unicode reformatting](#unicode-reformatting)
    - b. [Adding Custom IDs to Documents](#adding-custom-ids-to-documents)
    - c. [Exact deduplication](#exact-deduplication)
    - d. [Heuristic Quality Filtering](#heuristic-quality-filtering)
    - e. [Classifier-based Quality Filtering](#classifier-based-quality-filtering)


## Prerequisites and Environment setups


Install NeMo Curator by following the instructions to install the CPU and CUDA-accelerated modules in the README file of the [NeMo Curator repository](https://github.com/NVIDIA/NeMo-Curator/tree/main).


Next, install these additional packages:


In [None]:
!pip install datasets
!pip install jsonlines

To proceed with data processing, we need to set up a Dask environment. Dask is a flexible, open-source library that enables parallel and distributed computing in Python, allowing us to scale computations across multiple cores or even clusters. By distributing tasks, Dask makes the data handling process significantly faster and more efficient.


**Note:** This notebook was run on a single DGX A100 GPU, with a 128-core CPU and 2TB of RAM to handle the dataset size. Depending on your dataset and computing resources, you may need to adjust the Dask worker configuration below accordingly.


In [1]:
from dask.distributed import Client, LocalCluster

# Start a Dask cluster with 12 workers, each limited at 64GB of memory.
# You might need to adjust these numbers according to your computing resources.
cluster = LocalCluster(n_workers=12, processes=True, memory_limit="80GB")
client = Client(cluster)


Perhaps you already have a cluster running?
Hosting the HTTP server on port 35615 instead


## Data Collecting


Each dataset is accessed and downloaded using the Hugging Face Hub. For OSCAR (the Hindi subset dataset, version 23.01, an aggregation of web-crawled data), you need to accept the conditions on the [dataset page](https://huggingface.co/datasets/oscar-corpus/OSCAR-2301) and use a [Hugging Face access token](https://huggingface.co/docs/hub/en/security-tokens) for downloading.


**Download and Convert Datasets to Parquet**

The conversion of dataset into Parquet format facilitates efficient handling and processing of large datasets.


In [2]:
!pip install huggingface

[0m

In [3]:
!huggingface-cli login --token <hf_token>

The token has not been saved to the git credentials helper. Pass `add_to_git_credential=True` in this function directly or `--add-to-git-credential` if using via `hf`CLI if you want to set the git credential as well.
Token is valid (permission: read).
The token `nv` has been saved to /root/.cache/huggingface/stored_tokens
Your token has been saved to /root/.cache/huggingface/token
Login successful.
The current active token is: `nv`


In [4]:
import os

from datasets import DownloadConfig
from datasets import load_dataset as load_hf_dataset

data_dir = "./datasets/"
download_config = DownloadConfig(num_proc=8)
# Define paths for raw data
raw_data_directory = os.path.join(data_dir, "raw")
# Load and save Hindi Wikipedia dataset
# In this experiment, we'll focus exclusively on the Wikipedia dataset to have a faster runtime and streamline the process.
ds = load_hf_dataset("wikimedia/wikipedia", "20231101.hi")
ds["train"].to_parquet(os.path.join(data_dir, "wiki_hi_231101.parquet"))

# # Load and save Sangraha Hindi corpus (AI4Bharat's large-scale Hindi dataset)
# # This is a high-quality Hindi dataset with 34.5 billion tokens
# ds = load_hf_dataset("ai4bharat/sangraha", data_dir="verified/hin", split="train[:100000]")
# ds.to_parquet(os.path.join(data_dir, "sangraha_hindi.parquet"))

# Load and save OSCAR Hindi dataset
# ds = load_hf_dataset(
#     "oscar-corpus/OSCAR-2301",
#     language="hi",
#     token=True,  # Requires HuggingFace token
#     download_config=download_config,
#     trust_remote_code=True,
#     split="train[:50000]"  # Taking a subset for demo
# )
# ds.to_parquet(os.path.join(data_dir, "oscar_hi.parquet"))

# Load and save C4 multilingual Hindi dataset
# ds = load_hf_dataset(
#     "allenai/c4",
#     data_files="multilingual/c4-hi.*.json.gz",
#     download_config=download_config,
#     trust_remote_code=True,
#     split="train[:30000]"  # Taking a subset for demo
# )
# ds.to_parquet(os.path.join(data_dir, "c4_hi.parquet"))

# Load and save Hindi news dataset from IndicNLP suite
# Using a Hindi news corpus for diverse content
# ds = load_hf_dataset("ai4bharat/IndicNLPNews", "hi", split="train[:25000]")
# ds.to_parquet(os.path.join(data_dir, "hindi_news.parquet"))


  from .autonotebook import tqdm as notebook_tqdm


Data already exists


Creating parquet from Arrow format: 100%|██████████| 164/164 [00:01<00:00, 91.35ba/s] 


672817362

**Combine and Standardize Format**

We then combine them into a single dataset, keeping only the "text" column.


In [5]:
ds

DatasetDict({
    train: Dataset({
        features: ['id', 'url', 'title', 'text'],
        num_rows: 163093
    })
})

In [7]:
from datasets import concatenate_datasets
if not os.path.exists(raw_data_directory):
    # Combine datasets and standardize format
    datasets=["datasets/wiki_hi_231101.parquet"]
    # datasets = [
    #     os.path.join(data_dir, file)
    #     for file in ["wiki_hi_231101.parquet", "c4_hi.parquet", "oscar_hi.parquet", "sangraha_hindi.parquet", "hindi_news.parquet"]
    # ]
    
    data_files = {"train": datasets[0]}
    ds = load_hf_dataset("parquet", data_files=data_files)
    ds = ds["train"].remove_columns([col for col in ds["train"].column_names if col != "text"])
    
    for d in datasets[1:]:
        ds_ = load_hf_dataset("parquet", data_files={"train": d})
        ds_ = ds_["train"].remove_columns([col for col in ds_["train"].column_names if col != "text"])
        ds = concatenate_datasets([ds, ds_])
else:
    print('Data already standardized')

Data already standardized


**Shard the Combined Dataset**

The combined dataset is then sharded into smaller chunks. Sharding is performed to distribute the data evenly across multiple workers in the Dask cluster, facilitating efficient parallel processing during the data curation stages.


In [8]:

if not os.path.exists(raw_data_directory):
    # Shard the dataset
    num_shards = 256
    for shard_idx in range(num_shards):
        shard = ds.shard(index=shard_idx, num_shards=num_shards)
        shard.to_parquet(os.path.join(raw_data_directory, f"{shard_idx}.parquet"))
else:
    print('Data already sharded')

Data already sharded


## Data Curation flow


### Unicode reformatting


Unicode reformatting is an essential preprocessing step to ensure that text data is standardized and free of encoding errors, which are common in web-crawled datasets. This is particularly important for Hindi text which uses Devanagari script.


In [9]:
from nemo_curator import Modify
from nemo_curator.datasets import DocumentDataset
from nemo_curator.modifiers import UnicodeReformatter
from nemo_curator.utils.distributed_utils import read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under

# Define paths for Unicode formatted data
unicode_formatted_output_path = os.path.join(data_dir, "formatted")


# Load the raw data
def load_dataset(input_data_dir: str, file_type: str = "parquet") -> DocumentDataset:
    files = list(get_all_files_paths_under(input_data_dir))
    # print(files)
    raw_data = read_data(files, file_type=file_type, backend="pandas", add_filename=True)
    return DocumentDataset(raw_data)

if not os.path.exists(unicode_formatted_output_path):
    raw_data = load_dataset(raw_data_directory, file_type="parquet")
    print("Running reformatter now")
    # Initialize the Unicode reformatter
    cleaner = Modify(UnicodeReformatter())
    
    # Apply Unicode reformatting
    cleaned_data = cleaner(raw_data)
    print("Finished reformatting and saving to disk now")
    # Save the cleaned data to disk
    write_to_disk(cleaned_data.df, unicode_formatted_output_path, write_to_filename=True, output_type="parquet")


### Adding Custom IDs to Documents


Before proceeding with further curation steps, it is advisable to preprocess the dataset by adding a unique ID to each document. These IDs serve as trackers that help in identifying duplicate or low-quality documents throughout the curation process, ensuring that each document remains uniquely identifiable throughout processing. <br>

NeMo Curator offers an `AddId` class, which allows users to insert custom IDs into documents using a specified prefix format, such as `<prefix>_<id>`.


In [10]:
from nemo_curator import AddId

# Define paths for input data and output with added IDs
add_id_input_data_dir = unicode_formatted_output_path
added_id_output_path = os.path.join(data_dir, "add_id")
add_id_prefix = "HI_"  

# Load the formatted dataset
dataset = DocumentDataset.read_parquet(add_id_input_data_dir)

# Initialize the AddId class with a specified prefix and start index
add_id = AddId(id_field="id", id_prefix=add_id_prefix, start_index=0)

# Apply the ID addition to the dataset
id_dataset = add_id(dataset)
# print(id_dataset.df)
# Save the dataset with added IDs to disk
write_to_disk(id_dataset.df, output_path=added_id_output_path, write_to_filename=False, output_type="parquet")


Reading 256 files with blocksize='1gb' / files_per_partition=None


  return read_data_blocksize(


Writing to disk complete for 1 partition(s)


### Exact deduplication


Exact deduplication removes identical duplicates from the dataset. By eliminating exact duplicates, we ensure that each data point contributes uniquely to the training process, enhancing the diversity and overall quality of the dataset.


In this stage, we'll leverage GPU acceleration by utilizing a Dask CUDA cluster. Since the current cluster is CPU-based, we need to shut it down and start a new one with GPU support.

To close the existing cluster:


In [11]:
client.cluster.close()
client.shutdown()


Then, to initialize the GPU Dask cluster:


In [12]:
from nemo_curator.utils.distributed_utils import get_client


def pre_imports() -> None:
    import cudf  # noqa: F401


client = get_client(cluster_type="gpu", set_torch_to_use_rmm=False)
client.run(pre_imports)


Perhaps you already have a cluster running?
Hosting the HTTP server on port 42025 instead


cuDF Spilling is enabled


{'tcp://127.0.0.1:36139': None}

**Below is the implementation for exact deduplication:**


Imports and directory preparation:


In [13]:
import os

from nemo_curator.datasets import DocumentDataset
from nemo_curator.modules import ExactDuplicates

# Define input and output paths
exact_dedup_input_dataset_dir = added_id_output_path
exact_dedup_base_output_path = os.path.join(data_dir, "exact_dedup")
exact_dedup_log_dir = os.path.join(exact_dedup_base_output_path, "log")
exact_dedup_output_dir = os.path.join(exact_dedup_base_output_path, "data")
deduped_output_dir = os.path.join(data_dir, "remove_duplicate")

# Create directories for logs and output
!mkdir -p {exact_dedup_log_dir}
!mkdir -p {exact_dedup_output_dir}
!mkdir -p {deduped_output_dir}


Set parameters and load dataset:


In [14]:
# Parameters for ExactDuplicates
exact_dedup_dataset_id_field = "id"
exact_dedup_dataset_text_field = "text"

# Load the input dataset
input_dataset = DocumentDataset.read_parquet(exact_dedup_input_dataset_dir, backend="cudf")


Reading 1 files with blocksize='1gb' / files_per_partition=None


  return read_data_blocksize(


Initialize and run deduplication:


In [15]:
# Initialize and run exact deduplication
exact_dup = ExactDuplicates(
    logger=exact_dedup_log_dir,
    id_field=exact_dedup_dataset_id_field,
    text_field=exact_dedup_dataset_text_field,
    hash_method="md5",
    cache_dir=exact_dedup_output_dir,
)
duplicates = exact_dup(dataset=input_dataset)

print(f"Number of exact duplicate files: {len(duplicates)}")
print(f"Total remaining documents: {len(input_dataset) - len(duplicates)}")

  super().__init__(
  duplicates = self.identify_duplicates(dataset)


Reading 1 files with blocksize=None / files_per_partition=1
Number of exact duplicate files: 5039
Total remaining documents: 163093


  raw_data = read_data(


Remove duplicates and save final dataset:


In [None]:
# Load the dataset and exact duplicates to identify and remove duplicate IDs
input_dataset = DocumentDataset.read_parquet(added_id_output_path, backend="cudf")
exact_duplicates = DocumentDataset.read_parquet(
    os.path.join(exact_dedup_output_dir, "_exact_duplicates.parquet"),
    backend="cudf",
)

# Extract list of duplicate document IDs
exact_docs_to_remove = exact_duplicates.df.map_partitions(
    lambda x: x[x._hashes.duplicated(keep="first")],  # noqa: SLF001
)

# Remove duplicated documents from the input dataset
result = input_dataset.df[
    ~input_dataset.df[exact_dedup_dataset_id_field].isin(exact_docs_to_remove[exact_dedup_dataset_id_field].compute())
]

# Save the final deduplicated dataset
write_to_disk(result, output_path=deduped_output_dir, write_to_filename=False, output_type="parquet")


Close the GPU Dask cluster:


In [19]:
client.cluster.close()
client.shutdown()


### Heuristic Quality Filtering


Heuristic quality filtering is designed to enhance the quality of the dataset by removing low-quality content based on predefined heuristics. This approach involves applying a series of filters to the dataset to eliminate undesirable data characteristics such as excessive special characters, overly short or long texts, or other criteria that could negatively impact model performance.

We use a YAML file to define the heuristic filters. The configuration can be found [here](https://raw.githubusercontent.com/NVIDIA/NeMo-Curator/main/config/heuristic_filter_non-en.yaml). This file lists the filtering criteria and settings used to build a filter pipeline. You can customize the filters or change thresholds based on your needs for Hindi text processing. The `filter_pipeline` helper reads the YAML settings and applies each filter to the dataset step by step.


Recreate a CPU Dask cluster:


In [20]:
# Start a Dask cluster with 12 workers, each limited at 64GB of memory.
# You might need to adjust these numbers according to your computing resources

cluster = LocalCluster(n_workers=12, processes=True, memory_limit="64GB")
client = Client(cluster)


Perhaps you already have a cluster running?
Hosting the HTTP server on port 32993 instead


In [21]:
deduped_output_dir

'./datasets/remove_duplicate'

In [22]:
import warnings

from nemo_curator.utils.config_utils import build_filter_pipeline
from nemo_curator import ScoreFilter
from nemo_curator.filters.heuristic_filter import WordCountFilter


HF_input_data_dir = deduped_output_dir
HF_output_path = os.path.join(data_dir, "heuristic_filtering")

# Create a directory for the configuration file if it doesn't exist
os.makedirs("config", exist_ok=True)
# Download the YAML configuration file for heuristic filtering (non-English version suitable for Hindi)
# !wget https://raw.githubusercontent.com/NVIDIA-NeMo/Curator/refs/tags/v0.7.0/config/heuristic_filter_non-en.yaml -O ./config/heuristic_filter_non-en.yaml

# Specify the path to the configuration file
filter_config_file = "./config/heuristic_filter_non-en.yaml"
os.makedirs(HF_output_path, exist_ok=True)
# Load dataset
print(f"Loading dataset from {HF_input_data_dir}...")
dataset = DocumentDataset.read_parquet(
    HF_input_data_dir,
    backend="pandas",
    add_filename=False
)

print(f"Loaded {len(dataset)} documents")

# Filter out 0-word documents FIRST, before YAML filters

print("\nPre-filtering: Removing documents with 0 words...")

# Use NeMo's own WordCountFilter with min_words=1
min_word_filter = ScoreFilter(
    WordCountFilter(min_words=10, max_words=1000000000, lang="en"),
    text_field="text",
)

# Apply and PERSIST to force execution NOW
dataset = min_word_filter(dataset)

print(f"After removing 0-word docs: {len(dataset)} documents")



Loading dataset from ./datasets/remove_duplicate...
Reading 1 files with blocksize='1gb' / files_per_partition=None
Loaded 158516 documents

Pre-filtering: Removing documents with 0 words...


  return read_data_blocksize(
You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('text', 'int64'))



After removing 0-word docs: 155825 documents


In [23]:
# Load the filters from the YAML configuration file
filter_pipeline = build_filter_pipeline(filter_config_file)

# Load the dataset
# dataset = DocumentDataset.read_parquet(HF_input_data_dir, backend="pandas")
print(f"Loaded {len(dataset)} documents")

# Suppress specific warnings during filtering
with warnings.catch_warnings():
    warnings.simplefilter("ignore", category=UserWarning)
    # Apply the heuristic filters to the dataset
    result_data = filter_pipeline(dataset)
    print(f"After filtering: {len(result_data)} documents")
    # Save the filtered dataset to disk
    result_data.to_parquet(HF_output_path, write_to_filename=False)

Loaded 155825 documents
After filtering: 3182 documents
Writing to disk complete for 1 partition(s)


### Classifier-based Quality Filtering


Classifier-based filtering uses a trained classifier model to sort content as high or low quality, offering a smarter and more flexible way to handle diverse datasets that simple rules might miss.


**Prepare Data for Training Classifier**

To train a quality classifier, we need representative samples of both high-quality and low-quality content. For high-quality data, we use articles from Wikipedia's Hindi edition, which are generally well-structured and reliable. The low-quality samples come from unfiltered crawled Hindi news corpus or web data.


In [24]:
import os

from datasets import load_dataset as load_hf_dataset

In [25]:
# Paths for high-quality and low-quality sample data
hq_samples_path = os.path.join(data_dir, "classifier_filtering/train_samples/hq")
lq_samples_path = os.path.join(data_dir, "classifier_filtering/train_samples/lq")

# Load and shard the high-quality dataset (Hindi Wikipedia)
ds = load_hf_dataset("wikimedia/wikipedia", "20231101.hi")
num_shards = 8
for shard_idx in range(num_shards):
    shard = ds["train"].shard(index=shard_idx, num_shards=num_shards)
    shard.to_parquet(os.path.join(hq_samples_path, f"{shard_idx}.parquet"))

# Load and shard the low-quality dataset (Hindi news corpus - using a subset for low quality examples)
# Using OSCAR Hindi data which may contain lower quality web-crawled content
ds = load_hf_dataset(
    "oscar-corpus/OSCAR-2301", 
    language="hi", 
    token=True,
    split="train[:100000]"
)
num_shards = 32
for shard_idx in range(num_shards):
    shard = ds.shard(index=shard_idx, num_shards=num_shards)
    shard.to_parquet(os.path.join(lq_samples_path, f"{shard_idx}.parquet"))


Creating parquet from Arrow format: 100%|██████████| 21/21 [00:00<00:00, 58.52ba/s]
Creating parquet from Arrow format: 100%|██████████| 21/21 [00:00<00:00, 158.39ba/s]
Creating parquet from Arrow format: 100%|██████████| 21/21 [00:00<00:00, 59.78ba/s]
Creating parquet from Arrow format: 100%|██████████| 21/21 [00:00<00:00, 144.03ba/s]
Creating parquet from Arrow format: 100%|██████████| 21/21 [00:00<00:00, 110.35ba/s]
Creating parquet from Arrow format: 100%|██████████| 21/21 [00:00<00:00, 134.45ba/s]
Creating parquet from Arrow format: 100%|██████████| 21/21 [00:00<00:00, 113.06ba/s]
Creating parquet from Arrow format: 100%|██████████| 21/21 [00:00<00:00, 99.70ba/s] 


DatasetNotFoundError: Dataset 'oscar-corpus/OSCAR-2301' is a gated dataset on the Hub. Visit the dataset page at https://huggingface.co/datasets/oscar-corpus/OSCAR-2301 to ask for access.

**Training Classifier**

The classifier is trained using FastText, which offers an efficient and effective method for text classification.


In [None]:
from nemo_curator import Modify
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import write_to_disk


In [None]:
import random

import fasttext

from nemo_curator.modifiers import FastTextLabelModifier


# Function to create labeled samples
def create_samples(data_path: str, label: str, num_samples: int) -> list[str]:
    raw_dataset = DocumentDataset.read_parquet(data_path, backend="pandas")
    label_quality = Modify(FastTextLabelModifier(label))
    labeled_dataset = label_quality(raw_dataset)
    labeled_samples = labeled_dataset.df.sample(frac=num_samples / len(labeled_dataset.df))

    return labeled_samples["text"].compute().values.tolist()


# Prepare training data
low_quality_samples = create_samples(lq_samples_path, "__label__lq", 100000)
high_quality_samples = create_samples(hq_samples_path, "__label__hq", 100000)
train_samples = low_quality_samples + high_quality_samples
random.shuffle(train_samples)

# Save training data to a file
train_file = "./cf_model_fasttext_hindi.train"
with open(train_file, "w", encoding="utf-8") as f:
    for sample in train_samples:
        f.write(sample + "\n")

# Train the FastText classifier
model = fasttext.train_supervised(input=train_file, lr=0.01, dim=100, epoch=5, wordNgrams=2)
model_path = "./cf_model_fasttext_hindi_model.bin"
model.save_model(model_path)


**Classify and Filter the Dataset**

Once trained, the classifier is used to filter the dataset, categorizing documents into high and low quality based on the learned distinctions.


In [None]:
from nemo_curator import ScoreFilter
from nemo_curator.filters import FastTextQualityFilter

# Define paths and load the dataset
CF_input_data_dir = HF_output_path
CF_output_path = os.path.join(data_dir, "classifier_filtering/output")
target_dataset = DocumentDataset.read_parquet(CF_input_data_dir, "parquet")

# Set up the filtering pipeline
filter_pipeline = ScoreFilter(FastTextQualityFilter(model_path), score_field="quality_score", score_type=float)
filtered_dataset = filter_pipeline(target_dataset)

# Save the filtered dataset
write_to_disk(filtered_dataset.df, output_path=CF_output_path, write_to_filename=True, output_type="parquet")


Close the CPU Dask cluster:


In [None]:
client.cluster.close()
client.shutdown()


## Summary

This notebook demonstrates how to process and curate high-quality Hindi data using NVIDIA NeMo Curator.

- **Datasets**: 
  - Hindi Wikipedia (wikimedia/wikipedia, "20231101.hi")
  - Sangraha Hindi corpus (ai4bharat/sangraha)
  - OSCAR Hindi dataset
  - C4 multilingual Hindi
  - Hindi news from IndicNLP suite
- **Model Names**: Updated FastText model names to include "hindi"

**Complete Pipeline Steps Covered:**
- ✅ Data collection from multiple Hindi sources
- ✅ Unicode reformatting (important for Devanagari script)
- ✅ Document ID assignment
- ✅ Exact deduplication (GPU-accelerated)
- ✅ Heuristic quality filtering
- ✅ Classifier-based quality filtering

**Hindi-Specific Datasets Used:**
1. **Hindi Wikipedia**: High-quality encyclopedic content
2. **Sangraha Corpus**: AI4Bharat's large-scale cleaned Hindi dataset (34.5B tokens)
3. **OSCAR Hindi**: Web-crawled Hindi content
4. **C4 Multilingual Hindi**: Cleaned Common Crawl data
5. **IndicNLP News**: Hindi news articles for diversity

**What This Notebook Provides:**
- Complete end-to-end Hindi data curation pipeline
- GPU-accelerated deduplication using cuDF
- Quality assessment using both heuristic and ML-based approaches
- Hindi-specific considerations for Devanagari script processing
- Ready-to-use configuration for Hindi language processing

For other techniques such as Fuzzy Deduplication or PII redaction, you can go to [NeMo Curator example scripts](https://github.com/NVIDIA/NeMo-Curator/tree/main/examples).
