In [1]:
# Step 1: Import Libraries
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import json
import os

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [3]:
def load_json_data(file_paths):
    """
    Loads and parses JSON files, recursively extracting 'question', 'type', and 'intent'.
    This function is made more robust to handle varied nesting levels.
    """
    all_data = []

    def extract_qa_items(obj):
        """Recursively extracts Q&A items from a nested JSON object or list."""
        if isinstance(obj, dict):
            if 'question' in obj and 'intent' in obj and 'type' in obj:
                # Found a Q&A item
                all_data.append({
                    'question': obj['question'],
                    'intent': obj['intent'],
                    'type': obj['type']
                })
            else:
                # Recurse into dictionary values
                for key, value in obj.items():
                    extract_qa_items(value)
        elif isinstance(obj, list):
            # Recurse into list elements
            for item in obj:
                extract_qa_items(item)

    for file_path in file_paths:  # Changed from file_files to file_paths
        try:
            # Read file directly from filesystem
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
                extract_qa_items(data)  # Start recursive extraction from the root of the JSON
        except FileNotFoundError:
            print(f"Error: {file_path} not found.")
        except json.JSONDecodeError:
            print(f"Error: Could not decode JSON from {file_path}.")
        except Exception as e:
            print(f"An unexpected error occurred while processing {file_path}: {e}")

    return pd.DataFrame(all_data)

# Define file paths
file_paths = [
    r'D:\OWASP_BERT\QA_Pairs\Enhanced_QA\A01_2021.json',
    r'D:\OWASP_BERT\QA_Pairs\Enhanced_QA\A02_2021.json',
    r'D:\OWASP_BERT\QA_Pairs\Enhanced_QA\A03_2021.json',
    r'D:\OWASP_BERT\QA_Pairs\Enhanced_QA\A04_2021.json',
    r'D:\OWASP_BERT\QA_Pairs\Enhanced_QA\A05_2021.json',
    r'D:\OWASP_BERT\QA_Pairs\Enhanced_QA\A06_2021.json',
    r'D:\OWASP_BERT\QA_Pairs\Enhanced_QA\A07_2021.json',
    r'D:\OWASP_BERT\QA_Pairs\Enhanced_QA\A08_2021.json',
    r'D:\OWASP_BERT\QA_Pairs\Enhanced_QA\A09_2021.json',
    r'D:\OWASP_BERT\QA_Pairs\Enhanced_QA\A10_2021.json'
]

# Load data into a DataFrame
print("Loading JSON files...")
df = load_json_data(file_paths)

# Print debug info
print("\nDataFrame Info:")
print(df.info())
print("\nDataFrame Head:")
print(df.head())

# Only try to access these columns if they exist
if not df.empty:
    if 'intent' in df.columns:
        print("\nUnique Intents:")
        print(df['intent'].value_counts())
    else:
        print("\nNo 'intent' column found in the DataFrame.")
        print("Available columns:", df.columns.tolist())
    
    if 'type' in df.columns:
        print("\nUnique Types:")
        print(df['type'].value_counts())
    else:
        print("\nNo 'type' column found in the DataFrame.")

Loading JSON files...

DataFrame Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3775 entries, 0 to 3774
Data columns (total 3 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   question  3775 non-null   object
 1   intent    3775 non-null   object
 2   type      3775 non-null   object
dtypes: object(3)
memory usage: 88.6+ KB
None

DataFrame Head:
                                            question  \
0  What does 'Broken Access Control' refer to in ...   
1  What does 'Broken Access Control' refer to in ...   
2  What does 'Broken Access Control' refer to in ...   
3  What does 'Broken Access Control' refer to in ...   
4  What does 'Broken Access Control' refer to in ...   

                         intent                 type  
0  define_broken_access_control  basic_understanding  
1  define_broken_access_control  basic_understanding  
2  define_broken_access_control  basic_understanding  
3  define_broken_access_control  basic_unders

In [4]:
# Step 4: Prepare Labels for Classification & Handle Single-Occurrence Intents
# Identify intents with only one occurrence
intent_counts = df['intent'].value_counts()
single_occurrence_intents = intent_counts[intent_counts == 1].index

print(f"\nFound {len(single_occurrence_intents)} intents with only one occurrence.")

# Duplicate rows for single-occurrence intents
rows_to_duplicate = df[df['intent'].isin(single_occurrence_intents)]
df_processed = pd.concat([df, rows_to_duplicate], ignore_index=True)

print(f"DataFrame shape after duplicating single-occurrence intents: {df_processed.shape}")
print("\nUnique Intents (after duplication):")
print(df_processed['intent'].value_counts()) # Verify counts are now >= 2 for all

# We will encode 'intent' labels into numerical IDs.
# All intents, including those duplicated, will be included.
label_encoder = LabelEncoder()
df_processed['intent_id'] = label_encoder.fit_transform(df_processed['intent'])

# Create a mapping from intent_id back to intent string and type string for inference
id_to_intent = {i: intent for i, intent in enumerate(label_encoder.classes_)}
# Create a dictionary to map each intent to its corresponding type
intent_to_type = df_processed.set_index('intent')['type'].to_dict()

print(f"\nNumber of unique intents (all included, after duplication): {len(label_encoder.classes_)}")
print(f"Example mapping (ID to Intent): {list(id_to_intent.items())[:5]}")



Found 2979 intents with only one occurrence.
DataFrame shape after duplicating single-occurrence intents: (6754, 3)

Unique Intents (after duplication):
intent
basic_understanding                   60
vulnerability_identification          60
prevention_methods                    53
references                            50
define_broken_access_control          50
                                      ..
explain_idor_technical                 2
explain_missing_api_access_control     2
explain_least_privilege_principle      2
explain_rate_limiting_mechanism        2
explain_force_browsing                 2
Name: count, Length: 3162, dtype: int64

Number of unique intents (all included, after duplication): 3162
Example mapping (ID to Intent): [(0, 'CI_CD_for_component_management'), (1, 'SBOM_role_in_vulnerability_management'), (2, 'absence_of_abuse_case_modeling'), (3, 'absence_of_business_logic_validation'), (4, 'absence_of_logging_monitoring')]


In [5]:
# Step 5: Split Data into Training and Validation Sets
# IMPORTANT: Adjusted `test_size` to ensure it's greater than or equal to the number of unique classes,
# allowing `stratify` to work correctly.
train_df, val_df = train_test_split(df_processed, test_size=0.5, random_state=42, stratify=df_processed['intent'])

print(f"\nTraining set size: {len(train_df)}")
print(f"Validation set size: {len(val_df)}")


Training set size: 3377
Validation set size: 3377


In [6]:
# Step 6: Tokenization and Custom Dataset
try:
    # CHANGED: Using 'roberta-base' tokenizer
    tokenizer = AutoTokenizer.from_pretrained('roberta-base')
    print(f"Tokenizer loaded successfully. Max length: {tokenizer.model_max_length}")
except Exception as e:
    print(f"Error loading tokenizer: {e}")
    print("This might be due to network issues, missing required libraries, or corrupted cache.")
    print("Please ensure your environment has the necessary dependencies for RoBERTa-base.")
    print("If issue persists, try clearing your Hugging Face cache (usually at ~/.cache/huggingface).")
    exit() # Exit if tokenizer loading fails, as subsequent steps will also fail


class IntentDataset(Dataset):
    def __init__(self, questions, intent_ids, tokenizer):
        self.encodings = tokenizer(questions.tolist(), truncation=True, padding=True, max_length=128)
        self.intent_ids = intent_ids.tolist()

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.intent_ids[idx])
        return item

    def __len__(self):
        return len(self.intent_ids)

train_dataset = IntentDataset(train_df['question'], train_df['intent_id'], tokenizer)
val_dataset = IntentDataset(val_df['question'], val_df['intent_id'], tokenizer)

print(f"\nExample from training dataset (first item):")
print(train_dataset[0])

Tokenizer loaded successfully. Max length: 512

Example from training dataset (first item):
{'input_ids': tensor([    0,  2264,  8418,   109,   230,  9112,    29,   215,    25,   230,
         9112,    12,  1549,     8,   230,  9112,    12,   401,  1225,   694,
           77, 18999,   573,  3834, 43163, 27975,  6732,   116,     2,     1,
            1,     1,     1,     1,     1,     1,     1,     1,     1]), 'attention_mask': tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), 'labels': tensor(417)}


In [8]:
# Step 7: Fine-tuning the Model
# CHANGED: Using 'roberta-base' for AutoModelForSequenceClassification
model = AutoModelForSequenceClassification.from_pretrained(
    'roberta-base',
    num_labels=len(label_encoder.classes_) # Number of unique intents (all included, after duplication)
).to(device)

# Define Training Arguments
training_args = TrainingArguments(
    output_dir='./results_intent_classifier',
    num_train_epochs=5,
    per_device_train_batch_size=4, # Keep batch size small for MX 550
    per_device_eval_batch_size=4,
    warmup_steps=100,
    weight_decay=0.01,
    logging_dir='./logs_intent_classifier',
    logging_steps=50,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    fp16=torch.cuda.is_available(),
    gradient_accumulation_steps=2, # Crucial for MX 550 with this model size
)

# Initialize the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer
)

print("\nStarting intent classification model training...")
trainer.train()
print("\nIntent classification model training complete.")

# Save the fine-tuned model and tokenizer
model_save_path = "./fine_tuned_intent_classifier"
trainer.save_model(model_save_path)
tokenizer.save_pretrained(model_save_path)

# Save the label encoder classes and the intent_to_type mapping for later use
import joblib
joblib.dump(label_encoder, os.path.join(model_save_path, 'label_encoder.joblib'))
with open(os.path.join(model_save_path, 'intent_to_type.json'), 'w') as f:
    json.dump(intent_to_type, f)

print(f"\nFine-tuned intent classifier model, tokenizer, label_encoder, and intent_to_type mapping saved to: {model_save_path}")

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  trainer = Trainer(



Starting intent classification model training...


Epoch,Training Loss,Validation Loss
1,8.0827,8.064827
2,8.0705,8.077924
3,8.0529,7.993279
4,8.02,7.97841
5,7.9714,7.953343



Intent classification model training complete.

Fine-tuned intent classifier model, tokenizer, label_encoder, and intent_to_type mapping saved to: ./fine_tuned_intent_classifier


In [9]:
# Step 8: Test the Fine-tuned Model (Inference Example)
print("\n--- Testing Intent Classifier ---")
loaded_tokenizer = AutoTokenizer.from_pretrained(model_save_path)
loaded_model = AutoModelForSequenceClassification.from_pretrained(model_save_path).to(device)
loaded_label_encoder = joblib.load(os.path.join(model_save_path, 'label_encoder.joblib'))
with open(os.path.join(model_save_path, 'intent_to_type.json'), 'r') as f:
    loaded_intent_to_type = json.load(f)

def classify_intent(text, model, tokenizer, label_encoder, intent_to_type, device):
    inputs = tokenizer(text, return_tensors='pt', truncation=True, padding=True, max_length=128).to(device)
    with torch.no_grad():
        outputs = model(**inputs)
    logits = outputs.logits
    predicted_class_id = torch.argmax(logits, dim=1).item()
    predicted_intent = label_encoder.inverse_transform([predicted_class_id])[0]
    predicted_type = intent_to_type.get(predicted_intent, "Unknown Type")
    return predicted_intent, predicted_type

# Test cases
test_questions = [
    "What does 'Broken Access Control' refer to in the context of secure web applications?",
    "How to prevent Cross-Site Scripting?",
    "Explain the concept of IDOR.",
    "Why should applications implement secure fallback mechanisms for cryptographic failures?",
    "How does using strong cryptographic techniques to protect access tokens help?",
    "What are common weak encryption algorithms?",
    "Describe the impact of Broken Access Control on a system."
]

for q in test_questions:
    intent, q_type = classify_intent(q, loaded_model, loaded_tokenizer, loaded_label_encoder, loaded_intent_to_type, device)
    print(f"\nQuestion: \"{q}\"")
    print(f"  Predicted Intent: {intent}")
    print(f"  Inferred Type: {q_type}")

print("\nStage 1: Intent classification training and testing complete with RoBERTa-base (all intents included, duplicated rares).")


--- Testing Intent Classifier ---

Question: "What does 'Broken Access Control' refer to in the context of secure web applications?"
  Predicted Intent: vulnerability_identification
  Inferred Type: vulnerability_identification

Question: "How to prevent Cross-Site Scripting?"
  Predicted Intent: vulnerability_identification
  Inferred Type: vulnerability_identification

Question: "Explain the concept of IDOR."
  Predicted Intent: vulnerability_identification
  Inferred Type: vulnerability_identification

Question: "Why should applications implement secure fallback mechanisms for cryptographic failures?"
  Predicted Intent: vulnerability_identification
  Inferred Type: vulnerability_identification

Question: "How does using strong cryptographic techniques to protect access tokens help?"
  Predicted Intent: vulnerability_identification
  Inferred Type: vulnerability_identification

Question: "What are common weak encryption algorithms?"
  Predicted Intent: vulnerability_identification
