
Notebook 2: Dataset Preparation & Safety Training

In this notebook, we will prepare the dataset for safety training and train the model to be more robust against harmful content attacks.


The safety training dataset consists of safety training data to improve the safety of the model and 
post-training dataset to retrain the accuracy of the model.

The safety training dataset is a curated dataset of safe and unsafe prompts, collected from the following sources:

- Aegis v2
- TODO
- TODO
- TODO

We sample general training data from the [Llama-Nemotron-Post-Trainin-Dataset](https://huggingface.co/datasets/nvidia/Llama-Nemotron-Post-Training-Dataset)

# Preparation

You need to add `HF_TOKEN` and `WANDB_API_KEY` to the environment variables.



In [None]:
import os
import subprocess
import time
from pathlib import Path
from huggingface_hub import hf_hub_download
import shutil

# Base directory and configuration
BASE_DIR = "/lustre/fsw/portfolios/llmservice/users/ahazare"
LOG_DIR = "/lustre/fsw/portfolios/llmservice/users/ahazare/gtc_paris/logs"
SAFETY_DATASET_NAME = "nvidia/Nemotron-Safety-Training-Dataset"
POST_TRAINING_DATASET_NAME = "nvidia/Llama-Nemotron-Post-Training-Dataset"
MODEL_NAME = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
SAFETY_MODEL_NAME = "/lustre/fsw/portfolios/llmservice/users/ahazare/cache/huggingface/llama-3.1-nemoguard-8b-content-safety"

# Credentials
os.environ.update({
    "HF_TOKEN":"<Add your HF_TOKEN>",
    "WANDB_API_KEY": "<Add your WANDB_API_KEY>"
})

In [None]:
# Set environment variables
os.environ.update({
    'TMPDIR': f"{BASE_DIR}/tmp",
    'XDG_CACHE_HOME': f"{BASE_DIR}/cache",
    'HF_HOME': f"{BASE_DIR}/cache/huggingface",
    'UV_CACHE_DIR': f"{BASE_DIR}/cache/uv",
    'TRITON_CACHE_DIR': f"{BASE_DIR}/cache/triton",
    'DATASET_CACHE_DIR': f"{BASE_DIR}/dataset_cache",
    'RAY_TMPDIR': "/tmp/ray_ahazare",
    'LOG_DIR': f"{LOG_DIR}",

})

# Create directories
for dir_path in [os.environ['TMPDIR'], os.environ['XDG_CACHE_HOME'], os.environ['HF_HOME'],
                 os.environ['UV_CACHE_DIR'],os.environ['TRITON_CACHE_DIR'], os.environ['DATASET_CACHE_DIR'], 
                 os.environ['RAY_TMPDIR'], os.environ['LOG_DIR']]:
    Path(dir_path).mkdir(parents=True, exist_ok=True)


# Step 1-a: Create Safety Training Data

We download the safety training dataset and the post-training dataset.

TODO: safety training dataset creation should be done in a separate script.

In [None]:
# 1. Download NV Safety Dataset
result = subprocess.run([
    'python3', 'download_safety_dataset.py',
    '--dataset_name', SAFETY_DATASET_NAME,
    '--filename', 'nemotron-safety-sft-training-blend-v1.0.jsonl',
    '--total_samples', '1000',
    '--cache_dir', os.environ['DATASET_CACHE_DIR']
], check=True)


# Step 1-b: Download Post-Training Data

In [None]:
# 2. Download Llama Nemotron Post-training Dataset

files = [
    "SFT/math/math_v1.1.jsonl",
    "SFT/code/code_v1.1.jsonl",
    "SFT/chat/chat.jsonl",
    "SFT/science/science.jsonl"
]

LLAMA_NEMO_DIR = f"{os.environ['DATASET_CACHE_DIR']}/Llama-Nemotron-Post-Training-Dataset"
Path(LLAMA_NEMO_DIR).mkdir(parents=True, exist_ok=True)

for file in files:
    print(f"Downloading {file}...")
    downloaded_path = hf_hub_download(
        repo_id=POST_TRAINING_DATASET_NAME,
        filename=file,
        repo_type='dataset',
        cache_dir=os.environ['DATASET_CACHE_DIR']
    )
    
    filename = Path(file).name
    target_path = f"{LLAMA_NEMO_DIR}/{filename}"
    
    # Count lines and sample
    with open(downloaded_path, 'r') as f:
        total_lines = sum(1 for _ in f)
    
    print(f"Total lines in file: {total_lines}")
    
    if total_lines > 1000:
        # Use shuf for random sampling
        subprocess.run(['shuf', '-n', '1000', downloaded_path], stdout=open(target_path, 'w'), check=True)
        print(f"Extracted 1000 random samples to {target_path}")
    else:
        shutil.copy2(downloaded_path, target_path)
        print(f"File has fewer than 1000 lines, copied all {total_lines} lines")

In [None]:
# 3. Combine datasets
OUTPUT_DIR = f"{os.environ['DATASET_CACHE_DIR']}/sft_data"
subprocess.run([
    'python3', 'combine_datasets.py',
    '--safety_file', f"{os.environ['DATASET_CACHE_DIR']}/nv_safety_sampled.jsonl",
    '--llama_nemo_dir', LLAMA_NEMO_DIR,
    '--output_dir', OUTPUT_DIR,
    '--val_split', '0.03',
    '--max_tokens', '16384',
    '--max_samples', '5000'
], check=True)


# Step 2: On-policy Data Generation

The key idea of the safety training data is to generate on-policy data with the target model for the safety and post-training datasets.dataset_type
To generate the on-policy data, we need to start the vLLM servers for the target model.

In [None]:
# 4. Start vLLM servers
os.environ.update({
    'VLLM_ENGINE_ITERATION_TIMEOUT_S': '36000',
    'VLLM_ALLOW_LONG_MAX_MODEL_LEN': '1',
    'VLLM_HOST': '0.0.0.0',
    'VLLM_TENSOR_PARALLEL_SIZE': '1',
    'POLICY_MODEL_GPUS': '0,1,2,3',
    'SAFETY_MODEL_GPUS': '4,5'
})

print("Starting policy model server...")
policy_server = subprocess.Popen([
    'python3', '-m', 'vllm.entrypoints.openai.api_server',
    '--model', MODEL_NAME,
    '--trust-remote-code',
    '--seed', '1',
    '--host', os.environ['VLLM_HOST'],
    '--port', '5000',
    '--served-model-name', 'test-model',
    '--enable-reasoning', 
    '--reasoning-parser', 'deepseek_r1',
    '--tensor-parallel-size', os.environ['VLLM_TENSOR_PARALLEL_SIZE'],
    '--download-dir', os.environ['HF_HOME']
], env={**os.environ, 'CUDA_VISIBLE_DEVICES': os.environ['POLICY_MODEL_GPUS']},
   stdout=open(f"{LOG_DIR}/vllm-server-model.log", 'w'),
   stderr=subprocess.STDOUT)

print("Starting safety model server...")
safety_server = subprocess.Popen([
    'python3', '-m', 'vllm.entrypoints.openai.api_server',
    '--model', SAFETY_MODEL_NAME,
    '--trust-remote-code',
    '--seed', '1',
    '--host', os.environ['VLLM_HOST'],
    '--port', '6000',
    '--served-model-name', 'safety-model',
    '--tensor-parallel-size', os.environ['VLLM_TENSOR_PARALLEL_SIZE'],
    '--download-dir', os.environ['HF_HOME']
], env={**os.environ, 'CUDA_VISIBLE_DEVICES': os.environ['SAFETY_MODEL_GPUS']},
   stdout=open(f"{LOG_DIR}/vllm-server-safety.log", 'w'),
   stderr=subprocess.STDOUT)


To terminate the launched vLLM servers, you can run the following command:

```python
subprocess.run(['pkill', '-f', 'vllm.entrypoints.openai.api_server'])
```

In [None]:
# Cleanup vLLM servers
# subprocess.run(['pkill', '-f', 'vllm.entrypoints.openai.api_server'])

In [None]:
def generate_on_policy_data(input_dataset, output_file, log_file, model_name, safety_model, hf_token, 
                            vllm_host="0.0.0.0", vllm_model_port=5000, vllm_safety_port=6000,
                            batch_size=32, max_tokens=512, temperature=0.7, top_p=0.9):
    """
    Generate on-policy data using the specified model and parameters.
    
    Args:
        input_dataset (str): Path to input dataset file
        output_file (str): Path to output file
        log_file (str): Path to log file
        model_name (str): Name of the model to use
        safety_model (str): Path to safety model
        hf_token (str): HuggingFace token
        vllm_host (str): vLLM host address
        vllm_model_port (int): Port for model server
        vllm_safety_port (int): Port for safety server
        batch_size (int): Batch size for generation
        max_tokens (int): Maximum tokens to generate
        temperature (float): Temperature for generation
        top_p (float): Top-p sampling parameter
    
    Returns:
        subprocess.Popen: Process object for the generation
    """
    import subprocess
    
    print(f"Generating responses and safety predictions...")
    print(f"Input dataset: {input_dataset}")
    print(f"Output file: {output_file}")
    
    with open(log_file, 'w') as log:
        process = subprocess.Popen([
            'python3', 'generate_on_policy_data.py',
            '--model_name', model_name,
            '--safety_model', safety_model,
            '--huggingface_token', hf_token,
            '--vllm_host', vllm_host,
            '--vllm_model_port', str(vllm_model_port),
            '--vllm_safety_port', str(vllm_safety_port),
            '--input_dataset', input_dataset,
            '--output', output_file,
            '--batch_size', str(batch_size),
            '--max_tokens', str(max_tokens),
            '--temperature', str(temperature),
            '--top_p', str(top_p)
        ], stdout=log, stderr=subprocess.STDOUT)
    
    return process

# Example usage:
"""
# For training set
train_process = generate_on_policy_data(
    input_dataset=f"{OUTPUT_DIR}/train.jsonl",
    output_file=f"{OUTPUT_DIR}/train_on_policy_data.jsonl",
    log_file=f"{BASE_DIR}/generation_train.log",
    model_name=MODEL_NAME,
    safety_model=SAFETY_MODEL_NAME,
    hf_token=HF_TOKEN
)
train_process.wait()

# For validation set
val_process = generate_on_policy_data(
    input_dataset=f"{OUTPUT_DIR}/val.jsonl",
    output_file=f"{OUTPUT_DIR}/val_on_policy_data.jsonl",
    log_file=f"{BASE_DIR}/generation_val.log",
    model_name=MODEL_NAME,
    safety_model=SAFETY_MODEL_NAME,
    hf_token=HF_TOKEN
)
val_process.wait()
"""

In [None]:
# Kill on-policy
subprocess.run(['pkill', '-f', 'generate_on_policy_data.py'])

In [None]:
f"{OUTPUT_DIR}"

In [None]:
# 5. Generate on-policy data

# On-Policy Data Generation parameters
SAFETY_THRESHOLD = 0.8
CONCURRENCY = 16
MAX_ATTEMPTS = 3
BATCH_SIZE = 64
MAX_TOKENS = 512
TEMPERATURE = 0.7
TOP_P = 0.9

print("Generating on-policy data...")
for dataset_type in ['train', 'val']:
    input_dataset = f"{OUTPUT_DIR}/{dataset_type}.jsonl"
    output_file = f"{OUTPUT_DIR}/{dataset_type}_on_policy_data.jsonl"
    DATASET_TYPE = dataset_type
    subprocess.run([
        'python3', 'generate_on_policy_data.py',
        '--model_name', MODEL_NAME,
        '--safety_model', SAFETY_MODEL_NAME,
        '--huggingface_token', os.environ['HF_TOKEN'],
        '--vllm_host', os.environ['VLLM_HOST'],
        '--vllm_model_port', '5000',
        '--vllm_safety_port', '6000',
        '--concurrency', str(CONCURRENCY),
        '--input_dataset', input_dataset,
        '--output', output_file,
        '--batch_size', str(BATCH_SIZE),
        '--max_tokens', str(MAX_TOKENS),
        '--temperature', str(TEMPERATURE),
        '--top_p', str(TOP_P)
    ], stdout=open(f"{LOG_DIR}/{DATASET_TYPE}_on-policy.log", 'w'),
                   stderr=subprocess.STDOUT)

# Cleanup vLLM servers
# subprocess.run(['pkill', '-f', 'vllm.entrypoints.openai.api_server'])

In [None]:
!ps -aux | grep python

In [None]:
import asyncio
import aiohttp
import json

async def single_request(session, payload, base_url="http://0.0.0.0:5000/v1/chat/completions"):
    headers = {"Content-Type": "application/json"}
    async with session.post(base_url, headers=headers, json=payload) as response:
        return await response.json()

async def query_server():
    concurrency = 4
    parallel_requests = []

    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3600)) as session:
        for i in range(8):
            payload = {
                "model": "test-model",
                "messages": [{"role": "user", "content": f"What is {i} + 1?"}],
                "temperature": 0.6,
                "max_tokens": 32768,
                "top_p": 0.95,
                "n": 1,
            }
            parallel_requests.append(asyncio.create_task(single_request(session, payload)))

            if len(parallel_requests) >= concurrency:
                responses = await asyncio.gather(*parallel_requests)
                parallel_requests = []
                print(len(responses))
                print(responses)

        # handle any remaining tasks
        if parallel_requests:
            responses = await asyncio.gather(*parallel_requests)
            print(len(responses))
            print(responses)

await query_server()

In [None]:
!python3 clients.py

In [None]:
import pandas as pd

In [None]:
df = pd.read_json('../dataset_cache/sft_data/train_on_policy_data.jsonl', lines=True)

In [None]:
df_train = pd.read_json('../dataset_cache/sft_data/train.jsonl', lines=True)

In [None]:
df_train.shape

In [None]:
df.shape

In [None]:
df['safety_metrics'].iloc[0]

In [None]:
df

In [None]:
!python3 clients.py

In [None]:
# 6. Run SFT
print("Running SFT...")
os.chdir(f"{BASE_DIR}/NeMo-RL")
# Set up model directory environment variable
MODEL_DIR = f"{BASE_DIR}/NeMo-RL/results/sft_deepseek_8b_trial_step_300"

subprocess.run(['uv', 'run', 'python', 'examples/run_sft.py', 
                '--config', f"{BASE_DIR}/gtc_paris/deepseek_sft.yaml"
               ], 
               env={**os.environ, 'TMPDIR': os.environ['RAY_TMPDIR']},
               stdout=open(f"{MODEL_DIR}/sft.log", 'w'),
               check=True)

In [None]:
# 7. Convert checkpoint
MODEL_DIR = f"{BASE_DIR}/NeMo-RL/results/sft_deepseek_8b_trial_step_300/step_200"
DCP_CKPT_PATH = f"{MODEL_DIR}/policy/weights/"
CONFIG_PATH = f"{MODEL_DIR}/config.yaml"
HF_CKPT_PATH = f"{MODEL_DIR}/hf_ckpt"

print("Converting checkpoint...")
os.chdir(f"{BASE_DIR}/NeMo-RL")
subprocess.run([
    'uv', 'run', 'examples/convert_dcp_to_hf.py',
    '--config', CONFIG_PATH,
    '--dcp-ckpt-path', DCP_CKPT_PATH,
    '--hf-ckpt-path', HF_CKPT_PATH
], check=True)

# Verify conversion
if Path(f"{HF_CKPT_PATH}/pytorch_model.bin").exists() and Path(f"{HF_CKPT_PATH}/config.json").exists():
    print("Conversion successful!")
    print(f"The HuggingFace model is now available at: {HF_CKPT_PATH}")
else:
    print("Conversion may have failed. Please check the output.")