# Dataset creation for SFT and continued pretraining

In [1]:
#!pip install gdown --quiet

In [2]:

#!pip install langchain --quiet
#!pip install langchain_nvidia_ai_endpoints --quiet
#!pip install pypdf --quiet

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.2[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.2[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.2[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m


In [4]:
!pip install --extra-index-url https://pypi.nvidia.com nemo-curator[cuda12x] --quiet

[0m

In [3]:
!nvidia-smi

Mon Jun 10 07:59:55 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.161.08             Driver Version: 535.161.08   CUDA Version: 12.3     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |


|   0  NVIDIA A40                     On  | 00000000:25:00.0 Off |                    0 |
|  0%   34C    P8              22W / 300W |      0MiB / 46068MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                                         
+---------------------------------------------------------------------------------------+
| Processes:                                                                            |
|  GPU   GI   CI        PID   Type   Process name                            GPU Memory |
|        ID   ID                                                             Usage      |
|  No running processes found                                                           |
+---------------------------------------------------------------------------------------+


## Imports

In [5]:
# import the relevant libraries
import json
import os

from langchain_nvidia_ai_endpoints import ChatNVIDIA
from langchain.text_splitter import  RecursiveCharacterTextSplitter
from multiprocessing import Pool

In [6]:
%load_ext cudf.pandas
import pandas as pd

In [7]:
from pprint import PrettyPrinter
pprint = PrettyPrinter(indent=4).pprint
# os.environ['NVIDIA_API_KEY'] = "<YOUR NVIDIA API KEY HERE>"

## Download and extracted documents

In [8]:
!pwd

/


In [9]:
!wget https://zenodo.org/records/10775273/files/Norway%20-%20Diskos%20reports.csv

--2024-06-10 08:08:02--  https://zenodo.org/records/10775273/files/Norway%20-%20Diskos%20reports.csv
Resolving zenodo.org (zenodo.org)... 188.184.103.159, 188.184.98.238, 188.185.79.172, ...
Connecting to zenodo.org (zenodo.org)|188.184.103.159|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5684810410 (5.3G) [text/plain]
Saving to: ‘Norway - Diskos reports.csv’

rway - Diskos repor   0%[                    ]   1.18M   358KB/s    eta 4h 10m ^C


## Extract raw texts

Load csv file as dataframe (don't forget gpu acceleration)

In [10]:
PATH_PROJ = ''

In [11]:
df = pd.read_csv(os.path.join(PATH_PROJ, 'Norway - Diskos reports.csv'), sep=',')

In [12]:
os.makedirs("/workspace/local_data/raw", exist_ok=True)

for document in df['filename'].unique():
    with open(f"/workspace/local_data/raw/{document}.jsonl", "w") as f:
        for raw in df[df['filename'] == document]['content']:
            f.write(json.dumps({"text": raw}) + "\n")

## Cleaning raw documents with NeMo Curator

In [None]:
!text_cleaning --help

In [None]:
!text_cleaning --input-data-dir /workspace/local_data/raw --output-clean-dir /workspace/local_data/clean

In [13]:
import nemo_curator as nc
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.filters import WordCountFilter
from nemo_curator.modifiers import UnicodeReformatter

files = get_all_files_paths_under("/workspace/local_data/clean/")
documents = DocumentDataset.read_json(files, add_filename=True)

filter_step = nc.ScoreFilter(
                WordCountFilter(min_words=80),
                text_field="text",
                score_field="word_count",
            )

filtered_documents = filter_step(documents)

cleaner = nc.Modify(UnicodeReformatter())
filtered_documents = cleaner(filtered_documents)

filtered_documents.to_json("/workspace/local_data/curator/", write_to_filename=True)

Reading 39217 files


You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('text', 'int64'))



KeyboardInterrupt: 

## Preparing data for continuous pretraining
Creating `*.idx` and `*.bin` files

In [None]:
!python /opt/NeMo/scripts/nlp_language_modeling/preprocess_data_for_megatron.py \
    --input /workspace/local_data/curator/* \
    --json-keys text \
    --tokenizer-library sentencepiece \
    --tokenizer-model /workspace/models/Llama-2-7b-chat-hf/tokenizer.model \
    --output-prefix /workspace/local_data/curator/hackathon

In [None]:
!ls -l /workspace/local_data/clean | wc -l

## Preparing data for LLM tuning

Split text into overlapping chunks

In [None]:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=3000, chunk_overlap=1500)

documents = []
for filename in get_all_files_paths_under("/workspace/local_data/curator/"):
    with open(filename, "r") as f:
        documents.append(json.loads(line)["text"] for line in f.readlines())
document_chunks = [text_splitter.split_text(document) for document in documents]
document_chunks_flat = [chunk for chunks in document_chunks for chunk in chunks]
print(f'{len(document_chunks_flat)} chunks extracted out of {len(documents)} pdf documents')

Define LLM and prompt

In [None]:
INSTRUCTION_PROMPT = """Given the paragraph after <INPUT_START> tag, create a very good geoscience-related question and answer pair. Your output should be in a .json format containing the following fields: ['question', 'answer']
Restrict the question to the context information provided. The questions should use information from passage, but should not refer to the originating text implicitly (you can not use 'according to', 'based on', and similar).
Respond only with .json output, add no other comments. If generating a good question and answer pair is not possible, output <skip> instead.
<INPUT_START>"""
# CHUNKS_TO_PROCESS = 10
CHUNKS_TO_PROCESS = None # means all

llm = ChatNVIDIA(
    model="ai-llama3-70b",
    temperature=0.2,
    max_tokens=256
)

Submit batched requests to the LLM

**WARNING! It will take ±30 min to generate QA paris**

In [None]:
qa_pairs = await llm.abatch(['\n'.join([INSTRUCTION_PROMPT, chunk]) for chunk in document_chunks_flat[:CHUNKS_TO_PROCESS]], 
                            config={"max_concurrency": 10})
qa_pairs = [qa_pair.content for qa_pair in qa_pairs if qa_pair.content != "<skip>"]

In [None]:
pprint(qa_pairs[0])

Parse generated QA pairs in tovalid document

In [None]:
failed_count = 0
warning_count = 0

os.makedirs("/workspace/local_data/out", exist_ok=True)

with open("/workspace/data/out/documents_sft.jsonl", "w") as f:
    for qa_pair in qa_pairs:
        # Checking if json is correct
        try:
            json.loads(qa_pair)
        except json.JSONDecodeError:
            print(f'Failed to read {qa_pair} as a valid JSON')
            failed_count += 1
            continue
        jsonl_line = qa_pair.replace("\n", "").replace('"question":', '"input":').replace('"answer":', '"output":').strip()
        json_line_obj = json.loads(jsonl_line)
        if isinstance(json_line_obj, list):
            print(f'WARNING: {jsonl_line}')
            jsonl_line = json.dumps(json_line_obj[0])
            warning_count += 1
        f.write(jsonl_line + "\n")

print('Done')
print(f'Failed\t{failed_count} / {len(qa_pairs)}')
print(f'Warnings\t{warning_count} / {len(qa_pairs)}')

In [None]:
pprint(jsonl_line)

## Split dataset into train / val / test

In [None]:
import os
import json


def read_and_split(fname: str, out_dir: str):
    # Open the original file
    with open(fname, 'r') as original_file:
        lines = original_file.readlines()

    # Calculate partition sizes
    total_lines = len(lines)
    test_size = int(total_lines * 0.1)
    val_size = int(total_lines * 0.1)
    # The rest goes to the train partition

    print(f'There are {total_lines}--> {test_size}, {val_size}, {total_lines - test_size - val_size}')
    print(f'Iterate over {len(lines)} lines in {fname}')

    with open(os.path.join(out_dir, 'data_test.jsonl'), 'w') as test_file, \
         open(os.path.join(out_dir, 'data_val.jsonl'), 'w') as val_file, \
         open(os.path.join(out_dir, 'data_train.jsonl'), 'w') as train_file:

        # Iterate over each line in the original file
        for i, line in enumerate(lines):
            # Parse JSON data (optional, if you need to manipulate the data)
            json_data = json.loads(line)

            # Convert JSON back to string (if manipulated) or use original line
            # json_line = json.dumps(json_data) if 'manipulate' in locals() else line
            # json_line = str(json.dumps(json_data))
            json_line = line

            # Write to appropriate file based on index
            if i < test_size:
                test_file.write(json_line)
            elif i < test_size + val_size:
                val_file.write(json_line)
            else:
                train_file.write(json_line)

In [None]:
read_and_split('/workspace/data/clean/documents_sft.jsonl', '/workspace/data/')

In [None]:
!head /workspace/data/data_train.jsonl