# Sample Call Graph Analysis

This notebook demonstrates how to load the Ghidra-exported call graphs for the three
Windows system libraries (`ntdll`, `kernel32`, `user32`), merge them into a single graph,
and inspect high-level statistics.

In [1]:
from pathlib import Path
from collections import Counter

from call_graph_win11.analysis.graph_loader import merge_call_graphs

preferred_paths = [
    Path("data/interim/call_graphs/System32/ntdll.dll.callgraph.json"),
    Path("data/interim/call_graphs/System32/kernel32.dll.callgraph.json"),
    Path("data/interim/call_graphs/System32/user32.dll.callgraph.json"),
]
existing = [p for p in preferred_paths if p.exists()]
if len(existing) < 3:
    fallback = []
    for cand in sorted(Path("data/interim/call_graphs").rglob("*.callgraph.json")):
        if cand not in existing:
            fallback.append(cand)
        if len(existing) + len(fallback) >= 3:
            break
    existing.extend(fallback)
if not existing:
    raise FileNotFoundError("No call graph JSON files found under data/interim/call_graphs.")
merged_graph = merge_call_graphs(existing[:3])
node_count, edge_count = merged_graph.number_of_nodes(), merged_graph.number_of_edges()
program_counts = Counter(data.get("program", "unknown") for _, data in merged_graph.nodes(data=True))
node_count, edge_count, dict(program_counts)


(11699, 28066, {'ntdll.dll': 4640, 'kernel32.dll': 3257, 'user32.dll': 3802})

In [2]:
top_nodes = sorted(merged_graph.degree, key=lambda item: item[1], reverse=True)[:10]
for node, degree in top_nodes:
    data = merged_graph.nodes[node]
    program = data.get("program", "unknown")
    label = data.get("name") or data.get("qualified_name") or data.get("address")
    print(f"{program:12s} | {label:<40s} -> degree {degree}")


ntdll.dll    | __security_check_cookie                  -> degree 582
ntdll.dll    | memset                                   -> degree 389
ntdll.dll    | RtlFreeHeap                              -> degree 361
ntdll.dll    | RtlReleaseSRWLockExclusive               -> degree 317
user32.dll   | __security_check_cookie                  -> degree 302
ntdll.dll    | RtlAllocateHeap                          -> degree 301
ntdll.dll    | RtlCopyMemory                            -> degree 298
user32.dll   | _guard_dispatch_icall$thunk$10345483385596137414 -> degree 296
kernel32.dll | __security_check_cookie                  -> degree 261
ntdll.dll    | RtlAcquireSRWLockExclusive               -> degree 247


In [3]:
from pathlib import Path
from IPython.display import Image, Markdown

figure_path = Path("data/interim/figures/samples_combined.png")
if figure_path.exists():
    Image(filename=str(figure_path))
else:
    Markdown("_Visualization not generated yet. Run callgraph-aggregate --visualize to create the PNG._")


In [4]:
from pathlib import Path
import csv

output_dir = Path("docs/analytics")
output_dir.mkdir(parents=True, exist_ok=True)

csv_path = output_dir / "sample_call_graph_top_nodes.csv"
fieldnames = ["program", "label", "degree", "node"]

with csv_path.open("w", newline="", encoding="utf-8") as handle:
    writer = csv.DictWriter(handle, fieldnames=fieldnames)
    writer.writeheader()
    for node, degree in top_nodes:
        data = merged_graph.nodes[node]
        label = data.get("name") or data.get("qualified_name") or data.get("address") or node
        writer.writerow(
            {
                "program": data.get("program", "unknown"),
                "label": label,
                "degree": degree,
                "node": node,
            }
        )

csv_path


WindowsPath('docs/analytics/sample_call_graph_top_nodes.csv')