In [47]:
import os
import json
import numpy as np
import trimesh
from planner_schema import validate_plan, PLAN_DEFAULT
from llm_planner import llm_make_plan_local

In [48]:
def robust_floor_z(all_vertices:np.ndarray,percentile:float = 1.0) -> float:
    z = all_vertices[:, 2]
    return float(np.percentile(z, percentile))

In [49]:
def aabb_from_mesh_world(mesh: trimesh.Trimesh, transform: np.ndarray) -> dict:
    bmin, bmax = mesh.bounds
    corners = np.array([
        [bmin[0], bmin[1], bmin[2], 1.0],
        [bmin[0], bmin[1], bmax[2], 1.0],
        [bmin[0], bmax[1], bmin[2], 1.0],
        [bmin[0], bmax[1], bmax[2], 1.0],
        [bmax[0], bmin[1], bmin[2], 1.0],
        [bmax[0], bmin[1], bmax[2], 1.0],
        [bmax[0], bmax[1], bmin[2], 1.0],
        [bmax[0], bmax[1], bmax[2], 1.0],
    ], dtype=np.float64)

    world = (transform @ corners.T).T[:, :3]
    wmin = world.min(axis=0)
    wmax = world.max(axis=0)

    center = (wmin + wmax) / 2.0
    size = (wmax - wmin)

    return {
        "aabb_min": wmin.tolist(),
        "aabb_max": wmax.tolist(),
        "center": center.tolist(),
        "size": size.tolist(),
    }

In [50]:
def build_occupancy_grid(objects, room_bounds, floor_z, cell_size=0.25, floor_eps=0.12):
    room_min, room_max = room_bounds
    xmin, ymin, _ = room_min
    xmax, ymax, _ = room_max

    width = xmax - xmin
    height = ymax - ymin

    nx = int(np.ceil(width / cell_size))
    ny = int(np.ceil(height / cell_size))

    if nx <= 0 or ny <= 0:
        return {
            "cell_size": float(cell_size),
            "origin_xy": [float(xmin), float(ymin)],
            "shape": [0, 0],
            "data_rle": []
        }

    grid = np.zeros((ny, nx), dtype=np.uint8)

    def xy_to_ij(x, y):
        j = int((x - xmin) / cell_size)
        i = int((y - ymin) / cell_size)
        return i, j

    for obj in objects:
        aabb_min = np.array(obj["aabb_min"], dtype=float)
        aabb_max = np.array(obj["aabb_max"], dtype=float)

        # only obstacles near the floor matter for occupancy
        if aabb_min[2] > floor_z + floor_eps:
            continue

        x0, y0 = aabb_min[0], aabb_min[1]
        x1, y1 = aabb_max[0], aabb_max[1]

        i0, j0 = xy_to_ij(x0, y0)
        i1, j1 = xy_to_ij(x1, y1)

        # clamp
        i0 = max(0, min(ny - 1, i0))
        i1 = max(0, min(ny - 1, i1))
        j0 = max(0, min(nx - 1, j0))
        j1 = max(0, min(nx - 1, j1))

        r0, r1 = sorted([i0, i1])
        c0, c1 = sorted([j0, j1])

        grid[r0:r1 + 1, c0:c1 + 1] = 1

    return {
        "cell_size": float(cell_size),
        "origin_xy": [float(xmin), float(ymin)],
        "shape": [int(ny), int(nx)],
        "data_rle": rle_encode(grid)
    }

In [51]:
def xy_to_ij(x, y):
    j = int((x - xmin) / cell_size)
    i = int((y - ymin) / cell_size)
    return i, j

    for obj in objects:
        aabb_min = np.array(obj["aabb_min"], dtype=np.float64)
        aabb_max = np.array(obj["aabb_max"], dtype=np.float64)

        if aabb_min[2] > floor_z + 0.15:
            continue
        if (aabb_max[2] - floor_z) > z_filter_height:
            pass

        x0, y0 = aabb_min[0], aabb_min[1]
        x1, y1 = aabb_max[0], aabb_max[1]

        i0, j0 = xy_to_ij(x0, y0)
        i1, j1 = xy_to_ij(x1, y1)

        i0 = max(0, min(ny - 1, i0))
        i1 = max(0, min(ny - 1, i1))
        j0 = max(0, min(nx - 1, j0))
        j1 = max(0, min(nx - 1, j1))

        grid[min(i0, i1):max(i0, i1) + 1, min(j0, j1):max(j0, j1) + 1] = 1

    return {
        "cell_size": float(cell_size),
        "origin_xy": [float(xmin), float(ymin)],
        "shape": [int(ny), int(nx)],
        "data_rle": rle_encode(grid)  
    }

In [52]:
def rle_encode(grid: np.ndarray):
    flat = grid.reshape(-1)
    if flat.size == 0:
        return []
    out = []
    prev = int(flat[0])
    count = 1
    for v in flat[1:]:
        v = int(v)
        if v == prev:
            count += 1
        else:
            out.append([prev, count])
            prev = v
            count = 1
    out.append([prev, count])
    return out

In [53]:
import numpy as np

def tag_objects_phase15(objects, room_min, room_max, floor_z,
                        floor_eps=0.12,
                        struct_area_ratio=0.45,
                        struct_span_ratio=0.85,
                        obstacle_min_area=0.04,
                        clutter_max_area=0.01,
                        max_obstacle_height=3.0):
    
    room_w = float(room_max[0] - room_min[0])
    room_d = float(room_max[1] - room_min[1])
    room_area = max(room_w * room_d, 1e-6)

    for obj in objects:
        size = np.array(obj["size"], dtype=float)  # [sx, sy, sz]
        aabb_min = np.array(obj["aabb_min"], dtype=float)
        aabb_max = np.array(obj["aabb_max"], dtype=float)

        sx, sy, sz = size
        footprint = float(max(sx, 0) * max(sy, 0))
        area_ratio = float(footprint / room_area)

        span_x = float(sx / max(room_w, 1e-6))
        span_y = float(sy / max(room_d, 1e-6))
        span_max = max(span_x, span_y)

        touches_floor = bool(aabb_min[2] <= (floor_z + floor_eps))
        floating = bool(aabb_min[2] > (floor_z + 0.30))  # above floor by 30cm+

        # Store debug features
        obj["features"] = {
            "footprint_m2": footprint,
            "room_area_ratio": area_ratio,
            "span_max": span_max,
            "touches_floor": touches_floor,
            "floating": floating,
            "height_m": float(sz),
        }

        # STRUCTURE: room shell / big planes / huge meshes
        if area_ratio >= struct_area_ratio or span_max >= struct_span_ratio or sz >= max_obstacle_height:
            obj["tag"] = "structure"
            continue

        # CLUTTER: tiny items or floating decor
        if footprint <= clutter_max_area or floating:
            obj["tag"] = "clutter"
            continue

        # OBSTACLE: meaningful footprint + near floor
        height = sz
        room_height = room_max[2] - floor_z
        is_vertical_structure = height > 0.6 * room_height   # wall reaches most of ceiling

        if is_vertical_structure:
            obj["tag"] = "structure"
        elif touches_floor and footprint >= obstacle_min_area:
            obj["tag"] = "obstacle"
        else:
            obj["tag"] = "clutter"

    return objects

In [54]:
def aabb2d_from_obj(obj):
    mn = np.array(obj["aabb_min"], dtype=float)
    mx = np.array(obj["aabb_max"], dtype=float)
    return mn[0], mn[1], mx[0], mx[1]


In [55]:
def inflate_aabb2d(aabb, margin):
    x0, y0, x1, y1 = aabb
    return (x0 - margin, y0 - margin, x1 + margin, y1 + margin)

In [56]:
def point_in_aabb2d(x, y, aabb):
    x0, y0, x1, y1 = aabb
    return (x0 <= x <= x1) and (y0 <= y <= y1)

In [57]:
def generate_candidates(room_min, room_max, floor_z, obstacles,
                        step=0.25, clearance=0.35, boundary_margin=0.10,
                        max_points=None):
    """
    Generate collision-free candidate points on the floor.

    step: grid spacing (meters)
    clearance: expands obstacle footprints to keep buffer space (meters)
    boundary_margin: keeps candidates away from room borders (meters)
    max_points: optionally cap number of candidates for speed
    """
    xmin, ymin, _ = room_min
    xmax, ymax, _ = room_max

    # Shrink usable room slightly so we don't place on boundary
    xmin_u = xmin + boundary_margin
    ymin_u = ymin + boundary_margin
    xmax_u = xmax - boundary_margin
    ymax_u = ymax - boundary_margin

    # Precompute inflated obstacle rectangles in XY
    inflated_rects = []
    for o in obstacles:
        rect = aabb2d_from_obj(o)
        inflated_rects.append(inflate_aabb2d(rect, clearance))

    xs = np.arange(xmin_u, xmax_u, step)
    ys = np.arange(ymin_u, ymax_u, step)

    valid = []
    blocked_count = 0

    z = float(floor_z)
    for y in ys:
        for x in xs:
            blocked = False
            for rect in inflated_rects:
                if point_in_aabb2d(x, y, rect):
                    blocked = True
                    break

            if not blocked:
                valid.append([float(x), float(y), z])
                if max_points is not None and len(valid) >= max_points:
                    break
            else:
                blocked_count += 1

        if max_points is not None and len(valid) >= max_points:
            break

    meta = {
        "step": float(step),
        "clearance": float(clearance),
        "boundary_margin": float(boundary_margin),
        "num_valid": int(len(valid)),
        "num_blocked": int(blocked_count),
        "num_obstacles": int(len(obstacles)),
    }
    return valid, meta

In [58]:
import matplotlib.pyplot as plt

def plot_topdown(room_min, room_max, obstacles, candidates, title="Top-down candidates"):
    xmin, ymin, _ = room_min
    xmax, ymax, _ = room_max

    plt.figure(figsize=(10, 3))
    plt.xlim(xmin, xmax)
    plt.ylim(ymin, ymax)
    plt.gca().set_aspect('equal', adjustable='box')

    # plot obstacles as rectangles
    for o in obstacles:
        x0, y0, x1, y1 = aabb2d_from_obj(o)
        w = x1 - x0
        h = y1 - y0
        rect = plt.Rectangle((x0, y0), w, h, fill=False)
        plt.gca().add_patch(rect)

    # plot candidates as points
    if len(candidates) > 0:
        pts = np.array(candidates)
        plt.scatter(pts[:, 0], pts[:, 1], s=10)

    plt.title(title)
    plt.xlabel("X")
    plt.ylabel("Y")
    plt.grid(True)
    plt.show()

In [59]:
def point_to_rect_distance(x,y,rect):
    x0,y0,x1,y1 = rect
    cx = min(max(x,x0),x1)
    cy = min(max(y,y0),y1)
    return float(np.hypot(x-cx,y-cy))

In [60]:
def score_candidates_by_clearance(candidates, obstacles, clearance=0.0):
    rects = [] #this is the obstacles rects
    for o in obstacles:
        r = aabb2d_from_obj(o)
        if clearance > 0:
            r = inflate_aabb2d(r, clearance)
        rects.append(r)

    scored = []
    for pt in candidates:
        x, y, _ = pt
        if rects:
            dmin = min(point_to_rect_distance(x, y, r) for r in rects)
        else:
            dmin = 1e9
        scored.append({
            "point": pt,
            "min_dist_to_obstacles": dmin,
            "score": dmin
        })

    scored.sort(key=lambda d: d["score"], reverse=True)
    return scored

In [61]:
def obstacle_footprint_area(o):
    sx, sy, _ = o["size"]
    return float(sx * sy)

In [62]:
def distance_to_room_wall(x, y, room_min, room_max):
    """
    Distance from point to nearest room boundary (axis-aligned bounds).
    """
    xmin, ymin, _ = room_min
    xmax, ymax, _ = room_max
    return float(min(x - xmin, xmax - x, y - ymin, ymax - y))

In [63]:
def min_distance_to_obstacles(x, y, obstacles):
    """
    Minimum Euclidean distance from point to any obstacle rectangle (0 if inside).
    """
    if not obstacles:
        return 1e9
    rects = [aabb2d_from_obj(o) for o in obstacles]
    return min(point_to_rect_distance(x, y, r) for r in rects)

In [64]:
def distance_to_target(x, y, target_xy):
    tx, ty = target_xy
    return float(np.hypot(x - tx, y - ty))

In [65]:
def score_candidates_weighted(candidates, obstacles, room_min, room_max, target_xy,
                              weights=None,
                              wall_pref="neutral",
                              eps=1e-9):
    """
    weights: dict with keys:
      - near_target (higher is better)
      - max_clearance (higher is better)
      - near_wall (higher is better if wall_pref='near', lower is better if wall_pref='far')

    wall_pref:
      - 'near'     -> prefer closer to wall
      - 'far'      -> prefer farther from wall (more central)
      - 'neutral'  -> don't use wall term much
    """

    if weights is None:
        weights = {"near_target": 0.6, "max_clearance": 0.3, "near_wall": 0.1}

    # compute raw metrics for each candidate
    rows = []
    for pt in candidates:
        x, y, z = pt
        d_clear = min_distance_to_obstacles(x, y, obstacles)     # bigger = better
        d_wall  = distance_to_room_wall(x, y, room_min, room_max)# depends on preference
        d_tgt   = distance_to_target(x, y, target_xy) if target_xy else 0.0

        rows.append({
            "point": pt,
            "d_clear": float(d_clear),
            "d_wall": float(d_wall),
            "d_target": float(d_tgt),
        })

    # normalize to [0,1] so weights behave nicely
    def norm(values, higher_is_better=True):
        v = np.array(values, dtype=float)
        vmin, vmax = float(v.min()), float(v.max())
        if abs(vmax - vmin) < eps:
            out = np.ones_like(v) * 0.5
        else:
            out = (v - vmin) / (vmax - vmin)
        if not higher_is_better:
            out = 1.0 - out
        return out

    clear_norm = norm([r["d_clear"] for r in rows], higher_is_better=True)
    # for wall term:
    # - wall_pref='near' => smaller d_wall better
    # - wall_pref='far'  => bigger d_wall better
    # - neutral => treat like far but weight small
    if wall_pref == "near":
        wall_norm = norm([r["d_wall"] for r in rows], higher_is_better=False)
    else:
        wall_norm = norm([r["d_wall"] for r in rows], higher_is_better=True)

    # near_target: smaller distance is better
    tgt_norm = norm([r["d_target"] for r in rows], higher_is_better=False) if target_xy else np.zeros(len(rows))

    # compute final score
    for i, r in enumerate(rows):
        r["components"] = {
            "near_target": float(tgt_norm[i]),
            "max_clearance": float(clear_norm[i]),
            "near_wall": float(wall_norm[i]),
        }
        r["weights"] = weights
        r["score"] = (
            weights.get("near_target", 0.0) * r["components"]["near_target"] +
            weights.get("max_clearance", 0.0) * r["components"]["max_clearance"] +
            weights.get("near_wall", 0.0) * r["components"]["near_wall"]
        )

    rows.sort(key=lambda d: d["score"], reverse=True)
    return rows

In [66]:
import math

def object_margin(width, depth, clearance):
    # safe radius approximation using half-diagonal
    half_diag = 0.5 * math.sqrt(width**2 + depth**2)
    return half_diag + clearance

In [67]:
def is_wall_like(o,
                 height_thresh=2.0,# tall vertically
                 footprint_ratio=4.0,# elongated footprint
                 min_long_side=1.5):
    """
    Detect walls using vertical height + footprint elongation.
    """
    sx, sy, sz = map(float, o["size"])

    long_side = max(sx, sy)
    short_side = min(sx, sy)

    tall = sz >= height_thresh
    elongated = (long_side / max(short_side, 1e-6)) >= footprint_ratio
    long_enough = long_side >= min_long_side

    return bool(tall and elongated and long_enough)

In [68]:
def build_named_targets(obstacles):
    """
    Create named targets from obstacle node names using keywords.
    Returns dict: { 'tv': (x,y), 'shelf': (x,y), 'window': (x,y), ... }
    """
    targets = {}
    for o in obstacles:
        name = o["node_name"].lower()
        cx, cy, _ = o["center"]

        if "shelf" in name and "shelf" not in targets:
            targets["shelf"] = (float(cx), float(cy))
        if "tv" in name and "tv" not in targets:
            targets["tv"] = (float(cx), float(cy))
        if "window" in name and "window" not in targets:
            targets["window"] = (float(cx), float(cy))

    return targets

In [69]:
def pick_target_from_prompt(user_text, targets):
    t = user_text.lower()
    for key in targets.keys():
        if key in t:
            return targets[key], key
    return None, None

In [70]:
def make_explanation(best, user_prompt, weights, wall_pref, target_name=None):
    comp = best["components"]
    msg = []
    msg.append(f"Prompt intent: '{user_prompt}'")
    msg.append(f"Decision weights: near_target={weights.get('near_target')}, max_clearance={weights.get('max_clearance')}, near_wall={weights.get('near_wall')} (wall_pref={wall_pref})")
    msg.append(f"Chosen point: {best['point']}")
    msg.append(f"Clearance score={comp['max_clearance']:.2f} (d_clear={best['d_clear']:.2f}m)")
    msg.append(f"Wall score={comp['near_wall']:.2f} (d_wall={best['d_wall']:.2f}m)")
    if target_name:
        msg.append(f"Target: {target_name} | near_target score={comp['near_target']:.2f} (d_target={best['d_target']:.2f}m)")
    else:
        msg.append("No target object detected in this scene; decision optimized mainly for free space and wall preference.")
    return "\n".join(msg)

In [72]:
def normalize_weights(w):
    s = sum(max(0.0, float(v)) for v in w.values())
    if s <= 0: 
        return PLAN_DEFAULT["weights"].copy()
    return {k: float(v)/s for k, v in w.items()}

def validate_plan(plan, available_targets):
    out = PLAN_DEFAULT.copy()
    out.update({k: plan.get(k, out[k]) for k in out.keys()})

    # object dims fallback
    if "object_dims" in plan:
        out["object_dims"] = {
            "width": float(plan["object_dims"].get("width", out["object_dims"]["width"])),
            "depth": float(plan["object_dims"].get("depth", out["object_dims"]["depth"]))
        }

    # target
    tname = plan.get("target", {}).get("name", None)
    out["target"] = {"name": tname if tname in available_targets else None}

    # weights normalize
    out["weights"] = normalize_weights(plan.get("weights", out["weights"]))

    # constraints
    c = plan.get("constraints", {})
    out["constraints"] = {
        "min_clearance": float(c.get("min_clearance", out["constraints"]["min_clearance"])),
        "boundary_margin": float(c.get("boundary_margin", out["constraints"]["boundary_margin"]))
    }

    wall_pref = plan.get("wall_pref", out["wall_pref"])
    out["wall_pref"] = wall_pref if wall_pref in ["near","far","neutral"] else "neutral"
    return out

In [73]:
def run_pipeline(glb_path: str, out_path: str, user_prompt: str):
    """
    Full pipeline:
    Scene -> Targets -> LLM Plan -> Candidate Generation -> Scoring -> Best placement
    """

    # -------------------- 1) Load + parse scene --------------------
    scene = trimesh.load(glb_path, force="scene")
    if not isinstance(scene, trimesh.Scene):
        raise ValueError("Expected a GLB/GLTF scene. Got a single mesh instead.")

    objects = []
    all_verts = []

    for geom_name, mesh in scene.geometry.items():
        nodes = scene.graph.geometry_nodes.get(geom_name, [])

        for node_name in nodes:
            transform, _ = scene.graph.get(node_name)

            # sample vertices (speed)
            v = mesh.vertices
            if v.shape[0] > 20000:
                idx = np.random.choice(v.shape[0], 20000, replace=False)
                v = v[idx]

            v_h = np.hstack([v, np.ones((v.shape[0], 1))])
            v_world = (transform @ v_h.T).T[:, :3]
            all_verts.append(v_world)

            obj = {"node_name": node_name, "geometry_name": geom_name}
            obj.update(aabb_from_mesh_world(mesh, transform))
            objects.append(obj)

    if not all_verts:
        raise RuntimeError("No geometry extracted from scene.")

    all_verts = np.vstack(all_verts)
    room_min = all_verts.min(axis=0)
    room_max = all_verts.max(axis=0)
    floor_z = robust_floor_z(all_verts, percentile=1.0)

    # -------------------- 2) Tagging --------------------
    objects = tag_objects_phase15(objects, room_min, room_max, floor_z)
    obstacles = [o for o in objects if o.get("tag") == "obstacle"]

    occ = build_occupancy_grid(obstacles, (room_min, room_max), floor_z, cell_size=0.25)
    targets = build_named_targets(obstacles)

    # -------------------- Scene summary --------------------
    scene_graph = {
        "schema_version": "1.0",
        "source_file": os.path.basename(glb_path),
        "room_bounds": {"min": room_min.tolist(), "max": room_max.tolist()},
        "floor_z": float(floor_z),
        "available_targets": list(targets.keys()),
        "num_obstacles": len(obstacles),
    }

    # -------------------- 3) LLM Plan --------------------
    context = {
        "room_size_m": {
            "width": float(room_max[0] - room_min[0]),
            "depth": float(room_max[1] - room_min[1]),
            "height": float(room_max[2] - floor_z),
        },
        "num_obstacles": len(obstacles),
        "available_targets": list(targets.keys()),
        "supported_objects": ["chair", "table", "sofa", "plant"],
    }

    try:
        raw_plan = llm_make_plan_local(user_prompt, context)
        plan = validate_plan(raw_plan, available_targets=context["available_targets"])
    except Exception as e:
        print("⚠️ LLM failed, using default plan:", e)
        raw_plan = {}
        plan = PLAN_DEFAULT

    # -------------------- 4) Candidate Generation --------------------
    w = plan["object_dims"]["width"]
    d = plan["object_dims"]["depth"]
    clearance = plan["constraints"]["min_clearance"]
    boundary_margin = plan["constraints"]["boundary_margin"]

    total_margin = object_margin(w, d, clearance=clearance)

    candidates, cand_meta = generate_candidates(
        room_min=room_min,
        room_max=room_max,
        floor_z=floor_z,
        obstacles=obstacles,
        step=0.25,
        clearance=total_margin,
        boundary_margin=boundary_margin
    )

    # -------------------- 5) Scoring --------------------
    target_name = plan["target"]["name"]
    target_xy = targets.get(target_name) if target_name else None

    scored = score_candidates_weighted(
        candidates=candidates,
        obstacles=obstacles,
        room_min=room_min,
        room_max=room_max,
        target_xy=target_xy,
        weights=plan["weights"],
        wall_pref=plan["wall_pref"]
    )

    best = scored[0]

    # -------------------- 6) Save outputs --------------------
    os.makedirs("outputs", exist_ok=True)

    with open("outputs/scene_graph.json", "w") as f:
        json.dump(scene_graph, f, indent=2)

    with open("outputs/plan.json", "w") as f:
        json.dump(plan, f, indent=2)

    with open("outputs/best_placement.json", "w") as f:
        json.dump(best, f, indent=2)

    with open("outputs/top10_placements.json", "w") as f:
        json.dump(scored[:10], f, indent=2)

    # -------------------- Debug prints --------------------
    print("\n===== LLM PLANNING RESULT =====")
    print("Prompt:", user_prompt)
    print("Target:", target_name)
    print("Weights:", plan["weights"])
    print("Best point:", best["point"])
    print("Score breakdown:", best["components"])

    return best, plan, scene_graph, candidates

In [74]:
best, plan, scene_graph, candidates = run_pipeline(
    "living_room.glb",
    "outputs/scene_graph.json",
    "Place a chair near the wall but keep walking space"
)

Llama.generate: 254 prefix-match hit, remaining 1 prompt tokens to eval
llama_perf_context_print:        load time =   10694.17 ms
llama_perf_context_print: prompt eval time =       0.00 ms /     1 tokens (    0.00 ms per token,      inf tokens per second)
llama_perf_context_print:        eval time =   25296.70 ms /   143 runs   (  176.90 ms per token,     5.65 tokens per second)
llama_perf_context_print:       total time =   25438.36 ms /   144 tokens
llama_perf_context_print:    graphs reused =        137


⚠️ LLM failed, using default plan: 'NoneType' object has no attribute 'get'

===== LLM PLANNING RESULT =====
Prompt: Place a chair near the wall but keep walking space
Target: None
Weights: {'near_target': 0.2, 'max_clearance': 0.7, 'near_wall': 0.1}
Best point: [7.027899910272897, -0.5975192089530289, -1.734517626987245]
Score breakdown: {'near_target': 0.0, 'max_clearance': 1.0, 'near_wall': 0.0}
