In [1]:
# Install required packages
# pip install transformers torch datasets pandas numpy tqdm scikit-learn

import torch
import numpy as np
import pandas as pd
import ast
from tqdm import tqdm
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, get_linear_schedule_with_warmup
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import os

# Configuration
class Config:
    MODEL_NAME = 'bert-base-uncased'
    MAX_LEN = 256  # Increased for threat intelligence text length
    BATCH_SIZE = 16
    EPOCHS = 1
    LEARNING_RATE = 2e-5
    OUTPUT_DIR = './bert_threat_model/'
    SEED = 42
    MALICIOUS_LABELS = {  # Define malicious entity labels
        'attack-pattern', 'malware', 'threat-actor',
        'Infrastructure', 'C&C', 'exploit', 'ransomware'
    }

# Initialize configuration
config = Config()

# Set random seeds
torch.manual_seed(config.SEED)
np.random.seed(config.SEED)

# Load and preprocess TSV dataset
def load_threat_data(file_path):
    df = pd.read_csv(file_path, sep='\t')

    texts = []
    labels = []

    for _, row in df.iterrows():
        # Extract text
        texts.append(row['text'])

        # Process entities to determine label
        try:
            entities = ast.literal_eval(row['entities'])
            entity_labels = {e['label'] for e in entities}
            label = 1 if len(entity_labels & config.MALICIOUS_LABELS) > 0 else 0
        except:
            label = 0  # Default to benign if parsing fails

        labels.append(label)

    return texts, labels

# Load dataset
texts, labels = load_threat_data('Cyber-Threat-Intelligence-Custom-Data.tsv')  # Replace with your TSV path

# Split dataset
train_texts, test_texts, train_labels, test_labels = train_test_split(
    texts, labels, test_size=0.2, random_state=config.SEED
)
train_texts, val_texts, train_labels, val_labels = train_test_split(
    train_texts, train_labels, test_size=0.1, random_state=config.SEED
)

# Dataset class
class ThreatDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'label': torch.tensor(label, dtype=torch.long)
        }

# Initialize model and tokenizer
tokenizer = BertTokenizer.from_pretrained(config.MODEL_NAME)
model = BertForSequenceClassification.from_pretrained(config.MODEL_NAME, num_labels=2)

# Create data loaders
def create_data_loader(texts, labels, tokenizer, max_len, batch_size, shuffle=False):
    dataset = ThreatDataset(texts, labels, tokenizer, max_len)
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

train_loader = create_data_loader(train_texts, train_labels, tokenizer, config.MAX_LEN, config.BATCH_SIZE, shuffle=True)
val_loader = create_data_loader(val_texts, val_labels, tokenizer, config.MAX_LEN, config.BATCH_SIZE)
test_loader = create_data_loader(test_texts, test_labels, tokenizer, config.MAX_LEN, config.BATCH_SIZE)

# Training setup
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
optimizer = AdamW(model.parameters(), lr=config.LEARNING_RATE, correct_bias=False)
total_steps = len(train_loader) * config.EPOCHS
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=0,
    num_training_steps=total_steps
)

# Training loop with progress tracking
best_accuracy = 0
history = {'train_loss': [], 'val_loss': [], 'train_acc': [], 'val_acc': []}

for epoch in range(config.EPOCHS):
    print(f'\nEpoch {epoch + 1}/{config.EPOCHS}')
    print('-' * 50)

    # Training phase
    model.train()
    running_loss = 0.0
    correct_predictions = 0

    progress_bar = tqdm(train_loader, desc='Training', leave=False)
    for batch in progress_bar:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        logits = outputs.logits

        _, preds = torch.max(logits, dim=1)
        correct_predictions += torch.sum(preds == labels)
        running_loss += loss.item()

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

        progress_bar.set_postfix({'loss': loss.item()})

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct_predictions.double() / len(train_loader.dataset)
    history['train_loss'].append(epoch_loss)
    history['train_acc'].append(epoch_acc)

    # Validation phase
    model.eval()
    val_loss = 0.0
    val_correct_predictions = 0

    with torch.no_grad():
        for batch in tqdm(val_loader, desc='Validation', leave=False):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            logits = outputs.logits

            _, preds = torch.max(logits, dim=1)
            val_correct_predictions += torch.sum(preds == labels)
            val_loss += loss.item()

    val_epoch_loss = val_loss / len(val_loader)
    val_epoch_acc = val_correct_predictions.double() / len(val_loader.dataset)
    history['val_loss'].append(val_epoch_loss)
    history['val_acc'].append(val_epoch_acc)

    print(f'Train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
    print(f'Val Loss: {val_epoch_loss:.4f} Acc: {val_epoch_acc:.4f}')

    # Save best model
    if val_epoch_acc > best_accuracy:
        best_accuracy = val_epoch_acc
        if not os.path.exists(config.OUTPUT_DIR):
            os.makedirs(config.OUTPUT_DIR)
        model.save_pretrained(config.OUTPUT_DIR)
        tokenizer.save_pretrained(config.OUTPUT_DIR)
        print(f'New best model saved with accuracy {best_accuracy:.4f}')

# Final evaluation on test set
def evaluate_model(model, data_loader):
    model.eval()
    predictions = []
    true_labels = []

    with torch.no_grad():
        for batch in tqdm(data_loader, desc='Evaluating'):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            outputs = model(input_ids, attention_mask=attention_mask)
            _, preds = torch.max(outputs.logits, dim=1)

            predictions.extend(preds.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(true_labels, predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(
        true_labels, predictions, average='binary'
    )
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

test_metrics = evaluate_model(model, test_loader)
print('\nTest Set Metrics:')
print(f"Accuracy: {test_metrics['accuracy']:.4f}")
print(f"Precision: {test_metrics['precision']:.4f}")
print(f"Recall: {test_metrics['recall']:.4f}")
print(f"F1 Score: {test_metrics['f1']:.4f}")

# Inference function using saved model
class ThreatAnalyzer:
    def __init__(self, model_path):
        self.tokenizer = BertTokenizer.from_pretrained(model_path)
        self.model = BertForSequenceClassification.from_pretrained(model_path)
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = self.model.to(self.device)
        self.max_len = config.MAX_LEN
        self.malicious_labels = config.MALICIOUS_LABELS

    def analyze(self, text, entities):
        # Preprocess entities
        try:
            parsed_entities = ast.literal_eval(entities)
            entity_labels = {e['label'] for e in parsed_entities}
            ground_truth = 1 if len(entity_labels & self.malicious_labels) > 0 else 0
        except:
            ground_truth = 0

        # Model prediction
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )

        input_ids = encoding['input_ids'].to(self.device)
        attention_mask = encoding['attention_mask'].to(self.device)

        with torch.no_grad():
            outputs = self.model(input_ids, attention_mask=attention_mask)
            _, prediction = torch.max(outputs.logits, dim=1)

        return {
            'text': text,
            'prediction': 'malicious' if prediction.item() == 1 else 'benign',
            'confidence': torch.softmax(outputs.logits, dim=1)[0].max().item(),
            'ground_truth': 'malicious' if ground_truth == 1 else 'benign',
            'entities': entity_labels
        }

# Example usage
if __name__ == '__main__':
    # Initialize analyzer with saved model
    analyzer = ThreatAnalyzer(config.OUTPUT_DIR)

    # Test with sample data
    test_data = [
        {
            'text': "A new variant of Ryuk ransomware targeting healthcare systems was detected",
            'entities': "[{'label': 'ransomware'}, {'label': 'sector'}]"
        },
        {
            'text': "Normal system update process completed successfully",
            'entities': "[{'label': 'software'}]"
        }
    ]

    print("\nThreat Analysis Results:")
    for example in test_data:
        result = analyzer.analyze(example['text'], example['entities'])
        print(f"\nText: {result['text']}")
        print(f"Prediction: {result['prediction'].upper()} (Confidence: {result['confidence']:.2f})")
        print(f"Ground Truth: {result['ground_truth'].upper()}")
        print(f"Detected Entities: {result['entities']}")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.



Epoch 1/1
--------------------------------------------------




Train Loss: 0.4567 Acc: 0.8333
Val Loss: 0.3689 Acc: 0.8947
New best model saved with accuracy 0.8947


Evaluating: 100%|██████████| 6/6 [01:21<00:00, 13.65s/it]



Test Set Metrics:
Accuracy: 0.7708
Precision: 0.7708
Recall: 1.0000
F1 Score: 0.8706

Threat Analysis Results:

Text: A new variant of Ryuk ransomware targeting healthcare systems was detected
Prediction: MALICIOUS (Confidence: 0.91)
Ground Truth: MALICIOUS
Detected Entities: {'ransomware', 'sector'}

Text: Normal system update process completed successfully
Prediction: MALICIOUS (Confidence: 0.90)
Ground Truth: BENIGN
Detected Entities: {'software'}
