In [3]:
from transformers import AutoTokenizer, AutoModelForMaskedLM, GPT2Tokenizer, GPT2LMHeadModel, DataCollatorForLanguageModeling, DataCollatorWithPadding, TrainingArguments, Trainer
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import os
import json
import numpy as np
os.environ["CUDA_VISIBLE_DEVICES"] = "0" 
# Check if a GPU is available and if not, use a CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

Using device: cuda


In [5]:

data_dir = "../datasets/ics_attack/"
output_dir = "../model_outputs/ics_attack/llm_finetuned_models/"
models = ["jackaduma/SecRoBERTa", "ehsanaghaei/SecureBERT", "gpt2-xl"]
model_names = ["SecRoBERTa", "SecureBERT", "gpt2-xl"]
model_id = 0
n_epoch = 10
if not os.path.exists(output_dir+model_names[model_id]):
    os.makedirs(output_dir+model_names[model_id])

# load attack and weakness description with ID
with open(data_dir+"doc_id_to_desc.json") as f:
    doc_id_to_desc = json.load(f)
print("Number of Nodes with Description: ",len(doc_id_to_desc))
# text_data is a list containing your text data
text_data = []  # Your text items
for doc_id in doc_id_to_desc:
    text_data.append(doc_id_to_desc[doc_id])

Number of Nodes with Description:  1136


In [None]:
# Initialize the tokenizer and model based on model_name
if "gpt2" in model_name:
    tokenizer = GPT2Tokenizer.from_pretrained(models[model_id])
    model = GPT2LMHeadModel.from_pretrained(models[model_id])
elif("SecRoBERTa" in model_name):
    tokenizer = AutoTokenizer.from_pretrained(models[model_id])
    model = AutoModelForMaskedLM.from_pretrained(models[model_id])
elif("SecureBERT" in model_name):
    tokenizer = AutoTokenizer.from_pretrained(models[model_id])
    model = AutoModelForMaskedLM.from_pretrained(models[model_id])
model.to(device)
model.to(device)
# Set padding token
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.pad_token_id = tokenizer.eos_token_id

In [None]:
# Calculate max_length based on the longest text in your dataset
#max_length = max([len(tokenizer.encode(text)) for text in text_data])
max_length = min(max([len(tokenizer.encode(text)) for text in text_data]), 512)  # Limit to 512 tokens
print("Max # token in the longest text :",max_length)

In [None]:
# Custom dataset
# Custom dataset with chunking
class CustomDataset(Dataset):
    def __init__(self, tokenizer, texts, max_length):
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.examples = []

        for text in texts:
            tokenized_text = tokenizer.encode(text)
            for i in range(0, len(tokenized_text), max_length):
                chunk = tokenized_text[i:i + max_length]
                self.examples.append(chunk)

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, idx):
        chunk = self.examples[idx]
        tokenized_inputs = self.tokenizer.prepare_for_model(
            chunk,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors="pt"
        )
        tokenized_inputs["labels"] = tokenized_inputs["input_ids"].clone()
        # Move tensors to the device
        for key in tokenized_inputs:
            tokenized_inputs[key] = tokenized_inputs[key].squeeze(0).to(device)
        return tokenized_inputs


In [None]:
# Create dataset
dataset = CustomDataset(tokenizer, text_data, max_length)
# Create a data collator based on model type
if "gpt2" in model_name:
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
else:
    data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=True, mlm_probability=0.15)


In [None]:
# Training arguments and Trainer
training_args = TrainingArguments(
    per_device_train_batch_size=2,  # Reduce batch size
    gradient_accumulation_steps=4,  # Accumulate gradients over 4 steps
    num_train_epochs=n_epoch,
    learning_rate=1e-4,
    output_dir=output_dir+model_names[model_id]+'/results',
    logging_dir=output_dir+model_names[model_id]+'/logs',
    logging_steps=100,
    load_best_model_at_end=False,
    evaluation_strategy="no",
    remove_unused_columns=False,
    push_to_hub=False,
    save_strategy="no",  # Disable checkpoint saving
    fp16=True,  # Use mixed precision training
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset,
    eval_dataset=None,  # You can specify an evaluation dataset here
    data_collator=data_collator,  # Add the data collator here
)
# Clear cache before training
torch.cuda.empty_cache()
trainer.train()

In [None]:
trainer.save_model()
model.save_pretrained(output_dir+model_names[model_id]+'/epoch_{}'.format(n_epoch))
tokenizer.save_pretrained(output_dir+model_names[model_id]+'/epoch_{}'.format(n_epoch))

In [None]:
torch.cuda.empty_cache()