source fastmap-env/bin/activate

In [11]:
import heapq
import math
import numpy as np
from collections import defaultdict

class FastMap:
    def __init__(self, graph, K=2, epsilon=1e-3):
        self.graph = graph
        self.K = K
        self.epsilon = epsilon
        self.nodes = list(graph.keys())
        self.embeddings = {v: [0.0] * K for v in self.nodes}
        self.edge_weights = {
            (u, v): w for u in graph for v, w in graph[u]
        }

    def dijkstra(self, start):
        dist = {v: float('inf') for v in self.nodes}
        dist[start] = 0
        pq = [(0, start)]

        while pq:
            d, u = heapq.heappop(pq)
            if d > dist[u]:
                continue
            for v, w in self.graph[u]:
                if dist[v] > dist[u] + w:
                    dist[v] = dist[u] + w
                    heapq.heappush(pq, (dist[v], v))
        return dist

    def get_farthest_pair(self):
        na = self.nodes[0]
        for _ in range(10):
            dist = self.dijkstra(na)
            nb = max(dist, key=lambda x: dist[x])
            dist = self.dijkstra(nb)
            na = max(dist, key=lambda x: dist[x])
        return na, nb

    def compute_embeddings(self):
        for k in range(self.K):
            na, nb = self.get_farthest_pair()
            dist_a = self.dijkstra(na)
            dist_b = self.dijkstra(nb)
            dab = dist_a[nb]
            if dab < self.epsilon:
                break

            for v in self.nodes:
                da = dist_a[v]
                db = dist_b[v]
                coord = (da + dab - db) / 2
                self.embeddings[v][k] = coord

            # Update weights (L1 projection)
            new_graph = defaultdict(list)
            for u in self.graph:
                for v, w in self.graph[u]:
                    if u < v:
                        diff = abs(self.embeddings[u][k] - self.embeddings[v][k])
                        new_w = max(0.0, self.edge_weights[(u, v)] - diff)
                        new_graph[u].append((v, new_w))
                        new_graph[v].append((u, new_w))
                        self.edge_weights[(u, v)] = new_w
                        self.edge_weights[(v, u)] = new_w
            self.graph = new_graph

        return self.embeddings

In [12]:
def grid_aware_heuristic(u, v, embeddings, width, grid):
    """
    Combines FastMap embedding distance with grid-geometric awareness
    """
    # Standard L1 distance from FastMap embeddings
    fastmap_dist = sum(abs(x - y) for x, y in zip(embeddings[u], embeddings[v]))
    
    # Grid coordinates
    ur, uc = divmod(u, width)
    vr, vc = divmod(v, width)
    
    # Octile distance (accounts for diagonal movement costs)
    dx = abs(uc - vc)
    dy = abs(ur - vr)
    octile_dist = max(dx, dy) + (math.sqrt(2) - 1) * min(dx, dy)
    
    # Use the maximum of both heuristics to ensure admissibility
    # while leveraging FastMap's ability to capture graph structure
    return max(fastmap_dist, octile_dist)

def passable(ch):
    return ch in ".GSW" or ('A' <= ch <= 'Z')

def improved_astar_with_fastmap(graph, start, goal, embeddings, width, grid):
    """
    A* with improved grid-aware heuristic
    """
    open_set = []
    heuristic_val = grid_aware_heuristic(start, goal, embeddings, width, grid)
    heapq.heappush(open_set, (heuristic_val, 0, start))
    
    came_from = {}
    g_score = {v: float('inf') for v in graph}
    g_score[start] = 0

    while open_set:
        _, current_cost, current = heapq.heappop(open_set)

        if current == goal:
            path = [current]
            while current in came_from:
                current = came_from[current]
                path.append(current)
            return path[::-1], g_score[goal]

        # Skip if we've found a better path to this node
        if current_cost > g_score[current]:
            continue

        for neighbor, weight in graph[current]:
            tentative_g = g_score[current] + weight
            if tentative_g < g_score[neighbor]:
                came_from[neighbor] = current
                g_score[neighbor] = tentative_g
                h_score = grid_aware_heuristic(neighbor, goal, embeddings, width, grid)
                f_score = tentative_g + h_score
                heapq.heappush(open_set, (f_score, tentative_g, neighbor))

    return None, float('inf')

In [13]:
# Alternative: Pure octile heuristic (more conservative)
def octile_heuristic(u, v, width):
    """
    Standard octile distance heuristic for 8-connected grids
    """
    ur, uc = divmod(u, width)
    vr, vc = divmod(v, width)
    dx = abs(uc - vc)
    dy = abs(ur - vr)
    return max(dx, dy) + (math.sqrt(2) - 1) * min(dx, dy)

def astar_with_octile(graph, start, goal, width):
    """
    A* with pure octile heuristic for comparison
    """
    open_set = []
    heuristic_val = octile_heuristic(start, goal, width)
    heapq.heappush(open_set, (heuristic_val, 0, start))
    
    came_from = {}
    g_score = {v: float('inf') for v in graph}
    g_score[start] = 0

    while open_set:
        _, current_cost, current = heapq.heappop(open_set)

        if current == goal:
            return g_score[goal]

        if current_cost > g_score[current]:
            continue

        for neighbor, weight in graph[current]:
            tentative_g = g_score[current] + weight
            if tentative_g < g_score[neighbor]:
                came_from[neighbor] = current
                g_score[neighbor] = tentative_g
                h_score = octile_heuristic(neighbor, goal, width)
                f_score = tentative_g + h_score
                heapq.heappush(open_set, (f_score, tentative_g, neighbor))

    return float('inf')

In [16]:
# --- FastMap benchmark smoke-test -------------------------------------
# This cell adds tiny loaders, a graph builder, an A* runner, and
# verifies a few arena.map scenarios against their optimal lengths.

import math, heapq, numpy as np

# ── 1. 8-connected neighbour offsets ──────────────────────────────────
DIRS_8 = [(-1,0),(1,0),(0,-1),(0,1), (-1,-1),(-1,1),(1,-1),(1,1)]

# ── 2. Map & scenario loaders ─────────────────────────────────────────
def load_map(path):
    with open(path) as f:
        header = [next(f).strip() for _ in range(4)]
        h = int(header[1].split()[1])
        w = int(header[2].split()[1])
        grid = [list(next(f).rstrip()) for _ in range(h)]
    return grid, h, w

def cell_cost(ch):
    if ch in ".G":      return 1
    if ch == "S":       return 2
    if 'A' <= ch <= 'Z':return ord(ch) - ord('A') + 1
    return math.inf

def build_graph(grid):
    h, w = len(grid), len(grid[0])
    vid = lambda r,c: r*w + c
    G = {}
    for r,row in enumerate(grid):
        for c,ch in enumerate(row):
            if not passable(ch): continue
            u = vid(r,c)
            G[u] = []
            for dr,dc in DIRS_8:
                nr, nc = r+dr, c+dc
                if not (0<=nr<h and 0<=nc<w):      # out of bounds
                    continue
                if not passable(grid[nr][nc]):     # dest blocked
                    continue
                # --- NEW: block diagonal if either side neighbour is blocked
                if dr and dc:
                    if (not passable(grid[r][nc])   # horizontal side
                        or not passable(grid[nr][c])):  # vertical side
                        continue
                step  = math.sqrt(2) if dr and dc else 1.0
                cost  = step * (cell_cost(ch)+cell_cost(grid[nr][nc]))/2
                G[u].append((vid(nr,nc), cost))
    return G, w


def load_scen(path, limit=None):
    with open(path) as f:
        next(f)  # skip 'version 1'
        for i, line in enumerate(f):
            if limit and i >= limit: break
            _, _, w, _, sx, sy, gx, gy, opt = line.split()
            w = int(w)
            yield int(sy)*w + int(sx), int(gy)*w + int(gx), float(opt)

# ── 3. A* search with FastMap heuristic ───────────────────────────────
def l1(embed, u, v):
    return np.abs(embed[u] - embed[v]).sum()

def astar(G, start, goal, h):
    open_ = [(h(start), 0, start)]
    g_best = {start: 0}
    while open_:
        f, g, u = heapq.heappop(open_)
        if g > g_best[u]:               # stale entry
            continue
        if u == goal:
            return g
        for v, w in G[u]:
            ng = g + w
            if ng < g_best.get(v, math.inf):
                g_best[v] = ng
                heapq.heappush(open_, (ng + h(v), ng, v))


# ── 4. Smoke-test on arena.map ────────────────────
MAP_FILE  = "./data/DAO/arena.map"
SCEN_FILE = "./data/DAO/arena.map.scen"

grid, _, _  = load_map(MAP_FILE)
G, width    = build_graph(grid)

fm   = FastMap(G, K=10)          # your existing implementation
emb = fm.compute_embeddings()

import math

# Your existing setup code here...
grid, _, _ = load_map(MAP_FILE)
G, width = build_graph(grid)
fm = FastMap(G, K=10)
emb = fm.compute_embeddings()

# Convert embeddings format if needed
if isinstance(emb, dict):
    max_id = max(emb.keys())
    tmp = np.zeros((max_id + 1, len(next(iter(emb.values())))))
    for k, vec in emb.items():
        tmp[k] = vec
    emb = tmp

# Define both heuristics for comparison
def old_heuristic(a, b, embeddings):
    """Original L1 FastMap heuristic"""
    return sum(abs(x - y) for x, y in zip(embeddings[a], embeddings[b]))

def new_heuristic(a, b, embeddings, width):
    """Grid-aware FastMap + Octile heuristic"""
    # FastMap L1 distance
    fastmap_dist = sum(abs(x - y) for x, y in zip(embeddings[a], embeddings[b]))
    
    # Grid octile distance
    ar, ac = divmod(a, width)
    br, bc = divmod(b, width)
    dx, dy = abs(ac - bc), abs(ar - br)
    octile_dist = max(dx, dy) + (math.sqrt(2) - 1) * min(dx, dy)
    
    return max(fastmap_dist, octile_dist)

def astar_general(G, start, goal, heuristic_func):
    """General A* that accepts any heuristic function"""
    open_set = [(heuristic_func(start), 0, start)]
    g_best = {start: 0}
    
    while open_set:
        f, g, u = heapq.heappop(open_set)
        if g > g_best[u]:
            continue
        if u == goal:
            return g
        for v, w in G[u]:
            ng = g + w
            if ng < g_best.get(v, math.inf):
                g_best[v] = ng
                heapq.heappush(open_set, (ng + heuristic_func(v), ng, v))
    return math.inf

In [17]:
# Test harness showing both approaches
def test_both_heuristics(MAP_FILE, SCEN_FILE, limit=10):
    """
    Compare FastMap vs Octile heuristics
    """
    import math
    
    # Load map and build graph (assuming your existing functions)
    grid, _, _ = load_map(MAP_FILE)
    G, width = build_graph(grid)
    
    # Compute FastMap embeddings
    fm = FastMap(G, K=10)
    emb = fm.compute_embeddings()
    
    # Convert embeddings to proper format if needed
    if isinstance(emb, dict):
        max_id = max(emb.keys())
        tmp = np.zeros((max_id + 1, len(next(iter(emb.values())))))
        for k, vec in emb.items():
            tmp[k] = vec
        emb = tmp
    
    print("Comparing heuristics:")
    print("Case | FastMap Error | Octile Error | Optimal")
    print("-" * 50)
    
    for i, (s, g, opt) in enumerate(load_scen(SCEN_FILE, limit=limit)):
        # Test with grid-aware FastMap heuristic
        try:
            fastmap_dist = improved_astar_with_fastmap(G, s, g, emb, width, grid)
            if isinstance(fastmap_dist, tuple):
                fastmap_dist = fastmap_dist[1]
        except:
            fastmap_dist = float('inf')
        
        # Test with pure octile heuristic
        octile_dist = astar_with_octile(G, s, g, width)
        
        fm_error = abs(fastmap_dist - opt) if fastmap_dist != float('inf') else float('inf')
        oct_error = abs(octile_dist - opt) if octile_dist != float('inf') else float('inf')
        
        print(f"{i:4d} | {fm_error:11.6f} | {oct_error:10.6f} | {opt:7.3f}")
        
        if i >= limit - 1:
            break

In [18]:

# Test and compare results
print("Case |   Old Error   |   New Error   |  Improvement  | Optimal")
print("-" * 65)

problematic_cases = []
fixed_cases = []

for i, (s, g, opt) in enumerate(load_scen(SCEN_FILE, limit=100)):
    # Test with old heuristic
    old_hfn = lambda u, goal=g: old_heuristic(u, goal, emb)
    old_dist = astar_general(G, s, g, old_hfn)
    old_error = abs(old_dist - opt)
    
    # Test with new heuristic
    new_hfn = lambda u, goal=g: new_heuristic(u, goal, emb, width)
    new_dist = astar_general(G, s, g, new_hfn)
    new_error = abs(new_dist - opt)
    
    improvement = old_error - new_error
    
    # Track problematic cases
    if old_error > 0.5:  # Cases with significant error
        problematic_cases.append((i, old_error, new_error, improvement))
    
    if improvement > 0.1:  # Cases that were significantly improved
        fixed_cases.append((i, old_error, new_error, improvement))
    
    # Print results
    status = ""
    if old_error > 0.5 and new_error < 0.1:
        status = " ← FIXED!"
    elif improvement > 0.1:
        status = " ← Better"
    
    print(f"{i:4d} | {old_error:11.6f} | {new_error:11.6f} | {improvement:+10.6f} | {opt:7.3f}{status}")

print(f"\n--- SUMMARY ---")
print(f"Problematic cases (old error > 0.5): {len(problematic_cases)}")
print(f"Significantly improved cases: {len(fixed_cases)}")
print(f"Average improvement: {np.mean([imp for _, _, _, imp in fixed_cases]) if fixed_cases else 0:.6f}")

if problematic_cases:
    print(f"\nMost problematic cases that should be fixed:")
    for case, old_err, new_err, imp in sorted(problematic_cases, key=lambda x: x[1], reverse=True)[:5]:
        print(f"  Case {case}: {old_err:.6f} → {new_err:.6f} (improvement: {imp:+.6f})")


Case |   Old Error   |   New Error   |  Improvement  | Optimal
-----------------------------------------------------------------
   0 |    0.000000 |    0.000000 |  +0.000000 |   3.000
   1 |    0.000000 |    0.000000 |  +0.000000 |   2.414
   2 |    0.000000 |    0.000000 |  +0.000000 |   2.000
   3 |    0.000000 |    0.000000 |  +0.000000 |   1.414
   4 |    0.000000 |    0.000000 |  +0.000000 |   3.828
   5 |    0.000000 |    0.000000 |  +0.000000 |   2.000
   6 |    0.000000 |    0.000000 |  +0.000000 |   2.414
   7 |    0.000000 |    0.000000 |  +0.000000 |   3.828
   8 |    0.000000 |    0.000000 |  +0.000000 |   3.828
   9 |    0.000000 |    0.000000 |  +0.000000 |   3.414
  10 |    0.000000 |    0.000000 |  +0.000000 |   5.828
  11 |    0.000000 |    0.000000 |  +0.000000 |   4.828
  12 |    0.000000 |    0.000000 |  +0.000000 |   7.657
  13 |    0.000000 |    0.000000 |  +0.000000 |   5.657
  14 |    0.000000 |    0.000000 |  +0.000000 |   4.414
  15 |    0.000000 |    0.00000