In [None]:
# READ IN DATASET AND CLEAR THE NEGATIVE SIMALAROITY SCORES
import os
import pandas as pd

INPUT_FILE = "../data/outputs/validation_and_cleaning/FINAL_validated_prompts_with_similarity_MPnet.csv"
OUTPUT_FILE = "../data/outputs/validation_and_cleaning/step1_cleaned_data.csv"
SIMILARITY_COLUMN = "Similarity Score"  # Modify if column name differs
PROMPT_COLUMN = "Prompt"  # Column that contains the actual prompt text

def load_dataset(filepath):
    return pd.read_csv(filepath)

def filter_negative_similarity(df, column_name):
    before = len(df)
    df_filtered = df[df[column_name] >= 0].reset_index(drop=True)
    dropped = before - len(df_filtered)
    print(f"Dropped {dropped} rows due to negative similarity.")
    return df_filtered

def drop_duplicate_prompts(df, column_name):
    before = len(df)
    df_deduped = df.drop_duplicates(subset=column_name).reset_index(drop=True)
    dropped = before - len(df_deduped)
    print(f"Dropped {dropped} duplicate prompt rows.")
    return df_deduped

def save_dataset(df, filepath):
    df.to_csv(filepath, index=False)

def main():
    print("Loading dataset...")
    df = load_dataset(INPUT_FILE)
    print(f"Original dataset size: {len(df)} rows")

    df = filter_negative_similarity(df, SIMILARITY_COLUMN)
    df = drop_duplicate_prompts(df, PROMPT_COLUMN)

    print(f"Final cleaned dataset size: {len(df)} rows")
    save_dataset(df, OUTPUT_FILE)
    print(f"Saved cleaned dataset to: {OUTPUT_FILE}")

if __name__ == "__main__":
    main()


Loading dataset...
Original dataset size: 76822 rows
Dropped 289 rows due to negative similarity.
Dropped 60 duplicate prompt rows.
Final cleaned dataset size: 76473 rows
Saved cleaned dataset to: ../data/outputs/step1_cleaned_data.csv


In [None]:
# RELABEL DATA BASED ON EXPLANATION-INFORMED CATEGORY MATCHING
import re
import pandas as pd

INPUT_FILE = "../data/outputs/validation_and_cleaning/step1_cleaned_data.csv"
OUTPUT_FILE = "../data/outputs/validation_and_cleaning/step2_cleaned_data.csv"
RELABELED_PROMPTS_FILE = "../data/outputs/validation_and_cleaning/subset_relabeled_prompts_with_rule.csv"

PROMPT_COLUMN = "Prompt"
LABEL_COLUMN = "Malicious (0/1)"

# -------------------------------------------------------
# Explanation-Informed Rule Categories from Manual WHYs
# -------------------------------------------------------
CATEGORY_PATTERNS = {
    "internal_policy_request": [
        r"\bparsons\b.*(standard|policy|procedure|rule|process)",
        r"\binternal (compliance|reporting|workflow|guidelines)",
        r"\bhow does parsons (handle|deal with|respond to)"
    ],
    "legal_guidance_request": [
        r"\blegal (implication|advice|requirement|approval)",
        r"\bdo I need to report.*(law|legal)",
        r"\bcan parsons (represent|advise|sue|approve)"
    ],
    "security_breach_attempt": [
        r"\bbypass\b.*(security|logs|detection)",
        r"\bdelete\b.*(logs|history|evidence)",
        r"\bcover (my )?tracks\b",
        r"\berase\b.*(audit trail|activity)",
        r"\banonymize (my|this) access"
    ],
    "impersonation_or_misuse": [
        r"\bimpersonate\b",
        r"\bpretend to be\b",
        r"\bact like\b",
        r"\bforge (identity|credentials)"
    ],
    "ethics_sensitive_request": [
        r"\bwhistleblow(er|ing)?\b",
        r"\breport misconduct\b",
        r"\bfile (a )?complaint\b",
        r"\bretaliation\b",
        r"\bunethical behavior\b"
    ]
}


def match_any_category(text):
    """Check if the prompt matches any explanation-driven category."""
    text = text.lower()
    for category, patterns in CATEGORY_PATTERNS.items():
        for pattern in patterns:
            if re.search(pattern, text):
                return category
    return None


def relabel_prompts(df):
    """Re-label prompts that match any rule category."""
    re_labeled_rows = []

    for idx, row in df.iterrows():
        if row[LABEL_COLUMN] == 0:
            prompt = row[PROMPT_COLUMN]
            matched_category = match_any_category(prompt)
            if matched_category:
                df.at[idx, LABEL_COLUMN] = 1
                re_labeled_rows.append({
                    PROMPT_COLUMN: prompt,
                    "Rule Matched": matched_category
                })

    return df, pd.DataFrame(re_labeled_rows)


def main():
    print("Reading input dataset...")
    df = pd.read_csv(INPUT_FILE)
    print(f"Initial dataset size: {len(df)} rows")

    df, relabeled_df = relabel_prompts(df)

    print(f"Number of prompts re-labeled from 0 to 1: {len(relabeled_df)}")
    df.to_csv(OUTPUT_FILE, index=False)
    relabeled_df.to_csv(RELABELED_PROMPTS_FILE, index=False)

    print(f"Saved relabeled dataset to: {OUTPUT_FILE}")
    print(f"Saved re-labeled prompts to: {RELABELED_PROMPTS_FILE}")

if __name__ == "__main__":
    main()


Reading input dataset...
Initial dataset size: 76473 rows


KeyError: 'Malicious 0/1'