<a href="https://colab.research.google.com/github/Kolawole-a2/Kola_Projects/blob/main/MiTre_%26_Defend_Detailed_Merged_Table.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# PATCH CELL: fix "unhashable list" by stringifying list columns before drop_duplicates

import os, re, io, zipfile
import pandas as pd
import requests
from google.colab import files

ATTACK_CSV = "/content/attack_techniques.csv"
PAIRWISE_CSV = "/content/attack_d3fend_merged.csv"
MASTER_CSV = "/content/attack_defense_master.csv"
ZIP_PATH = "/content/mitre_attack_d3fend_tables.zip"

D3FEND_JSON_URL  = "https://d3fend.mitre.org/api/ontology/inference/d3fend-full-mappings.json"
D3FEND_CSV_URL   = "https://d3fend.mitre.org/api/ontology/inference/d3fend-full-mappings.csv"

def fetch_json(url: str):
    r = requests.get(url, timeout=60, headers={"User-Agent":"colab-mitre-builder"})
    r.raise_for_status()
    return r.json()

def fetch_text(url: str) -> str:
    r = requests.get(url, timeout=60, headers={"User-Agent":"colab-mitre-builder"})
    r.raise_for_status()
    return r.text

def extract_tids(text) -> list:
    if pd.isna(text): return []
    return re.findall(r"(T\d{4}(?:\.\d{3})?)", str(text))

def infer_cat(name: str) -> str:
    key = (name or "").lower()
    for cat in ["harden","isolate","detect","deceive","evict","neutralize","recover","resilience","respond"]:
        if cat in key:
            return cat.capitalize()
    return ""

def load_d3fend_schema_agnostic() -> pd.DataFrame:
    # Try CSV first (more stable), then JSON
    try:
        raw = pd.read_csv(io.StringIO(fetch_text(D3FEND_CSV_URL)))
        rows = []
        for _, r in raw.iterrows():
            tids = set()
            for v in r.to_list():
                tids.update(extract_tids(v))
            if not tids:
                continue
            # D3FEND name (best-effort)
            d3_name = ""
            for v in r.to_list():
                s = str(v)
                if "d3f:" in s or "d3fend" in s.lower():
                    d3_name = s.split(":")[-1] if ":" in s else s
                    break
            # Category, relation (best-effort)
            d3_cat = ""
            relation = ""
            for c in raw.columns:
                lc = c.lower()
                if not d3_cat and ("category" in lc or "tactic" in lc):
                    d3_cat = str(r[c])
                if not relation and ("relation" in lc or "predicate" in lc):
                    relation = str(r[c])
            if not d3_cat:
                d3_cat = infer_cat(d3_name)
            for tid in tids:
                rows.append({
                    "attack_technique_id": tid,
                    "d3fend_name": d3_name,
                    "d3fend_category": d3_cat,
                    "relation": relation,
                    "d3fend_raw": d3_name
                })
        return pd.DataFrame(rows).drop_duplicates()
    except Exception:
        data = fetch_json(D3FEND_JSON_URL)
        bindings = data.get("results", {}).get("bindings", []) or data.get("bindings", []) or []
        rows = []
        def label(d): return d.get("label") or d.get("value") or ""
        for b in bindings:
            subj = label(b.get("s", {})) or label(b.get("subject", {}))
            pred = label(b.get("p", {})) or label(b.get("predicate", {}))
            obj  = label(b.get("o", {})) or label(b.get("object", {}))
            tids = set(extract_tids(" ".join([subj, pred, obj])))
            if not tids:
                continue
            d3_name = ""
            for t in (subj, obj):
                if "d3f:" in t or "d3fend" in t.lower():
                    d3_name = t.split(":")[-1] if ":" in t else t
                    break
            d3_cat = infer_cat(d3_name)
            for tid in tids:
                rows.append({
                    "attack_technique_id": tid,
                    "d3fend_name": d3_name,
                    "d3fend_category": d3_cat,
                    "relation": pred or "",
                    "d3fend_raw": d3_name
                })
        return pd.DataFrame(rows).drop_duplicates()

# 1) Load ATT&CK techniques already created
df_attack = pd.read_csv(ATTACK_CSV)

# 2) Load D3FEND mappings
df_d3 = load_d3fend_schema_agnostic()
# Normalize technique IDs to canonical T####(.###)
def norm_tid(x):
    if pd.isna(x): return None
    m = re.search(r"(T\d{4}(?:\.\d{3})?)", str(x))
    return m.group(1) if m else None
df_d3["attack_technique_id"] = df_d3["attack_technique_id"].map(norm_tid)
df_d3 = df_d3.dropna(subset=["attack_technique_id"]).drop_duplicates()

# 3) Merge (left join to keep all techniques)
merged_left = df_attack.merge(
    df_d3,
    left_on="technique_id",
    right_on="attack_technique_id",
    how="left"
)

# 4) Stringify list-like columns BEFORE any drop_duplicates
def stringify(v):
    if isinstance(v, list):
        return " | ".join(map(str, v))
    return "" if pd.isna(v) else v

for col in ["tactics","platforms"]:
    if col in merged_left.columns:
        merged_left[col] = merged_left[col].apply(stringify)

# 5) Pairwise crosswalk (mapped rows only)
pairwise = merged_left.dropna(subset=["d3fend_name"]).copy()
pairwise_cols = [
    "attack_technique_id","technique_name","is_subtechnique","parent_technique_id","tactics","platforms",
    "d3fend_name","d3fend_category","relation","d3fend_raw"
]
for c in pairwise_cols:
    if c not in pairwise.columns: pairwise[c] = ""
pairwise = pairwise[pairwise_cols].drop_duplicates().reset_index(drop=True)
pairwise.to_csv(PAIRWISE_CSV, index=False)

# 6) Aggregated master (one row per technique, incl. unmapped)
if not pairwise.empty:
    agg = (
        pairwise
        .groupby("attack_technique_id", dropna=True)
        .agg({
            "d3fend_name": lambda s: " | ".join(sorted(set([x for x in s if str(x).strip()]))),
            "d3fend_category": lambda s: " | ".join(sorted(set([x for x in s if str(x).strip()]))),
        })
        .rename(columns={"d3fend_name":"d3fend_names","d3fend_category":"d3fend_categories"})
        .reset_index()
    )
    counts = pairwise.groupby("attack_technique_id")["d3fend_name"].nunique().reset_index(name="d3fend_count")
    agg = agg.merge(counts, on="attack_technique_id", how="left")
else:
    agg = pd.DataFrame(columns=["attack_technique_id","d3fend_names","d3fend_categories","d3fend_count"])

master = df_attack.merge(agg, left_on="technique_id", right_on="attack_technique_id", how="left")
master["has_d3fend_mapping"] = master["d3fend_count"].fillna(0).astype(int).gt(0)
if "attack_technique_id" in master.columns:
    master.drop(columns=["attack_technique_id"], inplace=True)
master = master.sort_values(["has_d3fend_mapping","technique_id","is_subtechnique"], ascending=[False, True, True])

master.to_csv(MASTER_CSV, index=False)

# 7) Zip and download
with zipfile.ZipFile(ZIP_PATH, "w", zipfile.ZIP_DEFLATED) as zf:
    zf.write(ATTACK_CSV, arcname=os.path.basename(ATTACK_CSV))
    zf.write(PAIRWISE_CSV, arcname=os.path.basename(PAIRWISE_CSV))
    zf.write(MASTER_CSV, arcname=os.path.basename(MASTER_CSV))

print(f"Saved:\n- {ATTACK_CSV}\n- {PAIRWISE_CSV} ({len(pairwise)} rows)\n- {MASTER_CSV} ({len(master)} rows)")
files.download(ZIP_PATH)


Saved:
- /content/attack_techniques.csv
- /content/attack_d3fend_merged.csv (1448 rows)
- /content/attack_defense_master.csv (679 rows)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>