In [1]:
# Cybersecurity Named Entity Recognition with Transformers
# Fine-tuning a pre-trained model on MITRE ATT&CK dataset

# Install required packages
!pip install transformers datasets torch seqeval accelerate




In [2]:
import torch
import numpy as np
import pandas as pd
from transformers import (
    AutoTokenizer, AutoModelForTokenClassification,
    TrainingArguments, Trainer, DataCollatorForTokenClassification,
    pipeline
)
from datasets import Dataset, DatasetDict
from seqeval.metrics import classification_report, f1_score
import json
from collections import defaultdict
import warnings
warnings.filterwarnings('ignore')

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cpu


In [3]:
# Upload the MITRE.zip file directly
import zipfile
import os

# Upload the MITRE.zip file
from google.colab import files
uploaded = files.upload()

# Extract the dataset
with zipfile.ZipFile('MITRE.zip', 'r') as zip_ref:
    zip_ref.extractall('/content/mitre_data')

# List files in the extracted directory
print("Files in dataset:")
for root, dirs, files in os.walk('/content/mitre_data'):
    for file in files:
        print(os.path.join(root, file))

Saving MITRE.zip to MITRE (1).zip
Files in dataset:
/content/mitre_data/MITRE/test.txt
/content/mitre_data/MITRE/train.txt
/content/mitre_data/MITRE/valid.txt


In [4]:
def parse_iob_file(file_path):
    """
    Parse IOB formatted file and return sentences with their labels
    """
    sentences = []
    words = []
    labels = []

    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()

            if not line:  # Empty line indicates sentence boundary
                if words:  # If we have collected words for a sentence
                    sentences.append({
                        'words': words,
                        'labels': labels
                    })
                    words = []
                    labels = []
                continue

            # Split the line (format: word [tab] label or word [space] label)
            if '\t' in line:
                parts = line.split('\t')
            else:
                parts = line.split()

            if len(parts) >= 2:
                word = parts[0]
                label = parts[-1]  # Assume label is the last column

                words.append(word)
                labels.append(label)

    # Add the last sentence if file doesn't end with empty line
    if words:
        sentences.append({
            'words': words,
            'labels': labels
        })

    return sentences

# Load and parse the dataset
train_file = '/content/mitre_data/MITRE/train.txt'
test_file = '/content/mitre_data/MITRE/test.txt'

train_sentences = parse_iob_file(train_file)
test_sentences = parse_iob_file(test_file)

print(f"Training sentences: {len(train_sentences)}")
print(f"Test sentences: {len(test_sentences)}")

# Display sample sentences
print("\nSample training sentence:")
print(train_sentences[0])

Training sentences: 2811
Test sentences: 748

Sample training sentence:
{'words': ['Super', 'Mario', 'Run', 'Malware', '#', '2', '–', 'DroidJack', 'RAT', 'Gamers', 'love', 'Mario', 'and', 'Pokemon', ',', 'but', 'so', 'do', 'malware', 'authors', '.'], 'labels': ['B-Malware', 'I-Malware', 'I-Malware', 'I-Malware', 'O', 'O', 'O', 'B-Malware', 'I-Malware', 'O', 'O', 'B-System', 'O', 'B-System', 'O', 'O', 'O', 'O', 'O', 'O', 'O']}


In [5]:
# Analyze the label distribution
def analyze_labels(sentences):
    label_counts = defaultdict(int)
    for sentence in sentences:
        for label in sentence['labels']:
            label_counts[label] += 1

    return label_counts

train_label_counts = analyze_labels(train_sentences)
test_label_counts = analyze_labels(test_sentences)

print("Training set label distribution:")
for label, count in sorted(train_label_counts.items()):
    print(f"  {label}: {count}")

print("\nTest set label distribution:")
for label, count in sorted(test_label_counts.items()):
    print(f"  {label}: {count}")

# Get all unique labels
all_labels = sorted(set(list(train_label_counts.keys()) + list(test_label_counts.keys())))
label2id = {label: i for i, label in enumerate(all_labels)}
id2label = {i: label for label, i in label2id.items()}

print(f"\nUnique labels: {all_labels}")
print(f"Label mapping: {label2id}")

Training set label distribution:
  B-Indicator: 1021
  B-Malware: 703
  B-Organization: 284
  B-System: 837
  B-Vulnerability: 48
  I-Indicator: 1247
  I-Malware: 195
  I-Organization: 116
  I-System: 484
  I-Vulnerability: 42
  O: 63214

Test set label distribution:
  B-Indicator: 261
  B-Malware: 242
  B-Organization: 131
  B-System: 248
  B-Vulnerability: 10
  I-Indicator: 322
  I-Malware: 45
  I-Organization: 62
  I-System: 121
  I-Vulnerability: 3
  O: 17825

Unique labels: ['B-Indicator', 'B-Malware', 'B-Organization', 'B-System', 'B-Vulnerability', 'I-Indicator', 'I-Malware', 'I-Organization', 'I-System', 'I-Vulnerability', 'O']
Label mapping: {'B-Indicator': 0, 'B-Malware': 1, 'B-Organization': 2, 'B-System': 3, 'B-Vulnerability': 4, 'I-Indicator': 5, 'I-Malware': 6, 'I-Organization': 7, 'I-System': 8, 'I-Vulnerability': 9, 'O': 10}


In [6]:
# Choose a pre-trained model
MODEL_NAME = "microsoft/deberta-v3-small"

# Initialize tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

def tokenize_and_align_labels(examples):
    """
    Tokenize the sentences and align the labels with the tokenized inputs
    """
    tokenized_inputs = tokenizer(
        examples["words"],
        truncation=True,
        padding=False,  # We'll pad later in the data collator
        is_split_into_words=True,
        max_length=512,
        return_offsets_mapping=True
    )

    labels = []
    for i, label in enumerate(examples["labels"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []

        for word_idx in word_ids:
            # Special tokens get a label of -100
            if word_idx is None:
                label_ids.append(-100)
            # Only label the first token of a given word
            elif word_idx != previous_word_idx:
                label_ids.append(label2id[label[word_idx]])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx

        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

# Convert to Hugging Face dataset format
def sentences_to_dataset(sentences):
    words = [sentence['words'] for sentence in sentences]
    labels = [sentence['labels'] for sentence in sentences]

    return Dataset.from_dict({
        'words': words,
        'labels': labels
    })

train_dataset = sentences_to_dataset(train_sentences)
test_dataset = sentences_to_dataset(test_sentences)

# Tokenize datasets
tokenized_train = train_dataset.map(
    tokenize_and_align_labels,
    batched=True,
    remove_columns=train_dataset.column_names
)

tokenized_test = test_dataset.map(
    tokenize_and_align_labels,
    batched=True,
    remove_columns=test_dataset.column_names
)

print("Tokenized dataset sample:")
print(tokenized_train[0])

Map:   0%|          | 0/2811 [00:00<?, ? examples/s]

Map:   0%|          | 0/748 [00:00<?, ? examples/s]

Tokenized dataset sample:
{'labels': [-100, 1, 6, 6, 6, 10, 10, 10, 1, -100, 6, 10, 10, 3, 10, 3, 10, 10, 10, 10, 10, 10, 10, -100], 'input_ids': [1, 2479, 9379, 5537, 50663, 953, 392, 377, 45383, 24164, 55316, 56293, 472, 9379, 263, 15918, 366, 304, 324, 333, 10582, 3482, 323, 2], 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 'offset_mapping': [[0, 0], [0, 5], [0, 5], [0, 3], [0, 7], [0, 1], [0, 1], [0, 1], [0, 5], [5, 9], [0, 3], [0, 6], [0, 4], [0, 5], [0, 3], [0, 7], [0, 1], [0, 3], [0, 2], [0, 2], [0, 7], [0, 7], [0, 1], [0, 0]]}


In [11]:
import os
os.environ["WANDB_DISABLED"] = "true"

# Initialize the model
model = AutoModelForTokenClassification.from_pretrained(
    MODEL_NAME,
    num_labels=len(all_labels),
    id2label=id2label,
    label2id=label2id
)

# Data collator for dynamic padding
data_collator = DataCollatorForTokenClassification(
    tokenizer=tokenizer,
    padding=True
)

# Training arguments
training_args = TrainingArguments(
    output_dir="./mitre-ner-model",
    learning_rate=4e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=1,
    weight_decay=0.01,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    push_to_hub=False,
    logging_dir='./logs',
    logging_steps=500,
    report_to=[]  # Changed from None to [] to explicitly disable all reports
)

# Compute metrics function for evaluation
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    # Remove ignored index (special tokens)
    true_predictions = [
        [id2label[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [id2label[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    return {
        "f1": f1_score(true_labels, true_predictions),
    }

# Initialize trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_test,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics
)

Some weights of DebertaV2ForTokenClassification were not initialized from the model checkpoint at microsoft/deberta-v3-small and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [9]:
import os
os.environ["WANDB_DISABLED"] = "true"

In [12]:
# Start training
print("Starting training...")
train_result = trainer.train()

# Save the model
trainer.save_model("./mitre-ner-model-final")
tokenizer.save_pretrained("./mitre-ner-model-final")

# Print training summary
print("Training completed!")
metrics = train_result.metrics
print(f"Training metrics: {metrics}")

The tokenizer has new PAD/BOS/EOS tokens that differ from the model config and generation config. The model config and generation config were aligned accordingly, being updated with the tokenizer's values. Updated tokens: {'eos_token_id': 2, 'bos_token_id': 1}.


Starting training...


Epoch,Training Loss,Validation Loss,F1
1,No log,0.128902,0.592671


Training completed!
Training metrics: {'train_runtime': 2129.9952, 'train_samples_per_second': 1.32, 'train_steps_per_second': 0.083, 'total_flos': 68521014052770.0, 'train_loss': 0.17820982499556107, 'epoch': 1.0}


In [13]:
# Evaluate the model
print("Final evaluation:")
results = trainer.evaluate()
print(results)

# Detailed classification report
def get_detailed_predictions(dataset):
    """Get detailed predictions for classification report"""
    predictions, labels, _ = trainer.predict(dataset)
    predictions = np.argmax(predictions, axis=2)

    # Remove ignored index (special tokens)
    true_predictions = [
        [id2label[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [id2label[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    return true_predictions, true_labels

true_predictions, true_labels = get_detailed_predictions(tokenized_test)

print("\nDetailed Classification Report:")
print(classification_report(true_labels, true_predictions, digits=4))

Final evaluation:


{'eval_loss': 0.12890173494815826, 'eval_f1': 0.592671269251195, 'eval_runtime': 125.8345, 'eval_samples_per_second': 5.944, 'eval_steps_per_second': 0.374, 'epoch': 1.0}

Detailed Classification Report:
               precision    recall  f1-score   support

    Indicator     0.6918    0.7608    0.7247       301
      Malware     0.5439    0.7686    0.6370       242
 Organization     0.6667    0.0597    0.1096       134
       System     0.5153    0.5422    0.5284       249
Vulnerability     0.0000    0.0000    0.0000        10

    micro avg     0.5892    0.5962    0.5927       936
    macro avg     0.4835    0.4263    0.3999       936
 weighted avg     0.5956    0.5962    0.5540       936



In [14]:
# Create a NER pipeline for inference
ner_pipeline = pipeline(
    "token-classification",
    model=model,
    tokenizer=tokenizer,
    aggregation_strategy="simple",
    device=0 if torch.cuda.is_available() else -1
)

# Test the model on some cybersecurity text samples
test_texts = [
    "The attacker used mimikatz to dump credentials from lsass.exe memory",
    "APT29 deployed Cobalt Strike beacon for command and control",
    "Malware established persistence via scheduled task and registry run keys",
    "The threat actor exploited CVE-2021-44228 in the log4j library"
]

def predict_entities(text):
    """Predict named entities in text"""
    try:
        entities = ner_pipeline(text)
        return entities
    except Exception as e:
        print(f"Error in prediction: {e}")
        return []

# Test the pipeline
print("Testing the trained model:")
for i, text in enumerate(test_texts):
    print(f"\nText {i+1}: {text}")
    entities = predict_entities(text)
    if entities:
        for entity in entities:
            print(f"  {entity['entity_group']}: {entity['word']} (confidence: {entity['score']:.3f})")
    else:
        print("  No entities detected")

Device set to use cpu
Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


Testing the trained model:

Text 1: The attacker used mimikatz to dump credentials from lsass.exe memory
  Malware: mi (confidence: 0.369)
  Malware: mikatz (confidence: 0.319)
  Indicator: l (confidence: 0.894)
  Indicator: s (confidence: 0.671)
  Indicator: as (confidence: 0.730)
  Indicator: s (confidence: 0.668)
  Indicator: . (confidence: 0.491)
  Indicator: exe (confidence: 0.582)

Text 2: APT29 deployed Cobalt Strike beacon for command and control
  Malware: APT29 (confidence: 0.541)
  Malware: Cobalt Strike (confidence: 0.470)

Text 3: Malware established persistence via scheduled task and registry run keys
  No entities detected

Text 4: The threat actor exploited CVE-2021-44228 in the log4j library
  Indicator: CVE (confidence: 0.255)
  Indicator: 228 (confidence: 0.342)
  Indicator: log (confidence: 0.553)
  Indicator: j (confidence: 0.353)


In [15]:
# Save label mapping for future use
label_mapping = {
    'id2label': id2label,
    'label2id': label2id
}

with open('./mitre-ner-model-final/label_mapping.json', 'w') as f:
    json.dump(label_mapping, f, indent=2)

print("Model and artifacts saved successfully!")
print(f"Label mapping: {json.dumps(label_mapping, indent=2)}")

# Create a summary of the project
print("\n" + "="*50)
print("PROJECT SUMMARY")
print("="*50)
print(f"Model: {MODEL_NAME}")
print(f"Training samples: {len(train_sentences)}")
print(f"Test samples: {len(test_sentences)}")
print(f"Number of labels: {len(all_labels)}")
print(f"Labels: {', '.join(all_labels)}")
print(f"Final F1 Score: {results.get('eval_f1', 'N/A')}")
print("Model saved to: ./mitre-ner-model-final")
print("="*50)

Model and artifacts saved successfully!
Label mapping: {
  "id2label": {
    "0": "B-Indicator",
    "1": "B-Malware",
    "2": "B-Organization",
    "3": "B-System",
    "4": "B-Vulnerability",
    "5": "I-Indicator",
    "6": "I-Malware",
    "7": "I-Organization",
    "8": "I-System",
    "9": "I-Vulnerability",
    "10": "O"
  },
  "label2id": {
    "B-Indicator": 0,
    "B-Malware": 1,
    "B-Organization": 2,
    "B-System": 3,
    "B-Vulnerability": 4,
    "I-Indicator": 5,
    "I-Malware": 6,
    "I-Organization": 7,
    "I-System": 8,
    "I-Vulnerability": 9,
    "O": 10
  }
}

PROJECT SUMMARY
Model: microsoft/deberta-v3-small
Training samples: 2811
Test samples: 748
Number of labels: 11
Labels: B-Indicator, B-Malware, B-Organization, B-System, B-Vulnerability, I-Indicator, I-Malware, I-Organization, I-System, I-Vulnerability, O
Final F1 Score: 0.592671269251195
Model saved to: ./mitre-ner-model-final
