In [None]:
from transformers import AutoModelForMaskedLM, AutoTokenizer
import torch

# Load the SecureBERT 2.0 model and tokenizer
model_name = "cisco-ai/SecureBERT2.0-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForMaskedLM.from_pretrained(model_name)

# Example sentence with a [MASK] token
text = "The malware exploits a vulnerability in the [MASK] system."

# Tokenize the text and get model outputs
inputs = tokenizer(text, return_tensors="pt")
with torch.no_grad():
    outputs = model(**inputs)

# Identify the index of the [MASK] token
mask_token_index = (inputs.input_ids == tokenizer.mask_token_id).nonzero(as_tuple=True)[1]

# Get prediction scores for the [MASK] position only
mask_token_logits = outputs.logits[0, mask_token_index, :]

# Pick top 5 likely predictions
top_5_tokens = torch.topk(mask_token_logits, 5, dim=1).indices[0].tolist()

print("Top 5 predictions for [MASK]:")
for token in top_5_tokens:
    print(f"→ {tokenizer.decode([token])}")


  from .autonotebook import tqdm as notebook_tqdm


Top 5 predictions for [MASK]:
→  operating
→  Windows
→  Linux
→  file
→  banking


In [None]:
import json
from pathlib import Path
import torch
import csv
from transformers import AutoTokenizer, AutoModelForMaskedLM
from collections import Counter

# --- Rule-based IoT classifier ---
def classify_cve_iot_category(cve_json):
    text_fields = []
    try:
        cna = cve_json["containers"]["cna"]
        for desc in cna.get("descriptions", []):
            text_fields.append(desc.get("value", "").lower())
        for aff in cna.get("affected", []):
            text_fields.append(aff.get("vendor", "").lower())
            text_fields.append(aff.get("product", "").lower())
    except KeyError:
        pass

    text = " ".join(text_fields)

    if any(k in text for k in ["router", "tplink", "d-link", "asus", "home", "smart", "tv", "soho", "camera"]):
        return "H (Home)"
    elif any(k in text for k in ["plc", "scada", "industrial", "ics", "automotive", "car", "vehicle", "sensor", "medical"]):
        return "S (SCADA/Industrial)"
    elif any(k in text for k in ["server", "enterprise", "network", "switch", "firewall", "keycloak", "vpn", "red hat", "cisco", "juniper"]):
        return "E (Enterprise)"
    elif any(k in text for k in ["android", "ios", "mobile", "tablet", "smartwatch", "phone"]):
        return "M (Mobile)"
    elif any(k in text for k in ["windows", "linux", "ubuntu", "intel nuc", "pc", "laptop", "desktop"]):
        return "P (PC/Server)"
    elif any(k in text for k in ["printer", "copier", "projector", "multimedia", "display"]):
        return "A (Other Non-Home Appliances)"
    else:
        return "Unknown"

# --- Label normalization ---
rule_map = {
    "H (Home)": "Home",
    "S (SCADA/Industrial)": "SCADA",
    "E (Enterprise)": "Enterprise",
    "M (Mobile)": "Mobile",
    "P (PC/Server)": "PC",
    "A (Other Non-Home Appliances)": "Other",
    "Unknown": "Unknown"
}

# --- SecureBERT 2.0 setup ---
model_name = "cisco-ai/SecureBERT2.0-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForMaskedLM.from_pretrained(model_name)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()

iot_labels = ["Home", "SCADA", "Enterprise", "Mobile", "PC", "Other"]

# --- Root folder containing JSONs ---
root_folder = Path("~/updatOR/data/dataset_fw").expanduser()
json_files = list(root_folder.rglob("*.json"))
print(f"Found {len(json_files)} JSON files.")

# --- Counters ---
mismatch_counter = 0
rule_counter = Counter()
nlp_counter = Counter()

# --- CSV output ---
output_csv = "iot_classification_results.csv"
with open(output_csv, mode="w", newline="", encoding="utf-8") as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(["File", "Rule-based", "NLP-based", "Match"])

    for json_file in json_files:
        try:
            with open(json_file) as f:
                cve_data = json.load(f)
        except Exception as e:
            print(f"Error loading {json_file}: {e}")
            continue

        # --- Rule-based classification ---
        rule_class = classify_cve_iot_category(cve_data)
        rule_norm = rule_map.get(rule_class, "Unknown")
        rule_counter[rule_norm] += 1

        # --- Extract description ---
        cve_description = ""
        try:
            descriptions = cve_data["containers"]["cna"].get("descriptions", [])
            if descriptions:
                cve_description = descriptions[0].get("value", "")
        except KeyError:
            pass

        # --- NLP-based classification ---
        text = f"This vulnerability affects a {tokenizer.mask_token} device. {cve_description}"
        inputs = tokenizer(text, return_tensors="pt").to(device)

        with torch.no_grad():
            outputs = model(**inputs).logits

        mask_index = (inputs.input_ids == tokenizer.mask_token_id).nonzero(as_tuple=True)[1]

        # --- Score each label (support multi-token labels) ---
        label_scores = {}
        for label in iot_labels:
            label_tokens = tokenizer.tokenize(label)
            token_ids = tokenizer.convert_tokens_to_ids(label_tokens)
            score = outputs[0, mask_index, token_ids].sum().item() if len(token_ids) > 0 else float('-inf')
            label_scores[label] = score

        ml_class = max(label_scores, key=label_scores.get)
        nlp_counter[ml_class] += 1

        match_status = "MATCH" if ml_class == rule_norm else "DIFFER"
        if match_status == "DIFFER":
            mismatch_counter += 1

        print(f"{json_file.name}: Rule={rule_class}, NLP={ml_class}, {match_status}")

        # --- Write to CSV ---
        writer.writerow([json_file.name, rule_class, ml_class, match_status])

# --- Print summary ---
print("\n=== Summary ===")
print(f"Total mismatches: {mismatch_counter}")
print("Rule-based category counts:", dict(rule_counter))
print("NLP-based category counts:", dict(nlp_counter))
print(f"\nAll results saved to {output_csv}")
