# Exercise 2: Intrusion Detection for Unix Processes

Apply the negative selection algorithm to detect anomalous system call sequences.

**Datasets:** `snd-cert` and `snd-unm` (in `syscalls/`)

**Key differences from Exercise 1:**
- Sequences are variable-length â†’ must chunk into fixed-length substrings
- Labels are in separate `.labels` files (0 = normal, 1 = anomalous)
- Must specify `-alphabet file://<alpha_file>` since the alphabet differs from the training data characters

In [None]:
import subprocess
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from sklearn.metrics import roc_curve, roc_auc_score

In [None]:
BASE_DIR = Path(".")
NEGSEL_JAR = BASE_DIR / "negsel2.jar"
SYSCALLS_DIR = BASE_DIR / "syscalls"

## 1. Data Loading & Exploration

In [None]:
def load_dataset(name: str):
    """Load a syscalls dataset (e.g. 'snd-cert' or 'snd-unm').
    
    Returns:
        train_seqs: list of training sequences
        alpha_file: path to alphabet file
        test_sets: list of (test_seqs, labels) tuples for each .test/.labels pair
    """
    d = SYSCALLS_DIR / name
    
    train_file = d / f"{name}.train"
    alpha_file = d / f"{name}.alpha"
    train_seqs = train_file.read_text().strip().split("\n")
    
    test_sets = []
    for i in [1, 2, 3]:
        test_file = d / f"{name}.{i}.test"
        label_file = d / f"{name}.{i}.labels"
        test_seqs = test_file.read_text().strip().split("\n")
        labels = [int(x) for x in label_file.read_text().strip().split("\n")]
        test_sets.append((test_seqs, labels))
    
    return train_seqs, alpha_file, test_sets

In [None]:
for name in ["snd-cert", "snd-unm"]:
    train, alpha, tests = load_dataset(name)
    print(f"=== {name} ===")
    print(f"  Alphabet: {alpha.read_text().strip()[:60]}...")
    print(f"  Training sequences: {len(train)}")
    train_lens = [len(s) for s in train]
    print(f"  Train seq lengths: min={min(train_lens)}, max={max(train_lens)}, median={int(np.median(train_lens))}")
    for i, (seqs, labels) in enumerate(tests, 1):
        n_anom = sum(labels)
        seq_lens = [len(s) for s in seqs]
        print(f"  Test {i}: {len(seqs)} seqs ({n_anom} anomalous, {len(seqs)-n_anom} normal), "
              f"lengths: min={min(seq_lens)}, max={max(seq_lens)}")
    print()

## 2. Preprocessing: Chunking Sequences

Since sequences are variable-length, we need to split them into fixed-length chunks of size `n` for both training and testing.

**Strategy choices:**
- Overlapping (sliding window) vs. non-overlapping chunks
- What to do with the remainder (discard if shorter than `n`)

In [None]:
def chunk_sequence(seq: str, n: int, overlap: bool = True) -> list[str]:
    """Split a sequence into chunks of length n.
    
    Args:
        seq: input sequence string
        n: chunk length
        overlap: if True, use a sliding window (stride=1); otherwise non-overlapping (stride=n)
    
    Returns:
        list of chunks of exactly length n
    """
    stride = 1 if overlap else n
    chunks = []
    for i in range(0, len(seq) - n + 1, stride):
        chunks.append(seq[i:i+n])
    return chunks

In [None]:
# Quick sanity check
example = "ABCDEFGHIJ"
print("Overlapping (n=4):", chunk_sequence(example, 4, overlap=True))
print("Non-overlapping (n=4):", chunk_sequence(example, 4, overlap=False))

## 3. Preparing Chunked Training Data

Write chunked training sequences to a temporary file that `negsel2.jar` can consume.

In [None]:
import tempfile

def write_chunked_train(train_seqs: list[str], n: int, overlap: bool = True) -> Path:
    """Chunk all training sequences and write to a temp file (one chunk per line)."""
    chunks = []
    for seq in train_seqs:
        chunks.extend(chunk_sequence(seq, n, overlap))
    # Deduplicate to reduce training time
    chunks = list(set(chunks))
    
    tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".train", delete=False, dir=".")
    tmp.write("\n".join(chunks) + "\n")
    tmp.close()
    print(f"Wrote {len(chunks)} unique chunks (n={n}) to {tmp.name}")
    return Path(tmp.name)

## 4. Running Negative Selection

Run `negsel2.jar` on chunked test data and aggregate scores per original sequence.

In [None]:
def run_negsel(train_file: Path, alpha_file: Path, test_chunks: list[str],
               n: int, r: int, use_log: bool = True) -> list[float]:
    """Run negsel2.jar and return the anomaly score for each test chunk."""
    cmd = [
        "java", "-jar", str(NEGSEL_JAR),
        "-alphabet", f"file://{alpha_file}",
        "-self", str(train_file),
        "-n", str(n),
        "-r", str(r),
        "-c",
    ]
    if use_log:
        cmd.append("-l")
    
    input_data = "\n".join(test_chunks) + "\n"
    result = subprocess.run(cmd, input=input_data, capture_output=True, text=True, timeout=300)
    
    if result.returncode != 0:
        raise RuntimeError(f"negsel2 failed: {result.stderr}")
    
    scores = [float(x) for x in result.stdout.strip().split("\n") if x.strip()]
    return scores

In [None]:
def score_sequences(train_file: Path, alpha_file: Path, test_seqs: list[str],
                    n: int, r: int, overlap: bool = True) -> np.ndarray:
    """Compute a composite anomaly score for each variable-length test sequence.
    
    Chunks each sequence, runs negsel on all chunks, then averages chunk scores
    per original sequence.
    """
    # Build flat list of chunks and track which sequence each belongs to
    all_chunks = []
    seq_indices = []  # maps chunk index -> original sequence index
    for i, seq in enumerate(test_seqs):
        chunks = chunk_sequence(seq, n, overlap)
        if not chunks:
            # Sequence shorter than n: use the sequence as-is (padded or skipped)
            chunks = [seq]
        all_chunks.extend(chunks)
        seq_indices.extend([i] * len(chunks))
    
    # Run negsel on all chunks at once
    chunk_scores = run_negsel(train_file, alpha_file, all_chunks, n, r)
    
    # Aggregate: average chunk scores per sequence
    seq_scores = np.zeros(len(test_seqs))
    seq_counts = np.zeros(len(test_seqs))
    for idx, score in zip(seq_indices, chunk_scores):
        seq_scores[idx] += score
        seq_counts[idx] += 1
    seq_scores /= np.maximum(seq_counts, 1)
    
    return seq_scores

## 5. ROC / AUC Analysis

In [None]:
def plot_roc(labels: np.ndarray, scores: np.ndarray, title: str = ""):
    """Plot ROC curve and return AUC."""
    fpr, tpr, _ = roc_curve(labels, scores)
    auc = roc_auc_score(labels, scores)
    
    plt.figure(figsize=(5, 5))
    plt.plot(fpr, tpr, label=f"AUC = {auc:.3f}")
    plt.plot([0, 1], [0, 1], "k--", alpha=0.3)
    plt.xlabel("1 - Specificity (FPR)")
    plt.ylabel("Sensitivity (TPR)")
    plt.title(title)
    plt.legend(loc="lower right")
    plt.tight_layout()
    plt.show()
    
    return auc

## 6. Run Experiment

Choose parameters and evaluate on both datasets. Start with the language example parameters (`n=10`, `r=4`) as a baseline, then explore.

In [None]:
# Parameters to explore
N = 10
R = 4
OVERLAP = True

In [None]:
results = {}

for dataset_name in ["snd-cert", "snd-unm"]:
    train_seqs, alpha_file, test_sets = load_dataset(dataset_name)
    
    # Prepare chunked training data
    train_file = write_chunked_train(train_seqs, N, overlap=OVERLAP)
    
    try:
        for test_idx, (test_seqs, labels) in enumerate(test_sets, 1):
            print(f"\nScoring {dataset_name} test {test_idx} (n={N}, r={R})...")
            scores = score_sequences(train_file, alpha_file, test_seqs, N, R, overlap=OVERLAP)
            auc = plot_roc(
                np.array(labels), scores,
                title=f"{dataset_name} test {test_idx} (n={N}, r={R})"
            )
            print(f"  AUC = {auc:.4f}")
            results[(dataset_name, test_idx)] = auc
    finally:
        Path(train_file).unlink()  # clean up temp file

## 7. Parameter Sweep

Systematically vary `n` and `r` to find the best configuration.

In [None]:
# TODO: Sweep over different values of n and r
# For each (n, r) combination, compute AUC on one representative test set
# and record the results for comparison.

n_values = [7, 10, 15]
r_values = [2, 3, 4, 5, 6]

# Example: sweep on snd-unm test 1 (smallest test set, fastest to evaluate)
dataset_name = "snd-unm"
train_seqs, alpha_file, test_sets = load_dataset(dataset_name)
test_seqs, labels = test_sets[0]  # test 1

sweep_results = {}

for n in n_values:
    train_file = write_chunked_train(train_seqs, n, overlap=True)
    try:
        for r in r_values:
            if r >= n:
                continue
            print(f"n={n}, r={r}...", end=" ")
            try:
                scores = score_sequences(train_file, alpha_file, test_seqs, n, r)
                auc = roc_auc_score(labels, scores)
                print(f"AUC={auc:.4f}")
                sweep_results[(n, r)] = auc
            except Exception as e:
                print(f"Error: {e}")
    finally:
        Path(train_file).unlink()

In [None]:
# Visualize parameter sweep results
if sweep_results:
    fig, ax = plt.subplots(figsize=(8, 5))
    for n in n_values:
        rs = [r for (nn, r) in sweep_results if nn == n]
        aucs = [sweep_results[(n, r)] for r in rs]
        if rs:
            ax.plot(rs, aucs, "o-", label=f"n={n}")
    ax.set_xlabel("r")
    ax.set_ylabel("AUC")
    ax.set_title(f"Parameter Sweep: {dataset_name}")
    ax.legend()
    plt.tight_layout()
    plt.show()

## 8. Final Evaluation with Best Parameters

Use the best `(n, r)` found above to evaluate on all test sets of both datasets.

In [None]:
# TODO: Set best parameters from the sweep above
BEST_N = 10
BEST_R = 4

# Re-run on all test sets with the best parameters and produce final ROC plots

## 9. Discussion

**TODO:** Answer the following in the report:

1. What preprocessing choices did you make (chunking strategy, overlap vs non-overlap) and why?
2. How did you aggregate chunk-level scores into sequence-level anomaly scores?
3. What parameters `n` and `r` worked best and why?
4. How well does the classifier perform on each dataset (`snd-cert` vs `snd-unm`)?
5. What would be the biggest challenges in implementing the negative selection algorithm yourself?