In [None]:
%%capture --no-stderr
%pip install -U langgraph langchain langchain-openai
%pip install -qU langchain-tavily
!pip install langchain-groq
!pip install -qU langchain-anthropic
!pip install neo4j
!pip install pyvis
!pip install pycountry
!pip install ipaddress

**Mounting Google Drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd drive/MyDrive/LLM4BGP

Mounted at /content/drive
/content/drive/MyDrive/LLM4BGP


**BGPStream**

In [None]:
import os
os.environ["PATH"] += ":/root/.cargo/bin"

In [None]:
%%bash
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
cargo install monocle
sudo apt-get update
sudo apt-get install -y curl apt-transport-https ssl-cert ca-certificates gnupg lsb-release
curl -1sLf 'https://dl.cloudsmith.io/public/wand/libwandio/cfg/setup/bash.deb.sh' | sudo -E bash
echo "deb https://pkg.caida.org/os/$(lsb_release -si|awk '{print tolower($0)}') $(lsb_release -sc) main" | sudo tee /etc/apt/sources.list.d/caida.list
sudo wget -O /etc/apt/trusted.gpg.d/caida.gpg https://pkg.caida.org/os/ubuntu/keyring.gpg
sudo apt update; sudo apt-get install bgpstream

In [None]:
!pip install pybgpstream
!pip install prsw
!python3 -m pip install pybgpkit-parser
!python3 -m pip install pybgpkit

In [None]:
from __future__ import annotations
import time
import datetime as dt
import re
from collections import defaultdict
from typing import Dict, Iterable, Iterator, List, Optional, Set, Tuple
import pybgpstream
import pandas as pd

**Tools for Solving Problems Without the Need of AS Graph**

In [None]:
def utc_now() -> int:
    """Return current UNIX epoch time (UTC, seconds)."""
    return int(time.time())


def window_last_minutes(minutes: int) -> Tuple[int, int]:
    """Return (start_ts, end_ts) for the last *minutes* minutes.

    PyBGPStream expects epoch seconds.
    """
    end = utc_now()
    start = end - minutes * 60
    return start, end


def ts_str(ts: Optional[int]) -> str:
    """Human‑readable UTC timestamp string for display."""
    if ts is None:
        return "—"
    return dt.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%SZ")

In [None]:
def origin_asns(as_path: str) -> Set[int]:
    _ASN_RE = re.compile(r"\d+")
    if not as_path:
        return set()
    tail = as_path.strip().split()[-1]
    tail_nums = {int(x) for x in _ASN_RE.findall(tail)}
    if tail_nums:
        return tail_nums
    # Fallback: any numbers in the full path
    all_nums = _ASN_RE.findall(as_path)
    return {int(all_nums[-1])} if all_nums else set()

In [None]:
DEFAULT_PROJECTS = ["ris", "routeviews"]

def make_stream(
    prefix: str,
    start_ts: int,
    end_ts: int,
    *,
    projects: Optional[List[str]] = None,
    collectors: Optional[List[str]] = None,
    record_type: str = "updates",
    more_specifics: bool = True,
) -> pybgpstream.BGPStream:
    """Create a configured BGPStream for a prefix family in a time window.

    Notes:
      • We use projects RIS + RouteViews (closest to "RIS Live + RouteViews" in
        BGPStream naming). Using record_type='updates' and a recent 10‑minute
        window effectively approximates the recent/live view.
      • Filter syntax: 'prefix more X' includes the exact prefix and more‑specifics.
    """
    filt = f"prefix {'more ' if more_specifics else ''}{prefix}"
    kwargs = dict(
        from_time=start_ts,
        until_time=end_ts,
        record_type=record_type,
        filter=filt,
    )
    if projects is None:
        projects = DEFAULT_PROJECTS
    if projects:
        kwargs["projects"] = projects
    if collectors:
        kwargs["collectors"] = collectors
    return pybgpstream.BGPStream(**kwargs)

In [None]:
def iter_bgp_elements(stream: pybgpstream.BGPStream) -> Iterator[dict]:
    """Yield normalized dicts for announcement-containing elements.

    Each yielded item includes: project, collector, record_time, elem_time,
    type, peer_asn, peer_address, prefix, as_path.
    """
    for rec in stream.records():
        if getattr(rec, "status", None) not in (None, "valid"):
            continue
        common = dict(project=rec.project, collector=rec.collector, record_time=rec.time)
        for elem in rec:
            etype = elem.type  # 'A' (announcement), 'W' (withdrawal), 'R' (RIB)
            if etype not in ("A", "R"):
                continue
            f = elem.fields
            prefix = f.get("prefix") or f.get("announced-prefix")
            as_path = f.get("as-path", "")
            yield dict(
                **common,
                elem_time=elem.time,
                type=etype,
                peer_asn=int(elem.peer_asn) if elem.peer_asn is not None else None,
                peer_address=elem.peer_address,
                prefix=prefix,
                as_path=as_path,
            )

In [None]:
Vantage = Tuple[str, str, Optional[int]]  # (project, collector, peer_asn)

def build_vantage_origin_index(elems: Iterable[dict]):
    """Return nested mapping:
        { (project, collector, peer_asn):
            { prefix:
                { origin_asn:
                    { 'first': ts, 'last': ts, 'count': n }
                }
            }
        }
    """
    idx: Dict[Vantage, Dict[str, Dict[int, Dict[str, int]]]] = defaultdict(
        lambda: defaultdict(lambda: defaultdict(lambda: {"first": None, "last": None, "count": 0}))
    )
    for e in elems:
        pfx = e.get("prefix")
        oset = origin_asns(e.get("as_path", ""))
        if not pfx or not oset:
            continue
        v: Vantage = (e["project"], e["collector"], e["peer_asn"])
        ts = e["elem_time"]
        bucket = idx[v][pfx]
        for o in oset:
            meta = bucket[o]
            meta["first"] = ts if meta["first"] is None else min(meta["first"], ts)
            meta["last"] = ts if meta["last"] is None else max(meta["last"], ts)
            meta["count"] += 1
    return idx

In [None]:
def find_conflicts_by_vantage(index) -> Dict[Vantage, Dict[str, Dict[int, dict]]]:
    """Filter to vantages where at least one prefix has >1 origin ASNs."""
    out: Dict[Vantage, Dict[str, Dict[int, dict]]] = {}
    for v, pmap in index.items():
        conflicted = {pfx: omap for pfx, omap in pmap.items() if len(omap) > 1}
        if conflicted:
            out[v] = conflicted
    return out


def aggregate_family_origins(index) -> Set[int]:
    """All distinct origins observed for the prefix family (any vantage/prefix)."""
    s: Set[int] = set()
    for pmap in index.values():
        for omap in pmap.values():
            s.update(omap.keys())
    return s

In [None]:
def summarize_conflicts_table(conflicts: Dict[Vantage, Dict[str, Dict[int, dict]]]):
    """Return a pandas.DataFrame summarizing conflicts per vantage (if pandas available)."""
    if pd is None:
        return None
    rows = []
    for (project, collector, peer_asn), pmap in conflicts.items():
        all_origins: Set[int] = set()
        for omap in pmap.values():
            all_origins.update(omap.keys())
        rows.append(
            dict(
                project=project,
                collector=collector,
                peer_asn=peer_asn,
                num_conflicting_prefixes=len(pmap),
                origins=sorted(all_origins),
                prefixes=sorted(pmap.keys()),
            )
        )
    if not rows:
        return pd.DataFrame(columns=["project", "collector", "peer_asn", "num_conflicting_prefixes", "origins", "prefixes"])  # type: ignore
    return pd.DataFrame(rows).sort_values(["num_conflicting_prefixes", "collector"], ascending=[False, True])  # type: ignore

In [None]:
def print_conflicts(conflicts: Dict[Vantage, Dict[str, Dict[int, dict]]], *, start_ts: int, end_ts: int) -> None:
    print(f"Time window: {ts_str(start_ts)} → {ts_str(end_ts)} (UTC)")
    if not conflicts:
        print("No origin conflicts observed for the given family and window.")
        return
    for (project, collector, peer_asn) in sorted(conflicts.keys()):
        pmap = conflicts[(project, collector, peer_asn)]
        print(f"\nVantage: {project}/{collector}  peer_asn={peer_asn}")
        for pfx in sorted(pmap.keys()):
            omap = pmap[pfx]
            olist = sorted(omap.items(), key=lambda kv: kv[0])
            print(f"  {pfx} → origins: {', '.join(str(o) for o, _ in olist)}")
            for o, meta in olist:
                print(
                    f"    AS{o}: first={ts_str(meta['first'])}, last={ts_str(meta['last'])}, count={meta['count']}"
                )

In [None]:
def conflicting_origins_by_vantage(
    prefix: str,
    window_min: int = 10,
    *,
    projects: Optional[List[str]] = None,  # default RIS + RouteViews
    collectors: Optional[List[str]] = None,
    record_type: str = "updates",
    more_specifics: bool = True,
):
    """Compute conflicts for *prefix* (and optionally more‑specifics) in the last *window_min* minutes.

    Returns: (conflicts, index, (start_ts, end_ts), family_origins)
      - conflicts: vantages → {prefix → {origin_asn → meta}}
      - index: full vantage index (including non‑conflicting entries)
      - time window: (start_ts, end_ts)
      - family_origins: set of all origin ASNs seen across the family
    """
    start_ts, end_ts = window_last_minutes(window_min)
    stream = make_stream(
        prefix,
        start_ts,
        end_ts,
        projects=projects,
        collectors=collectors,
        record_type=record_type,
        more_specifics=more_specifics,
    )
    elems = iter_bgp_elements(stream)
    index = build_vantage_origin_index(elems)
    conflicts = find_conflicts_by_vantage(index)
    fam_origins = aggregate_family_origins(index)
    return conflicts, index, (start_ts, end_ts), fam_origins

In [None]:
# ------------ Preset collector lists (broad coverage) ------------
RIS_COLLECTORS = [
    "rrc00","rrc01","rrc03","rrc04","rrc05","rrc06","rrc07",
    "rrc10","rrc11","rrc12","rrc13","rrc14","rrc15","rrc18",
    "rrc19","rrc20","rrc21","rrc22","rrc23","rrc24","rrc25","rrc26",
]
RV_COLLECTORS = [
    "route-views2","route-views.eqix","route-views.linx","route-views.chicago",
    "route-views.isc","route-views.kixp","route-views.jinx","route-views.nwax",
    "route-views.sg","route-views.sydney","route-views.saopaulo","route-views.saopaulo2",
    "route-views.sfmix","route-views.flix","route-views.perth","route-views.napafrica",
    "route-views.amsix","route-views.sfmix","route-views.sfmix2"
]


def _utc_now() -> int:
    return int(time.time())

def _window_last_minutes(minutes: int) -> tuple[int, int]:
    end = _utc_now()
    return end - minutes * 60, end

def _fmt_ts(ts: int) -> str:
    return dt.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%SZ")


def print_elements(
    prefix: str,
    minutes: int = 15,
    *,
    projects: Optional[List[str]] = None,     # e.g., ["ris"], ["routeviews"], or both
    collectors: Optional[List[str]] = None,
    include_withdrawals: bool = False,
    more_specifics: bool = True,
    record_type: str = "updates",            # "updates" or "ribs"
) -> int:
    """Print normalized lines for recent BGP elements matching the prefix family.
    Returns number of printed lines.
    """
    start_ts, end_ts = _window_last_minutes(minutes)

    filt = f"prefix {'more ' if more_specifics else ''}{prefix}"
    kwargs = dict(
        from_time=start_ts,
        until_time=end_ts,
        record_type=record_type,
        filter=filt,
    )
    if projects is None:
        projects = ["ris", "routeviews"]
    if projects:
        kwargs["projects"] = projects
    if collectors:
        kwargs["collectors"] = collectors

    stream = pybgpstream.BGPStream(**kwargs)

    print(f"Time window: {_fmt_ts(start_ts)} → {_fmt_ts(end_ts)} (UTC)")
    print("Project/Collector           PeerASN  Type  ElemTime                Prefix               AS_PATH")
    print("-" * 110)

    count = 0
    for rec in stream.records():
        if getattr(rec, "status", None) not in (None, "valid"):
            continue
        for elem in rec:
            etype = elem.type  # 'A', 'W', 'R'
            if etype == "W" and not include_withdrawals:
                continue
            if etype not in ("A", "R", "W"):
                continue

            f = elem.fields
            pfx = (
                f.get("prefix")
                or f.get("announced-prefix")
                or f.get("withdrawn-prefix")
                or "?"
            )
            as_path = f.get("as-path", "")

            print(
                f"{rec.project}/{rec.collector:<24} "
                f"{(elem.peer_asn if elem.peer_asn is not None else '?'):>7}  "
                f"{etype:^4}  "
                f"{_fmt_ts(elem.time)}  "
                f"{pfx:<19}  "
                f"{as_path}"
            )
            count += 1

    if count == 0:
        print("No matching elements observed in the selected window.")
    return count


# ------------ Smoke test helpers ------------
NOISY_PREFIXES = [
    "1.1.1.0/24",   # Cloudflare
    "8.8.8.0/24",   # Google
    "9.9.9.0/24",   # Quad9
    "4.2.2.0/24",   # Level3 legacy anycast
]

In [None]:
def smoke_test(minutes: int = 15) -> None:
    """Quickly verify pipeline & connectivity by printing a few elements from noisy prefixes."""
    # Try RIS first
    printed = 0
    for pfx in NOISY_PREFIXES:
        printed += print_elements(
            pfx,
            minutes=minutes,
            projects=["ris"],
            collectors=RIS_COLLECTORS,
            include_withdrawals=True,
            record_type="updates",
        )

    # If nothing from RIS, try RouteViews
    if printed == 0:
        for pfx in NOISY_PREFIXES:
            printed += print_elements(
                pfx,
                minutes=minutes,
                projects=["routeviews"],
                collectors=RV_COLLECTORS,
                include_withdrawals=True,
                record_type="updates",
            )

    if printed == 0:
        print("\nStill nothing — likely causes: very short window, network egress blocked, or broker lag.\n"
              "Try minutes=60 and ensure outbound HTTPS is allowed from Colab.")

In [None]:
smoke_test(minutes=60)

Time window: 2025-08-15 13:46:05Z → 2025-08-15 14:46:05Z (UTC)
Project/Collector           PeerASN  Type  ElemTime                Prefix               AS_PATH
--------------------------------------------------------------------------------------------------------------
ris/rrc15                     263237   A    2025-08-15 13:52:37Z  1.1.1.0/24           263237 265409 13335
ris/rrc15                     263237   A    2025-08-15 13:53:07Z  1.1.1.0/24           263237 174 13335
ris/rrc25                     328840   A    2025-08-15 14:01:17Z  1.1.1.0/24           328840 37282 13335
ris/rrc15                     263237   A    2025-08-15 14:02:39Z  1.1.1.0/24           263237 265409 13335
ris/rrc15                     263237   A    2025-08-15 14:03:07Z  1.1.1.0/24           263237 174 13335
ris/rrc15                     263237   A    2025-08-15 14:05:27Z  1.1.1.0/24           263237 262557 13335
ris/rrc15                     263237   A    2025-08-15 14:05:52Z  1.1.1.0/24           263237 1

In [None]:
def moas_conflicts(
    prefix: str,
    window_min: int = 10,
    *,
    projects: Optional[List[str]] = None,
    collectors: Optional[List[str]] = None,
    record_type: str = "updates",
    more_specifics: bool = True,
):
    """Return MOAS info for *prefix* family by *union across vantages*.

    Returns: (moas_map, per_prefix_vantages, (start_ts, end_ts), index)
      - moas_map: {prefix -> set(origin ASNs)} where len(set) > 1
      - per_prefix_vantages: {prefix -> {(project,collector,peer_asn) -> set(origins)}}
      - (start_ts, end_ts): time window used
      - index: the full vantage index from your pipeline (for deeper inspection)
    """
    conflicts, index, (start_ts, end_ts), _fam = conflicting_origins_by_vantage(
        prefix=prefix,
        window_min=window_min,
        projects=projects,
        collectors=collectors,
        record_type=record_type,
        more_specifics=more_specifics,
    )

    # Union origins across all vantages for each prefix
    per_prefix_union: Dict[str, Set[int]] = defaultdict(set)
    per_prefix_vantages: Dict[str, Dict[Tuple[str,str,Optional[int]], Set[int]]] = defaultdict(lambda: defaultdict(set))

    for vantage, pmap in index.items():  # vantage = (project, collector, peer_asn)
        for pfx, omap in pmap.items():   # omap: {origin_asn -> meta}
            origins = set(omap.keys())
            if not origins:
                continue
            per_prefix_union[pfx].update(origins)
            per_prefix_vantages[pfx][vantage].update(origins)

    # Keep only prefixes with >1 distinct origin → MOAS
    moas_map = {pfx: origins for pfx, origins in per_prefix_union.items() if len(origins) > 1}
    return moas_map, per_prefix_vantages, (start_ts, end_ts), index

In [None]:
def print_moas(
    moas_map: Dict[str, Set[int]],
    per_prefix_vantages: Dict[str, Dict[Tuple[str,str,Optional[int]], Set[int]]],
    index, *,
    start_ts: int,
    end_ts: int,
) -> None:
    print(f"Time window: {ts_str(start_ts)} → {ts_str(end_ts)} (UTC)")
    if not moas_map:
        print("No MOAS observed for the selected prefix family and window.")
        return

    for pfx in sorted(moas_map.keys()):
        all_orig = ", ".join(f"AS{o}" for o in sorted(moas_map[pfx]))
        print(f"\n{pfx} — MOAS origins: {all_orig}")
        # Show vantage breakdown and per‑origin timing from the existing index
        vmap = per_prefix_vantages.get(pfx, {})
        for (project, collector, peer_asn) in sorted(vmap.keys()):
            origins_here = ", ".join(f"AS{o}" for o in sorted(vmap[(project, collector, peer_asn)]))
            print(f"  {project}/{collector} peer_asn={peer_asn} → {origins_here}")
            # Optional: first/last/count per origin using the index meta we already built
            omap = index[(project, collector, peer_asn)][pfx]
            for o in sorted(omap.keys()):
                meta = omap[o]
                print(
                    f"    {o}: first={ts_str(meta['first'])}, last={ts_str(meta['last'])}, count={meta['count']}"
                )

In [None]:
def summarize_moas_table(moas_map: Dict[str, Set[int]], per_prefix_vantages):
    if pd is None:
        return None
    rows = []
    for pfx, origins in moas_map.items():
        vantages = list(per_prefix_vantages.get(pfx, {}).keys())
        rows.append(dict(
            prefix=pfx,
            n_origins=len(origins),
            origins=sorted(origins),
            n_vantages=len(vantages),
            vantages=[f"{p}/{c}:{a}" for (p,c,a) in vantages],
        ))
    return pd.DataFrame(rows).sort_values(["n_origins","prefix"], ascending=[False, True]) if rows else pd.DataFrame(columns=["prefix","n_origins","origins","n_vantages","vantages"])  # type: ignore


In [None]:
from __future__ import annotations
import time
import datetime as dt
import re
from collections import defaultdict
from typing import Dict, Iterable, Iterator, List, Optional, Set, Tuple
import pybgpstream
import pandas as pd
from itertools import islice


def conflicting_origins_by_vantage(prefix: str,
                   window_min: int = 10,
                   *,
                   projects: Optional[List[str]] = None,
                   collectors: Optional[List[str]] = None,
                   record_type: str = "updates",
                   more_specifics: bool = True):
    """Compatibility passthrough in case your namespace differs.
    Uses your existing conflicting_origins_by_vantage() then unions per prefix.
    """
    conflicts, index, (start_ts, end_ts), _fam = conflicting_origins_by_vantage(
        prefix=prefix,
        window_min=window_min,
        projects=projects,
        collectors=collectors,
        record_type=record_type,
        more_specifics=more_specifics,
    )
    per_prefix_union: Dict[str, Set[int]] = defaultdict(set)
    for v, pmap in index.items():
        for pfx, omap in pmap.items():
            per_prefix_union[pfx].update(omap.keys())
    moas_map = {pfx: s for pfx, s in per_prefix_union.items() if len(s) > 1}
    return moas_map, index, (start_ts, end_ts)


def moas_conflicts_ribs_then_updates(
    prefix: str,
    *,
    rib_window_min: int = 360,   # 6h — likely to include a RouteViews/RIS RIB snapshot
    upd_window_min: int = 60,    # 1h updates fallback
    projects: Optional[List[str]] = None,
    collectors: Optional[List[str]] = None,
    more_specifics: bool = True,
):
    """Find MOAS for a family by trying RIBs (wider window) then Updates.

    Returns: (moas_map, index, (start_ts, end_ts), source)
      - source ∈ {"ribs", "updates"} indicates where MOAS was found (if any).
    """
    # Phase A: RIBs with a wide enough window
    moas_map, index, (s1, e1) = moas_conflicts(
        prefix=prefix,
        window_min=rib_window_min,
        projects=projects,
        collectors=collectors,
        record_type="ribs",
        more_specifics=more_specifics,
    )
    if moas_map:
        return moas_map, index, (s1, e1), "ribs"

    # Phase B: Updates in a moderate window
    moas_map, index, (s2, e2) = moas_conflicts(
        prefix=prefix,
        window_min=upd_window_min,
        projects=projects,
        collectors=collectors,
        record_type="updates",
        more_specifics=more_specifics,
    )
    return moas_map, index, (s2, e2), "updates"


# ---------------------------------------------------------------------------
# 2) Any‑prefix MOAS discovery with limits (0.0.0.0/0 and ::/0)
# ---------------------------------------------------------------------------

def _limited(it: Iterable[dict], max_elems: Optional[int]) -> Iterable[dict]:
    if max_elems is None:
        return it
    return islice(it, max_elems)


def find_moas_anyprefix(
    minutes: int = 15,
    *,
    projects: Optional[List[str]] = None,
    collectors: Optional[List[str]] = None,
    ip_versions: Set[str] = {"v4"},      # {"v4"}, {"v6"}, or {"v4","v6"}
    record_type: str = "updates",
    more_specifics: bool = True,
    max_elems: Optional[int] = 20000,     # guardrail for notebooks
):
    """Scan a short window for MOAS anywhere (not just a given family).

    Strategy:
      - Build a vantage→prefix→origins index from /0 families (IPv4/IPv6) with a
        safe cap on number of elements, then union per prefix.
      - Reuses your existing: make_stream → iter_bgp_elements → build_vantage_origin_index.

    Returns: (moas_map, index, (start_ts, end_ts)) for the scanned set.
    """
    from_time, until_time = window_last_minutes(minutes)

    def index_family(family_prefix: str):
        stream = make_stream(
            family_prefix,
            from_time,
            until_time,
            projects=projects,
            collectors=collectors,
            record_type=record_type,
            more_specifics=more_specifics,
        )
        elems = iter_bgp_elements(stream)
        elems = _limited(elems, max_elems)
        return build_vantage_origin_index(elems)

    combined_index: Dict[Tuple[str,str,Optional[int]], Dict[str, Dict[int, Dict[str,int]]]] = defaultdict(dict)

    families = []
    if "v4" in ip_versions:
        families.append("0.0.0.0/0")
    if "v6" in ip_versions:
        families.append("::/0")

    for fam in families:
        idx = index_family(fam)
        # shallow merge
        for v, pmap in idx.items():
            combined_index.setdefault(v, {})
            for pfx, omap in pmap.items():
                combined_index[v].setdefault(pfx, {})
                combined_index[v][pfx].update(omap)

    # Union origins per prefix across all vantages
    per_prefix_union: Dict[str, Set[int]] = defaultdict(set)
    for pmap in combined_index.values():
        for pfx, omap in pmap.items():
            per_prefix_union[pfx].update(omap.keys())

    moas_map = {pfx: s for pfx, s in per_prefix_union.items() if len(s) > 1}
    return moas_map, combined_index, (from_time, until_time)


# ---------------------------------------------------------------------------
# 3) Pretty printers (optional)
# ---------------------------------------------------------------------------

def print_moas_map(moas_map, index, *, start_ts: int, end_ts: int, max_prefixes: int = 20):
    print(f"Time window: {ts_str(start_ts)} → {ts_str(end_ts)} (UTC)")
    if not moas_map:
        print("No MOAS observed in this scan.")
        return
    shown = 0
    for pfx in sorted(moas_map.keys()):
        if shown >= max_prefixes:
            print(f"... (truncated after {max_prefixes} prefixes)")
            break
        origins = ", ".join(f"AS{o}" for o in sorted(moas_map[pfx]))
        print(f"\n{pfx} — MOAS origins: {origins}")
        # show a couple of vantages where seen
        for v, pmap in index.items():
            if pfx in pmap:
                vs = ", ".join(f"AS{o}" for o in sorted(pmap[pfx].keys()))
                print(f"  {v[0]}/{v[1]} peer_asn={v[2]} → {vs}")
        shown += 1

In [None]:
# A) Try to find MOAS for a specific family using RIBs first
moas_map, idx, (s, e), src = moas_conflicts_ribs_then_updates(
    prefix="1.1.1.0/24",
    rib_window_min=360,
    upd_window_min=60,
    projects=["ris","routeviews"],
    collectors=(RIS_COLLECTORS + RV_COLLECTORS) if 'RIS_COLLECTORS' in globals() else None,
    more_specifics=True,
)
print(f"\nSource used: {src}")
print_moas_map(moas_map, idx, start_ts=s, end_ts=e)

In [None]:
# B) Discover MOAS anywhere (guarded by max_elems)
moas_any, idx_any, (s2, e2) = find_moas_anyprefix(
    minutes=60,
    projects=["ris"],
    collectors=RIS_COLLECTORS[:4] if 'RIS_COLLECTORS' in globals() else None,
    ip_versions={"v4"},
    record_type="updates",
    more_specifics=True,
    max_elems=20000,
)
print("\n[ANY‑PREFIX scan]")
print_moas_map(moas_any, idx_any, start_ts=s2, end_ts=e2, max_prefixes=100)


[ANY‑PREFIX scan]
Time window: 2025-08-15 14:52:55Z → 2025-08-15 15:52:55Z (UTC)

177.46.35.0/24 — MOAS origins: AS28135, AS274819
  ris/rrc00 peer_asn=34549 → AS274819
  ris/rrc00 peer_asn=58057 → AS274819
  ris/rrc00 peer_asn=206499 → AS274819
  ris/rrc01 peer_asn=6908 → AS274819
  ris/rrc01 peer_asn=2914 → AS274819
  ris/rrc03 peer_asn=47147 → AS274819
  ris/rrc03 peer_asn=48185 → AS274819
  ris/rrc01 peer_asn=15692 → AS274819
  ris/rrc00 peer_asn=852 → AS274819
  ris/rrc00 peer_asn=15562 → AS28135, AS274819
  ris/rrc00 peer_asn=4608 → AS274819

181.233.80.0/22 — MOAS origins: AS268314, AS271482
  ris/rrc00 peer_asn=204092 → AS268314
  ris/rrc00 peer_asn=34549 → AS268314, AS271482
  ris/rrc00 peer_asn=49432 → AS271482
  ris/rrc00 peer_asn=24482 → AS268314, AS271482
  ris/rrc00 peer_asn=7018 → AS268314
  ris/rrc00 peer_asn=216285 → AS271482
  ris/rrc00 peer_asn=37721 → AS268314, AS271482
  ris/rrc00 peer_asn=48185 → AS271482
  ris/rrc00 peer_asn=29504 → AS268314
  ris/rrc00 peer_asn

**BGPStream Graph Analysis**

In [None]:
with open("tools/bgpstream/as_graph.pickle", "rb") as f:
    G = pickle.load(f)

In [None]:
asn = '1299'
total_degree = G.degree(asn)
print(total_degree)

2528


**AS Graph Tools**

In [None]:
def _largest_component_subgraph(G: nx.Graph) -> nx.Graph:
    """Return the largest weakly‑connected component as an *undirected* graph."""
    if G.is_directed():
        comp_nodes = max(nx.weakly_connected_components(G), key=len)
    else:
        comp_nodes = max(nx.connected_components(G), key=len)
    return G.subgraph(comp_nodes).to_undirected()


def _percentile(value: float, sample: Sequence[float]) -> float:
    """Return the *inclusive* percentile (0‑100) of *value* within *sample*."""
    count = sum(1 for v in sample if v <= value)
    return 100 * count / len(sample)

In [None]:
def as_degree(G: nx.DiGraph, asn: str | int, *, normalized: bool = True) -> Dict[str, Any]:
    """Return in‑, out‑ and total degree of **asn**.

    Parameters
    ----------
    G : nx.DiGraph or nx.Graph
        AS‑relation graph (directed preferred).
    asn : str | int
        Autonomous‑System Number (must exist in *G*).
    normalized : bool, default **True**
        If *True* include percentile among all nodes.
    """
    if asn not in G:
        raise KeyError(f"ASN {asn} not found in graph")

    indeg = G.in_degree(asn) if G.is_directed() else G.degree(asn)
    outdeg = G.out_degree(asn) if G.is_directed() else G.degree(asn)
    total = indeg + outdeg if G.is_directed() else G.degree(asn)

    result: Dict[str, Any] = {"in": indeg, "out": outdeg, "total": total}

    if normalized:
        totals = [G.in_degree(n) + G.out_degree(n) if G.is_directed() else G.degree(n) for n in G]
        result["percentile"] = round(_percentile(total, totals), 2)
    return result

In [None]:
def as_betweenness(
    G: nx.DiGraph,
    asn: str | int,
    *,
    k: Optional[int] = 2000,
    normalized: bool = True,
) -> Dict[str, Any]:
    """Betweenness‑centrality for a single node.

    *k* – number of samples for *approximate* algorithm.  Set to *None* to run
    exact computation (be careful: O(V·E)).
    """
    if asn not in G:
        raise KeyError(asn)

    if k is None:
        bet = nx.betweenness_centrality(G, normalized=normalized)
        score = bet[asn]
    else:
        score = nx.betweenness_centrality_subset(
            G,
            sources=random.sample(list(G.nodes()), min(k, len(G))),
            targets=random.sample(list(G.nodes()), min(k, len(G))),
            normalized=normalized,
        )[asn]
    # Rank – how many nodes have strictly higher score
    higher = sum(1 for v in (nx.betweenness_centrality(G, k=k).values()) if v > score)
    return {"score": score, "rank": higher + 1}

In [None]:
def as_closeness(G: nx.DiGraph, asn: str | int) -> float:
    """Return closeness‑centrality (harmonic) of **asn**."""
    return nx.closeness_centrality(G, asn)

In [None]:
def as_eigenvector(
    G: nx.DiGraph,
    asn: str | int,
    *,
    variant: str = "pagerank",
    alpha: float = 0.85,
) -> float:
    """Return eigenvector‑based centrality.

    *variant* ∈ {'pagerank', 'eigenvector', 'katz'}
    """
    if variant == "pagerank":
        pr = nx.pagerank(G, alpha=alpha)
        return pr[asn]
    elif variant == "eigenvector":
        ev = nx.eigenvector_centrality(G, max_iter=500)
        return ev[asn]
    elif variant == "katz":
        katz = nx.katz_centrality_numpy(G, alpha=0.005)
        return katz[asn]
    else:
        raise ValueError("Unknown variant")

In [None]:
def as_kcore_index(G: nx.DiGraph, asn: str | int) -> int:
    """Return k‑core index (coreness) of **asn**."""
    core_num = nx.core_number(G.to_undirected())
    return core_num[asn]

In [None]:
def as_local_clustering(G: nx.DiGraph, asn: str | int) -> float:
    """Local clustering coefficient around **asn** (ignores edge direction)."""
    return nx.clustering(G.to_undirected(), asn)

In [None]:
def as_articulation_flag(G: nx.DiGraph, asn: str | int) -> bool:
    """Return *True* if **asn** is an articulation point (removal disconnects component)."""
    UG = G.to_undirected()
    return asn in nx.articulation_points(UG)

In [None]:
def as_removal_impact(
    G: nx.DiGraph,
    asn: str | int,
    metrics: Sequence[str] | None = None,
) -> Dict[str, Any]:
    """Simulate removal of **asn** and report delta for selected *metrics*.

    Supported metrics: 'components', 'diameter', 'avg_shortest_path'.
    """
    metrics = metrics or ("components",)
    UG = G.to_undirected()
    base_comp = nx.number_connected_components(UG)
    if "diameter" in metrics or "avg_shortest_path" in metrics:
        base_lcc = _largest_component_subgraph(G)
        base_diam = nx.diameter(base_lcc)
        base_aspl = nx.average_shortest_path_length(base_lcc)
    # Remove
    H = G.copy()
    H.remove_node(asn)
    results: Dict[str, Any] = {}
    if "components" in metrics:
        results["components"] = {
            "before": base_comp,
            "after": nx.number_connected_components(H.to_undirected()),
        }
    if "diameter" in metrics or "avg_shortest_path" in metrics:
        if H.number_of_nodes() == 0:
            results["diameter"] = {"before": base_diam, "after": None}
            results["avg_shortest_path"] = {"before": base_aspl, "after": None}
        else:
            H_lcc = _largest_component_subgraph(H)
            if "diameter" in metrics:
                results["diameter"] = {
                    "before": base_diam,
                    "after": nx.diameter(H_lcc),
                }
            if "avg_shortest_path" in metrics:
                results["avg_shortest_path"] = {
                    "before": base_aspl,
                    "after": nx.average_shortest_path_length(H_lcc),
                }
    return results

In [None]:
def as_two_hop_neighbors(G: nx.DiGraph, asn: str | int) -> List[str]:
    """Return list of neighbours within ≤ 2 hops from **asn** (inclusive)."""
    lengths = nx.single_source_shortest_path_length(G.to_undirected(), asn, cutoff=2)
    return [n for n, d in lengths.items() if 1 <= d <= 2]

In [None]:
def as_role_classifier(G: nx.DiGraph, asn: str | int) -> str:
    """Heuristic role classification: Tier‑1 | Tier‑2 | Stub | Content | Enterprise."""
    deg = as_degree(G, asn, normalized=False)
    cone_size = len(as_two_hop_neighbors(G, asn))
    core = as_kcore_index(G, asn)

    if deg["in"] == 0:  # no upstream – characteristic of Tier‑1
        return "Tier‑1"
    if core > 20 and cone_size > 1000:
        return "Tier‑2"
    if deg["total"] < 5 and cone_size < 50:
        return "Stub"
    if deg["out"] > deg["in"] * 2:
        return "Content/Enterprise"
    return "Regional ISP"

In [None]:
def graph_density(G: nx.DiGraph) -> float:
    """Return edge density (0‑1) – the fraction of possible edges that exist."""
    return nx.density(G)

In [None]:
def graph_degree_distribution(G: nx.DiGraph) -> Dict[str, Any]:
    """Return statistical summary of degree distribution (undirected view)."""
    degs = [G.in_degree(n) + G.out_degree(n) if G.is_directed() else G.degree(n) for n in G]
    summary = {
        "min": min(degs),
        "max": max(degs),
        "mean": statistics.fmean(degs),
        "median": statistics.median(degs),
        "stdev": statistics.pstdev(degs),
        "gini": _gini(degs),
    }
    # Power‑law exponent (alpha) via log‑log linear regression if possible.
    if len(degs) > 10:
        alpha = _power_law_alpha(degs)
        summary["power_law_alpha"] = alpha
    return summary

In [None]:
def _gini(values: Sequence[int | float]) -> float:
    """Gini coefficient (0 perfect equality, 1 max inequality)."""
    sorted_vals = sorted(values)
    n = len(values)
    cum = 0
    for i, v in enumerate(sorted_vals, 1):
        cum += v * i
    return (2 * cum) / (n * sum(sorted_vals)) - (n + 1) / n

In [None]:
def _power_law_alpha(values: Sequence[int]) -> float:
    """Return crude power‑law exponent using linear regression on CCDF."""
    import math, itertools

    sorted_vals = sorted(v for v in values if v > 0)
    ranks = list(range(1, len(sorted_vals) + 1))
    ccdf = [1 - (r - 1) / len(sorted_vals) for r in ranks]
    xs = [math.log(v) for v in sorted_vals]
    ys = [math.log(c) for c in ccdf]
    # Least‑squares slope
    n = len(xs)
    x_bar = sum(xs) / n
    y_bar = sum(ys) / n
    num = sum((x - x_bar) * (y - y_bar) for x, y in zip(xs, ys))
    den = sum((x - x_bar) ** 2 for x in xs)
    slope = num / den if den else 0.0
    return -slope  # α ≈ -slope of log‑log CCDF

In [None]:
def graph_diameter(G: nx.DiGraph) -> Dict[str, Any]:
    """Return diameter, radius, and average shortest path of the giant component."""
    LCC = _largest_component_subgraph(G)
    return {
        "diameter": nx.diameter(LCC),
        "radius": nx.radius(LCC),
        "avg_shortest_path": nx.average_shortest_path_length(LCC),
    }


def graph_modularity_communities(G: nx.DiGraph) -> Dict[str, Any]:
    """Detect communities using greedy modularity and return mapping + modularity."""
    UG = G.to_undirected()
    communities = list(nx.algorithms.community.greedy_modularity_communities(UG))
    mapping = {n: i for i, comm in enumerate(communities) for n in comm}
    Q = nx.algorithms.community.quality.modularity(UG, communities)
    return {"Q": Q, "communities": mapping}


def graph_assortativity(G: nx.DiGraph) -> float:
    """Degree assortativity coefficient (−1 disassortative, +1 assortative)."""
    UG = G.to_undirected()
    return nx.degree_assortativity_coefficient(UG)


def graph_transitivity(G: nx.DiGraph) -> float:
    """Global clustering coefficient (ratio of closed triplets)."""
    return nx.transitivity(G.to_undirected())


def graph_kcore_layers(G: nx.DiGraph) -> List[Dict[str, Any]]:
    """Return per‑layer statistics of the k‑core decomposition."""
    core = nx.core_number(G.to_undirected())
    layer_stats: Dict[int, List[int]] = defaultdict(list)
    for node, k in core.items():
        layer_stats[k].append(node)
    summary = []
    for k in sorted(layer_stats):
        nodes = layer_stats[k]
        summary.append({
            "k": k,
            "size": len(nodes),
            "avg_deg": statistics.fmean(G.degree(n) for n in nodes),
        })
    return summary


def graph_connectivity(G: nx.DiGraph) -> Dict[str, int]:
    """Return edge and vertex connectivity of the *undirected* graph."""
    UG = G.to_undirected()
    return {
        "edge_connectivity": nx.edge_connectivity(UG, approximate=True),
        "vertex_connectivity": nx.node_connectivity(UG),
    }


def graph_spectral_gap(G: nx.DiGraph) -> Dict[str, float]:
    """Return λ₂ and (λ₁ − λ₂) of the Laplacian – indicators of robustness.
    Requires *numpy*.
    """
    if np is None:
        raise ImportError("numpy required for spectral gap calculation")
    UG = G.to_undirected()
    L = nx.laplacian_matrix(UG).astype(float)
    eigs = np.linalg.eigvalsh(L.A)  # type: ignore
    eigs.sort()
    return {"lambda2": eigs[1], "spectral_gap": eigs[-1] - eigs[1]}


def graph_redundancy_profile(
    G: nx.DiGraph,
    *,
    sample: int = 2000,
) -> Dict[int, int]:
    """Histogram of edge‑disjoint path multiplicity between random node pairs."""
    UG = G.to_undirected()
    nodes = list(UG.nodes())
    hist: Counter[int] = Counter()
    for _ in range(min(sample, len(nodes) ** 2)):
        s, t = random.sample(nodes, 2)
        try:
            k = nx.edge_connectivity(UG, s, t)
            hist[k] += 1
        except nx.NetworkXError:
            hist[0] += 1  # disconnected
    return dict(hist)


def graph_time_compare(
    G_old: nx.DiGraph,
    G_new: nx.DiGraph,
) -> Dict[str, Any]:
    """Compare two snapshots and report added/removed nodes and edges."""
    edges_old = set(G_old.edges())
    edges_new = set(G_new.edges())
    nodes_old = set(G_old.nodes())
    nodes_new = set(G_new.nodes())
    return {
        "edges_added": list(edges_new - edges_old),
        "edges_removed": list(edges_old - edges_new),
        "nodes_added": list(nodes_new - nodes_old),
        "nodes_removed": list(nodes_old - nodes_new),
    }


In [None]:
def _largest_component_subgraph(G: nx.Graph) -> nx.Graph:
    """Return the Undirected view of the **largest weakly‑connected component**."""
    if G.is_directed():
        comp_nodes = max(nx.weakly_connected_components(G), key=len)
    else:
        comp_nodes = max(nx.connected_components(G), key=len)
    return G.subgraph(comp_nodes).to_undirected()


def _percentile(x: float, sample: Sequence[float]) -> float:
    """Inclusive percentile of *x* within *sample* (0–100)."""
    leq = sum(1 for v in sample if v <= x)
    return 100 * leq / len(sample)

In [None]:
def global_resilience_dashboard(G: nx.DiGraph, *, sample: int = 2000) -> Dict[str, Any]:
    """Return a consolidated resilience report with **pure graph metrics**.

    Metrics included
    ----------------
    • *edge_connectivity* / *vertex_connectivity*
    • *spectral_gap* ( λ₁ − λ₂ )
    • size of most robust k‑core (k_max)
    • average edge‑disjoint path count between random node pairs.
    """
    report: Dict[str, Any] = {}

    # Connectivity (approximate edge‑connectivity for performance)
    UG = G.to_undirected()
    report["edge_connectivity"] = nx.edge_connectivity(UG, approximate=True)
    report["vertex_connectivity"] = nx.node_connectivity(UG)

    # Spectral gap (requires numpy)
    if np is not None and G.number_of_nodes() > 2:
        L = nx.laplacian_matrix(UG).astype(float)
        eigs = np.linalg.eigvalsh(L.A)  # type: ignore[attr-defined]
        eigs.sort()
        report["lambda2"] = eigs[1]
        report["spectral_gap"] = eigs[-1] - eigs[1]
    else:
        report["lambda2"] = report["spectral_gap"] = None

    # k‑core robustness index
    core_numbers = nx.core_number(UG)
    report["k_max"] = max(core_numbers.values())

    # Average edge‑disjoint paths on random sample
    nodes = list(UG.nodes())
    hist: Counter[int] = Counter()
    for _ in range(min(sample, len(nodes) ** 2)):
        s, t = random.sample(nodes, 2)
        try:
            k = nx.edge_connectivity(UG, s, t)
            hist[k] += 1
        except nx.NetworkXError:
            hist[0] += 1
    avg_paths = sum(k * c for k, c in hist.items()) / sum(hist.values())
    report["avg_edge_disjoint_paths"] = avg_paths

    return report

In [None]:
def critical_edge_set(G: nx.DiGraph, *, budget: int = 20) -> List[Tuple[str, str, float]]:
    """Return *budget* most critical edges ranked by **edge‑betweenness**.
    Each tuple → (u, v, betweenness_score)."""
    betw = nx.edge_betweenness_centrality(G)
    return sorted(betw.items(), key=lambda x: x[1], reverse=True)[:budget]

In [None]:
def hub_landscape_mapper(G: nx.DiGraph) -> List[Dict[str, Any]]:
    """Classify each AS into *Global*, *Regional*, *Local* hub or *Stub*.

    Heuristic based on degree percentile + eigenvector centrality.
    """
    degs = {n: G.in_degree(n) + G.out_degree(n) if G.is_directed() else G.degree(n) for n in G}
    percentiles = {n: _percentile(d, degs.values()) for n, d in degs.items()}
    eig = nx.eigenvector_centrality(G, max_iter=500)

    mapping = []
    for n in G:
        if percentiles[n] >= 99 and eig[n] > 0.01:
            role = "Global Hub"
        elif percentiles[n] >= 90:
            role = "Regional Hub"
        elif percentiles[n] >= 70:
            role = "Local Hub"
        else:
            role = "Stub/Edge"
        mapping.append({
            "asn": n,
            "role": role,
            "degree": degs[n],
            "eigenvector": round(eig[n], 6),
            "degree_percentile": round(percentiles[n], 2),
        })
    return mapping

In [None]:
def _sample_longest_pairs(G: nx.Graph, k: int) -> List[Tuple[str, str]]:
    """Return *k* node pairs with the largest shortest‑path lengths (sampling)."""
    UG = G.to_undirected()
    nodes = list(UG)
    pairs: List[Tuple[str, str]] = []
    dists: List[int] = []
    for _ in range(k * 5):  # oversample for better chance of long paths
        a, b = random.sample(nodes, 2)
        try:
            d = nx.shortest_path_length(UG, a, b)
            pairs.append((a, b))
            dists.append(d)
        except nx.NetworkXNoPath:
            continue
    top_idx = sorted(range(len(dists)), key=lambda i: dists[i], reverse=True)[:k]
    return [pairs[i] for i in top_idx]

In [None]:
def edge_upgrade_recommender(G: nx.DiGraph, *, k: int = 5) -> List[Dict[str, Any]]:
    """Suggest *k* new edges that would most reduce diameter/ASPL (greedy heuristic)."""
    UG = G.to_undirected()
    base_diam = nx.diameter(_largest_component_subgraph(G))
    base_aspl = nx.average_shortest_path_length(_largest_component_subgraph(G))

    suggestions: List[Dict[str, Any]] = []
    for (u, v) in _sample_longest_pairs(G, k * 3):
        if UG.has_edge(u, v):
            continue
        H = UG.copy()
        H.add_edge(u, v)
        try:
            new_diam = nx.diameter(_largest_component_subgraph(H))
            new_aspl = nx.average_shortest_path_length(_largest_component_subgraph(H))
        except nx.NetworkXError:
            continue
        suggestions.append({
            "edge": (u, v),
            "delta_diameter": base_diam - new_diam,
            "delta_aspl": round(base_aspl - new_aspl, 4),
        })
    # Pick top‑k by ∆‑diameter then ∆‑ASPL
    return sorted(suggestions, key=lambda x: (x["delta_diameter"], x["delta_aspl"]), reverse=True)[:k]

In [None]:
def peering_impact_analyzer(
    G: nx.DiGraph,
    a: str | int,
    b: str | int,
) -> Dict[str, Any]:
    """Evaluate topological impact of **adding** an undirected edge (a,b)."""
    UG = G.to_undirected()
    if UG.has_edge(a, b):
        raise ValueError("Edge already exists – impact is zero.")

    base_diam = nx.diameter(_largest_component_subgraph(UG))
    base_aspl = nx.average_shortest_path_length(_largest_component_subgraph(UG))

    H = UG.copy()
    H.add_edge(a, b)
    new_diam = nx.diameter(_largest_component_subgraph(H))
    new_aspl = nx.average_shortest_path_length(_largest_component_subgraph(H))

    betw_before = nx.edge_betweenness_centrality(UG)
    betw_after = nx.edge_betweenness_centrality(H)

    # Sum absolute change as a proxy of load redistribution
    load_shift = sum(abs(betw_after[e] - betw_before.get(e, 0)) for e in betw_after)

    return {
        "delta_diameter": base_diam - new_diam,
        "delta_aspl": round(base_aspl - new_aspl, 4),
        "load_redistribution_score": round(load_shift, 6),
    }


In [None]:
def redundancy_heatmap(G: nx.DiGraph, *, sample: int = 2000) -> Dict[int, int]:
    """Return histogram of edge‑disjoint path counts between random pairs."""
    UG = G.to_undirected()
    hist: Counter[int] = Counter()
    nodes = list(UG)
    for _ in range(min(sample, len(nodes) ** 2)):
        s, t = random.sample(nodes, 2)
        try:
            k = nx.edge_connectivity(UG, s, t)
            hist[k] += 1
        except nx.NetworkXError:
            hist[0] += 1
    return dict(hist)

In [None]:
def viral_failure_simulator(
    G: nx.DiGraph,
    *,
    p: float = 0.1,
    runs: int = 50,
) -> Dict[str, Any]:
    """Random‑failure percolation: remove each node with prob *p* (×runs)."""
    surviving: List[float] = []
    for _ in range(runs):
        H = G.copy()
        for n in list(H.nodes()):
            if random.random() < p:
                H.remove_node(n)
        largest = _largest_component_subgraph(H) if H.number_of_nodes() else None
        surviving.append(0 if largest is None else len(largest) / G.number_of_nodes())
    return {
        "p": p,
        "runs": runs,
        "avg_survival_fraction": round(sum(surviving) / len(surviving), 4),
        "values": surviving,
    }

In [None]:
def _power_law_alpha(values: Sequence[int | float]) -> float:
    """Rough power‑law exponent via log‑log CCDF slope."""
    vals = [v for v in values if v > 0]
    if len(vals) < 10:
        return float("nan")
    vals.sort()
    ranks = range(1, len(vals) + 1)
    xs = [math.log(v) for v in vals]
    ys = [math.log(1 - (r - 1) / len(vals)) for r in ranks]
    n = len(xs)
    x_bar, y_bar = sum(xs) / n, sum(ys) / n
    num = sum((x - x_bar) * (y - y_bar) for x, y in zip(xs, ys))
    den = sum((x - x_bar) ** 2 for x in xs)
    slope = num / den if den else float("nan")
    return -slope

In [None]:
def degree_tail_watcher(graph_ts: List[Tuple[str, nx.DiGraph]]) -> List[Dict[str, Any]]:
    """Track power‑law α over time – each entry (timestamp, Graph)."""
    series = []
    for ts, G in graph_ts:
        degs = [G.in_degree(n) + G.out_degree(n) if G.is_directed() else G.degree(n) for n in G]
        series.append({"timestamp": ts, "alpha": _power_law_alpha(degs)})
    return series

In [None]:
def spanner_extractor(
    G: nx.DiGraph,
    *,
    eps: float = 0.2,
) -> nx.Graph:
    """Return a (1+ε) greedy spanner using a simple distance‑preserving heuristic."""
    UG = G.to_undirected()
    S = nx.Graph()
    S.add_nodes_from(UG.nodes())
    # Sort edges by weight 1 (all equal) for determinism
    for u, v in UG.edges():
        if u == v:
            continue
        if not nx.has_path(S, u, v):
            S.add_edge(u, v)
        else:
            d_S = nx.shortest_path_length(S, u, v)
            if d_S > (1 + eps):
                S.add_edge(u, v)
    return S

In [None]:
def spectral_partition_planner(G: nx.DiGraph, *, target_clusters: int = 4) -> Dict[str, Any]:
    """Partition graph into *target_clusters* via leading eigenvector method."""
    communities = list(nx.algorithms.community.leading_eigenvector_communities(G.to_undirected()))
    # If we got more than needed – merge smallest sets until count matches.
    while len(communities) > target_clusters:
        # Merge two smallest
        communities = sorted(communities, key=len)
        merged = communities[0] | communities[1]
        communities = [merged, *communities[2:]]
    mapping = {n: i for i, c in enumerate(communities) for n in c}
    Q = nx.algorithms.community.modularity(G.to_undirected(), communities)
    return {"communities": mapping, "modularity": Q}

In [None]:
def congestion_proxy_rank(G: nx.DiGraph, *, top_n: int = 50) -> List[Tuple[Tuple[str, str], float]]:
    """Return *top_n* edges by edge‑betweenness centrality (proxy for load)."""
    betw = nx.edge_betweenness_centrality(G)
    return sorted(betw.items(), key=lambda x: x[1], reverse=True)[:top_n]

In [None]:
def k_augmentation_planner(
    G: nx.DiGraph,
    *,
    k: int = 3,
    candidate_set: Iterable[str] | None = None,
    attach_degree: int = 5,
) -> Dict[str, List[Tuple[str, str]]]:
    """Suggest how to attach *k* **new nodes** (IXP/POP) for max diameter gain.

    Returns mapping: new_node → list of edges (new_node ↔ existing).
    """
    UG = G.to_undirected()
    if candidate_set is None:
        candidate_set = [f"NEW{i}" for i in range(k)]
    high_deg_nodes = sorted(UG.nodes(), key=UG.degree, reverse=True)[: attach_degree]

    plan: Dict[str, List[Tuple[str, str]]] = {}

    for new_node in list(candidate_set)[:k]:
        plan[new_node] = [(new_node, hub) for hub in high_deg_nodes]
    return plan

In [None]:
def meshedness_index(G: nx.DiGraph) -> float:
    """Compute (E − V + 1)/(2V − 5) – 0 for tree, 1 for maximally meshed planar."""
    V = G.number_of_nodes()
    E = G.to_undirected().number_of_edges()
    if V <= 2:
        return 0.0
    return (E - V + 1) / (2 * V - 5)

In [None]:
def extract_ego_subgraph(
    G: nx.DiGraph,
    asn: str | int,
    *,
    radius: int = 1,
    direction: str = "both",
) -> nx.DiGraph:
    """Return the induced subgraph of all nodes within *radius* hops of *asn*.

    Parameters
    ----------
    direction : {'in', 'out', 'both'}
        • 'in'   – traverse incoming edges only (upstream view).
        • 'out'  – traverse outgoing edges only (downstream view).
        • 'both' – ignore direction (default).
    """
    if direction not in {"in", "out", "both"}:
        raise ValueError("direction must be 'in', 'out', or 'both'")

    if direction == "both" or not G.is_directed():
        UG = G.to_undirected()
        nodes = nx.single_source_shortest_path_length(UG, asn, cutoff=radius).keys()
    else:
        nbrs: Set[str | int] = {asn}
        frontier: Set[str | int] = {asn}
        for _ in range(radius):
            next_frontier: Set[str | int] = set()
            for n in frontier:
                edges = G.in_edges(n) if direction == "in" else G.out_edges(n)
                next_frontier.update(v if direction == "in" else w for v, w in edges)
            nbrs.update(next_frontier)
            frontier = next_frontier
        nodes = nbrs
    return G.subgraph(nodes).copy()

In [None]:
def extract_customer_cone_subgraph(G: nx.DiGraph, asn: str | int) -> nx.DiGraph:
    """Return all ASNs **reachable downstream** from *asn* via provider→customer edges."""
    cone: Set[str | int] = set()
    stack = [asn]
    while stack:
        current = stack.pop()
        for _, child, data in G.out_edges(current, data=True):
            if data.get("relation") == "p2c" and child not in cone:
                cone.add(child)
                stack.append(child)
    cone.add(asn)
    return G.subgraph(cone).copy()

In [None]:
def extract_upstream_chain_subgraph(G: nx.DiGraph, asn: str | int, depth: int | None = None) -> nx.DiGraph:
    """Return chain of providers *upwards* from *asn* up to *depth* hops (None = unlimited)."""
    chain: Set[str | int] = {asn}
    current_level = {asn}
    hops = 0
    while current_level and (depth is None or hops < depth):
        next_level: Set[str | int] = set()
        for node in current_level:
            for parent, _, data in G.in_edges(node, data=True):
                if data.get("relation") == "p2c" and parent not in chain:
                    chain.add(parent)
                    next_level.add(parent)
        current_level = next_level
        hops += 1
    return G.subgraph(chain).copy()

In [None]:
def extract_peering_cluster_subgraph(
    G: nx.DiGraph,
    asn: str | int,
    *,
    include_peers_of_peers: bool = False,
) -> nx.DiGraph:
    """Return subgraph of *asn* and all its **p2p peers** (optionally peers‑of‑peers)."""
    peers: Set[str | int] = {
        nbr
        for nbr in G.neighbors(asn)
        if G.get_edge_data(asn, nbr).get("relation") == "p2p"
    }
    if include_peers_of_peers:
        second_hop = set()
        for p in peers:
            second_hop.update(
                n
                for n in G.neighbors(p)
                if G.get_edge_data(p, n).get("relation") == "p2p"
            )
        peers.update(second_hop)
    nodes = peers | {asn}
    return G.subgraph(nodes).copy()

In [None]:
def induced_subgraph_by_asns(
    G: nx.DiGraph,
    asns: Iterable[str | int],
    *,
    keep_isolated: bool = False,
) -> nx.DiGraph:
    """Return the subgraph induced by *asns*.

    If *keep_isolated* is False, nodes with degree 0 inside the subset are
    removed to focus the analysis.
    """
    H = G.subgraph(asns).copy()
    if not keep_isolated:
        iso = list(nx.isolates(H))
        H.remove_nodes_from(iso)
    return H

In [None]:
def top_betweenness_edge_subgraph(G: nx.DiGraph, top_k: int = 500) -> nx.DiGraph:
    """Return subgraph composed of the *top_k* edges by betweenness centrality."""
    betw = nx.edge_betweenness_centrality(G)
    top_edges = [e for e, _ in sorted(betw.items(), key=lambda x: x[1], reverse=True)[:top_k]]
    H = nx.DiGraph()
    H.add_nodes_from(G.nodes(data=True))
    for u, v in top_edges:
        H.add_edge(u, v, **G.get_edge_data(u, v))
    # Remove isolated nodes
    H.remove_nodes_from(list(nx.isolates(H)))
    return H

In [None]:
def build_ego_as_graph(
    seed_asns: list[str | int],
    *,
    radius: int = 2,
    from_time: str,
    until_time: str,
    collectors: list[str] | None = None,
    record_type: str = "ribs",
    rel_file: str | None = None,
) -> nx.DiGraph:
    """Stream BGP paths but **retain only edges** within *radius* hops of *seed_asns*.

    Rationale: ideal for engineers who want a quick neighbourhood view around
    their own AS(es) without parsing millions of lines unrelated to them.
    """
    collectors = collectors or ["rrc00"]
    rels = load_caida_relationships(rel_file)

    # Keep track of nodes within <= radius via incremental discovery.
    discovered: Set[str] = set(map(str, seed_asns))

    G = nx.DiGraph()
    stream = pybgpstream.BGPStream(
        from_time=from_time,
        until_time=until_time,
        collectors=collectors,
        record_type=record_type,
    )
    for rec in stream.records():
        for elem in rec:
            hops = [k for k, _ in groupby(elem.fields["as-path"].split())]
            intersect = discovered.intersection(hops)
            if not intersect:
                continue  # skip paths outside interest zone

            # Add edges but only for nodes within radius of any seed
            for idx, (u, v) in enumerate(zip(hops[:-1], hops[1:])):
                if any(abs(idx - hops.index(s)) <= radius for s in intersect):
                    rel = rels.get((u, v), "unknown")
                    if rel == "p2c":
                        G.add_edge(u, v, relation="p2c")
                    elif rel in {"p2p", "sibling"}:
                        G.add_edge(u, v, relation=rel)
                        G.add_edge(v, u, relation=rel)
                    else:
                        G.add_edge(u, v, relation="unknown")
                        G.add_edge(v, u, relation="unknown")
                    discovered.update([u, v])
    return G

# ---------------------------------------------------------------------------
# 4.2  Provider→customer cone builder  (downstream tree)
# ---------------------------------------------------------------------------

def build_customer_cone_graph(
    root_asns: list[str | int],
    *,
    max_depth: int | None = None,
    rel_file: str | None = None,
) -> nx.DiGraph:
    """Construct a **provider→customer** tree using only CAIDA *as‑rel* file.

    No BGPStream needed – therefore instantaneous.  Engineers often care about
    the potential blast‑radius of their announcements; this tree provides it.
    """
    rels = load_caida_relationships(rel_file)
    # Build one‑directional DiGraph from relations alone
    R = nx.DiGraph()
    for (a, b), rel in rels.items():
        if rel == "p2c":
            R.add_edge(a, b, relation="p2c")
    cone_nodes: Set[str] = set(map(str, root_asns))
    frontier: Set[str] = set(cone_nodes)
    depth = 0
    while frontier and (max_depth is None or depth < max_depth):
        next_frontier: Set[str] = set()
        for p in frontier:
            for _, c in R.out_edges(p):
                if c not in cone_nodes:
                    cone_nodes.add(c)
                    next_frontier.add(c)
        frontier = next_frontier
        depth += 1
    return R.subgraph(cone_nodes).copy()

# ---------------------------------------------------------------------------
# 4.3  Fixed‑budget path sampler (fast snapshot)
# ---------------------------------------------------------------------------

def build_sampled_as_graph(
    *,
    path_limit: int = 100000,
    from_time: str,
    until_time: str,
    collectors: list[str] | None = None,
    record_type: str = "updates",
    rel_file: str | None = None,
) -> nx.DiGraph:
    """Stream only the **first *path_limit*** BGP elements and build the graph.

    Useful when you want a *quick & dirty* Internet snapshot (<10 seconds) for
    sanity‑checks, regression tests, or teaching demos.
    """
    collectors = collectors or ["rrc00"]
    rels = load_caida_relationships(rel_file)

    G = nx.DiGraph()
    stream = pybgpstream.BGPStream(
        from_time=from_time,
        until_time=until_time,
        collectors=collectors,
        record_type=record_type,
    )
    element_iter = islice((elem for rec in stream.records() for elem in rec), path_limit)
    for elem in element_iter:
        hops = [k for k, _ in groupby(elem.fields["as-path"].split())]
        for u, v in zip(hops[:-1], hops[1:]):
            rel = rels.get((u, v), "unknown")
            if rel == "p2c":
                G.add_edge(u, v, relation="p2c")
            elif rel in {"p2p", "sibling"}:
                G.add_edge(u, v, relation=rel)
                G.add_edge(v, u, relation=rel)
            else:
                G.add_edge(u, v, relation="unknown")
                G.add_edge(v, u, relation="unknown")
    return G

# ---------------------------------------------------------------------------
# 4.4  Two‑collector diff builder (regional dx)
# ---------------------------------------------------------------------------

def build_diff_collectors_graph(
    *,
    collectors_a: list[str],
    collectors_b: list[str],
    from_time: str,
    until_time: str,
    record_type: str = "ribs",
    rel_file: str | None = None,
) -> Tuple[nx.DiGraph, nx.DiGraph, nx.DiGraph]:
    """Build two graphs from **distinct collector sets** and return their *diff*.

    Engineers can spot region‑specific links that appear only in one view.
    Returns  (G_a, G_b, G_diff) where G_diff = symmetric difference of edges.
    """
    G_a = create_bgpstream_as_rel_graph(
        from_time, until_time, collectors_a, record_type, rel_file
    )[0]
    G_b = create_bgpstream_as_rel_graph(
        from_time, until_time, collectors_b, record_type, rel_file
    )[0]
    diff = nx.DiGraph()
    edges_a = set(G_a.edges())
    edges_b = set(G_b.edges())
    for u, v in edges_a.symmetric_difference(edges_b):
        # Attach direction from whichever graph had it
        if (u, v) in edges_a:
            diff.add_edge(u, v, **G_a.get_edge_data(u, v))
        else:
            diff.add_edge(u, v, **G_b.get_edge_data(u, v))
    return G_a, G_b, diff

In [None]:
# More tools:
# Instead of numbers nodes - create AS object and use it as node, with all its properties

**RPKI Views**

In [None]:
from tools.rpki_views.rpkiviews_tools import *
from tools.rpki_views.rpkiviews_aux import *

In [None]:
rpkiviews_prefixes_asns('31.168.36.0/23')