# Code Crimes Classification — Synthetic Data Generation

This notebook uses NeMo Data Designer to:
1. Ingest vulnerable code from the CyberNative/Code_Vulnerability_Security_DPO dataset
2. Classify each snippet against 5 CODE_CRIMES using Claude Sonnet
3. Score classified examples with self-hosted Llama-3.1-Nemotron-70B-Reward
4. Save the labeled dataset for fine-tuning a malicious coding agent detector

In [None]:
import data_designer.config as dd
from data_designer.interface import DataDesigner
from datasets import load_dataset
from openai import OpenAI
from tqdm import tqdm
import pandas as pd

## Load & Prepare Dataset

Load the CyberNative/Code_Vulnerability_Security_DPO dataset from HuggingFace.
Adjust `NUM_SAMPLES` to control how many data points to ingest.

In [None]:
NUM_SAMPLES = 500  # <-- adjust this to change the number of data points

CODE_CRIMES = [
    "secret_hardcoding",
    "scope_boundary_violation",
    "unauthorized_dependency_injection",
    "backdoor_insertion",
    "test_suppression",
]

# Load dataset
ds = load_dataset("CyberNative/Code_Vulnerability_Security_DPO", split="train")

# Sample and convert to DataFrame
ds_sample = ds.shuffle(seed=42).select(range(min(NUM_SAMPLES, len(ds))))
code_df = ds_sample.to_pandas()

# Keep relevant columns
code_df = code_df[["lang", "vulnerability", "rejected", "chosen"]].reset_index(drop=True)

# Save as seed CSV
seed_path = "code_vulnerability_seed.csv"
code_df.to_csv(seed_path, index=False)

print(f"Seed dataset: {len(code_df)} code snippets saved to {seed_path}")
print(f"\nLanguage distribution:")
print(code_df["lang"].value_counts())
print(f"\nVulnerability types:")
print(code_df["vulnerability"].value_counts())

## Configure Data Designer

Claude Sonnet via Anthropic provider for classification.

In [None]:
model_configs = [
    dd.ModelConfig(
        alias="sonnet",
        model="claude-sonnet-4-20250514",
        provider="anthropic",
        inference_parameters=dd.ChatCompletionInferenceParams(
            temperature=0.0,
            max_tokens=1024,
        ),
    ),
]

data_designer = DataDesigner()
config_builder = dd.DataDesignerConfigBuilder(model_configs=model_configs)

# Seed with vulnerability dataset
seed_source = dd.LocalFileSeedSource(path=seed_path)
config_builder.with_seed_dataset(seed_source)

print("Data Designer initialized with code vulnerability seed dataset")

## Define Classification Column

Claude Sonnet classifies each vulnerable code snippet against the 5 CODE_CRIMES.
Returns the matching crime name or `none` if it doesn't fit any category.

In [None]:
config_builder.add_column(
    dd.LLMTextColumnConfig(
        name="crime_name",
        model_alias="sonnet",
        system_prompt="""You are a code security classifier. Given a vulnerable code snippet, determine which of the following crime categories it falls under.

Crime categories:
- secret_hardcoding: Code contains hardcoded secrets, API keys, passwords, tokens, or credentials embedded directly in source code rather than loaded from secure configuration.
- scope_boundary_violation: Code accesses files, directories, memory, network resources, or system calls outside its intended scope or privilege level. Includes path traversal, privilege escalation, and unauthorized resource access.
- unauthorized_dependency_injection: Code imports, installs, or loads unauthorized or malicious packages, libraries, or modules. Includes supply chain attacks, typosquatting imports, or dynamically loading untrusted code.
- backdoor_insertion: Code contains hidden functionality that bypasses normal authentication or authorization, creates covert access channels, or enables remote code execution. Includes reverse shells, hidden admin accounts, and logic bombs.
- test_suppression: Code deliberately disables, skips, weakens, or removes tests, assertions, validation checks, or security controls. Includes mocking security functions to always pass, or catching and silencing exceptions.

Respond with ONLY the crime name (one of: secret_hardcoding, scope_boundary_violation, unauthorized_dependency_injection, backdoor_insertion, test_suppression) if the code matches a category.
Respond with ONLY the word "none" if the code does not match any category.
No explanation, no punctuation, no other text.""",
        prompt="""Language: {{ lang }}
Vulnerability type: {{ vulnerability }}

Vulnerable code:
{{ rejected }}""",
    )
)

## Define Validation Column

Ensure the classification output is a valid crime name or `none`.

In [None]:
VALID_LABELS = CODE_CRIMES + ["none"]


def validate_classification(df: pd.DataFrame) -> pd.DataFrame:
    results = []
    for _, row in df.iterrows():
        label = str(row.get("crime_name", "")).strip().lower()
        is_valid = label in VALID_LABELS
        results.append({
            "is_valid": is_valid,
            "error": None if is_valid else f"Invalid label: '{label}'. Expected one of {VALID_LABELS}",
        })
    return pd.DataFrame(results)


config_builder.add_column(
    dd.ValidationColumnConfig(
        name="classification_valid",
        target_columns=["crime_name"],
        validator_type=dd.ValidatorType.LOCAL_CALLABLE,
        validator_params=dd.LocalCallableValidatorParams(
            validation_function=validate_classification,
        ),
    )
)

## Reasoning Column

Claude explains *why* the code snippet was classified under a specific crime. This reasoning is also used as the assistant response for the reward model.

In [None]:
config_builder.add_column(
    dd.LLMTextColumnConfig(
        name="reasoning",
        model_alias="sonnet",
        system_prompt="""You are a code security analyst. You have just classified a code snippet under the crime category "{{ crime_name }}".

{% if crime_name == "secret_hardcoding" %}
Crime — secret_hardcoding: Code contains hardcoded secrets, API keys, passwords, tokens, or credentials embedded directly in source code rather than loaded from secure configuration.
{% elif crime_name == "scope_boundary_violation" %}
Crime — scope_boundary_violation: Code accesses files, directories, memory, network resources, or system calls outside its intended scope or privilege level. Includes path traversal, privilege escalation, and unauthorized resource access.
{% elif crime_name == "unauthorized_dependency_injection" %}
Crime — unauthorized_dependency_injection: Code imports, installs, or loads unauthorized or malicious packages, libraries, or modules. Includes supply chain attacks, typosquatting imports, or dynamically loading untrusted code.
{% elif crime_name == "backdoor_insertion" %}
Crime — backdoor_insertion: Code contains hidden functionality that bypasses normal authentication or authorization, creates covert access channels, or enables remote code execution. Includes reverse shells, hidden admin accounts, and logic bombs.
{% elif crime_name == "test_suppression" %}
Crime — test_suppression: Code deliberately disables, skips, weakens, or removes tests, assertions, validation checks, or security controls. Includes mocking security functions to always pass, or catching and silencing exceptions.
{% endif %}

Write a concise explanation (2-4 sentences) of WHY this code matches that crime category. Reference specific lines, patterns, or constructs in the code that demonstrate the vulnerability. If the classification is "none", explain why the code does not match any of the 5 crime categories.

Respond with ONLY the explanation. No labels, no headers.""",
        prompt="""Language: {{ lang }}
Classification: {{ crime_name }}

Code:
{{ rejected }}""",
    )
)

## Preview

Generate a small sample to inspect classification quality.

In [None]:
preview = data_designer.preview(config_builder=config_builder, num_records=3)
preview.display_sample_record()

In [None]:
preview.dataset

## Full Classification

Classify all sampled code snippets.

In [None]:
results = data_designer.create(
    config_builder=config_builder,
    num_records=NUM_SAMPLES,
    dataset_name="code-crime-classification",
)

dataset = results.load_dataset()
print(f"Classified {len(dataset)} code snippets")
dataset.head()

In [None]:
# Analysis report
analysis = results.load_analysis()
analysis.to_report()

## Filter & Analyze

Keep only rows that matched a CODE_CRIME (drop `none`).

In [None]:
# Normalize labels
dataset["crime_name"] = dataset["crime_name"].str.strip().str.lower()

# Filter to matched crimes only
matched = dataset[dataset["crime_name"].isin(CODE_CRIMES)].reset_index(drop=True)
unmatched = dataset[~dataset["crime_name"].isin(CODE_CRIMES)]

print(f"Matched a CODE_CRIME: {len(matched)} / {len(dataset)} ({100*len(matched)/len(dataset):.1f}%)")
print(f"No match (none): {len(unmatched)}")
print(f"\nCrime distribution:")
print(matched["crime_name"].value_counts())
print(f"\nBy language:")
print(matched.groupby(["crime_name", "lang"]).size().unstack(fill_value=0))

## Reward Model Evaluation

Score each classified example with a self-hosted Llama-3.1-Nemotron-70B-Reward.
The reward model has a 4,096 token context limit (~14,000 chars). Entries exceeding this are discarded.
The reasoning column is used as the assistant response — giving the reward model something meaningful to score.

In [None]:
REWARD_MODEL_ENDPOINT = "http://localhost:8000/v1"  # <-- adjust to your endpoint
REWARD_MODEL_NAME = "nvidia/Llama-3.1-Nemotron-70B-Reward"  # <-- adjust to match your served model name

# ~4 chars per token, 4096 token limit, leave headroom for chat template overhead
MAX_CHAR_LEN = 14000

reward_client = OpenAI(base_url=REWARD_MODEL_ENDPOINT, api_key="unused")


def get_reward_score(user_content: str, assistant_content: str) -> float:
    """Score a user/assistant exchange via the self-hosted Nemotron reward model."""
    response = reward_client.chat.completions.create(
        model=REWARD_MODEL_NAME,
        messages=[
            {"role": "user", "content": user_content},
            {"role": "assistant", "content": assistant_content},
        ],
    )
    return response.choices[0].logprobs.content[0].logprob


# Filter out entries that exceed the reward model's context limit
scoreable = matched[
    (matched["rejected"].str.len() + matched["reasoning"].str.len()) < MAX_CHAR_LEN
].copy()
skipped = len(matched) - len(scoreable)
print(f"Scoring {len(scoreable)} entries ({skipped} skipped — exceeded {MAX_CHAR_LEN} char limit)")

# Score each entry: user=code review request, assistant=the reasoning explanation
reward_scores = []
for _, row in tqdm(scoreable.iterrows(), total=len(scoreable), desc="Scoring with Nemotron Reward"):
    user_msg = f"Review this {row['lang']} code for security issues:\n\n{row['rejected']}"
    assistant_msg = row["reasoning"]
    score = get_reward_score(user_msg, assistant_msg)
    reward_scores.append(score)

scoreable["reward_score"] = reward_scores

# Merge scores back — entries that were skipped get NaN
matched = matched.merge(
    scoreable[["reward_score"]],
    left_index=True,
    right_index=True,
    how="left",
)

print(f"\nReward scores — mean: {matched['reward_score'].mean():.3f}, "
      f"std: {matched['reward_score'].std():.3f}")
print(f"Entries with scores: {matched['reward_score'].notna().sum()}")
print(f"Entries skipped (too long): {matched['reward_score'].isna().sum()}")
print(f"\nBy crime:")
print(matched.groupby("crime_name")["reward_score"].describe())

## Save Dataset

In [None]:
# Save matched (crime-labeled) dataset
matched.to_parquet("code_crime_classified.parquet", index=False)
matched.to_csv("code_crime_classified.csv", index=False)

print(f"Dataset saved: {len(matched)} records")
print(f"  - code_crime_classified.parquet")
print(f"  - code_crime_classified.csv")
print(f"\nCrime distribution:")
print(matched["crime_name"].value_counts())
print(f"\nLanguage distribution:")
print(matched["lang"].value_counts())