In [1]:
import torch
import pandas as pd
from transformers import BertTokenizer, BertForSequenceClassification
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F

# Hardcoded MITRE ATT&CK Mapping for CVEs
MITRE_MAPPING = {
    "CVE-2021-26855": "T1190 - Exploit Public-Facing Application",
    "CVE-2021-26857": "T1210 - Remote Code Execution",
    "CVE-2021-26858": "T1072 - Remote Services",
    "CVE-2021-27065": "T1203 - Exploitation for Client Execution",
}

def extract_mitre_ttp(text):
    for cve, ttp in MITRE_MAPPING.items():
        if cve in text:
            return ttp
    return "No MITRE ATT&CK mapping found"

# Load dataset
train_path = "df_train.csv"
test_path = "df_test.csv"
df_train = pd.read_csv(train_path)
df_test = pd.read_csv(test_path)

# Define dataset class
class ThreatIntelDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=256):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer(text, padding='max_length', truncation=True, max_length=self.max_length, return_tensors="pt")
        return {
            'input_ids': encoding['input_ids'].squeeze(0),
            'attention_mask': encoding['attention_mask'].squeeze(0),
            'label': torch.tensor(label, dtype=torch.long)
        }

# Tokenizer
TOKENIZER = BertTokenizer.from_pretrained("bert-base-uncased")

dataset_train = ThreatIntelDataset(df_train['text'].tolist(), df_train['label'].tolist(), TOKENIZER)
dataset_test = ThreatIntelDataset(df_test['text'].tolist(), df_test['label'].tolist(), TOKENIZER)

dataloader_train = DataLoader(dataset_train, batch_size=8, shuffle=True)
dataloader_test = DataLoader(dataset_test, batch_size=8, shuffle=False)

# Load Pretrained Model
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

# Training Loop
EPOCHS = 3
for epoch in range(EPOCHS):
    model.train()
    total_loss = 0
    for batch in dataloader_train:
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        total_loss += loss.item()
        loss.backward()
        optimizer.step()
    
    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(dataloader_train)}")

print("Training Complete!")

# Evaluation
model.eval()
test_texts = df_test['text'].tolist()
predictions = []

for text in test_texts:
    encoding = TOKENIZER(text, padding=True, truncation=True, max_length=256, return_tensors="pt")
    input_ids = encoding['input_ids'].to(device)
    attention_mask = encoding['attention_mask'].to(device)
    
    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
        probs = F.softmax(outputs.logits, dim=-1)
        prediction = torch.argmax(probs, dim=1).item()
        predictions.append((text, prediction, extract_mitre_ttp(text)))

# Print sample results
for sample in predictions[:5]:
    print(f"Text: {sample[0]}\nThreat Intelligence: {bool(sample[1])}\nMapped MITRE ATT&CK TTP: {sample[2]}\n")


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch 1, Loss: 0.7068876028060913
Epoch 2, Loss: 0.6634136885404587
Epoch 3, Loss: 0.5801873505115509
Training Complete!
Text: RT Not out of the woods yet @dennisu24025937 
 #cybersecurity  #cyberthreats  #ransomware  #cyberinsurance  #socialengineering https://lnkd.in/d_vCpm3(https://t.co/m9ieXDlqPK)
Threat Intelligence: False
Mapped MITRE ATT&CK TTP: No MITRE ATT&CK mapping found

Text: RT Hackers Are Targeting Microsoft Exchange Servers With Ransomware!!
It did not take long since last week to do that! 
#DFIR #TrufflepigForensics #DigitalForensics #CyberSecurity #ITForensics #MemoryForensics #Microsoft #attacks #Exchange #Exploit #Ransomware https://twitter.com/phillip_misner/status/1370197696280027136(https://t.co/wM4FmUpPZ9)
Threat Intelligence: True
Mapped MITRE ATT&CK TTP: No MITRE ATT&CK mapping found

Text: RT New B-Side episode: @israel_barak an expert on cyber-warfare and CISO @cybereason on the recent Microsoft Exchange hack that hit thousands of organizations worldwide: Wh