In [13]:
#Enemies eneabled
class AStar3DPlanner:
    def __init__(self, floors, hatch_map, heuristic="manhattan", enemy_trajectories=None,
                 max_battery=2, cf="cf0", beta=0.0, gamma=0.0, epsilon=0.0, iota=0.0, tau=1e-6,
                 wait_cost=0.2, move_cost=1.0, hatch_cost=0.5, max_expansions=100000):
        self.floors = floors
        self.hatch_map = hatch_map
        self.heuristic_type = heuristic
        self.enemy_trajectories = enemy_trajectories or {}
        self.max_battery = max_battery

        # stats / guards
        self.expansions = 0
        self.max_expansions = int(max_expansions)
        self.max_time = None  # optionally set by caller

        # cost-function params
        self.cf = cf
        self.beta = float(beta)
        self.gamma = float(gamma)
        self.epsilon = float(epsilon)
        self.iota = float(iota)
        self.tau = float(tau)

        # movement costs
        self.wait_cost = float(wait_cost)   # small but non-zero
        self.move_cost = float(move_cost)
        self.hatch_cost = float(hatch_cost)

        # search state
        self.visited = set()         # (f,y,x,b,tmod)
        # dominance: (f,y,x,tmod) -> (best_battery, best_g_cost, best_priority)
        self.best_at = {}
        self.came_from = {}
        self.cost_so_far = {}
        self.actions = {}
        self.path = []
        self.goal = None

        # enemy timing (use MAX path length as global cycle)
        paths = [len(p[1]) for p in self.enemy_trajectories.values() if len(p[1]) > 0]
        self.enemy_period = max(paths) if paths else 1

        # precompute enemy occupancy for O(1) checks (cycled to global period)
        self.occ = self._precompute_enemy_occupancy()

        # shaping knobs
        self.recharge_threshold = 0.25
        self.recharge_bias_scale = 0.3  # was 2.0; much gentler lure to chargers

    # Helper methods for keeping track of and avoiding enemies
    def _precompute_enemy_occupancy(self):
        """Map (floor, tmod) -> set of (y,x) occupied by enemies across the global period."""
        occ = {}
        if self.enemy_period <= 0:
            self.enemy_period = 1
        for _, (efloor, path) in self.enemy_trajectories.items():
            if not path:
                continue
            L = len(path)
            for t in range(self.enemy_period):  # cycle each enemy through the global period
                y, x = path[t % L]
                occ.setdefault((efloor, t), set()).add((y, x))
        return occ

    def enemy_in_cell(self, floor, y, x, t):
        tmod = t % self.enemy_period
        return (y, x) in self.occ.get((floor, tmod), set())

    def edge_conflict(self, f0, y0, x0, f1, y1, x1, t):
        """
        True if moving (f0,y0,x0)->(f1,y1,x1) from t->t+1 collides with an enemy
        either by arriving at an occupied target at t+1 or by swapping edges.
        """
        if self.enemy_in_cell(f1, y1, x1, t + 1):  # target occupied at arrival
            return True
        # swap: enemy at target at t and at current at t+1
        if self.enemy_in_cell(f1, y1, x1, t) and self.enemy_in_cell(f0, y0, x0, t + 1):
            return True
        return False

    # --- Heuristics ---
    def heuristic(self, a, b):
        dz, dy, dx = abs(a[0] - b[0]), abs(a[1] - b[1]), abs(a[2] - b[2])
        if self.heuristic_type == "euclidean":
            return math.sqrt(dx**2 + dy**2 + dz**2)
        elif self.heuristic_type == "manhattan":
            return dx + dy + dz
        elif self.heuristic_type == "hybrid":
            return 0.5 * (dx + dy + dz) + 0.5 * math.sqrt(dx**2 + dy**2 + dz**2)
        else:
            return dx + dy + dz

    # --- Grid helpers ---
    def in_bounds(self, floor, y, x):
        return (0 <= floor < len(self.floors)
                and 0 <= y < self.floors[floor].shape[0]
                and 0 <= x < self.floors[floor].shape[1])

    def passable(self, floor, y, x):
        return self.floors[floor][y][x] != "#"

    def is_recharge_station(self, floor, y, x):
        return self.floors[floor][y][x] == "+"

    def get_move_neighbors(self, floor, y, x):
        # 4-neighborhood + optional hatch
        neighbors = []
        for dy, dx in [(-1,0),(1,0),(0,-1),(0,1)]:
            ny, nx = y + dy, x + dx
            if self.in_bounds(floor, ny, nx) and self.passable(floor, ny, nx):
                neighbors.append((floor, ny, nx))
        if (floor, y, x) in self.hatch_map:
            nf, ny, nx = self.hatch_map[(floor, y, x)]
            if self.in_bounds(nf, ny, nx) and self.passable(nf, ny, nx):
                neighbors.append((nf, ny, nx))
        return neighbors

    def should_wait(self, f, y, x, t):
        """Wait if leaving is unsafe and all exits are unsafe next tick."""
        if self.enemy_in_cell(f, y, x, t + 1):
            return True
        for dy, dx in [(-1,0),(1,0),(0,-1),(0,1)]:
            ny, nx = y + dy, x + dx
            if self.in_bounds(f, ny, nx) and self.passable(f, ny, nx):
                if not self.enemy_in_cell(f, ny, nx, t + 1):
                    return False
        return True

    # --- Augmentation terms ---
    def floor_distance(self, n, goal):
        return abs(n[0] - goal[0])

    def enemy_penalty(self, n, t):
        return 1.0 if self.enemy_in_cell(n[0], n[1], n[2], t) else 0.0

    def d_recharge(self, n):
        f, y, x = n
        if not (0 <= f < len(self.floors)):
            return 1e9
        floor = self.floors[f]
        H, W = floor.shape
        best = 1e9
        for yy in range(H):
            for xx in range(W):
                if floor[yy][xx] == "+":
                    d = abs(yy - y) + abs(xx - x)
                    if d < best:
                        best = d
        return best if best < 1e9 else 1e6

    # --- Planning ---
    def plan(self, start, goal):
        self.goal = goal
        self.expansions = 0
        self.visited.clear()
        self.best_at.clear()
        self.came_from.clear()
        self.cost_so_far.clear()
        self.actions.clear()
        self.path = []

        frontier = []
        # priority = (base_f, guidance, g), state
        heapq.heappush(frontier, ((0.0, 0.0, 0.0), (start, 0, self.max_battery)))
        self.came_from[(start, 0, self.max_battery)] = None
        self.cost_so_far[(start, 0, self.max_battery)] = 0.0

        while frontier and self.expansions < self.max_expansions:
            (_, _, _), (pos, t, b) = heapq.heappop(frontier)
            f, y, x = pos
            self.expansions += 1

            if self.max_time is not None and t > self.max_time:
                continue

            if pos == self.goal and not self.enemy_in_cell(f, y, x, t):
                self.reconstruct_path((pos, t, b))
                return True

            tmod = t % self.enemy_period
            visited_key = (f, y, x, b, tmod)
            if visited_key in self.visited:
                continue
            self.visited.add(visited_key)

            enqueued_move = False
            blocked_by_enemy = True

            for nf, ny, nx in self.get_move_neighbors(f, y, x):
                nt = t + 1
                nb = b - 1
                if nb < 0 and not self.is_recharge_station(f, y, x):
                    continue
                if self.is_recharge_station(nf, ny, nx):
                    nb = self.max_battery
                if self.edge_conflict(f, y, x, nf, ny, nx, t):
                    continue

                blocked_by_enemy = False

                is_hatch_move = (f != nf)
                parent_cost = self.cost_so_far[((f, y, x), t, b)]
                step_cost = self.hatch_cost if is_hatch_move else self.move_cost
                new_cost = parent_cost + step_cost  # g'

                # base A* values
                h = self.heuristic((nf, ny, nx), self.goal)
                base_f = new_cost + h

                # guidance ONLY as tiebreaker (no enemy penalty; collisions already blocked)
                n_next = (nf, ny, nx)
                battery_used_proxy = float(self.max_battery - nb)
                add_batt  = self.beta  * battery_used_proxy if self.cf in ("cf1","cf2","cf3","cf4") else 0.0
                add_floor = self.gamma * self.floor_distance(n_next, self.goal) if self.cf in ("cf2","cf3","cf4") else 0.0
                d_re = self.d_recharge(n_next)
                add_rechg = (- self.iota / (d_re + self.tau)) if (self.cf in ("cf4",) and nb <= self.max_battery * self.recharge_threshold) else 0.0
                recharge_bias = (-self.recharge_bias_scale
                                 if (self.is_recharge_station(nf, ny, nx) and not self.enemy_in_cell(nf, ny, nx, nt))
                                 else 0.0)
                guidance = add_batt + add_floor + add_rechg + recharge_bias

                # Early accept with proper path commit
                if (nf, ny, nx) == self.goal and not self.enemy_in_cell(nf, ny, nx, nt):
                    self.cost_so_far[((nf, ny, nx), nt, nb)] = new_cost
                    self.came_from[((nf, ny, nx), nt, nb)] = ((f, y, x), t, b)
                    self.actions[((nf, ny, nx), nt, nb)] = "HATCH" if is_hatch_move else "MOVE"
                    self.reconstruct_path(((nf, ny, nx), nt, nb))
                    return True

                # priority-aware dominance BUT using base_f (not guidance)
                state_key = (nf, ny, nx, nt % self.enemy_period)
                prev = self.best_at.get(state_key)  # (best_batt, best_g, best_base_f)
                candidate = (nb, new_cost, base_f)

                if prev is not None:
                    prev_b, prev_g, prev_f = prev
                    if prev_b >= nb and prev_g <= new_cost and prev_f <= base_f:
                        continue  # dominated on battery, g, and base_f

                if (prev is None or nb > prev[0] or new_cost < prev[1] or base_f < prev[2]):
                    self.best_at[state_key] = (nb, new_cost, base_f)
                    state = ((nf, ny, nx), nt, nb)
                    if state not in self.cost_so_far or new_cost < self.cost_so_far[state]:
                        self.cost_so_far[state] = new_cost
                        # final priority: (base_f, guidance, g) keeps A* order, nudged by guidance
                        heapq.heappush(frontier, ((base_f, guidance, new_cost), state))
                        self.came_from[state] = ((f, y, x), t, b)
                        self.actions[state] = "HATCH" if is_hatch_move else "MOVE"
                        enqueued_move = True

            # WAIT outside neighbor loop
            if not self.enemy_in_cell(f, y, x, t + 1) and (blocked_by_enemy or not enqueued_move):
                nt = t + 1
                nb = self.max_battery if self.is_recharge_station(f, y, x) else b
                parent_cost = self.cost_so_far[((f, y, x), t, b)]
                new_cost = parent_cost + self.wait_cost  # g'

                h = self.heuristic((f, y, x), self.goal)
                base_f = new_cost + h

                battery_used_proxy = float(self.max_battery - nb)
                add_batt  = self.beta  * battery_used_proxy if self.cf in ("cf1","cf2","cf3","cf4") else 0.0
                add_floor = self.gamma * self.floor_distance((f, y, x), self.goal) if self.cf in ("cf2","cf3","cf4") else 0.0
                d_re = self.d_recharge((f, y, x))
                add_rechg = (- self.iota / (d_re + self.tau)) if (self.cf in ("cf4",) and nb <= self.max_battery * self.recharge_threshold) else 0.0
                recharge_bias = (-self.recharge_bias_scale
                                 if (self.is_recharge_station(f, y, x) and not self.enemy_in_cell(f, y, x, nt))
                                 else 0.0)
                guidance = add_batt + add_floor + add_rechg + recharge_bias

                state_key = (f, y, x, nt % self.enemy_period)
                prev = self.best_at.get(state_key)  # (best_batt, best_g, best_base_f)
                if prev is None or nb > prev[0] or new_cost < prev[1] or base_f < prev[2]:
                    self.best_at[state_key] = (nb, new_cost, base_f)
                    state = ((f, y, x), nt, nb)
                    if state not in self.cost_so_far or new_cost < self.cost_so_far[state]:
                        self.cost_so_far[state] = new_cost
                        heapq.heappush(frontier, ((base_f, guidance, new_cost), state))
                        self.came_from[state] = ((f, y, x), t, b)
                        self.actions[state] = "WAIT"

        return False

    def reconstruct_path(self, goal_state):
        current_state = goal_state
        path = []
        while current_state in self.came_from and self.came_from[current_state] is not None:
            pos, t, b = current_state
            action = self.actions.get(current_state, "MOVE")
            path.append((pos, t, b, action))
            current_state = self.came_from[current_state]
        pos, t, b = current_state
        path.append((pos, t, b, "START"))
        path.reverse()
        # trim trailing WAIT at goal if present
        if len(path) >= 2:
            last = path[-1]
            second_last = path[-2]
            if (last[0] == self.goal and second_last[0] == self.goal and last[3] == "WAIT"):
                path.pop()
        self.path = path

    def plan_steps(self):
        return len(self.path) - 1 if self.path else None

In [15]:
df_enemies = run_analysis1(
    "Map2",
    cf_list=("cf0","cf1","cf2","cf3","cf4"),
    heuristic="euclidean",
    weights=dict(beta=0.2, gamma=1.0, epsilon=0.5, iota=1.0, tau=1e-6),
    wait_cost=0.3, move_cost=1.0, hatch_cost=0.5, max_expansions=300000,
    time_horizon_factor=None  # temporarily disable horizon while validating enemies
)
df_enemies

Unnamed: 0,map,heuristic,cf,found,expansions,steps
0,Map2,euclidean,cf0,True,68337,161
1,Map2,euclidean,cf1,True,40284,161
2,Map2,euclidean,cf2,True,40284,161
3,Map2,euclidean,cf3,True,40284,161
4,Map2,euclidean,cf4,True,40284,161
