# Nemo Curator With PySpark Example

## NeMo Curator Introduction
NeMo Curator is a Python library that consists of a collection of scalable data processing modules for curating natural language processing (NLP) data for training large language models (LLMs). The modules within the NeMo Data Curator enable NLP researchers to mine high-quality text at scale from massive uncurated web corpora. 

NeMo Curator includes the following modules to perform data curation:
- Data download and Extraction
- Language identification and separation
- Text reformatting and cleaning
- Quality filtering
- Document-level deduplication
- Multilingual downstream-task decontamination
- Distributed Data Classification
- Personal identifiable information (PII) redaction



## About this notebook


This notebook will use the **Tiny Stories [Dataset](https://huggingface.co/datasets/roneneldan/TinyStories)** as an example to demonstrate how you can run data curation using NeMo Curator, integrate additional data processing you may already have on PySpark and build and end to end curation pipeline. 

Step description:
1. Download and extract data using Nemo Curator
2. Clean and process data using Nemo Curator
3. Perform additional processing using PySpark
4. Deduplication using Nemo Curator

For a full working example of Nemo Curator, please refer this [tutorial](https://github.com/NVIDIA/NeMo-Curator/blob/main/tutorials/single_node_tutorial/single_gpu_tutorial.ipynb)



## Prerequisites

### System Requirements
Here is the hardware setting for this notebook

**GPU**: NVIDIA A10 24G. 

**CUDA & Nvidia Drivers**: CUDA 12.2 with Driver 535.154.05

**OS**: ubuntu 22.04

### Getting NeMo Framework Training Container
- Get access to the container via https://developer.nvidia.com/nemo-framework
- Set your docker credentials 
    ```bash
    docker login nvcr.io

    Username: $oauthtoken
    Password: <Your NGC Key>
- Get NeMo NeMo Framework Training Container
    ```bash
    docker pull nvcr.io/nvidia/nemo:dev


## 0. Env Setup

In [None]:
!pip install jsonlines

In [None]:
%env DASK_DATAFRAME__QUERY_PLANNING False
%env CUDA_VISIBLE_DEVICES 0

In [None]:
import argparse
import os

from nemo_curator.utils.distributed_utils import get_client,get_num_workers
from nemo_curator.utils.script_utils import ArgumentHelper
from nemo_curator.utils.file_utils import get_all_files_paths_under, separate_by_metadata
from nemo_curator.utils.distributed_utils import read_data,write_to_disk
from nemo_curator.datasets import DocumentDataset

import sys
import pandas as pd
import time
import cudf
import dask_cudf
import dask
import numpy as np
from dask.distributed import Client, LocalCluster
import jsonlines

In [None]:
def pre_imports():
    import cudf 

def attach_args(parser=argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)):
    return ArgumentHelper(parser).add_distributed_args()

def check_jsonl_file(file_dir):
    for file in os.listdir(file_dir):
        if 'jsonl' not in file:
            continue
        with open(os.path.join(file_dir,file), 'r', encoding='utf-8') as f:
            first_line = f.readline()
            print(first_line)
        break

def extract_lines_with_id(file_path,target_list):
    with jsonlines.open(file_path) as reader:
        for obj in reader:
            if obj.get('id') in target_list:
                yield obj


In [None]:
CUR_DIR = os.getcwd()
print(CUR_DIR)
DATA_DIR = f"{CUR_DIR}/workspace/"

## 1. Download
In this example, we will download the Tiny Stories validation dataset which is < 20 MB.  The small size of this dataset makes it ideal to demonstrate data curation pipelines on a local machine.  Nemo Curator supports several default downloader [implementations](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/download.html).  In this example, we will use a custom downloader from this [tutorial](https://github.com/NVIDIA/NeMo-Curator/tree/main/tutorials/tinystories) to download the dataset and convert it to JSONL format.

The resultant .jsonl will contain the following keys:
1. text
2. file_name
3. id

Import custom downloader libraries

In [None]:
!wget https://raw.githubusercontent.com/NVIDIA/NeMo-Curator/main/tutorials/tinystories/docbuilder.py
!wget https://raw.githubusercontent.com/NVIDIA/NeMo-Curator/main/tutorials/tinystories/helpers.py
    
from docbuilder import TinyStoriesDownloader, TinyStoriesIterator, TinyStoriesExtractor
from helpers import write_jsonl

Download dataset and convert to JSONL files

In [None]:
TINY_STORIES_URL = "https://huggingface.co/datasets/roneneldan/TinyStories/resolve/main/TinyStories-valid.txt"
JSONL_ROOT_DIR = os.path.join(DATA_DIR, "jsonl")

downloader = TinyStoriesDownloader(DATA_DIR)
tinystories_val_fp = downloader.download(TINY_STORIES_URL)

# Convert to JSONL files.
write_jsonl(tinystories_val_fp, JSONL_ROOT_DIR)

Verify output

In [72]:
check_jsonl_file(JSONL_ROOT_DIR)

{"text": "Spot. Spot saw the shiny car and said, \"Wow, Kitty, your car is so bright and clean!\" Kitty smiled and replied, \"Thank you, Spot. I polish it every day.\" After playing with the car, Kitty and Spot felt thirsty. They found a small pond with clear water. They drank the water and felt very happy. They played together all day and became best friends.", "filename": "TinyStories-valid.txt", "id": "TinyStories-valid.txt-0"}



 Start a CPU based Dask cluster. Please modify `n_workers` and `memory_limit` according to your hardware specification. 

In [None]:
cluster = LocalCluster(n_workers=8, processes=True, memory_limit='16GB')
client = Client(cluster)

Next we will read the JSONL files into Curator's DocumentDataset

In [74]:
source_dataset = DocumentDataset.read_json(JSONL_ROOT_DIR, add_filename=True)

Reading 3 files


In [75]:
source_dataset.df.head()

Unnamed: 0,filename,id,text
0,TinyStories-valid.txt-0.jsonl,TinyStories-valid.txt-0,"Spot. Spot saw the shiny car and said, ""Wow, K..."
1,TinyStories-valid.txt-0.jsonl,TinyStories-valid.txt-1,"Once upon a time, in a big forest, there lived..."
2,TinyStories-valid.txt-0.jsonl,TinyStories-valid.txt-2,"Once upon a time, in a small yard, there was a..."
3,TinyStories-valid.txt-0.jsonl,TinyStories-valid.txt-3,"Once upon a time, there was a thoughtful girl ..."
4,TinyStories-valid.txt-0.jsonl,TinyStories-valid.txt-4,"Once upon a time, there was a kind farmer. He ..."


## 2. Unicode fixing

In this section, we apply `UnicodeReformatter` to the data to ensure consisent and accurate representations of text data for down stream processing like deduplication




In [None]:
from nemo_curator.modules.modify import Modify
from nemo_curator.modifiers import UnicodeReformatter

In [None]:
CLEAN_DATA_DIR = os.path.join(DATA_DIR, "cleaned")

t0 = time.time()

cleaner = Modify(UnicodeReformatter())
cleaned_data = cleaner(source_dataset)

# Write the cleaned_data
cleaned_data.to_json(CLEAN_DATA_DIR, write_to_filename=True)

print(f"Time taken for fixing unicode:{time.time()-t0}")

Verify output

In [None]:
check_jsonl_file(CLEAN_DATA_DIR)

## 3. Additional Processing with Dask/PySpark

We will compute additional info and add it to the dataset.  We will calculate: 
- Word Count: The number of words in each story
- Character Count: The number of characters in each story  



**We can compute this using Dask DataFrames**

In [71]:
cleaned_data.df['WordCount'] = cleaned_data.df['text'].str.split(r'\s+').str.len()
cleaned_data.df['CharacterCount'] = cleaned_data.df['text'].str.len()
cleaned_data.df.head()

Unnamed: 0,filename,id,text,WordCount,CharacterCount
0,TinyStories-valid.txt-0.jsonl,TinyStories-valid.txt-0,"Spot. Spot saw the shiny car and said, ""Wow, K...",64,348
1,TinyStories-valid.txt-0.jsonl,TinyStories-valid.txt-1,"Once upon a time, in a big forest, there lived...",235,1198
2,TinyStories-valid.txt-0.jsonl,TinyStories-valid.txt-2,"Once upon a time, in a small yard, there was a...",105,507
3,TinyStories-valid.txt-0.jsonl,TinyStories-valid.txt-3,"Once upon a time, there was a thoughtful girl ...",154,785
4,TinyStories-valid.txt-0.jsonl,TinyStories-valid.txt-4,"Once upon a time, there was a kind farmer. He ...",113,535


**Altenatively, you can also compute the same using PySpark.  Dask DataFrames can be converted to Spark DataFrames using ```spark.createDataFrame(<dask df>)```. However, for large datasets it is more efficient to first write output from a Dask DataFrame into Parquet or JSONL, and read it back into a Spark DataDrame**

Start a local Spark session. Ensure you have enough resources to perform this operation. In real-world deployments, you will likely connect to a remote Spark cluster

Note: if you do not have PySpark installed, uncomment the cell below to install both java & spark 

You can also Spark processing using NVIDIA Rapids [Accelerator](https://docs.nvidia.com/spark-rapids/index.html).  The spark session below starts a CPU session

In [None]:
#!apt update && apt install -y openjdk-11-jdk 
#!pip install pyspark

In [None]:
from pyspark.sql import SparkSession

In [61]:
spark = SparkSession.builder \
    .master("local[4]") \
    .appName("Data Curation") \
    .config("spark.driver.memory", '4g') \
    .config("spark.executor.memory", '4g') \
    .getOrCreate()

Read the JSONL output from Nemo Curator into a Spark dataframe

In [62]:
df = spark.read.json(CLEAN_DATA_DIR)

In [63]:
df.show(5, truncate=True)

+--------------------+--------------------+--------------------+
|            filename|                  id|                text|
+--------------------+--------------------+--------------------+
|TinyStories-valid...|TinyStories-valid...|Spot. Spot saw th...|
|TinyStories-valid...|TinyStories-valid...|Once upon a time,...|
|TinyStories-valid...|TinyStories-valid...|Once upon a time,...|
|TinyStories-valid...|TinyStories-valid...|Once upon a time,...|
|TinyStories-valid...|TinyStories-valid...|Once upon a time,...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [64]:
from pyspark.sql.functions import size, split, length, expr
# Calculate Word Count
df = df.withColumn("WordCount", size(split(df["text"], r'\s+')))

# Calculate Character Count
df = df.withColumn("CharacterCount", length(df["text"]))

df.show(5, truncate=True)

+--------------------+--------------------+--------------------+---------+--------------+
|            filename|                  id|                text|WordCount|CharacterCount|
+--------------------+--------------------+--------------------+---------+--------------+
|TinyStories-valid...|TinyStories-valid...|Spot. Spot saw th...|       64|           348|
|TinyStories-valid...|TinyStories-valid...|Once upon a time,...|      235|          1198|
|TinyStories-valid...|TinyStories-valid...|Once upon a time,...|      105|           507|
|TinyStories-valid...|TinyStories-valid...|Once upon a time,...|      154|           785|
|TinyStories-valid...|TinyStories-valid...|Once upon a time,...|      113|           535|
+--------------------+--------------------+--------------------+---------+--------------+
only showing top 5 rows



Write dataset to a processed folder

In [None]:
PROCESSED_DIR = os.path.join(DATA_DIR, "processed")
df.write.mode("overwrite").parquet(PROCESSED_DIR)

Shutdown the Dask cluster, we will launch a GPU cluster in the next step

In [65]:
client.cluster.close()
client.shutdown()

## 4.Exact Deduplication

We will continue processing the dataset in Nemo Curator.  In exact deduplication, the document text is hashed into unique string using certain hashing algorithm, such as 'md5'. The documents with exact hashed values are having identical text. We will output the `ID` of duplicated documents for removal later. The function used is `ExactDuplicates()`. Arguments for this function include:
- `id_field`: Key in input file for identifying document ID
- `text_field`: Key in input file which contains document text.
- `hash_method`: Hashing algorithm used. Default is `md5`
- `cache_dir`: If specified, the duplicated document IDs will be output to the `cache_dir`. Otherwise, the IDs will not be saved

We will start a GPU cluster to accelerate the deduplication step.  Since GPU based Dask cluster involves setting several arguments, we will use the get_client() wrapper function to quickly set up a single node cluster


In [None]:
client = get_client(cluster_type = 'gpu', set_torch_to_use_rmm=False)
print(f"Number of dask worker:{get_num_workers(client)}")
client.run(pre_imports)

In [None]:
from nemo_curator.modules import ExactDuplicates

In [68]:
LOG_DIR = os.path.join(DATA_DIR, "logs")
EXACT_DEDUP_OUT_DIR = os.path.join(DATA_DIR, "exact_dedup")

!mkdir -p {LOG_DIR}
!mkdir -p {EXACT_DEDUP_OUT_DIR}

#ignores checksum and marker files created by Spark job
processed_files = [filename for filename in get_all_files_paths_under(PROCESSED_DIR) 
                   if not filename.endswith('.crc') or filename.endswith('_SUCCESS')]


In [None]:
t0 = time.time()
# Read input dataset from Spark output
input_dataset = DocumentDataset.read_parquet(processed_files, backend='cudf')

#Run exact deduplication to the input
exact_dup = ExactDuplicates(
    logger=LOG_DIR,
    id_field="id",
    text_field="text",
    hash_method="md5",
    cache_dir=EXACT_DEDUP_OUT_DIR #Duplicated document ID list is output to the cache_dir
)
duplicates = exact_dup(dataset=input_dataset)

print(f"Number of exact duplicated file:{len(duplicates)}")

print(f"Time taken for exact duplicate:{time.time()-t0}")

**[Optional]** Verify the output duplicated ID. We can group by the `_hashes` to get the list of duplicated documents having the same _hashes and use `extract_lines_with_id()` to verify that those documents are indeed exact duplicates. Please note that the `id` might changes, therefore, please replace the `target_list` when necessary

In [None]:
exact_dedup_res = pd.read_parquet(os.path.join(EXACT_DEDUP_OUT_DIR,"_exact_duplicates.parquet"))
print(f"Number of exact duplicated document:{len(exact_dedup_res)}")
exact_dedup_res.head()

Close the CPU Dask Cluster

In [70]:
client.cluster.close()
client.shutdown()

Stop the Spark session

In [69]:
spark.stop()