In [1]:
import pandas as pd
import torch
from keybert import KeyBERT
from transformers import AutoModel, AutoTokenizer
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm


model_path = "D:\\Italy\\sbert\\SBert models\\AC\\checkpoint_dataset5"
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModel.from_pretrained(model_path, trust_remote_code=True)


device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)


kw_model = KeyBERT(model=model)


df = pd.read_csv("D:\\Italy\\final.csv")


def extract_keywords(text):
    if isinstance(text, str):
        
        with torch.no_grad():
            keywords = kw_model.extract_keywords(text, keyphrase_ngram_range=(1, 1), stop_words=None, top_n=5)
        return [(kw[0], 0) for kw in keywords]  
    return []


def process_data(df):
    results = []
    with ThreadPoolExecutor(max_workers=8) as executor:
        for res in tqdm(executor.map(extract_keywords, df["text"]), total=len(df)):
            results.append(res)
    return results

df["keywords"] = process_data(df)


df.to_csv("output2.csv", index=False)

  from .autonotebook import tqdm as notebook_tqdm





100%|██████████| 137521/137521 [2:09:01<00:00, 17.77it/s]  


In [2]:
import pandas as pd
import ast

label_to_words = {


"L":[
  "easy", "simple", "straightforward", "trivial", "basic", "minimal", "effortless", "direct", "accessible", "plain", "smooth", "fluent", "breezy", "elementary", "naive", "obvious", "open", "free", "universal", "userfriendly",
  "standard", "typical", "default", "common", "routine", "mundane", "ordinary", "frequent", "regular", "normal", "feasible", "quick", "fast", "handy", "convenient", "immediate", "automatic", "autopilot", "manageable", "uncomplicated",
  "relaxed", "mild", "gentle", "lenient", "lowlevel", "slight", "generic", "trivial2", "direct2", "prebuilt", "arbitrary", "execution", "execute", "privilege", "privileges", "elevation", "bypass", "aslr", "reparse", "mount",
  "kernel", "heap", "corruption", "crafted", "spoofing", "silverlight", "denial", "disclosure", "xss", "sandbox", "bypassing", "spoof", "malicious", "overflow", "injection", "xxe", "dos", "service", "rce", "escalation",
  "memory", "clickjacking", "csrf", "bruteforce", "cleartext", "logfiles", "plaintext", "sessionfixation", "incompleteblacklist", "weakpermissions", "hardcoded", "typeconfusion", "fileupload", "directorytraversal", "dirtraversal", "sqli", "dllhijacking", "overread", "underflow", "telnet",
  "replayattack", "missingnonce", "unauthenticated", "spoofed", "offbyone", "catastrophicbacktracking", "starttls", "sniffing", "exposed", "worldreadable", "anonymousroot", "unrestricted", "insecurepermissions", "lackacl", "lack2fa", "possessionaccount", "insecure", "twofactorauth", "nolimit", "lackratelimiting",
  "unquoted", "predictableseed", "timingsidechannel", "downgrade", "birthdayattack", "weakcrypt", "noencryption", "nolocalencryption", "unquotedsearchpath", "unspecifiedvectors", "insecurehash", "localuser", "nossl", "ssrf", "repeatable", "unconditional", "automated", "reliable", "predictable", "instant",
  "simplified", "scriptable", "plugandplay", "nospecialrequirements", "nointeraction", "readytouse", "basicsteps", "minimalprerequisites", "publicknowledge", "reproducible", "pathtraversal", "openredirect", "brokenaccesscontrol", "insecuredeserialization", "prototypepollution", "publiclyavailable", "nocondition", "widelyavailable", "lowcomplexity", "straightuse",
  "standardconfig", "outofthebox", "zeroeffort", "onestep", "effortlessaccess", "builtin", "defaultenabled", "noauthentication", "minimalinteraction", "publicexploit", "readilyavailable", "unrestrictedaccess", "immediatelyusable", "noauthenticationneeded", "instantexecution", "withoutauthentication", "unauthenticatedaccess", "conditional", "requiresauthentication", "dependency",
  "restrictedscenario", "limitedaccess", "nondefaultconfiguration", "precondition", "complexsetup", "requiresinteraction", "customizedenvironment", "initialsetup", "reconnaissance", "tailoredattack", "advancedknowledge", "rarecondition", "unusualscenario", "specificrequirements", "preexistingconditions", "authenticateduser", "privaterequirements", "easyaccess", "directaccess", "nostepsrequired",
  "automatedexploit", "readyexploit", "widelyused", "widespread", "nopreconditions", "commonexploit", "universallyaccessible",
],



    "H": [
  "advanced", "complex", "complicated", "specialized", "intricate", "elaborate", "sophisticated", "difficult", "challenging", "formidable", "arcane", "esoteric", "obscure", "nuanced", "labyrinthine", "puzzling", "baffling", "perplexing", "cryptic", "elusive",
  "enigma", "mystifying", "convoluted", "multifaceted", "multilayered", "intricacy", "thorough", "deep", "intense", "skillful", "expert", "rigorous", "highlevel", "byzantine", "tangled", "tricky", "delicate", "knotty", "mazelike", "arduous",
  "onerous", "stealthy", "stealth", "subtle", "partial", "chain", "escalated", "privileged", "specialized2", "envronment", "physically", "proximate", "neglected", "race", "environment", "timing", "maninthemiddle", "mitm", "downgrade", "bypassaslr",
  "bypass", "heap", "heapgrooming", "aslr", "mitigation", "intrusive", "preparation", "preconditions", "conditional", "limitedopportunity", "infrequent", "rarelyexploitable", "specificrequirements", "configurationdependent", "knowledgeintensive", "rarecondition", "specificcondition", "crafteddata", "customscenario", "nonstandard",
  "dependency", "effort", "conditionalexecution", "reconnaissance", "uncertainty", "intermittent", "unreliable", "prerequisite", "escalation", "restrictedscenario", "rop", "chaining", "multiphase", "unusualconfig", "fips", "protocoldowngrade", "customized", "specificscenario", "rareconditions", "customconfig",
  "chainattack", "rarelyoccur", "specificknowledge", "specialrequirements", "manualinteraction", "indepthknowledge", "specialcircumstances", "targeted", "specialinteraction", "precisecontrol", "coordinated", "multiattack", "tailored", "userinteraction", "limitedscenario", "highskill", "manualadjustment", "condition", "requiresdata", "specificdata",
  "specificinput", "craftedinput", "engine", "specificengine", "enginecorruption", "requiresenvironmentsetup", "specificenvironment", "complexinteraction", "specificinteraction", "interactionrequired", "dependentcondition", "preexistingcondition", "tailoreddata", "enginebased", "customdata", "custominput", "inputdependent", "datadependent", "customizedscenario", "initialsetup",
  "requiresauthentication", "nondefaultconfiguration", "multistepattack", "requirespreparation", "requiresreconnaissance", "advancedsetup", "intermediatesteps",
]

}



def safe_eval(value):
    try:
        if isinstance(value, str):
            return ast.literal_eval(value)
        elif isinstance(value, list):
            return value
        else:
            print(f"Value is not a valid format: {value}")
            return []
    except Exception as e:
        print(f"Error evaluating value: {value}, Error: {e}")
        return []

def update_keywords(row):
    label = row['ac']
    target_words = label_to_words.get(label, [])

    keywords_list = safe_eval(row['keywords'])  

    updated_keywords = []
    for word, weight in keywords_list:
        if word in target_words:
            updated_keywords.append((word, 10))
        else:
            updated_keywords.append((word, weight))

    return updated_keywords


df['keywords'] = df['keywords'].fillna("[]")

df['updated_keywords'] = df.apply(update_keywords, axis=1)


print(df.head())

              ID                                               text  \
0  CVE-2016-0002  The Microsoft (1) VBScript 5.7 and 5.8 and (2)...   
1  CVE-2016-0003  Microsoft Edge allows remote attackers to exec...   
2  CVE-2016-0005  Microsoft Internet Explorer 9 through 11 allow...   
3  CVE-2016-0006  The sandbox implementation in Microsoft Window...   
4  CVE-2016-0007  The sandbox implementation in Microsoft Window...   

                                   vectorString av ac pr ui  s  c  i  a  \
0  CVSS:3.0/AV:N/AC:H/PR:N/UI:R/S:U/C:H/I:H/A:H  N  H  N  R  U  H  H  H   
1  CVSS:3.0/AV:N/AC:L/PR:N/UI:R/S:C/C:H/I:H/A:H  N  L  N  R  C  H  H  H   
2  CVSS:3.0/AV:N/AC:L/PR:N/UI:R/S:U/C:N/I:L/A:N  N  L  N  R  U  N  L  N   
3  CVSS:3.0/AV:L/AC:L/PR:L/UI:R/S:U/C:H/I:H/A:H  L  L  L  R  U  H  H  H   
4  CVSS:3.0/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H  L  L  N  R  U  H  H  H   

   score baseSeverity                                           keywords  \
0    7.5         HIGH  [(microsoft, 0), (jscri

In [3]:
df.to_csv("D:\\Italy\\sbert\\SBert models\\AC\\W_keybert_output2.csv", index=False)

In [None]:
import os
import pandas as pd
import ast
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from transformers import AutoTokenizer, AutoModel, TrainingArguments, Trainer
import numpy as np
from sklearn.metrics import accuracy_score, balanced_accuracy_score, f1_score, precision_score, recall_score
from sklearn.model_selection import train_test_split

os.environ["WANDB_DISABLED"] = "true"




unique_labels = sorted(df['ac'].unique())
label2id = {label: i for i, label in enumerate(unique_labels)}
id2label = {i: label for label, i in label2id.items()}
df['label_id'] = df['ac'].map(label2id)


train_df, val_df = train_test_split(df, test_size=0.2, stratify=df['ac'], random_state=42)



def build_inputs_and_weights(keywords, tokenizer, max_length=64):
   
    tokens = []
    weights = []
    for word, weight in keywords:
        
        word_tokens = tokenizer.tokenize(word)
        tokens.extend(word_tokens)
        weights.extend([weight] * len(word_tokens))

    
    cls_token = tokenizer.cls_token if tokenizer.cls_token is not None else "[CLS]"
    sep_token = tokenizer.sep_token if tokenizer.sep_token is not None else "[SEP]"
    tokens = [cls_token] + tokens + [sep_token]
    weights = [1] + weights + [1]

    input_ids = tokenizer.convert_tokens_to_ids(tokens)
    attention_mask = [1] * len(input_ids)

    
    if len(input_ids) < max_length:
        pad_length = max_length - len(input_ids)
        input_ids = input_ids + [tokenizer.pad_token_id] * pad_length
        attention_mask = attention_mask + [0] * pad_length
        weights = weights + [1] * pad_length
    else:
        input_ids = input_ids[:max_length]
        attention_mask = attention_mask[:max_length]
        weights = weights[:max_length]

    return {
        "input_ids": torch.tensor(input_ids, dtype=torch.long),
        "attention_mask": torch.tensor(attention_mask, dtype=torch.long),
        "token_weights": torch.tensor(weights, dtype=torch.float)
    }



class HybridDataset(Dataset):
    def __init__(self, df, tokenizer, max_length_text=256, max_length_sym=64):
        
        self.df = df.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.max_length_text = max_length_text
        self.max_length_sym = max_length_sym

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        
        text = row['text']
        encoding_text = self.tokenizer(text, padding='max_length', truncation=True,
                                       max_length=self.max_length_text, return_tensors='pt')
        encoding_text = {k: v.squeeze(0) for k, v in encoding_text.items()}

        
        updated_keywords = row['updated_keywords']
        if isinstance(updated_keywords, str):
            updated_keywords = ast.literal_eval(updated_keywords)
        encoding_sym = build_inputs_and_weights(updated_keywords, self.tokenizer, max_length=self.max_length_sym)

        label = torch.tensor(row['label_id'], dtype=torch.long)

        return {
            "input_ids_text": encoding_text['input_ids'],
            "attention_mask_text": encoding_text['attention_mask'],
            "input_ids_sym": encoding_sym['input_ids'],
            "attention_mask_sym": encoding_sym['attention_mask'],
            "token_weights_sym": encoding_sym['token_weights'],
            "labels": label
        }



class HybridModel(nn.Module):
    def __init__(self, base_model, num_labels):
        
        super().__init__()
        self.base_model = base_model  
        
        hidden_size = base_model.config.hidden_size

      
        self.gate_layer = nn.Linear(hidden_size * 2, hidden_size)
        self.sigmoid = nn.Sigmoid()

        
        self.classifier = nn.Linear(hidden_size, num_labels)

    def forward(self,
                input_ids_text=None,
                attention_mask_text=None,
                input_ids_sym=None,
                attention_mask_sym=None,
                token_weights_sym=None,
                labels=None):
        
        outputs_text = self.base_model(input_ids=input_ids_text,
                                       attention_mask=attention_mask_text,
                                       output_hidden_states=True,
                                       return_dict=True)
        
        text_repr = outputs_text.hidden_states[-1][:, 0, :]  # (batch, hidden_size)

        
        embedding = self.base_model.get_input_embeddings()
        embeddings_sym = embedding(input_ids_sym)  # (batch, seq_len_sym, hidden_size)
        
        weighted_embeddings_sym = embeddings_sym * token_weights_sym.unsqueeze(-1)
        masked_embeddings = weighted_embeddings_sym * attention_mask_sym.unsqueeze(-1)
        sum_embeddings_sym = masked_embeddings.sum(dim=1)  
        valid_tokens = attention_mask_sym.sum(dim=1, keepdim=True) + 1e-8
        symbolic_repr = sum_embeddings_sym / valid_tokens  

      
        combined_input = torch.cat([text_repr, symbolic_repr], dim=1)  # (batch, 2*hidden_size)
        gate = self.sigmoid(self.gate_layer(combined_input))  # (batch, hidden_size)
        combined_repr = gate * symbolic_repr + (1 - gate) * text_repr

        
        logits = self.classifier(combined_repr)

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits, labels)

        return {"loss": loss, "logits": logits}


model_path = "D:\\Italy\\sbert\\SBert models\\AC\\checkpoint_dataset5"


tokenizer = AutoTokenizer.from_pretrained(model_path)


base_model = AutoModel.from_pretrained(model_path, trust_remote_code=True)


num_labels = len(unique_labels)


hybrid_model = HybridModel(base_model, num_labels)


device = "cuda" if torch.cuda.is_available() else "cpu"
hybrid_model.to(device)


train_dataset = HybridDataset(train_df, tokenizer, max_length_text=256, max_length_sym=64)
val_dataset   = HybridDataset(val_df, tokenizer, max_length_text=256, max_length_sym=64)



def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)
    acc = accuracy_score(labels, preds)
    bal_acc = balanced_accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average='weighted')
    prec = precision_score(labels, preds, average='weighted')
    rec = recall_score(labels, preds, average='weighted')
    return {
        "accuracy (validation)": acc,
        "weighted_accuracy": bal_acc,
        "f1": f1,
        "precision": prec,
        "recall": rec
    }



training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=10,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
    eval_strategy="epoch",   
    save_strategy="epoch",
    report_to=[] )

trainer = Trainer(
    model=hybrid_model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset, 
    compute_metrics=compute_metrics
)



trainer.train()



def predict_label(description, updated_keywords_str):
 
    encoding_text = tokenizer(description, return_tensors='pt', truncation=True, padding='max_length', max_length=256)
    encoding_text = {k: v.to(device) for k, v in encoding_text.items()}


    updated_keywords = ast.literal_eval(updated_keywords_str) if isinstance(updated_keywords_str, str) else updated_keywords_str
    encoding_sym = build_inputs_and_weights(updated_keywords, tokenizer, max_length=64)
    encoding_sym = {k: v.unsqueeze(0).to(device) for k, v in encoding_sym.items()}

    outputs = hybrid_model(
        input_ids_text=encoding_text['input_ids'],
        attention_mask_text=encoding_text['attention_mask'],
        input_ids_sym=encoding_sym['input_ids'],
        attention_mask_sym=encoding_sym['attention_mask'],
        token_weights_sym=encoding_sym['token_weights']
    )
    logits = outputs["logits"]
    predicted_class_id = logits.argmax(dim=1).item()
    return id2label[predicted_class_id]


