# TTP Mapping & Quick Evaluation (Retrieval + LLM)

Short notebook to map detection descriptions to MITRE ATT&CK technique IDs using:
- Sentence-transformer embeddings + nearest neighbor retrieval
- Local LLM (phi-4) to consolidate / re-rank and justify
- Precision / Recall / F1@K evaluation (if ground truth provided)

## Input Files

- `MITRE_TPPs.xlsx` (reference techniques)
- `MDE_SampleDetections.xlsx` (detections to label) — ground truth in column `MiTRE_TTPs` (not sent to the model)

## Flow (Cell Numbers)

1. (2) Install deps (run once or skip if already installed)
2. (3) Config + direct LLM load (always on)
3. (4) Load TTPs & prep text
4. (5) Embed TTPs
5. (6) Build vector index + retrieval helper
6. (7) Load detections (keeps `MiTRE_TTPs` separate)
7. (8) LLM consolidation + labeling pipeline
8. (9) Evaluation
9. (10) Inspect an example

## Key Params

- Retrieval size: `k_retrieval` arg in `label_detections` (Cell 8).
- Output top N: `top_n_final` in `label_detections`.
- Evaluation K: variable `K` in evaluation cell (Cell 9).

## Notes

- The ground truth column `MiTRE_TTPs` is never included in the prompt; it's only used for scoring.
- If column names differ, edit the mapping variables after they are auto-detected.
- LLM is always used so we can evaluate the full retrieval + reasoning chain.
- You can save predictions later by dumping the `predictions` list to JSON.

In [None]:
# Install required packages (run once). Comment out if already installed.
import sys, subprocess, importlib


def pip_install(pkgs):

    for p in pkgs:

        mod_name = p.split('==')[0].replace('-', '_')

        try:

            importlib.import_module(mod_name)

        except ImportError:

            print('Installing', p)

            subprocess.check_call([sys.executable, '-m', 'pip', 'install', p])



pip_install([

    'pandas', 'openpyxl', 'sentence-transformers', 'scikit-learn', 'transformers', 'torch'

])


print('Core dependencies ensured.')

Installing openpyxl...
Installing sentence-transformers...
Installing sentence-transformers...


CalledProcessError: Command '['C:\\Users\\emvictor\\AppData\\Local\\Microsoft\\WindowsApps\\PythonSoftwareFoundation.Python.3.13_qbz5n2kfra8p0\\python.exe', '-m', 'pip', 'install', 'sentence-transformers']' returned non-zero exit status 1.

In [None]:
# Imports & configuration (clean, LLM mandatory)

import pandas as pd

import numpy as np

from sentence_transformers import SentenceTransformer

from sklearn.neighbors import NearestNeighbors

from typing import List, Dict, Any

import os, math, json, torch

from transformers import AutoTokenizer, AutoModelForCausalLM



MODEL_NAME = "microsoft/phi-4"

print(f"Loading LLM: {MODEL_NAME} ... (first load may download weights)")

llm_tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

llm_model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, device_map="auto")

try:

    if torch.cuda.is_available():

        llm_model.to(torch.device("cuda"))

except Exception as e:

    print("Could not move model to CUDA:", e)



DEVICE = str(next(llm_model.parameters()).device)

print('LLM device:', DEVICE)



def generate_chat(messages, max_new_tokens: int = 128):

    inputs = llm_tokenizer.apply_chat_template(

        messages,

        add_generation_prompt=True,

        tokenize=True,

        return_dict=True,

        return_tensors="pt",

    ).to(llm_model.device)

    with torch.no_grad():

        outputs = llm_model.generate(**inputs, max_new_tokens=max_new_tokens)

    generated_text = llm_tokenizer.decode(outputs[0][inputs["input_ids"].shape[-1]:], skip_special_tokens=True)

    return generated_text.strip()

In [None]:
# Load MITRE TTP reference data
TPP_FILE = 'MITRE_TPPs.xlsx'  # Adjust if needed
try:
    ttp_df = pd.read_excel(TPP_FILE)
except FileNotFoundError:
    raise FileNotFoundError(f'Could not find {TPP_FILE}. Please place it in the working directory.')

print('Columns in TTP file:', list(ttp_df.columns))
# Attempt to auto-detect columns
col_map = {}
def detect(col_names, candidates):
    for c in candidates:
        for cn in col_names:
            if cn.lower() == c.lower():
                return cn
    for c in candidates:
        for cn in col_names:
            if c.lower() in cn.lower():
                return cn
    return None
col_map['id'] = detect(ttp_df.columns, ['TechniqueID','ID','TID','Technique Id'])
col_map['name'] = detect(ttp_df.columns, ['TechniqueName','Name','Technique'])
col_map['desc'] = detect(ttp_df.columns, ['Description','Details','TechniqueDescription'])
print('Detected column mapping:', col_map)
missing = [k for k,v in col_map.items() if v is None]
if missing:
    raise ValueError(f'Could not detect columns for: {missing}. Please edit this cell and set col_map manually.')

# Clean & prep text field for embedding
def prep_text(row):
    return ' | '.join([str(row[col_map['id']]), str(row[col_map['name']]), str(row[col_map['desc']])])
ttp_df['__text'] = ttp_df.apply(prep_text, axis=1)
print('Sample prepared text:', ttp_df['__text'].iloc[0][:200])
print('Total TTP records:', len(ttp_df))

In [None]:
# Embed TTPs
EMBED_MODEL_NAME = 'sentence-transformers/all-MiniLM-L6-v2'  # small & fast
embed_model = SentenceTransformer(EMBED_MODEL_NAME)
ttp_embeddings = embed_model.encode(ttp_df['__text'].tolist(), batch_size=64, show_progress_bar=True, convert_to_numpy=True, normalize_embeddings=True)
print('Embedding shape:', ttp_embeddings.shape)

In [None]:
# Build NearestNeighbors index (cosine similarity via metric='cosine')
nn_index = NearestNeighbors(n_neighbors=25, metric='cosine')  # keep larger n_neighbors internally
nn_index.fit(ttp_embeddings)
print('Vector index ready.')

def retrieve_ttp(query: str, top_k: int = 5):
    """Return top_k technique rows (with similarity) for a query string."""
    q_emb = embed_model.encode([query], normalize_embeddings=True)
    distances, indices = nn_index.kneighbors(q_emb, n_neighbors=top_k)
    # cosine distance -> similarity = 1 - distance
    sims = 1 - distances[0]
    rows = []
    for sim, idx in zip(sims, indices[0]):
        r = ttp_df.iloc[idx]
        rows.append({
            'TechniqueID': r[col_map['id']],
            'TechniqueName': r[col_map['name']],
            'Similarity': float(sim),
            'Description': r[col_map['desc']]
        })
    return rows

def build_rag_context(candidates):
    """Build a concise context block for the LLM from retrieved techniques.
    Truncates description to keep prompt size manageable."""
    lines = []
    for i, c in enumerate(candidates, start=1):
        desc_snip = str(c['Description'])[:400].replace('\n', ' ')
        lines.append(f"{i}. {c['TechniqueID']} | {c['TechniqueName']} | {desc_snip}")
    return "\n".join(lines)

# Quick smoke test (retrieval only)
_test = retrieve_ttp('credential dumping from lsass memory', top_k=3)
_test[:1]  # show first candidate

In [None]:
# Load detection dataset (ground truth column MiTRE_TTPs will be excluded from model input)
DETECTIONS_FILE = 'MDE_SampleDetections.xlsx'  # Adjust if needed
GROUND_TRUTH_COLUMN_NAME = 'MiTRE_TTPs'  # Explicit ground truth column (labels expected from LLM)
try:
    det_df = pd.read_excel(DETECTIONS_FILE)
except FileNotFoundError:
    raise FileNotFoundError(f'Could not find {DETECTIONS_FILE}. Place it in working directory.')
print('Columns in detection file:', list(det_df.columns))

# Ensure ground truth column exists (optional)
if GROUND_TRUTH_COLUMN_NAME not in det_df.columns:
    print(f"Warning: Expected ground truth column '{GROUND_TRUTH_COLUMN_NAME}' not found. Evaluation will be skipped unless you set det_map['gt'] manually.")

# Auto-detect ID / Name / Description columns while EXCLUDING the ground truth column
candidate_cols = [c for c in det_df.columns if c != GROUND_TRUTH_COLUMN_NAME]

# ID column: prefer one containing 'id' but not technique; fallback to first non-gt column
id_col = next((c for c in candidate_cols if 'id' in c.lower() and 'technique' not in c.lower()), candidate_cols[0])

# Name column
name_col = next((c for c in candidate_cols if 'name' in c.lower() and c != id_col), (candidate_cols[1] if len(candidate_cols) > 1 else id_col))

# Description column: look for 'desc' or 'detail'; fallback to last non-gt column that's not id/name
possible_desc = [c for c in candidate_cols if c not in {id_col, name_col}]
desc_col = next((c for c in possible_desc if ('desc' in c.lower()) or ('detail' in c.lower()) or ('description' in c.lower())), (possible_desc[-1] if possible_desc else name_col))

# Ground truth column mapping
_gt = GROUND_TRUTH_COLUMN_NAME if GROUND_TRUTH_COLUMN_NAME in det_df.columns else None

# Final mapping
det_map = {
    'id': id_col,
    'name': name_col,
    'desc': desc_col,
    'gt': _gt
}

# Safety: assert we are not using ground truth column as description
if det_map['gt'] and det_map['gt'] == det_map['desc']:
    raise ValueError("Ground truth column was mistakenly selected as description. Please adjust mappings manually.")

print('Detected detection column mapping:', det_map)
print('Total detections:', len(det_df))

# Preview WITHOUT leaking ground truth into description prompt
preview_cols = [det_map['id'], det_map['name'], det_map['desc']] + ([det_map['gt']] if det_map['gt'] else [])
det_df[preview_cols].head(2)

In [None]:
# LLM-based labeling: retrieval provides candidate context; LLM must return JSON with technique IDs.

import json as _json



def llm_rag_label(detection_text: str, k_retrieval: int = 8, max_new_tokens: int = 220):

    candidates = retrieve_ttp(detection_text, top_k=k_retrieval)

    context_block = build_rag_context(candidates)

    system_prompt = (

        "You are a cybersecurity assistant mapping detection descriptions to MITRE ATT&CK technique IDs. "

        "Use ONLY the provided candidate techniques. If none fit, return an empty list. "

        "Output strict JSON with keys: technique_ids (array of strings), rationale (short string)."

    )

    user_prompt = (

        f"Detection Description:\n{detection_text}\n\n" \

        f"Candidate Techniques (retrieved):\n{context_block}\n\n" \

        "Return JSON ONLY. Example format: {\"technique_ids\": [\"T1059\"], \"rationale\": \"Short reason\"}"

    )

    messages = [

        {"role": "system", "content": system_prompt},

        {"role": "user", "content": user_prompt}

    ]

    reply = generate_chat(messages, max_new_tokens=max_new_tokens)



    parsed_ids = []

    rationale = ''

    try:

        start = reply.find('{')

        end = reply.rfind('}')

        if start != -1 and end != -1 and end > start:

            obj = _json.loads(reply[start:end+1])

            if isinstance(obj.get('technique_ids'), list):

                parsed_ids = [str(x) for x in obj['technique_ids'] if x]

            rationale = str(obj.get('rationale', ''))

    except Exception as pe:

        rationale = f'Failed to parse JSON: {pe}. Raw: {reply[:160]}'



    candidate_id_set = {c['TechniqueID'] for c in candidates}

    parsed_ids = [cid for cid in parsed_ids if cid in candidate_id_set]



    return {

        'predicted_ids': parsed_ids,

        'rationale': rationale,

        'raw_response': reply,

        'candidates': candidates

    }



def run_rag_labeling(k_retrieval: int = 8):

    outputs = []

    for _, row in det_df.iterrows():

        text = str(row[det_map['desc']])

        result = llm_rag_label(text, k_retrieval=k_retrieval)

        outputs.append({

            'DetectionID': row[det_map['id']],

            'PredictedTechniqueIDs': result['predicted_ids'],

            'Rationale': result.get('rationale'),

            'RawResponse': result.get('raw_response'),

            'Retrieved': result['candidates']

        })

    return outputs



predictions = run_rag_labeling()

predictions[:2]

In [None]:
# Core classification metric: F1@K (micro only)
if det_map.get('gt') is None:
    print('No ground truth column detected; skipping F1@K evaluation.')
else:
    import numpy as np, re
    TECH_ID_REGEX = re.compile(r"T[0-9]{4,5}(?:\.[0-9]{3})?")
    def extract_ids(val):
        if pd.isna(val): return []
        text = str(val)
        return list({m.group(0) for m in TECH_ID_REGEX.finditer(text)})
    gt_lists = det_df[det_map['gt']].apply(extract_ids).tolist()
    pred_lists = [p['PredictedTechniqueIDs'] for p in predictions]
    K = 3
    tp = fp = fn = 0
    for gt, pred in zip(gt_lists, pred_lists):
        set_gt = set(gt)
        set_pred = set(pred[:K])
        tp += len(set_gt & set_pred)
        fp += len(set_pred - set_gt)
        fn += len(set_gt - set_pred)
    micro_precision = tp / (tp + fp) if (tp + fp) else 0
    micro_recall = tp / (tp + fn) if (tp + fn) else 0
    micro_f1 = 2*micro_precision*micro_recall / (micro_precision + micro_recall) if (micro_precision + micro_recall) else 0
    results = {'F1@K_micro': micro_f1, 'K': K, 'NumDetections': len(predictions)}
    print('F1@K_micro:', round(micro_f1,4))

In [None]:
# Faithfulness & Relevance (LLM-as-judge only)


import re, json as _j


TECH_ID_REGEX = re.compile(r"T[0-9]{4,5}(?:\.[0-9]{3})?")


gold_col = det_map.get('gt')


records = []
for i, row in det_df.iterrows():
    raw_gold = str(row[gold_col]) if gold_col and not pd.isna(row[gold_col]) else ''
    gt_ids = list({m.group(0) for m in TECH_ID_REGEX.finditer(raw_gold)}) if raw_gold else []
    pred_entry = predictions[i]
    rec = {
        'DetectionID': row[det_map['id']],
        'GroundTruthIDs': gt_ids,
        'PredictedIDs': pred_entry['PredictedTechniqueIDs'],
        'Rationale': pred_entry.get('Rationale') or '',
        'RetrievedContext': '\n'.join([f"{c['TechniqueID']} | {c['TechniqueName']} | {str(c['Description'])[:200]}" for c in pred_entry['Retrieved']])
    }
    records.append(rec)


def judge_pair(rationale: str, context: str, detection_text: str):
    system = ("You are evaluating a security detection explanation. Output JSON with faithfulness and relevance in [0,1]. "
              "Faithfulness: grounded in context. Relevance: addresses the detection text.")
    user = (f"Detection: {detection_text}\n\nContext:\n{context[:1800]}\n\nRationale:\n{rationale}\n\nRespond ONLY as {{'faithfulness': <num>, 'relevance': <num>}}")
    messages = [{"role": "system", "content": system}, {"role": "user", "content": user}]
    try:
        reply = generate_chat(messages, max_new_tokens=100)
        s = reply.find('{'); e = reply.rfind('}')
        if s != -1 and e != -1 and e > s:
            obj = _j.loads(reply[s:e+1].replace("'", '"'))
            f = float(obj.get('faithfulness')) if obj.get('faithfulness') is not None else None
            r = float(obj.get('relevance')) if obj.get('relevance') is not None else None
            if f is not None and not (0 <= f <= 1): f = None
            if r is not None and not (0 <= r <= 1): r = None
            return f, r
    except Exception:
        return None, None
    return None, None


for rec in records:
    f, r = judge_pair(rec['Rationale'], rec['RetrievedContext'], rec['Rationale'])
    rec['Faithfulness'] = f
    rec['Relevance'] = r


import pandas as _pd
results_df = _pd.DataFrame(records)
faith_avg = float(results_df['Faithfulness'].mean()) if results_df['Faithfulness'].notna().any() else None
rel_avg = float(results_df['Relevance'].mean()) if results_df['Relevance'].notna().any() else None


evaluation_summary = {
    'F1@K_micro': results.get('F1@K_micro') if 'results' in globals() else None,
    'Faithfulness_avg': faith_avg,
    'Relevance_avg': rel_avg,
    'NumDetections': len(results_df)
}
print('Evaluation Summary (Slim):')
print(evaluation_summary)
results_df.head(3)

In [None]:
# Inspect a single detection with its RAG context & final predictions
idx = 0  # change this index to inspect another detection
sample_pred = predictions[idx]
det_row = det_df.iloc[idx]
print('Detection ID:', det_row.get(det_map['id']))
print('Detection Name:', det_row.get(det_map['name']))
print('Description:', str(det_row.get(det_map['desc']))[:500])
print('Predicted Technique IDs:', sample_pred['PredictedTechniqueIDs'])
print('Mode:', sample_pred['Mode'])
print('Rationale:', sample_pred.get('Rationale'))
if det_map.get('gt'):
    print('Ground Truth IDs:', det_row.get(det_map['gt']))
print('\nTop Retrieved Candidates (first 5):')
for c in sample_pred['Retrieved'][:5]:
    print(f" - {c['TechniqueID']} ({c['Similarity']:.3f}): {c['TechniqueName']}")
print('\nRaw LLM Response (truncated):')
raw = sample_pred.get('RawResponse')
if raw:
    print(raw[:400])

In [None]:
## Notes on Direct Model Loading

You are now loading the causal LLM (phi-4) directly in the configuration cell. If you want to disable it (to speed up experimentation with retrieval + evaluation only), set `USE_LLM = False` and re-run from the top.

### Troubleshooting
- If you encounter out-of-memory errors, try a smaller model or add `torch_dtype="bfloat16"` (if supported) when calling `from_pretrained`.
- If you only have CPU available, generation will be slower; consider reducing `max_new_tokens`.
- To log raw prompts and responses for auditing, wrap calls to `generate_chat` and append entries to a list or dataframe.

### Quick Example (already in code)
```python
reply = generate_chat([
    {"role": "user", "content": "List 2 common credential dumping techniques."}
])
print(reply)
```
