# Symbolic Manifold — Entropy & Curvature (Colab Runner)

Pipeline para calcular **H_rate** e (opcionalmente) **curvatura de Ollivier–Ricci** em grafos semânticos grandes, com segurança de memória e integração para o *source data* da **Fig. 2**.

**Fluxo:** instalar dependências → gravar scripts → carregar grafo (upload ou Drive) → rodar **H_rate + kappa** (amostrado) → gerar `source_fig2_entropy_curvature.csv` → salvar/baixar.


In [ ]:
# @title Verificar RAM
import psutil, platform

ram_gb = psutil.virtual_memory().total / 1024**3
print(f"Python: {platform.python_version()} | RAM: {ram_gb:.1f} GB")

In [ ]:
# @title Instalar dependências (networkx<3.0, numpy, pandas, GraphRicciCurvature, POT)
!pip -q install "networkx>=2.8.8,<3.0" numpy pandas pyyaml matplotlib scipy
!pip -q install GraphRicciCurvature pot || echo 'Ricci opcional; prossiga mesmo assim'

In [ ]:
#@title Gravar scripts no runtime
from pathlib import Path
pro_src = r'''#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations
"""
compute_entropy_curvature_pro.py (improved)
- Memory-safe H_rate via streaming power iteration.
- Ricci curvature computed ONLY on sampled edges (if library supports compute_ricci_curvature_edges).
- Default curv-sample=500 to reduce memory/time.
"""

import argparse, csv, glob, math, os, random, sys, time, warnings
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
import numpy as np

try:
    import networkx as nx
except Exception as e:
    print("[FATAL] networkx required:", e, file=sys.stderr); sys.exit(1)

_HAS_RICCI = False
try:
    from GraphRicciCurvature.OllivierRicci import OllivierRicci
    _HAS_RICCI = True
except Exception:
    _HAS_RICCI = False

def detect_weight_attr(G: nx.DiGraph) -> Optional[str]:
    for _,_,d in G.edges(data=True):
        if not d: continue
        for c in ("weight","w","freq","frequency","strength","p","prob"):
            if c in d:
                try:
                    float(d[c]); return c
                except Exception:
                    pass
        break
    return None

def load_graph(path: str) -> nx.DiGraph:
    p = path.lower()
    if p.endswith(".graphml"):
        G = nx.read_graphml(path)
    elif p.endswith(".gpickle") or p.endswith(".pickle"):
        G = nx.read_gpickle(path)
    elif p.endswith(".edgelist") or p.endswith(".txt"):
        G = nx.read_edgelist(path, create_using=nx.DiGraph)
    else:
        G = nx.read_graphml(path)
    if not G.is_directed():
        G = nx.DiGraph(G)
    return G

def neighbors_with_probs(G: nx.DiGraph, u, beta: float, wkey: Optional[str]):
    nbrs = list(G.successors(u))
    if not nbrs:
        return np.array([u]), np.array([1.0])
    ws = []
    for v in nbrs:
        d = G[u][v]
        w = d.get(wkey, d.get("weight", 1.0)) if wkey is not None else d.get("weight", 1.0)
        try: w = float(w)
        except Exception: w = 1.0
        ws.append(max(w, 1e-12))
    ws = np.power(np.asarray(ws, dtype=float), float(beta))
    Z = ws.sum()
    if not np.isfinite(Z) or Z <= 0.0:
        probs = np.full(len(nbrs), 1.0/len(nbrs))
    else:
        probs = ws / Z
    return np.asarray(nbrs), probs

def entropy_rate_stream(G: nx.DiGraph, beta: float, wkey: Optional[str], max_iter: int = 200, tol: float = 1e-10) -> float:
    nodes = list(G.nodes())
    n = len(nodes)
    index = {u:i for i,u in enumerate(nodes)}
    row_probs = {}
    row_entropy = np.zeros(n, dtype=float)
    for u in nodes:
        nbrs, probs = neighbors_with_probs(G, u, beta, wkey)
        row_probs[u] = (nbrs, probs)
        with np.errstate(divide="ignore", invalid="ignore"):
            row_entropy[index[u]] = -np.sum(probs * np.where(probs>0, np.log(probs), 0.0))
    pi = np.full(n, 1.0/n, dtype=float)
    nxt = np.zeros_like(pi)
    for _ in range(max_iter):
        nxt.fill(0.0)
        for u in nodes:
            i = index[u]
            p_i = pi[i]
            nbrs, probs = row_probs[u]
            for v, pv in zip(nbrs, probs):
                nxt[index[v]] += p_i * pv
        s = nxt.sum()
        if s > 0: nxt /= s
        if np.linalg.norm(nxt - pi, 1) < tol:
            pi = nxt; break
        pi, nxt = nxt, pi
    return float(np.dot(pi, row_entropy))

def mean_ollivier_ricci(G: nx.DiGraph, undirected: bool, nsample: Optional[int], alpha: float = 0.5, method: str = "OTD", seed: int = 42):
    if not _HAS_RICCI:
        return (float("nan"), "ricci_unavailable")
    H = G.to_undirected() if undirected else G
    E = list(H.edges())
    if not E:
        return (float("nan"), "empty_graph")
    if nsample is not None and nsample > 0 and nsample < len(E):
        rng = random.Random(seed); E = rng.sample(E, nsample); note = f"_sample{len(E)}"
    else:
        note = "_full"
    try:
        orc = OllivierRicci(H, alpha=alpha, method=method, verbose="ERROR")
        if hasattr(orc, "compute_ricci_curvature_edges"):
            orc.compute_ricci_curvature_edges(E)   # compute only sampled edges
        else:
            orc.compute_ricci_curvature()          # fallback (all edges)
        vals = []
        for (u,v) in E:
            d = orc.G[u][v]
            k = d.get("ricciCurvature", None)
            if k is None: continue
            if math.isfinite(k): vals.append(float(k))
        if not vals: return (float("nan"), f"ollivier_{method}{note}_empty")
        return (float(np.mean(vals)), f"ollivier_{method}{note}")
    except Exception as e:
        warnings.warn(f"OllivierRicci failed: {e}")
        return (float("nan"), f"ollivier_{method}_error")

def run_once(graph_path: str, beta: float, regime: str, no_curv: bool, curv_undirected: bool, curv_sample: Optional[int], ricci_method: str, seed: int):
    t0 = time.time()
    G = load_graph(graph_path)
    wkey = detect_weight_attr(G)
    H = entropy_rate_stream(G, beta=beta, wkey=wkey)
    if no_curv:
        kappa, method = float("nan"), "ricci_skipped"
    else:
        kappa, method = mean_ollivier_ricci(G, undirected=curv_undirected, nsample=curv_sample, alpha=0.5, method=ricci_method, seed=seed)
    elapsed = time.time() - t0
    return {
        "graph": Path(graph_path).name,
        "beta": float(beta),
        "H_rate": float(H),
        "kappa": kappa,
        "regime": regime,
        "nnodes": int(G.number_of_nodes()),
        "nedges": int(G.number_of_edges()),
        "weight_attr": (wkey or ""),
        "curv_method": method,
        "elapsed_s": round(float(elapsed), 3),
    }

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--graph", required=True)
    ap.add_argument("--beta", nargs="+", type=float, default=[1.0])
    ap.add_argument("--regime", type=str, default="")
    ap.add_argument("--no-curvature", action="store_true")
    ap.add_argument("--curv-undirected", action="store_true")
    ap.add_argument("--curv-sample", type=int, default=500)  # reduced default
    ap.add_argument("--ricci-method", choices=["OTD","base","approximate"], default="OTD")
    ap.add_argument("--seed", type=int, default=42)
    ap.add_argument("--out", required=True)
    args = ap.parse_args()

    paths = sorted(sum([glob.glob(args.graph)], []))
    if not paths:
        print(f"[FATAL] no files matched {args.graph}", file=sys.stderr); sys.exit(2)

    rows = []
    for p in paths:
        for b in args.beta:
            rows.append(run_once(p, b, args.regime, args.no_curvature, args.curv_undirected, (None if args.curv_sample<=0 else args.curv_sample), args.ricci_method, args.seed))

    header = ["graph","beta","H_rate","kappa","regime","nnodes","nedges","weight_attr","curv_method","elapsed_s"]
    with open(args.out, "w", newline="") as f:
        w = csv.DictWriter(f, fieldnames=header); w.writeheader()
        for r in rows: w.writerow(r)
    print(f"Wrote {len(rows)} rows to {args.out}")

if __name__ == "__main__":
    main()
'''\nintegrate_src = r''''''\nPath('/content/compute_entropy_curvature_pro.py').write_text(pro_src, encoding='utf-8')
Path('/content/integrate_entropy_curvature_pro.py').write_text(integrate_src, encoding='utf-8')
print('Scripts gravados em /content')


## Entrada do grafo — escolha uma opção
### Opção A — Upload manual (`.graphml`)


In [ ]:
# @title Upload do arquivo do grafo (.graphml)
from google.colab import files

uploaded = files.upload()
GRAPH_PATH = next(iter(uploaded.keys()), None)
print("GRAPH_PATH =", GRAPH_PATH)

### Opção B — Montar Google Drive (use se o grafo estiver no Drive)


In [ ]:
# @title Montar Drive e definir caminho do grafo
from google.colab import drive

drive.mount("/content/drive")
GRAPH_PATH = ""  # @param {type:"string"}
print("GRAPH_PATH =", GRAPH_PATH)

## Rodar cálculo (seguro em memória)
Use **amostragem** e método `approximate` para a Ricci (rápido). Se quiser, aumente `--curv-sample` depois.


In [ ]:
# @title Executar H_rate + kappa (amostrada)
import subprocess, shlex

assert GRAPH_PATH, "Defina GRAPH_PATH via upload (Opção A) ou Drive (Opção B)."
cmd = [
    "python",
    "/content/compute_entropy_curvature_pro.py",
    "--graph",
    GRAPH_PATH,
    "--beta",
    "1.0",
    "--regime",
    "integrated",
    "--curv-undirected",
    "--curv-sample",
    "500",
    "--ricci-method",
    "approximate",
    "--out",
    "/content/entropy_curvature_integrated_kappa_sampled.csv",
]
print(">>", " ".join(shlex.quote(c) for c in cmd))
subprocess.run(cmd, check=True)
print("OK: /content/entropy_curvature_integrated_kappa_sampled.csv")

## Integrar para `source_fig2_entropy_curvature.csv`


In [ ]:
# @title Gerar source_fig2_entropy_curvature.csv (β=1.0)
import subprocess, shlex

cmd = [
    "python",
    "/content/integrate_entropy_curvature_pro.py",
    "--in",
    "/content/entropy_curvature_integrated_kappa_sampled.csv",
    "--out",
    "/content/source_fig2_entropy_curvature.csv",
    "--select-beta",
    "1.0",
]
print(">>", " ".join(shlex.quote(c) for c in cmd))
subprocess.run(cmd, check=True)
print("OK: /content/source_fig2_entropy_curvature.csv")

## Salvar no Drive e/ou baixar


In [ ]:
# @title Salvar resultados no Drive (ajuste a pasta se quiser)
import os, shutil

OUT_DIR = "/content/drive/MyDrive/symbolic_manifold_outputs"  # @param {type:"string"}
os.makedirs(OUT_DIR, exist_ok=True)
for f in [
    "/content/entropy_curvature_integrated_kappa_sampled.csv",
    "/content/source_fig2_entropy_curvature.csv",
]:
    if os.path.exists(f):
        shutil.copy(f, OUT_DIR)
print("Arquivos salvos em", OUT_DIR)

In [ ]:
# @title Baixar para sua máquina
from google.colab import files

files.download("/content/source_fig2_entropy_curvature.csv")
files.download("/content/entropy_curvature_integrated_kappa_sampled.csv")