In [30]:
import numpy as np
from gbp.gbp import FactorGraph, VariableNode, Factor

# -----------------------
# SLAM-like base graph
# -----------------------
def make_slam_like_graph(N=100, step_size=25, loop_prob=0.05, loop_radius=50, prior_prop=0.0, rng=None,):
    nodes, edges = [], []
    positions = []
    x, y = 0.0, 0.0
    positions.append((x, y))
    for _ in range(1, int(N)):
        dx, dy = np.random.randn(2)
        norm = np.sqrt(dx**2 + dy**2) + 1e-6
        dx, dy = dx / norm * float(step_size), dy / norm * float(step_size)
        x, y = x + dx, y + dy
        positions.append((x, y))
    for i, (px, py) in enumerate(positions):
        nodes.append({
            "data": {"id": f"{i}", "layer": 0, "dim": 2},
            "position": {"x": float(px), "y": float(py)}
        })
    for i in range(int(N) - 1):
        edges.append({"data": {"source": f"{i}", "target": f"{i+1}"}})
    for i in range(int(N)):
        for j in range(i + 5, int(N)):
            if np.random.rand() < float(loop_prob):
                xi, yi = positions[i]; xj, yj = positions[j]
                if np.hypot(xi-xj, yi-yj) < float(loop_radius):
                    edges.append({"data": {"source": f"{i}", "target": f"{j}"}})

    # 确定强先验的节点集合
    if rng is None:
        rng = np.random.default_rng()

    if prior_prop <= 0.0:
        strong_ids = {0}
    elif prior_prop >= 1.0:
        strong_ids = set(range(N))
    else:
        k = max(1, int(np.floor(prior_prop * N)))
        strong_ids = set(rng.choice(N, size=k, replace=False).tolist())

    # 给 strong prior 节点加 edge
    for i in strong_ids:
        edges.append({
            "data": {"source": f"{i}", "target": "prior"}
        })


    return nodes, edges

# -----------------------
# GBP Graph 构建
# -----------------------
def build_noisy_pose_graph(
    nodes,
    edges,
    prior_sigma: float = 10,
    odom_sigma: float = 10,
    tiny_prior: float = 1e-10,
    rng=None,
):
    
    """
    构造二维 pose-only 因子图（线性，高斯），并注入噪声。
    参数:
      prior_sigma : 强先验的标准差（小=强）
      odom_sigma  : 里程计测量噪声标准差
      prior_prop  : 0.0=仅 anchor；(0,1)=按比例随机选；>=1.0=全体
      tiny_prior  : 所有节点默认加的极小先验，防止奇异
      seed        : 随机种子（可复现）
    """

    fg = FactorGraph(nonlinear_factors=False, eta_damping=0)

    var_nodes = []
    I2 = np.eye(2, dtype=float)
    N = len(nodes)

    # ---- 预生成噪声 ----
    prior_noises = {}
    odom_noises = {}

    if rng is None:
        rng = np.random.default_rng()

    # 为所有边生成噪声
    for e in edges:
        src = e["data"]["source"]; dst = e["data"]["target"]
        # 二元边
        if dst != "prior":
            odom_noises[(int(src[:]), int(dst[:]))] = rng.normal(0.0, odom_sigma, size=2)
        # 一元边（强先验）
        elif dst == "prior":
            prior_noises[int(src[:])] = rng.normal(0.0, prior_sigma, size=2)


    # ---- variable nodes ----
    for i, n in enumerate(nodes):
        v = VariableNode(i, dofs=2)
        v.GT = np.array([n["position"]["x"], n["position"]["y"]], dtype=float)

        # 极小先验
        v.prior.lam = tiny_prior * I2
        v.prior.eta = np.zeros(2, dtype=float)

        var_nodes.append(v)

    fg.var_nodes = var_nodes
    fg.n_var_nodes = len(var_nodes)


    # ---- prior factors ----
    def meas_fn_unary(x, *args):
        return x
    def jac_fn_unary(x, *args):
        return np.eye(2)
    # ---- odometry factors ----
    def meas_fn(xy, *args):
        return xy[2:] - xy[:2]
    def jac_fn(xy, *args):
        return np.array([[-1, 0, 1, 0],
                         [ 0,-1, 0, 1]], dtype=float)
    
    factors = []
    fid = 0

    for e in edges:
        src = e["data"]["source"]; dst = e["data"]["target"]
        if dst != "prior":
            i, j = int(src[:]), int(dst[:])
            vi, vj = var_nodes[i], var_nodes[j]

            meas = (vj.GT - vi.GT) + odom_noises[(i, j)]

            f = Factor(fid, [vi, vj], meas, odom_sigma, meas_fn, jac_fn)
            f.type = "base"
            linpoint = np.r_[vi.GT, vj.GT]
            f.compute_factor(linpoint=linpoint, update_self=True)

            factors.append(f)
            vi.adj_factors.append(f)
            vj.adj_factors.append(f)
            fid += 1

        else:
            i = int(src[:])
            vi = var_nodes[i]
            z = vi.GT + prior_noises[i]

            f = Factor(fid, [vi], z, prior_sigma, meas_fn_unary, jac_fn_unary)
            f.type = "prior"
            f.compute_factor(linpoint=z, update_self=True)

            factors.append(f)
            vi.adj_factors.append(f)
            fid += 1

    fg.factors = factors
    fg.n_factor_nodes = len(factors)
    return fg


# -----------------------
# 主程序：跑 100 次迭代
# -----------------------
if __name__ == "__main__":
    rng = np.random.default_rng(42)  # 固定随机种子

    # 1. 造一个图
    nodes, edges = make_slam_like_graph(N=50, step_size=25, loop_prob=0.1, loop_radius=50, prior_prop=0.1, rng=rng)

    # 2. 构建 GBP 图
    fg = build_noisy_pose_graph(nodes, edges, prior_sigma=10.0, odom_sigma=10.0, rng=rng)

    # 3. 跑迭代
    for it in range(1000):
        fg.synchronous_iteration()
        energy = fg.energy_map(include_priors=True, include_factors=True)
        print(f"Iter {it+1:03d} | Energy = {energy:.6f}")


Iter 001 | Energy = 774.130549
Iter 002 | Energy = 709.981416
Iter 003 | Energy = 556.189571
Iter 004 | Energy = 508.090565
Iter 005 | Energy = 473.331254
Iter 006 | Energy = 457.934329
Iter 007 | Energy = 25.867008
Iter 008 | Energy = 17.956291
Iter 009 | Energy = 15.984974
Iter 010 | Energy = 14.781472
Iter 011 | Energy = 13.736504
Iter 012 | Energy = 13.018778
Iter 013 | Energy = 12.545911
Iter 014 | Energy = 12.357473
Iter 015 | Energy = 12.347305
Iter 016 | Energy = 12.327396
Iter 017 | Energy = 12.319373
Iter 018 | Energy = 12.312751
Iter 019 | Energy = 12.310353
Iter 020 | Energy = 12.309470
Iter 021 | Energy = 12.308735
Iter 022 | Energy = 12.308173
Iter 023 | Energy = 12.307672
Iter 024 | Energy = 12.307270
Iter 025 | Energy = 12.306936
Iter 026 | Energy = 12.306633
Iter 027 | Energy = 12.306382
Iter 028 | Energy = 12.306174
Iter 029 | Energy = 12.305996
Iter 030 | Energy = 12.305842
Iter 031 | Energy = 12.305710
Iter 032 | Energy = 12.305598
Iter 033 | Energy = 12.305501
Iter

In [None]:
# -----------------------
# 初始化 & 边界
# -----------------------
def init_layers(N=100, step_size=25, loop_prob=0.05, loop_radius=50, prior_prop=0.0):
    base_nodes, base_edges = make_slam_like_graph(N, step_size, loop_prob, loop_radius, prior_prop)
    return [{"name": "base", "nodes": base_nodes, "edges": base_edges}]

In [None]:
layers = []


N=50, step=25, prob=0.1, radius=50, prior_prop=0.1
layers = init_layers(N, step, prob, radius, prior_prop)
pair_idx = 0


prior_sigma=10.0, odom_sigma=10.0

# 构建 GBP 图
gbp_graph = build_noisy_pose_graph(layers[0]["nodes"], layers[0]["edges"],
                                    prior_sigma=prior_sigma,
                                    odom_sigma=odom_sigma
                                    )
layers[0]["graph"] = gbp_graph
opts=[{"label":"base","value":"base"}]


if triggered == "add-layer":
    idx = next(i for i,L in enumerate(layers) if L["name"] == current_value)
    layers = layers[:idx+1]
    last = layers[-1]
    kind, k = parse_layer_name(last["name"])
    pair_idx = highest_pair_idx([L["name"] for L in layers])
    if kind == "super":
        abs_layer_idx = k*2
        abs_nodes, abs_edges = copy_to_abs(last["nodes"], last["edges"], abs_layer_idx)
        layers.append({"name":f"abs{k}", "nodes":abs_nodes, "edges":abs_edges})
    else:
        k_next = pair_idx + 1
        super_layer_idx = k_next*2 - 1
        if mode == "grid":
            super_nodes, super_edges, node_map = fuse_to_super_grid(last["nodes"], last["edges"], int(gx or 2), int(gy or 2), super_layer_idx)
        else:
            super_nodes, super_edges, node_map = fuse_to_super_knn(last["nodes"], last["edges"], int(kk or 8), super_layer_idx)
        layers.append({"name":f"super{k_next}", "nodes":super_nodes, "edges":super_edges, "node_map":node_map})
        layers[super_layer_idx]["graph"] = build_super_graph(layers)

        pair_idx = k_next


In [24]:
import numpy as np
from gbp.gbp import FactorGraph, VariableNode, Factor

# -----------------------
# SLAM-like base graph
# -----------------------
def make_slam_like_graph(N=100, step_size=25, loop_prob=0.2, loop_radius=50, rng=None):
    if rng is None:
        rng = np.random.default_rng()

    nodes, edges = [], []
    positions = []
    x, y = 0.0, 0.0
    positions.append((x, y))

    # 随机游走生成轨迹
    for _ in range(1, int(N)):
        dx, dy = rng.normal(size=2)
        norm = np.sqrt(dx**2 + dy**2) + 1e-6
        dx, dy = dx / norm * float(step_size), dy / norm * float(step_size)
        x, y = x + dx, y + dy
        positions.append((x, y))

    # 节点
    for i, (px, py) in enumerate(positions):
        nodes.append({
            "data": {"id": f"b{i}", "layer": 0, "dim": 2},
            "position": {"x": float(px), "y": float(py)}
        })

    # 顺序边
    for i in range(int(N) - 1):
        edges.append({"data": {"source": f"b{i}", "target": f"b{i+1}"}})

    # 随机回环
    for i in range(int(N)):
        for j in range(i + 5, int(N)):
            if rng.random() < float(loop_prob):
                xi, yi = positions[i]; xj, yj = positions[j]
                if np.hypot(xi-xj, yi-yj) < float(loop_radius):
                    edges.append({"data": {"source": f"b{i}", "target": f"b{j}"}})
    return nodes, edges




def build_noisy_pose_graph(
    nodes,
    edges,
    prior_sigma: float = 10,
    odom_sigma: float = 10,
    prior_prop: float = 0.0,
    tiny_prior: float = 1e-10,
    rng=None,
):
    if rng is None:
        rng = np.random.default_rng()

    fg = FactorGraph(nonlinear_factors=False, eta_damping=0)

    var_nodes = []
    I2 = np.eye(2, dtype=float)
    N = len(nodes)

    # ---- 预生成噪声 ----
    prior_noises = {}
    odom_noises = {}

    # 确定强先验的节点集合
    if prior_prop <= 0.0:
        strong_ids = {0}
    elif prior_prop >= 1.0:
        strong_ids = set(range(N))
    else:
        k = max(1, int(np.floor(prior_prop * N)))
        strong_ids = set(rng.choice(N, size=k, replace=False).tolist())

    # 为 strong prior 节点生成噪声
    for i in strong_ids:
        prior_noises[i] = rng.normal(0.0, prior_sigma, size=2)

    # 为所有边生成噪声
    for e in edges:
        src = e["data"]["source"]; dst = e["data"]["target"]
        if src.startswith("b") and dst.startswith("b"):
            odom_noises[(int(src[1:]), int(dst[1:]))] = rng.normal(0.0, odom_sigma, size=2)

    # ---- 变量节点 ----
    for i, n in enumerate(nodes):
        v = VariableNode(i, dofs=2)
        v.GT = np.array([n["position"]["x"], n["position"]["y"]], dtype=float)

        # 极小先验
        v.prior.lam = tiny_prior * I2
        v.prior.eta = np.zeros(2, dtype=float)

        var_nodes.append(v)

    fg.var_nodes = var_nodes
    fg.n_var_nodes = len(var_nodes)

    # ---- prior factors ----
    factors = []
    fid = 0
    for i in strong_ids:
        vi = var_nodes[i]
        lam_strong = I2 / (prior_sigma ** 2)
        z = vi.GT + prior_noises[i]

        def meas_fn_unary(x, *args):
            return x
        def jac_fn_unary(x, *args):
            return np.eye(2)

        f = Factor(fid, [vi], z, prior_sigma, meas_fn_unary, jac_fn_unary)
        f.type = "prior"
        f.compute_factor(linpoint=z, update_self=True)

        factors.append(f)
        vi.adj_factors.append(f)
        fid += 1

    # ---- odometry factors ----
    def meas_fn(xy, *args):
        return xy[2:] - xy[:2]
    def jac_fn(xy, *args):
        return np.array([[-1, 0, 1, 0],
                         [ 0,-1, 0, 1]], dtype=float)

    for e in edges:
        src = e["data"]["source"]; dst = e["data"]["target"]
        if not (src.startswith("b") and dst.startswith("b")):
            continue
        i, j = int(src[1:]), int(dst[1:])
        vi, vj = var_nodes[i], var_nodes[j]

        meas = (vj.GT - vi.GT) + odom_noises[(i, j)]

        f = Factor(fid, [vi, vj], meas, odom_sigma, meas_fn, jac_fn)
        f.type = "base"
        linpoint = np.r_[vi.GT, vj.GT]
        f.compute_factor(linpoint=linpoint, update_self=True)

        factors.append(f)
        vi.adj_factors.append(f)
        vj.adj_factors.append(f)
        fid += 1

    fg.factors = factors
    fg.n_factor_nodes = len(factors)
    return fg



# -----------------------
# 主程序：跑 100 次迭代
# -----------------------
if __name__ == "__main__":
    rng = np.random.default_rng(42)  # 固定随机种子

    # 1. 造一个图
    nodes, edges = make_slam_like_graph(N=50, step_size=25, loop_prob=0.1, loop_radius=50, rng=rng)

    # 2. 构建 GBP 图
    fg = build_noisy_pose_graph(nodes, edges, prior_sigma=10.0, odom_sigma=10.0, prior_prop=0.1, rng=rng)

    # 3. 跑迭代
    for it in range(1000):
        fg.synchronous_iteration()
        energy = fg.energy_map(include_priors=True, include_factors=True)
        print(f"Iter {it+1:03d} | Energy = {energy:.6f}")


Iter 001 | Energy = 736.609303
Iter 002 | Energy = 875.120492
Iter 003 | Energy = 1010.758653
Iter 004 | Energy = 613.047975
Iter 005 | Energy = 198.667459
Iter 006 | Energy = 48.870660
Iter 007 | Energy = 34.180285
Iter 008 | Energy = 26.933797
Iter 009 | Energy = 24.075363
Iter 010 | Energy = 23.071562
Iter 011 | Energy = 22.035149
Iter 012 | Energy = 21.511796
Iter 013 | Energy = 21.224165
Iter 014 | Energy = 21.028798
Iter 015 | Energy = 20.874596
Iter 016 | Energy = 20.773675
Iter 017 | Energy = 20.700691
Iter 018 | Energy = 20.652154
Iter 019 | Energy = 20.611446
Iter 020 | Energy = 20.579389
Iter 021 | Energy = 20.555237
Iter 022 | Energy = 20.535371
Iter 023 | Energy = 20.518726
Iter 024 | Energy = 20.504865
Iter 025 | Energy = 20.493166
Iter 026 | Energy = 20.483322
Iter 027 | Energy = 20.474894
Iter 028 | Energy = 20.467642
Iter 029 | Energy = 20.461428
Iter 030 | Energy = 20.456041
Iter 031 | Energy = 20.451377
Iter 032 | Energy = 20.447349
Iter 033 | Energy = 20.443859
Iter

In [None]:
def build_super_graph(layers, prior_sigma=1.0, odom_sigma=1.0):
    """
    基于 layers[-2] 的 base graph, 和 layers[-1] 的 super 分组，构造 super graph。
    要求: layers[-2]["graph"] 已经是构建好的基图（含 unary/binary 因子）。
    layers[-1]["node_map"]: { base_node_id(str, 如 'b12') -> super_node_id(str) }
    """
    # ---------- 取出 base & super ----------
    base_graph = layers[-2]["graph"]
    super_nodes = layers[-1]["nodes"]
    super_edges = layers[-1]["edges"]
    node_map    = layers[-1]["node_map"]   # 'bN' -> 'sX_...'

    # base: id(int)->VariableNode，方便查 dofs 和 mu
    id2var = {vn.variableID: vn for vn in base_graph.var_nodes}

    # ---------- super_id -> [base_id(int)] ----------
    super_groups = {}
    for b_str, s_id in node_map.items():
        if not b_str.startswith("b"):
            continue
        b_int = int(b_str[1:])
        super_groups.setdefault(s_id, []).append(b_int)

    # ---------- 为每个 super 组建立 (start, dofs) 表 ----------
    # local_idx[sid][bid] = (start, dofs), total_dofs[sid] = sum(dofs)
    local_idx   = {}
    total_dofs  = {}
    for sid, group in super_groups.items():
        off = 0
        local_idx[sid] = {}
        for bid in group:
            d = id2var[bid].dofs
            local_idx[sid][bid] = (off, d)
            off += d
        total_dofs[sid] = off

    # ---------- 创建 super VariableNodes ----------
    from gbp.gbp import FactorGraph, VariableNode, Factor
    fg = FactorGraph(nonlinear_factors=False, eta_damping=0)

    super_var_nodes = {}
    for i, sn in enumerate(super_nodes):
        sid = sn["data"]["id"]
        dofs = total_dofs.get(sid, 0)
        v = VariableNode(i, dofs=dofs)
        super_var_nodes[sid] = v
        fg.var_nodes.append(v)
    fg.n_var_nodes = len(fg.var_nodes)

    # ---------- 工具：拼接某组的 linpoint（用 base belief 均值） ----------
    def make_linpoint_for_group(sid):
        x = np.zeros(total_dofs[sid])
        for bid, (st, d) in local_idx[sid].items():
            mu = getattr(id2var[bid], "mu", None)
            if mu is None or len(mu) != d:
                mu = np.zeros(d)
            x[st:st+d] = mu
        return x

    # ---------- 3) super prior（组内 unary + 组内 binary） ----------
    def make_super_prior_factor(sid, base_factors):
        group = super_groups[sid]
        idx_map = local_idx[sid]
        ncols = total_dofs[sid]

        # 选出：所有变量都在组内的因子（unary 或 binary）
        in_group = []
        for f in base_factors:
            vids = [v.ID for v in f.adj_var_nodes]
            if all(vid in group for vid in vids):
                in_group.append(f)

        def meas_fn_super_prior(x_super, *args):
            resids = []
            for f in in_group:
                vids = [v.ID for v in f.adj_var_nodes]
                # 拼本因子的局部 x
                x_loc_list = []
                for vid in vids:
                    st, d = idx_map[vid]
                    x_loc_list.append(x_super[st:st+d])
                x_loc = np.concatenate(x_loc_list) if x_loc_list else np.zeros(0)
                resids.append(f.meas_fn(x_loc) - f.z)
            return np.concatenate(resids) if resids else np.zeros(0)

        def jac_fn_super_prior(x_super, *args):
            Jrows = []
            for f in in_group:
                vids = [v.ID for v in f.adj_var_nodes]
                # 构造本因子的局部 x，用于（潜在）非线性雅可比
                x_loc_list = []
                dims = []
                for vid in vids:
                    st, d = idx_map[vid]
                    dims.append(d)
                    x_loc_list.append(x_super[st:st+d])
                x_loc = np.concatenate(x_loc_list) if x_loc_list else np.zeros(0)

                Jloc = f.jac_fn(x_loc)
                # 将 Jloc 列块映射回 super 变量的列
                row = np.zeros((Jloc.shape[0], ncols))
                c0 = 0
                for vid, d in zip(vids, dims):
                    st, _ = idx_map[vid]
                    row[:, st:st+d] = Jloc[:, c0:c0+d]
                    c0 += d
                Jrows.append(row)
            return np.vstack(Jrows) if Jrows else np.zeros((0, ncols))

        # z_super：拼各 base 因子的 z
        z_list = [f.z for f in in_group]
        z_super = np.concatenate(z_list) if z_list else np.zeros(0)

        return meas_fn_super_prior, jac_fn_super_prior, z_super

    # 为每个 super node 加 prior 因子
    for sid in super_groups.keys():
        meas_fn, jac_fn, z = make_super_prior_factor(sid, base_graph.factors)
        f = Factor(len(fg.factors), [super_var_nodes[sid]], z, prior_sigma, meas_fn, jac_fn)
        f.type = "super_prior"
        lin0 = make_linpoint_for_group(sid)
        f.compute_factor(linpoint=lin0, update_self=True)
        fg.factors.append(f)
        super_var_nodes[sid].adj_factors.append(f)

    # ---------- 4) super between（只取跨组 binary） ----------
    def make_super_between_factor(sidA, sidB, base_factors):
        groupA, groupB = super_groups[sidA], super_groups[sidB]
        idxA, idxB = local_idx[sidA], local_idx[sidB]
        nA, nB = total_dofs[sidA], total_dofs[sidB]

        cross = []
        for f in base_factors:
            vids = [v.ID for v in f.adj_var_nodes]
            if len(vids) != 2:
                continue
            i, j = vids
            # 恰有一端在 A，一端在 B
            if (i in groupA and j in groupB) or (i in groupB and j in groupA):
                cross.append(f)

        def meas_fn_super_between(xAB, *args):
            xA, xB = xAB[:nA], xAB[nA:]
            resids = []
            for f in cross:
                i, j = [v.ID for v in f.adj_var_nodes]
                if i in groupA:
                    si, di = idxA[i]
                    sj, dj = idxB[j]
                    xi = xA[si:si+di]
                    xj = xB[sj:sj+dj]
                else:
                    si, di = idxB[i]
                    sj, dj = idxA[j]
                    xi = xB[si:si+di]
                    xj = xA[sj:sj+dj]
                x_loc = np.concatenate([xi, xj])
                resids.append(f.meas_fn(x_loc) - f.z)
            return np.concatenate(resids) if resids else np.zeros(0)

        def jac_fn_super_between(xAB, *args):
            xA, xB = xAB[:nA], xAB[nA:]
            Jrows = []
            for f in cross:
                i, j = [v.ID for v in f.adj_var_nodes]
                if i in groupA:
                    si, di = idxA[i]
                    sj, dj = idxB[j]
                    xi = xA[si:si+di]
                    xj = xB[sj:sj+dj]
                    left_start, right_start = si, nA + sj
                else:
                    si, di = idxB[i]
                    sj, dj = idxA[j]
                    xi = xB[si:si+di]
                    xj = xA[sj:sj+dj]
                    left_start, right_start = nA + si, sj
                x_loc = np.concatenate([xi, xj])
                Jloc = f.jac_fn(x_loc)

                row = np.zeros((Jloc.shape[0], nA + nB))
                row[:, left_start:left_start+di]   = Jloc[:, :di]
                row[:, right_start:right_start+dj] = Jloc[:, di:di+dj]
                Jrows.append(row)
            return np.vstack(Jrows) if Jrows else np.zeros((0, nA + nB))

        z_super = np.concatenate([f.z for f in cross]) if cross else np.zeros(0)
        return meas_fn_super_between, jac_fn_super_between, z_super

    for e in super_edges:
        u, v = e["data"]["source"], e["data"]["target"]
        if u == "prior" or v == "prior":
            continue
        if (u not in super_groups) or (v not in super_groups):
            continue
        meas_fn, jac_fn, z = make_super_between_factor(u, v, base_graph.factors)
        f = Factor(len(fg.factors), [super_var_nodes[u], super_var_nodes[v]], z, odom_sigma, meas_fn, jac_fn)
        f.type = "super_between"
        lin0 = np.concatenate([make_linpoint_for_group(u), make_linpoint_for_group(v)])
        f.compute_factor(linpoint=lin0, update_self=True)
        fg.factors.append(f)
        super_var_nodes[u].adj_factors.append(f)
        super_var_nodes[v].adj_factors.append(f)

    fg.n_factor_nodes = len(fg.factors)
    return fg
