In [113]:
import importlib, incidence_graph
import inspect

importlib.reload(incidence_graph)
from incidence_graph import IncidenceGraph

In [115]:
# --- Create graph ---
G = IncidenceGraph(directed=True)

# --- Add layers ---
G.add_layer("genes")
G.add_layer("proteins")

# --- Add nodes to layers ---
G.add_node("G1", layer="genes", type="gene")
G.add_node("G2", layer="genes", type="gene")
G.add_node("P1", layer="proteins", type="protein")
G.add_node("P2", layer="proteins", type="protein")

# --- Add edges within layers ---
e1 = G.add_edge("G1", "G2", layer="genes", weight=1.0, relation="regulates")
e2 = G.add_edge("P1", "P2", layer="proteins", weight=2.0, relation="interacts")

# --- Add cross-layer edge (gene → protein) ---
e3 = G.add_edge("G1", "P1", layer="genes", weight=3.0, relation="encodes", propagate="all")

# --- Print basic info ---
print("Nodes:", G.nodes())
print("Edges:", G.edge_list())
print("Layers:", G.list_layers())
print()

# --- Layer membership ---
print("Genes layer:", G._layers["genes"])
print("Proteins layer:", G._layers["proteins"])
print()

# --- Cross-layer queries ---
print("Edge presence G1-P1 across layers:", G.edge_presence_across_layers("e2"))
print("Conserved edges (>=2 layers):", G.conserved_edges(min_layers=2))
print("Layer-specific edges (genes):", G.layer_specific_edges("genes"))
print("Layer intersection [genes, proteins]:", G.layer_intersection(["genes", "proteins"]))
print("Layer union [genes, proteins]:", G.layer_union(["genes", "proteins"]))
print()

# --- Temporal dynamics example ---
changes = G.temporal_dynamics(["genes", "proteins"], metric="edge_change")
print("Temporal dynamics (edge change from genes → proteins):", changes)

# --- Layer statistics ---
print("Layer statistics:", G.layer_statistics())


Nodes: ['G1', 'G2', 'P1', 'P2']
Edges: [('G1', 'G2', 'edge_0', 1.0), ('P1', 'P2', 'edge_1', 2.0), ('G1', 'P1', 'edge_2', 3.0)]
Layers: ['genes', 'proteins']

Genes layer: {'nodes': {'G1', 'P1', 'G2'}, 'edges': {'edge_2', 'edge_0'}, 'attributes': {}}
Proteins layer: {'nodes': {'G1', 'P1', 'P2'}, 'edges': {'edge_2', 'edge_1'}, 'attributes': {}}

Edge presence G1-P1 across layers: []
Conserved edges (>=2 layers): {'edge_2': 2}
Layer-specific edges (genes): {'edge_0'}
Layer intersection [genes, proteins]: {'nodes': {'G1', 'P1'}, 'edges': {'edge_2'}}
Layer union [genes, proteins]: {'nodes': {'G2', 'G1', 'P1', 'P2'}, 'edges': {'edge_2', 'edge_0', 'edge_1'}}

Temporal dynamics (edge change from genes → proteins): [{'added': 1, 'removed': 1, 'net_change': 0}]
Layer statistics: {'genes': {'nodes': 3, 'edges': 2, 'attributes': {}}, 'proteins': {'nodes': 3, 'edges': 2, 'attributes': {}}}


In [117]:
# --- test_incidence_graph.py ---

def find_edge_ids_by_pair(G: IncidenceGraph, u, v):
    """Return all edge_ids whose (source, target) == (u, v)."""
    return [eid for eid, (s, t, _) in G.edge_definitions.items() if s == u and t == v]

def test_basic_layers_and_edges():
    G = IncidenceGraph(directed=True)

    # layers
    G.add_layer("genes")
    G.add_layer("proteins")
    assert set(G.list_layers()) == {"genes", "proteins"}

    # nodes per layer
    G.add_node("G1", layer="genes", type="gene")
    G.add_node("G2", layer="genes", type="gene")
    G.add_node("P1", layer="proteins", type="protein")
    G.add_node("P2", layer="proteins", type="protein")

    # check global nodes
    assert set(G.nodes()) == {"G1", "G2", "P1", "P2"}

    # edges in-layer
    e_g = G.add_edge("G1", "G2", layer="genes", weight=1.0, relation="regulates")
    e_p = G.add_edge("P1", "P2", layer="proteins", weight=2.0, relation="interacts")

    # cross-layer edge; propagate to all layers containing either endpoint
    e_x = G.add_edge("G1", "P1", layer="genes", weight=3.0, relation="encodes", propagate="all")

    # global integrity
    edges = dict(G.edge_definitions)
    assert set(edges.keys()) == {e_g, e_p, e_x}
    assert edges[e_g] == ("G1", "G2", "regular")
    assert edges[e_p] == ("P1", "P2", "regular")
    assert edges[e_x] == ("G1", "P1", "regular")

    # layer contents
    genes = G.get_layer_info("genes")
    proteins = G.get_layer_info("proteins")

    assert genes["nodes"] == {"G1", "G2", "P1"}               # P1 added by cross-layer propagation "all"
    assert genes["edges"] == {e_g, e_x}

    assert proteins["nodes"] == {"P1", "P2", "G1"}            # G1 added by cross-layer propagation "all"
    assert proteins["edges"] == {e_p, e_x}

    # presence-by-edge_id
    assert set(G.edge_presence_across_layers(e_x)) == {"genes", "proteins"}
    assert set(G.edge_presence_across_layers(e_g)) == {"genes"}
    assert set(G.edge_presence_across_layers(e_p)) == {"proteins"}

    # presence-by (source,target) via helper
    assert set(find_edge_ids_by_pair(G, "G1", "P1")) == {e_x}
    assert set(find_edge_ids_by_pair(G, "G1", "G2")) == {e_g}
    assert set(find_edge_ids_by_pair(G, "P1", "P2")) == {e_p}

    # set ops
    inter = G.layer_intersection(["genes", "proteins"])
    assert inter["edges"] == {e_x}
    assert inter["nodes"] == {"G1", "P1"}

    union = G.layer_union(["genes", "proteins"])
    assert union["edges"] == {e_g, e_p, e_x}
    assert union["nodes"] == {"G1", "G2", "P1", "P2"}

    # conserved/specific
    conserved = G.conserved_edges(min_layers=2)
    assert conserved == {e_x: 2}

    specific_genes = G.layer_specific_edges("genes")
    assert specific_genes == {e_g}

    # temporal diffs (genes -> proteins): +1 (e_p), -1 (e_g), e_x persists
    changes = G.temporal_dynamics(["genes", "proteins"], metric="edge_change")
    assert changes == [{'added': 1, 'removed': 1, 'net_change': 0}]

    # removal semantics
    G.remove_edge(e_x)
    assert e_x not in G.edge_definitions
    assert e_x not in G.get_layer_edges("genes")
    assert e_x not in G.get_layer_edges("proteins")

    G.remove_node("G2")
    assert "G2" not in G.entity_to_idx
    assert "G2" not in G.get_layer_nodes("genes")

    # sanity: lengths consistent
    assert len(G.edges()) == 1  # only e_p remains
    assert len(G.nodes()) == 3  # G1, P1, P2


In [119]:

if __name__ == "__main__":
    test_basic_layers_and_edges()
    print("✅ All tests passed.")

✅ All tests passed.


In [121]:
# stress_test_incidence_graph.py
import random
import time

def stress_test(
    num_nodes=600,
    num_layers=5,
    num_edges=2000,
    hyperedge_size=4,
    seed=42
):
    random.seed(seed)
    t0 = time.time()

    G = IncidenceGraph(directed=True)

    # Layers
    layer_ids = [f"layer_{i}" for i in range(num_layers)]
    for lid in layer_ids:
        G.add_layer(lid)

    # Nodes
    nodes = [f"N{i}" for i in range(num_nodes)]
    for n in nodes:
        # no layer here; edges will place endpoints into layers as needed
        G.add_node(n)

    # Edges
    edge_count = 0
    for i in range(num_edges):
        lid = random.choice(layer_ids)
        etype = random.choice(["directed", "undirected", "self", "hyper"])
        propagate = random.choice(["none", "shared", "all"])
        w = random.random() + 1e-6  # avoid 0

        if etype == "directed":
            u, v = random.sample(nodes, 2)
            eid = f"e{i}"
            G.add_edge(u, v, layer=lid, edge_id=eid, weight=w, propagate=propagate, etype="directed")
            edge_count += 1

        elif etype == "undirected":
            # simulate undirected by adding both directions with distinct IDs
            u, v = random.sample(nodes, 2)
            eid1 = f"e{i}_a"
            eid2 = f"e{i}_b"
            G.add_edge(u, v, layer=lid, edge_id=eid1, weight=w, propagate=propagate, undirected=True)
            G.add_edge(v, u, layer=lid, edge_id=eid2, weight=w, propagate=propagate, undirected=True)
            edge_count += 2

        elif etype == "self":
            u = random.choice(nodes)
            eid = f"e{i}_self"
            G.add_edge(u, u, layer=lid, edge_id=eid, weight=w, propagate=propagate, self_loop=True)
            edge_count += 1

        elif etype == "hyper":
            # simulate hyperedge via clique expansion among k nodes
            hs = random.sample(nodes, hyperedge_size)
            for j in range(len(hs)):
                for k in range(j + 1, len(hs)):
                    u, v = hs[j], hs[k]
                    eid = f"e{i}_h{j}_{k}"
                    G.add_edge(u, v, layer=lid, edge_id=eid, weight=w, propagate=propagate, hyper=True)
                    edge_count += 1

    t_build = time.time()

    # --- Basic integrity ---
    print("Total nodes (global):", len(G.nodes()))
    print("Total edges (global):", len(G.edges()))
    print("Expected edges inserted (simulated):", edge_count)
    print("Layers:", G.list_layers())

    # --- Layer statistics ---
    stats = G.layer_statistics()
    for lid, s in stats.items():
        print(f"[{lid}] nodes={s['nodes']} edges={s['edges']}")

    # --- Intersections / unions (first two layers) ---
    if len(layer_ids) >= 2:
        L12_inter = G.layer_intersection(layer_ids[:2])
        L12_union  = G.layer_union(layer_ids[:2])
        print(f"Intersection({layer_ids[0]},{layer_ids[1]}): nodes={len(L12_inter['nodes'])}, edges={len(L12_inter['edges'])}")
        print(f"Union({layer_ids[0]},{layer_ids[1]}): nodes={len(L12_union['nodes'])}, edges={len(L12_union['edges'])}")

    # --- Conserved edges (>=2 layers) ---
    conserved = G.conserved_edges(min_layers=2)
    print("Conserved edges (≥2 layers):", len(conserved))

    # --- Random edge presence checks by edge_id ---
    eids = list(G.edge_definitions.keys())
    sample = random.sample(eids, min(10, len(eids)))
    for eid in sample:
        layers_with = G.edge_presence_across_layers(eid)
        s, t, _ = G.edge_definitions[eid]
        print(f"edge {eid} ({s}->{t}) in layers: {layers_with}")

    # --- Temporal dynamics (if ≥2 layers) ---
    if len(layer_ids) >= 2:
        td = G.temporal_dynamics(layer_ids[:2], metric="edge_change")
        print(f"Temporal dynamics {layer_ids[0]}→{layer_ids[1]}:", td)

    t_query = time.time()
    print(f"Build time: {t_build - t0:.3f}s | Query time: {t_query - t_build:.3f}s | Total: {t_query - t0:.3f}s")
    print("✅ Stress test completed")

In [123]:
if __name__ == "__main__":
    stress_test()

Total nodes (global): 600
Total edges (global): 5112
Expected edges inserted (simulated): 5112
Layers: ['layer_0', 'layer_1', 'layer_2', 'layer_3', 'layer_4']
[layer_0] nodes=564 edges=2859
[layer_1] nodes=562 edges=2874
[layer_2] nodes=565 edges=2949
[layer_3] nodes=560 edges=2729
[layer_4] nodes=563 edges=2743
Intersection(layer_0,layer_1): nodes=538, edges=2108
Union(layer_0,layer_1): nodes=588, edges=3625
Conserved edges (≥2 layers): 2854
edge e1484_h1_2 (N338->N102) in layers: ['layer_0', 'layer_1', 'layer_2', 'layer_3', 'layer_4']
edge e295_a (N157->N175) in layers: ['layer_4']
edge e374_h1_3 (N397->N261) in layers: ['layer_0']
edge e303_h0_2 (N253->N99) in layers: ['layer_1']
edge e604_b (N413->N58) in layers: ['layer_0', 'layer_1']
edge e288_b (N316->N67) in layers: ['layer_1', 'layer_4']
edge e616_h1_2 (N82->N499) in layers: ['layer_4']
edge e1029_h2_3 (N140->N329) in layers: ['layer_0', 'layer_1', 'layer_2', 'layer_4']
edge e1898_h1_2 (N184->N464) in layers: ['layer_0', 'laye

In [125]:
e_dir = G.add_edge("X","Y", layer="L1", weight=1.0, edge_directed=True)
e_und = G.add_edge("Y","Z", layer="L1", weight=1.0, edge_directed=False)
assert G.is_edge_directed(e_dir) is True
assert G.is_edge_directed(e_und) is False

In [127]:
def test_attr_tables():
    G = IncidenceGraph()

    # layers
    G.add_layer("genes", color="blue", weight=5)
    G.add_layer("proteins", owner="labX")
    assert G.layer_attributes.loc["genes", "color"] == "blue"
    assert G.get_layer_attr("proteins", "owner") == "labX"

    # nodes
    G.add_node("G1", layer="genes", type="gene", species="human")
    G.add_node("G1", layer="genes", symbol="BRCA1")  # upsert, no duplicate row
    assert G.node_attributes.loc["G1", "type"] == "gene"
    assert G.node_attributes.loc["G1", "symbol"] == "BRCA1"
    assert G.node_attributes.index.is_unique

    # edges (note: edge DF stores ONLY user attrs)
    e = G.add_edge("G1","P1", layer="genes", weight=3.0, directed=True, interaction="ppi")
    assert G.get_edge_attr(e, "interaction") == "ppi"
    # structure lives in views, not in attributes DF
    ev = G.edges_view()
    assert ev.loc[e, "source"] == "G1" and ev.loc[e, "target"] == "P1"
    assert ev.loc[e, "global_weight"] == 3.0

    print("✅ attribute tables OK")
test_attr_tables()

✅ attribute tables OK


In [129]:
eid = G.add_edge("A","B", layer="L1", weight=1.0, layer_weight=2.5)
assert G.get_effective_edge_weight(eid) == 1.0
assert G.get_effective_edge_weight(eid, layer="L1") == 2.5

In [133]:
G = IncidenceGraph(directed=True)
G.add_layer("L1")

eid = G.add_edge("A","B", layer="L1", weight=1.0, layer_weight=2.5)
assert G.get_effective_edge_weight(eid) == 1.0           # global
assert G.get_effective_edge_weight(eid, layer="L1") == 2.5  # override

# still can fetch the raw/global even with a layer given
assert G.get_effective_edge_weight(eid) == 1.0
