## Prerequisites and Environment setups

-	CUDA and NVIDIA Drivers: CUDA 12.3 with Driver 545.23.08
-	Ubuntu 22.04
-	[NVIDIA-container-toolkit](https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html) version 1.15.0


### Installation

Clone the NeMo Curator repository and install the required dependencies

In [None]:
%%bash
git clone https://github.com/NVIDIA/NeMo-Curator.git
cd NeMo-Curator
pip install --extra-index-url https://pypi.nvidia.com ".[cuda12x]"
pip install datasets
pip install jsonlines

Set up a Dask environment

In [None]:
import nemo_curator
from dask.distributed import Client, LocalCluster
# Start a Dask cluster with 12 workers, each limited at 64GB of memory. You might need to adjust these numbers according to your computing resources
cluster = LocalCluster(n_workers=12, processes=True, memory_limit= '64GB')
client = Client(cluster)


## Data Collecting

Each dataset is accessed and downloaded using the Hugging Face Hub, with additional steps required for OSCAR due to its access restrictions. For OSCAR, you need to accept the conditions on the [dataset page](https://huggingface.co/datasets/oscar-corpus/OSCAR-2301) and use a [Hugging Face access token](https://huggingface.co/docs/hub/en/security-tokens) for downloading.

**Download and Convert Datasets to Parquet**

The conversion of dataset into Parquet format facilitates efficient handling and processing of large datasets

In [None]:
import os
from datasets import load_dataset as load_hf_dataset
from datasets import DownloadConfig 

data_dir = "./datasets/"
download_config = DownloadConfig(num_proc=4)

# Load and save Vietnamese Wikipedia dataset
ds = load_hf_dataset("wikimedia/wikipedia", "20231101.vi")
ds['train'].to_parquet(os.path.join(data_dir, "wiki_vi_231101.parquet"))

# Load and save Vietnamese news corpus
ds = load_hf_dataset("jetaudio/binhvq_news")
ds['train'].to_parquet(os.path.join(data_dir, "binhvq_news_train.parquet"))

# Load and save OSCAR dataset
ds = load_hf_dataset("oscar-corpus/OSCAR-2301", language='vi', token=True, download_config=download_config, trust_remote_code=True)
ds['train'].to_parquet(os.path.join(data_dir, 'oscar_vi.parquet'))

# Load and save C4 dataset
ds = load_hf_dataset("allenai/c4", data_files='multilingual/c4-vi.*.json.gz', download_config=download_config, trust_remote_code=True)
ds['train'].to_parquet(os.path.join(data_dir, "c4_vi.parquet"))


**Combine and Standardize Format**

We then combine them into a single dataset, keeping only the 'text' column. 

In [None]:
from datasets import concatenate_datasets
# Combine datasets and standardize format
datasets = [os.path.join(data_dir, file) for file in ["wiki_vi_231101.parquet", "c4_vi.parquet", 'oscar_vi.parquet', "binhvq_news_train.parquet"]]

data_files = {"train": datasets[0]}
ds = load_hf_dataset("parquet", data_files=data_files)
ds = ds['train'].remove_columns([col for col in ds['train'].column_names if col != 'text'])

for d in datasets[1:]:
    ds_ = load_hf_dataset("parquet", data_files={"train": d})
    ds_ = ds_['train'].remove_columns([col for col in ds_['train'].column_names if col != 'text'])
    ds = concatenate_datasets([ds, ds_])


**Shard the Combined Dataset**

The combined dataset is then sharded into smaller chunks. Sharding is performed to distribute the data evenly across multiple workers in the Dask cluster, facilitating efficient parallel processing during the data curation stages.

In [None]:
# Define paths for raw data
raw_data_directory = os.path.join(data_dir, "raw")

# Shard the dataset
num_shards = 256
for shard_idx in range(num_shards):
    shard = ds.shard(index=shard_idx, num_shards=num_shards)
    shard.to_parquet(os.path.join(raw_data_directory, f"{shard_idx}.parquet"))


## Data Curation flow

### Unicode reformatting

In [None]:
from nemo_curator import Modify
from nemo_curator.modifiers import UnicodeReformatter
from nemo_curator.utils.distributed_utils import read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.datasets import DocumentDataset

# Define paths for Unicode formatted data
unicode_formatted_output_path = os.path.join(data_dir, "formatted")

def load_dataset(input_data_dir, file_type='parquet'):
    files = list(get_all_files_paths_under(input_data_dir))
    raw_data = read_data(files, file_type=file_type, backend="pandas", add_filename=True)
    dataset = DocumentDataset(raw_data)

    return dataset

# Load the raw data
raw_data = load_dataset(raw_data_directory, file_type='parquet')

# Initialize the Unicode reformatter
cleaner = Modify(UnicodeReformatter())

# Apply Unicode reformatting
cleaned_data = cleaner(raw_data)

# Save the cleaned data to disk
write_to_disk(cleaned_data.df, unicode_formatted_output_path, write_to_filename=True, output_type='parquet')


### Adding Custom IDs to Documents

In [None]:
from nemo_curator import AddId

# Define paths for input data and output with added IDs
add_id_input_data_dir = unicode_formatted_output_path
added_id_output_path = os.path.join(data_dir, "add_id")
add_ID_id_prefix = "VI_"

# Load the formatted dataset
dataset = load_dataset(add_id_input_data_dir, file_type='parquet')

# Initialize the AddId class with a specified prefix and start index
add_id = AddId(id_field='id', id_prefix=add_ID_id_prefix, start_index=0)

# Apply the ID addition to the dataset
id_dataset = add_id(dataset)

# Save the dataset with added IDs to disk
write_to_disk(id_dataset.df, output_file_dir=added_id_output_path, write_to_filename=True, output_type='parquet')


### Exact deduplication

In [None]:
from nemo_curator import AddId

# Define paths for input data and output with added IDs
add_id_input_data_dir = unicode_formatted_output_path
added_id_output_path = os.path.join(data_dir, "add_id")
add_ID_id_prefix = "VI_"

# Load the formatted dataset
dataset = load_dataset(add_id_input_data_dir, file_type='parquet')

# Initialize the AddId class with a specified prefix and start index
add_id = AddId(id_field='id', id_prefix=add_ID_id_prefix, start_index=0)

# Apply the ID addition to the dataset
id_dataset = add_id(dataset)

# Save the dataset with added IDs to disk
write_to_disk(id_dataset.df, output_file_dir=added_id_output_path, write_to_filename=True, output_type='parquet')


### Heuristic Quality Filtering

In [None]:
from nemo_curator.utils.config_utils import build_filter_pipeline
import warnings

# Define paths for input data and output data after heuristic filtering
HF_input_data_dir = deduped_output_dir
HF_output_path = os.path.join(data_dir, 'heuristic_filtering')

# Create a directory for the configuration file if it doesn't exist
os.makedirs('config', exist_ok=True)
# Download the YAML configuration file for heuristic filtering
!wget https://raw.githubusercontent.com/NVIDIA/NeMo-Curator/main/config/heuristic_filter_non-en.yaml -O ./config/heuristic_filter_non-en.yaml

# Specify the path to the configuration file
filter_config_file = './config/heuristic_filter_non-en.yaml'
os.makedirs(HF_output_path, exist_ok=True)

# Load the filters from the YAML configuration file
filter_pipeline = build_filter_pipeline(filter_config_file)

# Load the dataset
dataset = DocumentDataset.read_parquet(HF_input_data_dir, backend='pandas')

# Suppress specific warnings during filtering
with warnings.catch_warnings():
    warnings.simplefilter("ignore", category=UserWarning)
    # Apply the heuristic filters to the dataset
    result_data = filter_pipeline(dataset)
 
    # Save the filtered dataset to disk
    result_data.to_parquet(HF_output_path, write_to_filename=True)


### Classifier-based quality filtering

Classifier-based filtering uses a trained classifier model to sort content as high or low quality, offering a smarter and more flexible way to handle diverse datasets that simple rules might miss.

**Prepare Data for Training Classifier**

To train a quality classifier, we need representative samples of both high-quality and low-quality content. For high-quality data, we use articles from Wikipedia's Vietnamese edition, which are generally well-structured and reliable. The low-quality samples come from unfiltered crawled Vietnamese news corpus.

In [None]:
from nemo_curator.utils.config_utils import build_filter_pipeline
import warnings

# Define paths for input data and output data after heuristic filtering
HF_input_data_dir = deduped_output_dir
HF_output_path = os.path.join(data_dir, 'heuristic_filtering')

# Create a directory for the configuration file if it doesn't exist
os.makedirs('config', exist_ok=True)
# Download the YAML configuration file for heuristic filtering
!wget https://raw.githubusercontent.com/NVIDIA/NeMo-Curator/main/config/heuristic_filter_non-en.yaml -O ./config/heuristic_filter_non-en.yaml

# Specify the path to the configuration file
filter_config_file = './config/heuristic_filter_non-en.yaml'
os.makedirs(HF_output_path, exist_ok=True)

# Load the filters from the YAML configuration file
filter_pipeline = build_filter_pipeline(filter_config_file)

# Load the dataset
dataset = DocumentDataset.read_parquet(HF_input_data_dir, backend='pandas')

# Suppress specific warnings during filtering
with warnings.catch_warnings():
    warnings.simplefilter("ignore", category=UserWarning)
    # Apply the heuristic filters to the dataset
    result_data = filter_pipeline(dataset)
 
    # Save the filtered dataset to disk
    result_data.to_parquet(HF_output_path, write_to_filename=True)
s

**Training classifier**

The classifier is trained using FastText, which offers an efficient and effective method for text classification. 

In [None]:
from nemo_curator.modifiers import FastTextLabelModifier
import fasttext
import random

# Function to create labeled samples
def create_samples(data_path, label, num_samples):
    raw_dataset = DocumentDataset.read_parquet(data_path, backend='pandas')
    label_quality = Modify(FastTextLabelModifier(label))
    labeled_dataset = label_quality(raw_dataset)
    labeled_samples = labeled_dataset.df.sample(frac=num_samples / len(labeled_dataset.df))
    return labeled_samples["text"].compute().values.tolist()

# Prepare training data
low_quality_samples = create_samples(lq_samples_path, "__label__lq", 100000)
high_quality_samples = create_samples(hq_samples_path, "__label__hq", 100000)
train_samples = low_quality_samples + high_quality_samples
random.shuffle(train_samples)

# Save training data to a file
train_file = "./cf_model_fasttext.train"
with open(train_file, "w") as f:
    for sample in train_samples:
        f.write(sample + "\n")

# Train the FastText classifier
model = fasttext.train_supervised(input=train_file, lr=0.01, dim=100, epoch=5, wordNgrams=2)
model_path = "./cf_model_fasttext_model.bin"
model.save_model(model_path)


**Classify and filter the dataset**

Once trained, the classifier is used to filter the dataset, categorizing documents into high and low quality based on the learned distinctions

In [None]:
from nemo_curator.filters import FastTextQualityFilter
from nemo_curator import ScoreFilter

# Define paths and load the dataset
CF_input_data_dir = HF_output_path
CF_output_path = os.path.join(data_dir, 'classifier_filtering/output')
target_dataset = DocumentDataset.read_parquet(CF_input_data_dir, 'parquet')

# Set up the filtering pipeline
filter_pipeline = ScoreFilter(FastTextQualityFilter(model_path), score_field="quality_score", score_type=float)
filtered_dataset = filter_pipeline(target_dataset)

# Save the filtered dataset
write_to_disk(filtered_dataset.df, output_file_dir=CF_output_path, write_to_filename=True, output_type='parquet')


Close the CPU Dask Cluster

In [None]:
client.cluster.close()
client.shutdown()

We have completed the notebook! For other techniques such as Fuzzy Deduplication or PII redaction, you can go to [NeMo Curator example scripts](https://github.com/NVIDIA/NeMo-Curator/tree/main/examples).