# LOTL Detection - Neural Network Training

This notebook trains a small neural network for LOTL attack detection.
Run this on Google Colab for GPU acceleration.


In [None]:
# Install dependencies
!pip install torch sentence-transformers scikit-learn numpy pandas

# Upload dataset.jsonl to Colab
# Use the file uploader or mount Google Drive


In [None]:
import sys
import json
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, accuracy_score, precision_score, recall_score, f1_score
from pathlib import Path

# Import our modules (upload these files to Colab or use from GitHub)
from feature_extractor import ComprehensiveFeatureExtractor
from data_loader import load_dataset, filter_label_agreement, get_labels, split_dataset


## Load and Prepare Data


In [None]:
# Load dataset
events = load_dataset('dataset.jsonl')
print(f"Loaded {len(events)} events")

# Filter events where Claude and ground truth agree
filtered_events, _ = filter_label_agreement(events)
print(f"Kept {len(filtered_events)} events with agreement")

# Get labels
labels = get_labels(filtered_events, use_claude_label=True)

# Split dataset
train_events, test_events, train_indices, test_indices = split_dataset(
    filtered_events, test_size=0.2, random_seed=42
)

train_labels = [labels[i] for i in train_indices]
test_labels = [labels[i] for i in test_indices]

print(f"Training: {len(train_events)} events")
print(f"Test: {len(test_events)} events")


## Extract Features


In [None]:
# Initialize feature extractor
feature_extractor = ComprehensiveFeatureExtractor(use_text_embeddings=True)

# Extract features for training set
print("Extracting training features...")
train_features = []
for event in train_events:
    features = feature_extractor.extract_all_features(event)
    train_features.append(features)

# Extract features for test set
print("Extracting test features...")
test_features = []
for event in test_events:
    features = feature_extractor.extract_all_features(event)
    test_features.append(features)

# Get feature names
feature_names = sorted(train_features[0].keys())
print(f"Number of features: {len(feature_names)}")

# Convert to numpy arrays
X_train = np.array([[f.get(name, 0) for name in feature_names] for f in train_features])
X_test = np.array([[f.get(name, 0) for name in feature_names] for f in test_features])

print(f"Training shape: {X_train.shape}")
print(f"Test shape: {X_test.shape}")


## Define Neural Network Model


In [None]:
class LOTLNet(nn.Module):
    def __init__(self, input_dim, hidden_dims=[128, 64], dropout=0.3):
        super(LOTLNet, self).__init__()
        
        layers = []
        prev_dim = input_dim
        
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))
            prev_dim = hidden_dim
        
        # Output layer (binary classification)
        layers.append(nn.Linear(prev_dim, 2))
        
        self.network = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.network(x)

# Initialize model
input_dim = X_train.shape[1]
model = LOTLNet(input_dim=input_dim, hidden_dims=[128, 64], dropout=0.3)
print(f"Model initialized with input_dim={input_dim}")
print(model)


## Prepare Data for Training


In [None]:
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Encode labels
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(train_labels)
y_test_encoded = label_encoder.transform(test_labels)

# Convert to tensors
X_train_tensor = torch.FloatTensor(X_train_scaled)
y_train_tensor = torch.LongTensor(y_train_encoded)
X_test_tensor = torch.FloatTensor(X_test_scaled)
y_test_tensor = torch.LongTensor(y_test_encoded)

# Create datasets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# Create dataloaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

print(f"Training batches: {len(train_loader)}")
print(f"Test batches: {len(test_loader)}")


## Training


In [None]:
# Setup training
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

epochs = 50
best_f1 = 0.0

for epoch in range(epochs):
    # Training
    model.train()
    total_loss = 0
    
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    # Evaluation
    if (epoch + 1) % 5 == 0:
        model.eval()
        with torch.no_grad():
            all_preds = []
            all_labels = []
            
            for batch_X, batch_y in test_loader:
                batch_X = batch_X.to(device)
                outputs = model(batch_X)
                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(batch_y.numpy())
            
            # Calculate metrics
            y_pred = label_encoder.inverse_transform(all_preds)
            y_true = label_encoder.inverse_transform(all_labels)
            
            acc = accuracy_score(y_true, y_pred)
            prec = precision_score(y_true, y_pred, pos_label='malicious', zero_division=0)
            rec = recall_score(y_true, y_pred, pos_label='malicious', zero_division=0)
            f1 = f1_score(y_true, y_pred, pos_label='malicious', zero_division=0)
            
            if f1 > best_f1:
                best_f1 = f1
                # Save best model
                torch.save({
                    'model_state_dict': model.state_dict(),
                    'input_dim': input_dim,
                    'hidden_dims': [128, 64],
                    'dropout': 0.3,
                    'scaler': scaler,
                    'label_encoder': label_encoder,
                    'feature_names': feature_names
                }, 'best_model.pt')
            
            print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader):.4f}, "
                  f"Acc: {acc:.4f}, Prec: {prec:.4f}, Rec: {rec:.4f}, F1: {f1:.4f}")

print(f"\nTraining complete! Best F1: {best_f1:.4f}")


In [None]:
# Load best model
checkpoint = torch.load('best_model.pt')
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()

# Evaluate on test set
all_preds = []
all_labels = []
all_probs = []

with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        outputs = model(batch_X)
        probs = torch.softmax(outputs, dim=1)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(batch_y.numpy())
        all_probs.extend(probs.cpu().numpy())

y_pred = label_encoder.inverse_transform(all_preds)
y_true = label_encoder.inverse_transform(all_labels)

# Print metrics
print("Final Test Results:")
print(f"Accuracy: {accuracy_score(y_true, y_pred):.4f}")
print(f"Precision: {precision_score(y_true, y_pred, pos_label='malicious', zero_division=0):.4f}")
print(f"Recall: {recall_score(y_true, y_pred, pos_label='malicious', zero_division=0):.4f}")
print(f"F1-Score: {f1_score(y_true, y_pred, pos_label='malicious', zero_division=0):.4f}")
print("\nClassification Report:")
print(classification_report(y_true, y_pred))


## Download Model


In [None]:
# Download the model file
from google.colab import files
files.download('best_model.pt')

print("Model downloaded! Place it in the models/ directory of your project.")
