<a href="https://colab.research.google.com/github/Adamali1985/-5G-Enabled-BSM-Threat-Detection/blob/main/SBOM%20analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
import json
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Set


# =========================
# I/O HELPERS
# =========================

def load_json(path: str) -> Any:
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(f"Input not found: {path}")
    with p.open("r", encoding="utf-8") as f:
        return json.load(f)


def save_json(path: str, payload: Any) -> None:
    p = Path(path)
    p.parent.mkdir(parents=True, exist_ok=True)
    with p.open("w", encoding="utf-8") as f:
        json.dump(payload, f, indent=2)


# =========================
# NORMALIZATION / MATCHING
# =========================

CVE_RE = re.compile(r"^(CVE-\d{4}-\d{4,})$")
GHSA_RE = re.compile(r"^(GHSA-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4})$", re.IGNORECASE)
OSV_RE = re.compile(r"^(OSV-\w+)$", re.IGNORECASE)

def _first_list_or_scalar(val: Any) -> Any:
    if isinstance(val, list):
        return val[0] if val else None
    return val

def _norm_severity(sev: Any) -> Optional[str]:
    if sev is None:
        return None
    if isinstance(sev, str):
        return sev
    if isinstance(sev, list) and sev:
        first = sev[0]
        if isinstance(first, dict):
            return first.get("score") or first.get("severity") or first.get("type") or first.get("value")
        return str(first)
    if isinstance(sev, dict):
        return sev.get("score") or sev.get("severity") or sev.get("type") or sev.get("value")
    return str(sev)

def _looks_like_vuln_id(v: Any) -> Optional[str]:
    if v is None:
        return None
    if isinstance(v, str):
        s = v.strip()
        if CVE_RE.match(s) or GHSA_RE.match(s) or OSV_RE.match(s):
            return s
        # some IDs include prefixes like "CVEID: CVE-2024-1234"
        m = CVE_RE.search(s)
        if m:
            return m.group(1)
        m = GHSA_RE.search(s)
        if m:
            return m.group(1)
    return None

def _pick_vuln_id(node: Dict[str, Any]) -> Optional[str]:
    # Common fields
    for key in ["vulnerabilityid", "vulnerability_id", "id", "cve", "cve_id"]:
        if key in node and (vid := _looks_like_vuln_id(node[key])):
            return vid
    # aliases often contain CVE/GHSA
    aliases = node.get("aliases")
    if isinstance(aliases, list):
        for a in aliases:
            vid = _looks_like_vuln_id(a)
            if vid:
                return vid
    # nested vulnerability dicts
    if "vulnerability" in node and isinstance(node["vulnerability"], dict):
        return _pick_vuln_id(node["vulnerability"])
    return None

def _pick_severity(node: Dict[str, Any]) -> Optional[str]:
    # direct
    for key in ["severity", "cvss", "cvss_score", "score"]:
        if key in node:
            return _norm_severity(node[key])
    # common nests: database_specific.severity, cvss.score
    ds = node.get("database_specific")
    if isinstance(ds, dict):
        sev = ds.get("severity") or ds.get("cvss")
        if sev is not None:
            return _norm_severity(sev)
    cvss = node.get("cvss")
    if isinstance(cvss, dict):
        sev = cvss.get("score") or cvss.get("severity")
        if sev is not None:
            return _norm_severity(sev)
    return None

def _pick_pkg_and_version(node: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
    """
    Try many shapes for package name + version.
    """
    # 1) OSV affected: {"package":{"name":...}, "versions":[...]}
    if "package" in node and isinstance(node["package"], dict):
        pkg = node["package"].get("name") or node["package"].get("id")
        vers = node.get("versions")
        version = _first_list_or_scalar(vers)
        if pkg:
            return str(pkg), version if version is None or isinstance(version, str) else str(version)

    # 2) Grype-like "artifact": {"name":..., "version":...}
    if "artifact" in node and isinstance(node["artifact"], dict):
        pkg = node["artifact"].get("name")
        version = node["artifact"].get("version")
        if pkg:
            return str(pkg), version

    # 3) Trivy vuln entry
    if "PkgName" in node or "InstalledVersion" in node:
        pkg = node.get("PkgName")
        version = node.get("InstalledVersion")
        if pkg:
            return str(pkg), version

    # 4) Simple shapes
    for k in ["package", "pkg", "name", "module", "component", "library", "target"]:
        if k in node and isinstance(node[k], str):
            # version companions
            for vk in ["version", "installedversion", "pkg_version", "packageversion", "version_installed"]:
                if vk in node:
                    v = node[vk]
                    v = _first_list_or_scalar(v)
                    return str(node[k]), v if isinstance(v, str) or v is None else str(v)
            # no version but package available
            return str(node[k]), None

    # 5) "package" given as dict elsewhere
    if "name" in node and isinstance(node["name"], str):
        return node["name"], node.get("version")

    return None, None


# =========================
# PARSERS (MODULE-SPECIFIC)
# Each returns: List[ Dict[str, Dict[str, Any]] ] (one entry per finding)
# =========================

def parse_cvd(data: Any) -> List[Dict[str, Dict[str, Any]]]:
    out: List[Dict[str, Dict[str, Any]]] = []
    vulns = (data or {}).get("vulnerabilities", []) or []
    for e in vulns:
        pkg, version = _pick_pkg_and_version(e)
        vid = _pick_vuln_id(e)
        sev = _pick_severity(e)
        if pkg:
            out.append({pkg: {"version": version, "vuln_id": vid, "severity": sev}})
    return out

def parse_grype(data: Any) -> List[Dict[str, Dict[str, Any]]]:
    out: List[Dict[str, Dict[str, Any]]] = []
    for m in (data or {}).get("matches", []) or []:
        art = m.get("artifact", {}) or {}
        vul = m.get("vulnerability", {}) or {}
        pkg = art.get("name")
        version = art.get("version")
        vid = _pick_vuln_id(vul) or vul.get("id")
        sev = _pick_severity(vul)
        if pkg:
            out.append({pkg: {"version": version, "vuln_id": vid, "severity": sev}})
    return out

def parse_osv(data: Any) -> List[Dict[str, Dict[str, Any]]]:
    """
    Handles standard OSV schema and common variations.
    """
    out: List[Dict[str, Dict[str, Any]]] = []

    # Standard: {"vulns":[{id, severity, affected:[{package:{name}, versions:[]}, ...]}, ...]}
    if isinstance(data, dict) and isinstance(data.get("vulns"), list):
        for entry in data["vulns"]:
            vid = _pick_vuln_id(entry) or entry.get("id")
            sev = _pick_severity(entry)
            aff = entry.get("affected") or []
            for a in aff:
                if not isinstance(a, dict):
                    continue
                pkg, version = _pick_pkg_and_version(a)
                if pkg:
                    out.append({pkg: {"version": version, "vuln_id": vid, "severity": sev}})

    # Some generators wrap under "results" -> { "vulnerabilities": [...] } with OSV shape inside
    if not out and isinstance(data, dict) and isinstance(data.get("results"), list):
        for res in data["results"]:
            vulns = res.get("vulnerabilities") or res.get("vulns") or []
            for v in vulns:
                vid = _pick_vuln_id(v) or v.get("id")
                sev = _pick_severity(v)
                aff = v.get("affected") or []
                if aff:
                    for a in aff:
                        pkg, version = _pick_pkg_and_version(a)
                        if pkg:
                            out.append({pkg: {"version": version, "vuln_id": vid, "severity": sev}})
                else:
                    # fallback: try to pull pkg/version directly
                    pkg, version = _pick_pkg_and_version(v)
                    if pkg:
                        out.append({pkg: {"version": version, "vuln_id": vid, "severity": sev}})

    return out

def parse_trivy(data: Any) -> List[Dict[str, Dict[str, Any]]]:
    out: List[Dict[str, Dict[str, Any]]] = []
    for res in (data or {}).get("Results", []) or []:
        vulns = res.get("Vulnerabilities") or []
        for v in vulns:
            pkg = v.get("PkgName") or res.get("Target")
            version = v.get("InstalledVersion")
            vid = _pick_vuln_id(v) or v.get("VulnerabilityID") or v.get("ID") or v.get("CVE")
            sev = _pick_severity(v)
            if pkg:
                out.append({pkg: {"version": version, "vuln_id": vid, "severity": sev}})
    return out


# =========================
# GENERIC FALLBACK (TREE WALK)
# Returns entries even when schema is unknown
# =========================

def walk_dicts(obj: Any):
    if isinstance(obj, dict):
        yield obj
        for v in obj.values():
            yield from walk_dicts(v)
    elif isinstance(obj, list):
        for v in obj:
            yield from walk_dicts(v)

def parse_generic(data: Any) -> List[Dict[str, Dict[str, Any]]]:
    out: List[Dict[str, Dict[str, Any]]] = []
    seen: Set[Tuple[str, Optional[str], Optional[str], Optional[str]]] = set()

    for node in walk_dicts(data):
        if not isinstance(node, dict):
            continue

        # Special-case OSV 'affected' nodes
        if "affected" in node and isinstance(node["affected"], list):
            vid = _pick_vuln_id(node) or node.get("id")
            sev = _pick_severity(node)
            for a in node["affected"]:
                if isinstance(a, dict):
                    pkg, version = _pick_pkg_and_version(a)
                    if pkg:
                        key = (pkg, version, vid, sev)
                        if key not in seen:
                            seen.add(key)
                            out.append({pkg: {"version": version, "vuln_id": vid, "severity": sev}})
            continue

        # General attempt from any dict node
        pkg, version = _pick_pkg_and_version(node)
        vid = _pick_vuln_id(node)
        sev = _pick_severity(node)

        if pkg and (vid or version or sev):
            key = (pkg, version, vid, sev)
            if key not in seen:
                seen.add(key)
                out.append({pkg: {"version": version, "vuln_id": vid, "severity": sev}})

    return out


# =========================
# DISPATCH
# =========================

PARSERS = {
    "cvd": parse_cvd,
    "grype": parse_grype,
    "osv": parse_osv,
    "trivy": parse_trivy,
}

def refine_file(input_path: str, output_path: str, module_type: str = "auto", verbose: bool = True) -> List[Dict[str, Dict[str, Any]]]:
    """
    Load -> parse -> (if empty, generic fallback) -> save.
    """
    data = load_json(input_path)

    if module_type == "auto":
        # quick auto-detect by top-level keys
        if isinstance(data, dict):
            if "matches" in data: module_type = "grype"
            elif "vulns" in data: module_type = "osv"
            elif "Results" in data: module_type = "trivy"
            elif "vulnerabilities" in data: module_type = "cvd"
            else: module_type = "generic"
        else:
            module_type = "generic"

    if module_type in PARSERS:
        entries = PARSERS[module_type](data)
    else:
        entries = []

    if not entries:
        # fallback sweep
        if verbose:
            print(f"[warn] {module_type} parser returned 0 entries. Falling back to generic scan...")
        entries = parse_generic(data)

    if verbose:
        print(f"[info] extracted {len(entries)} entries from {input_path}")
        if len(entries) > 0:
            # show a small sample for sanity
            print("[sample]", entries[:min(3, len(entries))])

    save_json(output_path, entries)
    return entries


# =========================
# MAIN (COLAB PATHS)
# =========================

def main():
    base = "/content"

    # ---- BCF file (uploaded to Colab) ----
    bcf_in = f"{base}/trivy_bcf.json"
    bcf_out = f"{base}/trivy_bcf_refined.json"

    # Try the declared type first; fallback will trigger automatically if needed
    refine_file(bcf_in, bcf_out, module_type="osv", verbose=True)

    # ---- ESRP files (add when available) ----
    # esrp_in = f"{base}/esrp_grype_vulns.json"
    # esrp_out = f"{base}/esrp_grype_vulns_refined.json"
    # refine_file(esrp_in, esrp_out, module_type="grype", verbose=True)

    print(f"\n[done] Wrote:\n  - {bcf_out}")
    # print(f"  - {esrp_out}")

if __name__ == "__main__":
    main()


[warn] osv parser returned 0 entries. Falling back to generic scan...
[info] extracted 2 entries from /content/trivy_bcf.json
[sample] [{'Twisted': {'version': '22.8.0', 'vuln_id': None, 'severity': None}}, {'cryptography': {'version': '42.0.7', 'vuln_id': None, 'severity': None}}]

[done] Wrote:
  - /content/trivy_bcf_refined.json
