# CVE to CWE Linker
This notebook provides a tool to link a CVE ID to its associated CWEs and display their names and descriptions using the local CVE and CWE databases.

In [1]:
import os
import json
import re
import math
import shutil
import subprocess
from pathlib import Path

import numpy as np
import scipy.sparse as sp
import xml.etree.ElementTree as ET

# Project-relative paths (assumes you run the notebook from the repo root)
PROJECT_ROOT = Path.cwd()
CWE_XML_PATH = PROJECT_ROOT / "data" / "cwec_v4.19.xml"
CVE_BASE_DIR = PROJECT_ROOT / "data" / "cvelistV5-main" / "cves"

print(f"Project root: {PROJECT_ROOT}")
print(f"CWE Database: {CWE_XML_PATH}")
print(f"CVE Database Directory: {CVE_BASE_DIR}")

Project root: /home/dnfy/Desktop/Fortiss
CWE Database: /home/dnfy/Desktop/Fortiss/data/cwec_v4.19.xml
CVE Database Directory: /home/dnfy/Desktop/Fortiss/data/cvelistV5-main/cves


## 1. Parse CWE Database
We parse the CWE XML file to create a mapping from CWE ID to its name and description.

In [None]:
def parse_cwe_database(xml_path: Path):
    """Parse CWE XML into:
    - cwe_map: CWE-<id> -> {name, description, extended_description}
    - cwe_corpus: list[{id, name, description, text}] for retrieval

    PHASE 1 IMPROVEMENT: Enriched documents with extended_description,
    consequences, and common consequences for better retrieval surface.
    """
    cwe_map = {}
    cwe_corpus = []

    xml_path = Path(xml_path)
    tree = ET.parse(xml_path)
    root = tree.getroot()

    # Extract namespace if present
    ns = {"cwe": root.tag.split("}")[0].strip("{")} if "}" in root.tag else {}
    xpath = ".//cwe:Weakness" if ns else ".//Weakness"

    for weakness in root.findall(xpath, ns):
        wid = weakness.get("ID")
        wname = weakness.get("Name")

        # Primary description
        desc_elem = weakness.find("cwe:Description", ns) if ns else weakness.find("Description")
        description = (desc_elem.text or "").strip() if desc_elem is not None else ""
        if not description:
            description = "No description available."

        # Extended description (often more detailed)
        ext_desc_elem = weakness.find("cwe:Extended_Description", ns) if ns else weakness.find("Extended_Description")
        extended_description = ""
        if ext_desc_elem is not None:
            # Extended description can have mixed text/p tags
            parts = []
            if ext_desc_elem.text:
                parts.append(ext_desc_elem.text.strip())
            for child in ext_desc_elem:
                if child.text:
                    parts.append(child.text.strip())
                if child.tail:
                    parts.append(child.tail.strip())
            extended_description = " ".join(p for p in parts if p)

        # Common consequences (technical impact)
        consequences = []
        cons_elem = weakness.find("cwe:Common_Consequences", ns) if ns else weakness.find("Common_Consequences")
        if cons_elem is not None:
            for cons in cons_elem.findall("cwe:Consequence" if ns else "Consequence", ns):
                scope_elem = cons.find("cwe:Scope" if ns else "Scope", ns)
                impact_elem = cons.find("cwe:Impact" if ns else "Impact", ns)
                if scope_elem is not None and scope_elem.text:
                    consequences.append(scope_elem.text.strip())
                if impact_elem is not None and impact_elem.text:
                    consequences.append(impact_elem.text.strip())

        cwe_id = f"CWE-{wid}"
        cwe_map[cwe_id] = {
            "name": wname,
            "description": description,
            "extended_description": extended_description
        }

        # Enriched retrieval text: combine all fields for better matching
        text_parts = [f"{cwe_id}: {wname}", description]
        if extended_description:
            text_parts.append(extended_description)
        if consequences:
            text_parts.append("Consequences: " + ", ".join(set(consequences)))

        text = ". ".join(text_parts)
        cwe_corpus.append({
            "id": cwe_id,
            "name": wname,
            "description": description,
            "extended_description": extended_description,
            "text": text
        })

    print(f"Successfully parsed {len(cwe_map)} CWEs from {xml_path.name} (enriched with extended fields).")
    return cwe_map, cwe_corpus

cwe_map, cwe_corpus = parse_cwe_database(CWE_XML_PATH)

Successfully parsed 969 CWEs from cwec_v4.19.xml.


## 1b. PHASE 1: Enhanced Hybrid RAG System

**Major improvements implemented:**
1. **True Hybrid Retrieval**: BM25 (lexical) + Dense embeddings (semantic)
2. **Query Expansion**: Rule-based security vocabulary expansion
3. **RRF Fusion**: Reciprocal Rank Fusion to combine BM25 and dense scores
4. **Two-Stage Retrieval**: BM25 → dense re-ranking

**Retriever backends:**
- **BM25**: Lexical matching (great for exact security terms like "SQL injection")
- **Dense (SBERT)**: Semantic similarity (optional, auto-upgrades if installed)
- **Hybrid**: Combines both with RRF for best of both worlds

In [None]:
# ===========================
# PHASE 1: Query Expansion
# ===========================

# Security-specific vocabulary for query expansion (bridges CVE→CWE vocabulary gap)
SECURITY_EXPANSIONS = {
    # Memory corruption
    "buffer overflow": ["out of bounds write", "memory corruption", "buffer error", "heap overflow", "stack overflow"],
    "heap overflow": ["buffer overflow", "memory corruption", "out of bounds write"],
    "stack overflow": ["buffer overflow", "memory corruption", "out of bounds write"],
    "use after free": ["memory corruption", "dangling pointer", "temporal memory safety"],
    "double free": ["memory corruption", "heap corruption"],
    "null pointer": ["null dereference", "null pointer dereference"],
    
    # Injection attacks
    "sql injection": ["query injection", "improper neutralization", "code injection", "command injection"],
    "command injection": ["os command injection", "code injection", "improper neutralization"],
    "xss": ["cross site scripting", "script injection", "improper neutralization"],
    "cross site scripting": ["xss", "script injection", "improper neutralization"],
    "ldap injection": ["query injection", "improper neutralization"],
    "xpath injection": ["query injection", "improper neutralization"],
    
    # Authentication & Authorization
    "authentication bypass": ["improper authentication", "missing authentication", "broken authentication"],
    "privilege escalation": ["improper privilege management", "improper authorization", "vertical privilege escalation"],
    "authorization": ["access control", "improper authorization", "privilege management"],
    "access control": ["authorization", "improper access control", "missing access control"],
    
    # Cryptographic issues
    "weak encryption": ["inadequate encryption strength", "weak cryptography", "insecure cryptographic algorithm"],
    "weak hash": ["weak cryptographic hash", "inadequate encryption strength"],
    "hard coded": ["hardcoded credentials", "embedded credentials", "plaintext storage"],
    "plaintext": ["cleartext storage", "cleartext transmission", "missing encryption"],
    
    # Path/file issues
    "path traversal": ["directory traversal", "path injection", "improper limitation of pathname"],
    "directory traversal": ["path traversal", "path injection"],
    "file upload": ["unrestricted file upload", "improper validation of file"],
    
    # Input validation
    "integer overflow": ["numeric overflow", "wrap around", "integer wraparound"],
    "format string": ["format string vulnerability", "uncontrolled format string"],
    "race condition": ["time of check time of use", "toctou", "concurrent execution"],
    
    # Web-specific
    "csrf": ["cross site request forgery", "session riding"],
    "cross site request forgery": ["csrf", "session riding"],
    "open redirect": ["url redirection", "improper url redirect"],
    "ssrf": ["server side request forgery", "improper url handling"],
    
    # Misc
    "denial of service": ["resource exhaustion", "uncontrolled resource consumption", "dos"],
    "dos": ["denial of service", "resource exhaustion"],
    "information disclosure": ["information exposure", "sensitive information exposure"],
    "deserialization": ["unsafe deserialization", "untrusted deserialization"],
}


def expand_query(query_text: str, max_expansions: int = 3) -> str:
    """
    Expand query with security-specific synonyms and related terms.
    Returns: original + expanded terms (deduped)
    """
    query_lower = query_text.lower()
    expanded_terms = []
    
    for trigger, expansions in SECURITY_EXPANSIONS.items():
        if trigger in query_lower:
            # Add top N expansions for each matched trigger
            expanded_terms.extend(expansions[:max_expansions])
    
    if expanded_terms:
        # Deduplicate and combine
        expanded_terms = list(dict.fromkeys(expanded_terms))  # preserve order, remove dupes
        return query_text + " " + " ".join(expanded_terms)
    
    return query_text


# ===========================
# PHASE 1: BM25 Retriever
# ===========================

def _normalize_text(s: str) -> str:
    s = (s or "").lower()
    s = re.sub(r"[^a-z0-9]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s


def _tokenize(s: str):
    s = _normalize_text(s)
    return [t for t in s.split(" ") if t]


# Build BM25 index
_cwe_texts = [c["text"] for c in cwe_corpus]
_tokenized_corpus = [_tokenize(text) for text in _cwe_texts]

try:
    from rank_bm25 import BM25Okapi
    _bm25 = BM25Okapi(_tokenized_corpus)
    BM25_AVAILABLE = True
    print(f"BM25 index built: {len(_tokenized_corpus)} documents")
except ImportError:
    BM25_AVAILABLE = False
    print("rank-bm25 not installed; BM25 retrieval disabled (install: pip install rank-bm25)")


def retrieve_bm25(query_text: str, top_k: int = 20) -> list[tuple[int, float]]:
    """BM25 retrieval: returns [(doc_idx, score), ...]"""
    if not BM25_AVAILABLE:
        return []
    
    query_tokens = _tokenize(query_text)
    if not query_tokens:
        return []
    
    scores = _bm25.get_scores(query_tokens)
    top_idx = np.argsort(-scores)[:top_k]
    return [(int(i), float(scores[i])) for i in top_idx if scores[i] > 0]


# ===========================
# PHASE 1: Dense Retriever (SBERT)
# ===========================

DENSE_AVAILABLE = False
_sbert_model = None
_cwe_emb = None

try:
    from sentence_transformers import SentenceTransformer
    _sbert_model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2")
    _cwe_emb = _sbert_model.encode(_cwe_texts, normalize_embeddings=True, show_progress_bar=True)
    DENSE_AVAILABLE = True
    print(f"Dense embeddings (SBERT) built: {_cwe_emb.shape}")
except Exception:
    print("SBERT not available (install sentence-transformers for semantic retrieval)")


def retrieve_dense(query_text: str, top_k: int = 20) -> list[tuple[int, float]]:
    """Dense retrieval: returns [(doc_idx, score), ...]"""
    if not DENSE_AVAILABLE:
        return []
    
    q = _sbert_model.encode([query_text], normalize_embeddings=True)[0]
    sims = _cwe_emb @ q
    top_idx = np.argsort(-sims)[:top_k]
    return [(int(i), float(sims[i])) for i in top_idx if sims[i] > 0]


# ===========================
# PHASE 1: Reciprocal Rank Fusion (RRF)
# ===========================

def reciprocal_rank_fusion(
    rankings: list[list[tuple[int, float]]],
    k: int = 60
) -> list[tuple[int, float]]:
    """
    Combine multiple ranked lists using RRF.
    
    Args:
        rankings: List of [(doc_id, score), ...] from different retrievers
        k: RRF constant (typically 60)
    
    Returns:
        Fused ranking [(doc_id, fused_score), ...]
    """
    rrf_scores = {}
    
    for ranking in rankings:
        for rank, (doc_id, _) in enumerate(ranking, start=1):
            if doc_id not in rrf_scores:
                rrf_scores[doc_id] = 0.0
            rrf_scores[doc_id] += 1.0 / (k + rank)
    
    # Sort by fused score
    fused = sorted(rrf_scores.items(), key=lambda x: -x[1])
    return fused


# ===========================
# PHASE 1: Hybrid Retrieval (main entry point)
# ===========================

def retrieve_cwe_candidates(query_text: str, top_k: int = 5, use_hybrid: bool = True):
    """
    PHASE 1 Hybrid Retrieval:
    1. Expand query with security vocabulary
    2. Retrieve with BM25 (lexical) + Dense (semantic)
    3. Fuse with RRF
    4. Return top-k
    
    Falls back gracefully if libraries are missing.
    """
    # Step 1: Query expansion
    expanded_query = expand_query(query_text)
    
    # Step 2: Retrieve from available backends
    rankings = []
    
    if use_hybrid:
        # BM25 retrieval (lexical)
        if BM25_AVAILABLE:
            bm25_results = retrieve_bm25(expanded_query, top_k=20)
            if bm25_results:
                rankings.append(bm25_results)
        
        # Dense retrieval (semantic) - use original query for embeddings
        if DENSE_AVAILABLE:
            dense_results = retrieve_dense(query_text, top_k=20)
            if dense_results:
                rankings.append(dense_results)
    
    # Step 3: Fusion
    if len(rankings) >= 2:
        # True hybrid: fuse BM25 + Dense with RRF
        fused = reciprocal_rank_fusion(rankings, k=60)
        top_indices = fused[:top_k]
        backend = "hybrid-rrf"
    elif len(rankings) == 1:
        # Single retriever available
        top_indices = rankings[0][:top_k]
        backend = "bm25" if BM25_AVAILABLE else "dense"
    else:
        # No retrievers available (shouldn't happen)
        return []
    
    # Step 4: Format results
    results = []
    for idx, score in top_indices:
        c = cwe_corpus[idx]
        results.append({
            "cwe_id": c["id"],
            "score": float(score),
            "name": c["name"],
            "description": c["description"],
        })
    
    return results


# Report what's available
backends = []
if BM25_AVAILABLE:
    backends.append("BM25")
if DENSE_AVAILABLE:
    backends.append("Dense(SBERT)")

if len(backends) >= 2:
    print(f"✓ Hybrid retrieval enabled: {' + '.join(backends)} with RRF fusion")
elif len(backends) == 1:
    print(f"⚠ Single retriever mode: {backends[0]} (install missing libraries for hybrid)")
else:
    print("✗ No retrievers available (install rank-bm25 and/or sentence-transformers)")

TF-IDF index built: X=(969, 3681), vocab=3681 tokens
Retriever backend: tfidf


## 2. Link CVE to CWE
Function to find the CVE JSON file and extract CWE IDs.

In [4]:
def normalize_cve_id(cve_input: str):
    m = re.search(r"(CVE-\d{4}-\d+)", (cve_input or "").upper())
    return m.group(1) if m else None


def get_cve_path(cve_id: str) -> Path | None:
    """Construct the CVE JSON path for cvelistV5 layout: <year>/<prefix>xxx/CVE-YYYY-NNNN.json"""
    cve_id = (cve_id or "").upper()
    match = re.match(r"CVE-(\d{4})-(\d+)$", cve_id)
    if not match:
        return None

    year = match.group(1)
    number = match.group(2)

    # Directory is the number with last 3 digits replaced by 'xxx'
    # 0001 -> 0xxx, 1234 -> 1xxx, 12345 -> 12xxx
    if len(number) < 4:
        dir_name = "0xxx"
    else:
        dir_name = number[:-3] + "xxx"

    return CVE_BASE_DIR / year / dir_name / f"{cve_id}.json"


def read_cve_record(cve_input: str):
    """Return (cve_id, data_dict) or (cve_id, error_str)."""
    cve_id = normalize_cve_id(cve_input)
    if not cve_id:
        return None, "Invalid CVE ID or link."

    cve_path = get_cve_path(cve_id)
    if not cve_path:
        return cve_id, f"Could not map CVE to path: {cve_id}"

    if not cve_path.exists():
        return cve_id, f"CVE file not found at {cve_path}"

    with cve_path.open("r", encoding="utf-8") as f:
        data = json.load(f)

    return cve_id, data


def extract_cves_explicit_cwes(cve_data: dict):
    """Extract explicit CWE IDs from CVE JSON (containers.cna.problemTypes.*.descriptions[].cweId)."""
    cwe_ids = []
    problem_types = cve_data.get("containers", {}).get("cna", {}).get("problemTypes", [])
    for pt in problem_types:
        for desc in pt.get("descriptions", []):
            cwe_id = desc.get("cweId")
            if cwe_id and isinstance(cwe_id, str) and cwe_id.startswith("CWE-"):
                cwe_ids.append(cwe_id)
    return sorted(set(cwe_ids))


def extract_cve_description(cve_data: dict) -> str:
    """Best-effort: get the English description from CVE V5."""
    descs = cve_data.get("containers", {}).get("cna", {}).get("descriptions", [])
    for d in descs:
        if d.get("lang") == "en" and d.get("value"):
            return str(d.get("value")).strip()

    # fallback: any description
    for d in descs:
        if d.get("value"):
            return str(d.get("value")).strip()

    return ""


def retrieve_cwes_for_cve(cve_data: dict, top_k: int = 5):
    """Hybrid step: if explicit CWEs exist, return those; otherwise retrieve based on description."""
    explicit = extract_cves_explicit_cwes(cve_data)
    desc = extract_cve_description(cve_data)

    if explicit:
        return {
            "mode": "explicit",
            "cve_description": desc,
            "explicit_cwes": explicit,
            "retrieved": retrieve_cwe_candidates(desc, top_k=top_k) if desc else [],
        }

    if not desc:
        return {"mode": "none", "cve_description": "", "explicit_cwes": [], "retrieved": []}

    return {
        "mode": "rag",
        "cve_description": desc,
        "explicit_cwes": [],
        "retrieved": retrieve_cwe_candidates(desc, top_k=top_k),
    }

## 2b. Reasoner Step (Optional): Local LLM

If you have a local LLM runner (e.g., **Ollama**) you can let it choose the best CWE among the retrieved candidates.

Prompt template:

"Given this vulnerability description and these 5 potential weakness definitions, which one fits best? If none fit well, look at the parents of these CWEs."

In [5]:
def build_reasoner_prompt(cve_description: str, candidates: list[dict], top_k: int = 5) -> str:
    """Build a single prompt for an instruction-tuned LLM (e.g., mistral:7b-instruct)."""
    lines = []
    lines.append("You are a security analyst.")
    lines.append("Return ONLY valid JSON. Do not wrap it in markdown.")
    lines.append("")
    lines.append("VULNERABILITY DESCRIPTION:")
    lines.append(cve_description.strip() if cve_description else "(missing)")
    lines.append("")
    lines.append(f"TOP {top_k} RETRIEVED CWE DEFINITIONS:")

    for i, c in enumerate(candidates[:top_k], start=1):
        lines.append("")
        lines.append(f"{i}. {c['cwe_id']} — {c.get('name','')}")
        lines.append(f"Definition: {c.get('description','')}")

    lines.append("")
    lines.append(
        "Task: Given the vulnerability description and the candidate CWE definitions, choose the SINGLE best CWE. "
        "If none fit well, output best_cwe as 'NONE'. "
        "If NONE, suggest which parent(s) to check and why (in the rationale).\n\n"
        "Output schema (JSON): {\"best_cwe\": <CWE-XXX or NONE>, \"confidence\": <0..1>, \"rationale\": <string>}"
    )
    return "\n".join(lines)


def ollama_available() -> bool:
    return shutil.which("ollama") is not None


def run_ollama_reasoner(prompt: str, model: str = "mistral:7b-instruct", timeout_s: int = 180):
    """Runs ollama if installed. Returns stdout text or None."""
    if not ollama_available():
        return None

    # Non-interactive: pass prompt via stdin
    proc = subprocess.run(
        ["ollama", "run", model],
        input=prompt.encode("utf-8"),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        timeout=timeout_s,
    )
    if proc.returncode != 0:
        print(proc.stderr.decode("utf-8", errors="ignore"))
        return None

    return proc.stdout.decode("utf-8", errors="ignore").strip()

## 3. Query Tool
Input a CVE link or ID to see the results.

In [6]:
def pretty_print_cwe(cwe_id: str):
    info = cwe_map.get(cwe_id)
    if not info:
        print(f"- {cwe_id}: (not found in CWE catalog)")
        return
    print(f"- {cwe_id}: {info['name']}")
    print(f"  Description: {info['description']}")


def lookup_cve_hybrid(
    cve_input: str,
    top_k: int = 5,
    run_llm: bool = False,
    ollama_model: str = "mistral:7b-instruct",
):
    """Hybrid CVE->CWE:
    - If CVE record has explicit CWE(s), show them.
    - Always retrieve top-k candidates from CVE description (helps vague / zero-shot cases).
    - Optionally run a local LLM reasoner (Ollama) on the retrieved candidates.
    """

    cve_id, data_or_err = read_cve_record(cve_input)
    print(f"### Hybrid results for {cve_id} ###\n")

    if isinstance(data_or_err, str):
        print(f"Error: {data_or_err}")
        return

    cve_data = data_or_err
    desc = extract_cve_description(cve_data)

    if desc:
        print("CVE description:")
        print(desc)
        print("")

    out = retrieve_cwes_for_cve(cve_data, top_k=top_k)

    if out["explicit_cwes"]:
        print("Explicit CWEs in CVE record:")
        for cwe_id in out["explicit_cwes"]:
            pretty_print_cwe(cwe_id)
        print("")

    if out["retrieved"]:
        print(f"Retriever top-{top_k} CWE candidates (cosine similarity):")
        for c in out["retrieved"]:
            print(f"- {c['cwe_id']} (score={c['score']:.4f}) — {c.get('name','')}")
        print("")

        prompt = build_reasoner_prompt(desc, out["retrieved"], top_k=top_k)

        if run_llm:
            resp = run_ollama_reasoner(prompt, model=ollama_model)
            if resp is None:
                print("LLM reasoner not available (ollama missing or failed). Showing prompt instead:\n")
                print(prompt)
            else:
                print("LLM reasoner output:\n")
                print(resp)
        else:
            print("Reasoner prompt (copy/paste into your local LLM):\n")
            print(prompt)
    else:
        print("No candidates retrieved (missing description or empty query).")


# Example usage (link or raw CVE ID both work)
# If you installed Ollama + mistral:7b-instruct, set run_llm=True.
lookup_cve_hybrid(
    "https://www.cve.org/CVERecord?id=CVE-2024-0001",
    top_k=5,
    run_llm=True,
    ollama_model="mistral:7b-instruct",
)

### Hybrid results for CVE-2024-0001 ###

CVE description:
A condition exists in FlashArray Purity whereby a local account intended for initial array configuration remains active potentially allowing a malicious actor to gain elevated privileges.

Explicit CWEs in CVE record:
- CWE-1188: Initialization of a Resource with an Insecure Default
  Description: The product initializes or sets a resource with a default that is intended to be changed by the product's installer, administrator, or maintainer, but the default is not secure.

Retriever top-5 CWE candidates (cosine similarity):
- CWE-496 (score=0.1729) — Public Data Assigned to Private Array-Typed Field
- CWE-489 (score=0.1692) — Active Debug Code
- CWE-648 (score=0.1597) — Incorrect Use of Privileged APIs
- CWE-582 (score=0.1594) — Array Declared Public, Final, and Static
- CWE-129 (score=0.1524) — Improper Validation of Array Index

LLM reasoner output:

{
  "best_cwe": "CWE-648",
  "confidence": 0.9,
  "rationale": "The vulnerab