In [2]:
import gymnasium as gym
import numpy as np
from collections import defaultdict
from gymnasium.envs.registration import register
import minigrid


  from pkg_resources import resource_stream, resource_exists


In [3]:


# ==========================================================
# 0) Register your custom env (assumes enemy_doorkey_env.py exists)
# ==========================================================
register(
    id="MiniGrid-DoorKey-6x6-Enemy-v0",
    entry_point="enemy_doorkey_env:DoorKeyWithEnemyEnv",
    kwargs={"size":6}
)

# ==========================================================
# 1) Useful actions only (DoorKey)
# ==========================================================
USEFUL_ACTIONS = [0, 1, 2, 3, 5]  # left, right, forward, pickup, toggle

def sample_useful_action():
    return int(np.random.choice(USEFUL_ACTIONS))

# ==========================================================
# 2) State encoder (no enemy_pos -> smaller table)
# ==========================================================
def get_door_open(u):
    for j in range(u.height):
        for i in range(u.width):
            obj = u.grid.get(i, j)
            if obj is not None and obj.type == "door":
                return 1 if obj.is_open else 0
    return 0

def get_state(env):
    u = env.unwrapped
    ax, ay = u.agent_pos
    ad = int(u.agent_dir)
    has_key = 1 if (u.carrying is not None and getattr(u.carrying, "type", None) == "key") else 0
    door_open = get_door_open(u)
    return (ax, ay, ad, has_key, door_open)

# ==========================================================
# 3) Distance-to-goal shaping helpers (aligned with success)
# ==========================================================
def find_goal_pos(u):
    for j in range(u.height):
        for i in range(u.width):
            obj = u.grid.get(i, j)
            if obj is not None and obj.type == "goal":
                return (i, j)
    return None

def manhattan(a, b):
    return abs(a[0] - b[0]) + abs(a[1] - b[1])

# ==========================================================
# 4) Q-learning
# ==========================================================
def make_Q(n_actions):
    return defaultdict(lambda: np.zeros(n_actions, dtype=np.float32))

def epsilon_greedy(Q, s, eps):
    if np.random.rand() < eps:
        return sample_useful_action()
    q = Q[s]
    return int(max(USEFUL_ACTIONS, key=lambda a: q[a]))

def train_q(
    env,
    Q,
    episodes=6000,
    max_steps=500,
    alpha=0.15,
    gamma=0.99,
    eps_start=1.0,
    eps_end=0.15,
    eps_decay=0.99985,
    living_penalty=-0.001,
    dist_coef=0.02,
):
    rewards, success = [], []
    eps = eps_start

    for ep in range(episodes):
        obs, info = env.reset()
        s = get_state(env)

        goal = find_goal_pos(env.unwrapped)
        if goal is None:
            raise RuntimeError("Goal not found in grid. Check environment generation.")

        total_shaped = 0.0
        last_env_r = 0.0

        # --- NEW: minimal stability trackers ---
        td_abs_sum = 0.0
        td_max = 0.0
        q_abs_sum = 0.0
        q_max = 0.0
        updates = 0
        # -------------------------------------

        for t in range(max_steps):
            prev_dist = manhattan(env.unwrapped.agent_pos, goal)

            a = epsilon_greedy(Q, s, eps)
            obs2, r, terminated, truncated, info = env.step(a)

            r = max(float(r), -0.2)   # clip negative rewards
            last_env_r = r

            s2 = get_state(env)
            done = terminated or truncated

            new_dist = manhattan(env.unwrapped.agent_pos, goal)

            # reward shaping
            shaped = r
            shaped += dist_coef * (prev_dist - new_dist)
            shaped += living_penalty

            # Q-learning update (max over useful actions)
            next_best = 0.0 if done else float(max(Q[s2][aa] for aa in USEFUL_ACTIONS))
            td_target = shaped + gamma * next_best
            td_err = td_target - Q[s][a]

            Q[s][a] += alpha * td_err

            # --- NEW: collect minimal diagnostics ---
            td_abs = abs(td_err)
            td_abs_sum += td_abs
            td_max = max(td_max, td_abs)

            q_vals = Q[s]
            q_abs_sum += np.mean(np.abs(q_vals))
            q_max = max(q_max, np.max(q_vals))
            updates += 1
            # ---------------------------------------

            total_shaped += shaped
            s = s2

            if done:
                break

        eps = max(eps_end, eps * eps_decay)
        rewards.append(total_shaped)
        success.append(1 if last_env_r > 0 else 0)

        if (ep + 1) % 500 == 0:
            print(
                f"Episode {ep+1}/{episodes} | eps={eps:.3f} | "
                f"avg_reward(last500)={np.mean(rewards[-500:]):.3f} | "
                f"success_rate(last500)={np.mean(success[-500:]):.2%} | "
                f"TDabs(avg/max)={td_abs_sum/max(1,updates):.4f}/{td_max:.4f} | "
                f"Q(abs/max)={q_abs_sum/max(1,updates):.3f}/{q_max:.3f}"
            )

    return rewards, success, eps


In [4]:
# ==========================================================
# 5) Curriculum: Phase 1 (DoorKey) -> Phase 2 (DoorKey+Enemy)
# ==========================================================
env_easy = gym.make("MiniGrid-DoorKey-6x6-v0")  # train without rendering
Q = make_Q(env_easy.action_space.n)

print("\n=== Phase 1: train on MiniGrid-DoorKey-6x6-v0 ===")
rewards1, success1, eps_after = train_q(
    env_easy,
    Q,
    episodes=6000,
    max_steps=500,
    alpha=0.18,
    gamma=0.99,
    eps_start=1.0,
    eps_end=0.10,
    eps_decay=0.99985,
    living_penalty=-0.002,
    dist_coef=0.015,
)


=== Phase 1: train on MiniGrid-DoorKey-6x6-v0 ===
Episode 500/6000 | eps=0.928 | avg_reward(last500)=-0.444 | success_rate(last500)=34.60% | TDabs(avg/max)=0.0128/0.2089 | Q(abs/max)=0.315/0.571
Episode 1000/6000 | eps=0.861 | avg_reward(last500)=-0.248 | success_rate(last500)=54.00% | TDabs(avg/max)=0.0047/0.5234 | Q(abs/max)=0.362/0.539
Episode 1500/6000 | eps=0.799 | avg_reward(last500)=-0.036 | success_rate(last500)=70.60% | TDabs(avg/max)=0.0272/0.3436 | Q(abs/max)=0.359/0.654
Episode 2000/6000 | eps=0.741 | avg_reward(last500)=-0.023 | success_rate(last500)=70.00% | TDabs(avg/max)=0.0102/0.0926 | Q(abs/max)=0.387/0.698
Episode 2500/6000 | eps=0.687 | avg_reward(last500)=0.016 | success_rate(last500)=73.80% | TDabs(avg/max)=0.0081/0.2728 | Q(abs/max)=0.380/0.620
Episode 3000/6000 | eps=0.638 | avg_reward(last500)=0.073 | success_rate(last500)=75.60% | TDabs(avg/max)=0.0081/0.2599 | Q(abs/max)=0.402/0.611
Episode 3500/6000 | eps=0.592 | avg_reward(last500)=0.047 | success_rate(las

In [5]:
env_hard = gym.make("MiniGrid-DoorKey-6x6-Enemy-v0")  # train without rendering

print("\n=== Phase 2: continue on MiniGrid-DoorKey-6x6-Enemy-v0 ===")
rewards2, success2, _ = train_q(
    env_hard,
    Q,
    episodes=8000,
    max_steps=500,
    alpha=0.08,
    gamma=0.99,
    eps_start=1.0,
    eps_end=0.25,
    eps_decay=0.99992,
    living_penalty=-0.002,
    dist_coef=0.025,
)

print("\nDone.")



=== Phase 2: continue on MiniGrid-DoorKey-6x6-Enemy-v0 ===
Episode 500/8000 | eps=0.961 | avg_reward(last500)=-0.469 | success_rate(last500)=2.60% | TDabs(avg/max)=0.0073/0.4219 | Q(abs/max)=0.298/0.496
Episode 1000/8000 | eps=0.923 | avg_reward(last500)=-0.422 | success_rate(last500)=4.20% | TDabs(avg/max)=0.0074/0.1802 | Q(abs/max)=0.287/0.551
Episode 1500/8000 | eps=0.887 | avg_reward(last500)=-0.417 | success_rate(last500)=4.20% | TDabs(avg/max)=0.0106/0.6446 | Q(abs/max)=0.328/0.563
Episode 2000/8000 | eps=0.852 | avg_reward(last500)=-0.397 | success_rate(last500)=7.00% | TDabs(avg/max)=0.0516/0.6200 | Q(abs/max)=0.290/0.434
Episode 2500/8000 | eps=0.819 | avg_reward(last500)=-0.397 | success_rate(last500)=7.20% | TDabs(avg/max)=0.0068/0.4382 | Q(abs/max)=0.303/0.474
Episode 3000/8000 | eps=0.787 | avg_reward(last500)=-0.357 | success_rate(last500)=5.60% | TDabs(avg/max)=0.0302/0.6935 | Q(abs/max)=0.308/0.632
Episode 3500/8000 | eps=0.756 | avg_reward(last500)=-0.314 | success_ra