In [1]:
import numpy as np
import pygame
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
import sys
import os
import os
# print(os.listdir("/Users/zhangbaifeng/Documents/交大/課程資料/大三（下）/物件導向程式設計/oop-2025-proj-pycade"))

sys.path.append(os.path.abspath(".."))
from game import Game 


# ---------- Environment -----------
class RLGameEnv:
    def __init__(self, render_mode=False):
        pygame.init()
        self.screen = pygame.display.set_mode((640, 480))
        self.clock = pygame.time.Clock()
        self.render_mode = render_mode
        self.game = Game(self.screen, self.clock, ai_archetype="rl")
        self.action_space = 5  # e.g., 0:stay, 1:up, 2:down, 3:left, 4:right
        self.state_dim = (10,)  # 依據你的觀察空間設計（這只是範例）

    def reset(self):
        self.game.setup_initial_state()
        return self.get_state()

    def step(self, action):
        # --- 將 action 應用到 AI 玩家上 ---
        if self.game.player2_ai and self.game.player2_ai.is_alive:
            self.apply_action(action)

        self.game.update()
        state = self.get_state()
        reward, done = self.compute_reward_done()
        return state, reward, done, {}

    def render(self):
        if self.render_mode:
            self.game.draw()
            pygame.display.flip()

    def get_state(self):
        # ⚠️ 請根據遊戲觀察資訊設計，例如：
        p2 = self.game.player2_ai
        p1 = self.game.player1
        if p2 is None or p1 is None:
            return np.zeros(self.state_dim, dtype=np.float32)
        state = np.array([
            p2.rect.x / 640, p2.rect.y / 480,
            p1.rect.x / 640, p1.rect.y / 480,
            p2.lives, p1.lives,
            p2.score / 100.0, p1.score / 100.0,
            self.game.time_elapsed_seconds / 60.0,
            int(p2.is_alive)
        ], dtype=np.float32)
        return state

    def compute_reward_done(self):
        done = self.game.game_state in ["GAME_OVER", "SCORE_SUBMITTED"]
        reward = 0.0
        if self.game.game_state == "GAME_OVER":
            if self.game.time_up_winner == "AI":
                reward = 1.0
            elif self.game.time_up_winner == "P1":
                reward = -1.0
            elif self.game.time_up_winner == "DRAW":
                reward = 0.5
        return reward, done

    def apply_action(self, action):
        ai = self.game.player2_ai
        if not ai:
            return
        if action == 0:
            pass  # stay
        elif action == 1:
            ai.move(0, -1)
        elif action == 2:
            ai.move(0, 1)
        elif action == 3:
            ai.move(-1, 0)
        elif action == 4:
            ai.move(1, 0)

# ---------- Q Network ----------
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )

    def forward(self, x):
        return self.model(x)


# ---------- DQN Agent ----------
class DQNAgent:
    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99, epsilon=1.0, epsilon_min=0.1, epsilon_decay=0.995):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.q_net = QNetwork(state_dim, action_dim)
        self.target_net = QNetwork(state_dim, action_dim)
        self.target_net.load_state_dict(self.q_net.state_dict())
        self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr)
        self.memory = deque(maxlen=10000)
        self.batch_size = 64
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.update_target_steps = 100
        self.step_count = 0
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.q_net.to(self.device)
        self.target_net.to(self.device)

    def select_action(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.action_dim)
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        with torch.no_grad():
            q_values = self.q_net(state_tensor)
        return q_values.argmax().item()

    def store(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def train_step(self):
        if len(self.memory) < self.batch_size:
            return
        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).unsqueeze(1).to(self.device)
        rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device)

        q_values = self.q_net(states).gather(1, actions)
        with torch.no_grad():
            max_next_q = self.target_net(next_states).max(1)[0].unsqueeze(1)
            target_q = rewards + (1 - dones) * self.gamma * max_next_q

        loss = nn.MSELoss()(q_values, target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Epsilon decay
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        # 更新 target net
        self.step_count += 1
        if self.step_count % self.update_target_steps == 0:
            self.target_net.load_state_dict(self.q_net.state_dict())

# ---------- Training Loop ----------
def train(env, num_episodes=1000):
    agent = DQNAgent(state_dim=env.state_dim[0], action_dim=env.action_space)
    rewards_log = []

    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        done = False

        while not done:
            action = agent.select_action(state)
            next_state, reward, done, _ = env.step(action)
            agent.store(state, action, reward, next_state, done)
            agent.train_step()
            state = next_state
            total_reward += reward
            if env.render_mode:
                env.render()

        rewards_log.append(total_reward)
        print(f"Episode {episode+1}/{num_episodes} - Reward: {total_reward:.2f} - Epsilon: {agent.epsilon:.2f}")

        # 每 50 回合存一次模型
        if (episode + 1) % 50 == 0:
            torch.save(agent.q_net.state_dict(), f"dqn_model_episode{episode+1}.pt")

    return rewards_log

if __name__ == "__main__":
    env = RLGameEnv(render_mode=False)
    train(env, num_episodes=500)




pygame 2.6.1 (SDL 2.28.4, Python 3.8.19)
Hello from the pygame community. https://www.pygame.org/contribute.html
[MapManager DEBUG] Final generated map_data in get_randomized_map_layout:
Row 00: WWWWWWWWWWWWWWW
Row 01: W....D.DDDDD..W
Row 02: W.W.WDWDW.W.W.W
Row 03: W...D....DD..DW
Row 04: WDWDW.W.WDW.W.W
Row 05: WD.DD.D..D..DDW
Row 06: W.WDWDW.WDWDWDW
Row 07: W.D...DD......W
Row 08: WDW.W.WDW.W.W.W
Row 09: WD..D...DDD...W
Row 10: WWWWWWWWWWWWWWW
[MapManager DEBUG] map_data loaded in load_map_from_data:
Row 00: WWWWWWWWWWWWWWW
Row 01: W....D.DDDDD..W
Row 02: W.W.WDWDW.W.W.W
Row 03: W...D....DD..DW
Row 04: WDWDW.W.WDW.W.W
Row 05: WD.DD.D..D..DDW
Row 06: W.WDWDW.WDWDWDW
Row 07: W.D...DD......W
Row 08: WDW.W.WDW.W.W.W
Row 09: WD..D...DDD...W
Row 10: WWWWWWWWWWWWWWW
[AI_INIT] AIController for Player ID: 2111109808864 initialized. Initial state: PLANNING_PATH_TO_PLAYER. Debug Mode: True
[AI_RESET] Resetting AI state for Player ID: 2111109808864.
[AI_RESET] Target player initial spawn tile s

  states = torch.FloatTensor(states).to(self.device)


[DEBUG_ATTEMPT_MOVE] AI at (12,9), trying dx=0, dy=-1. IsAlive: True, ActionTimer: 0.029999999999999943
[DEBUG_ATTEMPT_MOVE_FAIL] Reason: Not alive or action_timer > 0. IsAlive: True, ActionTimer: 0.029999999999999943
  [AI_HANDLER] EXECUTING_PATH_CLEARANCE at (12, 9). Target A* node: N(13,9,'.',g=0.0,h=20.0)
    Evaluating A* segment idx 0: N(13,9,'.',g=0.0,h=20.0) from AI tile (12, 9)
      A* segment is EMPTY: N(13,9,'.',g=0.0,h=20.0). Setting sub-path to move.
    Set new movement sub-path: [(12, 9), (13, 9)]
[DEBUG_ATTEMPT_MOVE] AI at (12,9), trying dx=1, dy=0. IsAlive: True, ActionTimer: 0.012999999999999942
[DEBUG_ATTEMPT_MOVE_FAIL] Reason: Not alive or action_timer > 0. IsAlive: True, ActionTimer: 0.012999999999999942
[DEBUG_ATTEMPT_MOVE] AI at (12,9), trying dx=0, dy=1. IsAlive: True, ActionTimer: 0
[DEBUG_MOVE_FAIL] Player at (12, 9) trying to move to (12, 10).
    Blocked by: <class 'sprites.wall.Wall'> sprite.
    Obstacle rect: <rect(384, 320, 32, 32)>, its map coords shou

KeyboardInterrupt: 