# Data Curation
This notebook showcases the building blocks that can be used for building a simple data curation pipeline using [NeMo Curator](https://github.com/NVIDIA/NeMo-Curator).

## Reading Materials
Before proceeding, we highly recommend looking through the following deep dive blog posts that walk you through building data curation pipelines using NeMo Curator:
- [Curating Custom Datasets for LLM Training with NVIDIA NeMo Curator](https://developer.nvidia.com/blog/curating-custom-datasets-for-llm-training-with-nvidia-nemo-curator/)
- [Curating Custom Datasets for LLM Parameter-Efficient Fine-Tuning with NVIDIA NeMo Curator](https://developer.nvidia.com/blog/curating-custom-datasets-for-llm-parameter-efficient-fine-tuning-with-nvidia-nemo-curator/)

Also, please checkout [our tutorials](https://github.com/NVIDIA/NeMo-Curator/tree/main/tutorials) in the repository to learn more about various functionalities that NeMo Curator provides.

In this notebook, we will use the [Law-StackExchange dataset](https://huggingface.co/datasets/ymoslem/Law-StackExchange) for this pipeline, which is a dataset of legal question/answers scraped from the Stack Exchange website. This notebook is the summarized version of our existing [synthetic data generation tutorial](https://github.com/NVIDIA/NeMo-Curator/tree/main/tutorials/peft-curation-with-sdg). Feel free to go through that tutorial to gain a better understanding of various NeMo Curator facilities.

## Setup and Requirements
The NeMo dependencies are already installed in the container. However, before proceeding you need to install one dependency to follow along. Execute the following cell before getting started.

In [2]:
! pip install ipywidgets

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Collecting ipywidgets
  Downloading ipywidgets-8.1.5-py3-none-any.whl.metadata (2.3 kB)
Collecting widgetsnbextension~=4.0.12 (from ipywidgets)
  Downloading widgetsnbextension-4.0.13-py3-none-any.whl.metadata (1.6 kB)
Collecting jupyterlab-widgets~=3.0.12 (from ipywidgets)
  Downloading jupyterlab_widgets-3.0.13-py3-none-any.whl.metadata (4.1 kB)
Downloading ipywidgets-8.1.5-py3-none-any.whl (139 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.8/139.8 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading jupyterlab_widgets-3.0.13-py3-none-any.whl (214 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m214.4/214.4 kB[0m [31m17.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading widgetsnbextension-4.0.13-py3-none-any.whl (2.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m79.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collect

---
## Getting Started

To get started, let's setup some environment variables, as well as path variables that will be used for storing the curated data, as well as intermediate temporary files that are required for this notebooks to function.

In [3]:
import os
os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"  # Needed for running Curator on the GPU

NOTEBOOK_DIR = os.path.abspath("")
DATA_DIR = os.path.join(NOTEBOOK_DIR, "data")
TEMP_DIR = os.path.join(NOTEBOOK_DIR, ".temp")
os.makedirs(DATA_DIR, exist_ok=True)

Let's now import everything we need to build our data curation pipeline. For your conveniene, we've provided the document builder implementations that allow you to download the dataset from HuggingFace and convert it into a Pandas `DataFrame`.

We have additionally implemented a score-based filter that allows you to filter the dataset rows using the score values assigned to each question. You can use this implementation as the basis for creating your own filtering/scoring mechanisms using NeMo Curator.

In [23]:
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.datasets import DocumentDataset
from nemo_curator.filters import WordCountFilter, SymbolsToWordsFilter, RepeatingTopNGramsFilter
from nemo_curator.modifiers import UnicodeReformatter
from nemo_curator.modifiers import DocumentModifier
from nemo_curator.modifiers.pii_modifier import PiiModifier
from nemo_curator.utils.file_utils import expand_outdir_and_mkdir
from nemo_curator import ScoreFilter, Sequential
from nemo_curator.modules.modify import Modify

# Importing helper functions
from helpers.filters import FilterLowScores
from helpers.docbuilder import download_and_convert_dataset

Before proceeding, let's decide the compute resources we'd like to use for running our data curation pipeline. NeMo Curator uses Dask to orchestrate scalable data processing. As such, it needs to know what resources to use. 

For the purposes of this notebook, we will instruct NeMo Curator to use 8 CPU workers. While most NeMo Curator functionalities can be executed on the CPU, some modules (such as semantic deduplication) can only be executed on the GPU. Please make sure to select the appropriate device.

Note that you can increase or decrease the number of CPU workers depending on the runtime environment. Keep in mind that each CPU worker gets allocated a fixed amount of the total available system memory (RAM). Thus, if the environment does not have enough memory available, Dask operations might fail.

Once we have decided on the resources to use, we can initialize our Dask cluster and start using NeMo Curator.

In [16]:
device = "gpu"  # It can be either "cpu" or "gpu"
n_workers = 4  # Number of workers to use for Dask. If running out of memory, try reducing this.
client = get_client(device, n_workers=n_workers, set_torch_to_use_rmm=False)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 33477 instead


cuDF Spilling is enabled


2024-10-28 06:57:43,667 - distributed.scheduler - INFO - Retire worker addresses (0,)
2024-10-28 06:57:43,673 - distributed.core - INFO - Received 'close-stream' from tcp://127.0.0.1:42632; closing.
2024-10-28 06:57:43,674 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:38093', name: 0, status: closing, memory: 0, processing: 0> (stimulus_id='handle-worker-cleanup-1730098663.6747935')
2024-10-28 06:57:43,675 - distributed.scheduler - INFO - Lost all workers
2024-10-28 06:57:45,112 - distributed.scheduler - INFO - Closing scheduler. Reason: unknown
2024-10-28 06:57:45,113 - distributed.scheduler - INFO - Scheduler closing all comms


---
## The Main Data Curation and Processing Pipeline

We start by downloading and converting the dataset into a suitable format. This is done via the document builders that we have provided for you.

In [18]:
dataset_df = download_and_convert_dataset(DATA_DIR)

Download directory:  /root/ODSC-Hackathon-Repository/data/raw
File '/root/ODSC-Hackathon-Repository/data/raw/law-stackexchange-questions-answers.json' already exists, skipping download.


In [19]:
dataset_df['id'] = dataset_df['id'].apply(lambda x: x.split('-')[-1])
dataset_df['id'] = dataset_df['id'].apply(lambda x: int(x))

In [20]:
dataset_df.head()

Unnamed: 0,filename,id,title,question,question_score,answer,answer_score,tags
0,law-stackexchange-questions-answers.json,94665,Why is drunk driving causing accident punished...,When people drink and drive and then cause an ...,23,Moral luck You have raised the issue of moral ...,72,"criminal-law,driving,sentencing"
1,law-stackexchange-questions-answers.json,94671,What counts as consideration in contract law?,What counts as consideration in contract law? ...,0,"See generally Hamer v. Sidway (1891), 124 NY 5...",1,"consideration,contract-law,legal-terms"
2,law-stackexchange-questions-answers.json,94683,Question Concerning Responding to Employer of ...,My high school daughter worked for about a yea...,1,"Read the terms It’s quite likely that, if you ...",3,"california,employment,teenager"
3,law-stackexchange-questions-answers.json,67110,Can Hawaii secede from the U.S. through legal ...,Can Hawaii secede from the U.S. through legal ...,2,"Currently, there is no legal means for a state...",9,"constitutional-law,federalism,united-states"
4,law-stackexchange-questions-answers.json,94678,Legality of privately bibby Stockholming to sa...,It seems that the principal impetus of moving ...,1,england-and-wales then what stops private citi...,1,"any-jurisdiction,coast,law-of-the-sea,property..."


In [21]:
raw_dataset = DocumentDataset.from_pandas(dataset_df)

Next, we need to define our data curation pipeline. The pipeline we define here is very simple, as it contains basic filtering operations

> NOTE: to use the modules that need a GPU, the dataset has to be converted to the `cudf` backend. Please refer to [this tutorial](https://github.com/NVIDIA/NeMo-Curator/tree/main/tutorials/peft-curation-with-sdg) for an example demonstrating the usage of GPU modules.

In [24]:
class QuotationUnifier(DocumentModifier):
    def modify_document(self, text: str) -> str:
        text = text.replace("‘", "'").replace("’", "'")
        text = text.replace("“", '"').replace("”", '"')
        text = text.replace("...", '').replace("..", '')
        return text

In [25]:
def run_curation_pipeline(dataset: DocumentDataset, device: str) -> DocumentDataset:
    print(f"Running curation pipeline on '{device}'...")
    orig_dataset = dataset
    
    def datacleaners(DocumentDataset) -> DocumentDataset:
        cleaners = Sequential([
            Modify(QuotationUnifier(), text_field="title"),
            Modify(QuotationUnifier(), text_field="question"),
            Modify(QuotationUnifier(), text_field="answer"),
            Modify(UnicodeReformatter(), text_field="title"),
            Modify(UnicodeReformatter(), text_field="question"),
            Modify(UnicodeReformatter(), text_field="answer"),
        ])
        return cleaners(dataset)
    
    def datafilter(DocumentDataset) -> DocumentDataset:
        filters = Sequential([
            ScoreFilter(
                WordCountFilter(min_words=50, max_words=1000, lang='en'),
                text_field="question",
                score_type=int,
            ),
            ScoreFilter(
                FilterLowScores(score_threshold=0),
                text_field="question_score",
                score_type=bool,
            ),
            ScoreFilter(
                FilterLowScores(score_threshold=0),
                text_field="answer_score",
                score_type=bool,
            ),

        ])
        return filters(dataset)

    cpu_curation_steps = Sequential(
        [
            datacleaners,
            datafilter
        ]
    )

    # Run the CPU curation steps.
    dataset = cpu_curation_steps(dataset)
    dataset = dataset.persist()
    # Drop the columns that are no longer needed.
    dataset.df = dataset.df.drop(columns=["answer", "answer_score", "question_score"])
    orig_len = len(orig_dataset.df)
    new_len = len(dataset.df)

    print(f"Original dataset length: {orig_len}")
    print(f"New dataset length: {new_len}")

    return dataset

Finally, we are ready to run the pipeline and get our final dataset. This may take up to 10 minutes to execute, especially if any GPU functionalities are used.

In [26]:
curated_dataset = run_curation_pipeline(raw_dataset, device)

Running curation pipeline on 'gpu'...


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Original dataset length: 24343
New dataset length: 19164


Next, let's specify the final columns that we would like our dataset to have. Depending on how you plan on consuming this dataset for training, you may decide to introduce other arbitrary columns to help the model learn better.

Also, this is a great place to add system or instruction prompts to every record, in case you intend to use the same instruction prompt for every record.

Let's define a function that formats the dataset, and also adds system prompts.

In [29]:
def format_dataset(dataset: DocumentDataset, filename: str) -> DocumentDataset:
    SYSTEM_PROMPT = "Read the following title and question about a legal issue and assign the most appropriate tag to it. All tags must be in lowercase, ordered lexicographically and separated by commas.\n\n"

    df = dataset.df.compute()
    has_tags = "tags" in df.columns
    df["input"] = SYSTEM_PROMPT + "TITLE:\n" + df["title"] + "\n\n" + "QUESTION:\n" + df["question"]
    df["output"] = df["tags"] if has_tags else ""  # If the dataset doesn't have tags, use an empty string.
    df["filename"] = filename

    df = df.drop(columns=["title", "question"])
    if has_tags:
        df = df.drop(columns=["tags"]) # Drop the tags column if it exists.
    return DocumentDataset.from_pandas(df)

We use the function above to format the dataset. We apply the same logic to the final evaluation dataset.

In [30]:
formatted_dataset = format_dataset(curated_dataset, "law-stackexchange-curated.jsonl")
print(f"Original dataset columns: {curated_dataset.df.columns}")
print(f"Formatted dataset columns: {formatted_dataset.df.columns}")

Original dataset columns: Index(['filename', 'id', 'title', 'question', 'tags'], dtype='object')
Formatted dataset columns: Index(['filename', 'id', 'input', 'output'], dtype='object')


Once the final dataset is ready, we can write it into a JSONL file that is in the format expected for training with NeMo Framework.

> NOTE: The curated dataset will be written under `curator/data/curated_dataset/law-stackexchange-curated.jsonl`

In [31]:
print(f"Curated dataset columns: {formatted_dataset.df.columns}")
result_fp = os.path.join(DATA_DIR, "curated_dataset")
print()
print(f"Saving curated dataset to '{result_fp}'...")
formatted_dataset.to_json(result_fp, write_to_filename=True)

Curated dataset columns: Index(['filename', 'id', 'input', 'output'], dtype='object')

Saving curated dataset to '/root/ODSC-Hackathon-Repository/data/curated_dataset'...


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Writing to disk complete for 1 partitions


---
# Spliting the Dataset

Before starting the model training procedure, let's split the dataset we've just curated into `training`, `validation` and `test` splits with 80/10/10 ratios.

In [32]:
from sklearn.model_selection import train_test_split

VAL_RATIO = 0.05

df = formatted_dataset.df.compute()

# Some sanity checks
assert len(df) > 0, "The dataset is empty."
assert VAL_RATIO >= 0 and VAL_RATIO <= 1, "VAL_RATIO must be between 0 and 1."
val_size = int(len(df) * VAL_RATIO)
output_dir = f"{DATA_DIR}/split"
os.makedirs(output_dir, exist_ok=True)

# Split the data into training and temporary sets
train_df, val_df = train_test_split(df, test_size=val_size, random_state=42)

print(f"Original size: {len(df)}")
print("After splitting:")
print(f"    Train size: {len(train_df)}")
print(f"    Validation size: {len(val_df)}")

train_df["filename"] = "train.jsonl"
val_df["filename"] = "val.jsonl"

DocumentDataset.from_pandas(train_df).to_json(output_dir, write_to_filename=True)
DocumentDataset.from_pandas(val_df).to_json(output_dir, write_to_filename=True)


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Original size: 19164
After splitting:
    Train size: 18206
    Validation size: 958


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Writing to disk complete for 1 partitions
Writing to disk complete for 1 partitions


---
# Preparing the Submission Dataset

The submission dataset is dataset of questions and titles, where every participating team would have to predict the tags for.
It needs to have a format similar to training datasets so that you can evaluate your model on it, and submit your predicted tags.

In [33]:
submission_ds = "data/submission/evaluation-dataset-verified-for-participants.jsonl"
assert os.path.exists(submission_ds), f"The submission dataset does not exist at '{submission_ds}'"
submission_ds = DocumentDataset.read_json(submission_ds)
submission_ds = format_dataset(submission_ds, "submission.jsonl")
print("Writing the formatted submission dataset to disk...")
submission_ds.to_json(output_dir, write_to_filename=True)

Reading 1 files
Writing the formatted submission dataset to disk...
Writing to disk complete for 1 partitions


Once you have run the above cell, your data that is suitable for training will be written under `data/split`. When making submissions, run inference with your model on `data/split/submission.jsonl`.

---
# Freeing Memory and Other Resources

Before moving to the next notebook, please execute the following cell to free up all the allocated resources to avoid running into out-of-memory or other issues.

Alternatively, please restart the kernel by navigating to `Kernel > Restart Kernel` (if using Jypyter notebook), or clicking the `Restart` button in VS Code.

In [34]:
client.close()
exit(0)