In [186]:
import EnvironmentSimulator
print(EnvironmentSimulator.__file__)


/Users/liuzitang/Desktop/CSC584/Project/Deadzone/src/EnvironmentSimulator.py


In [187]:
import math
import random
from collections import deque
import numpy as np
import os
import torch
import torch.nn as nn
import torch.optim as optim

# ----------------------------
# Environment Simulator
# ----------------------------
class EnvironmentSimulator:
    def __init__(self):
        self.map = [
            [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1],
            [1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1],
            [1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1],
            [1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1],
            [1,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,0,0,0,1],
            [1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,1,1,0,0,1],
            [1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1],
            [1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1],
            [1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1],
            [1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1],
            [1,0,0,0,0,1,1,1,0,0,1,0,0,0,1,0,0,0,0,0,1],
            [1,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,1],
            [1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1],
            [1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1],
            [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1]
        ]
        self.player_gun = random.randint(0,1)
        self.player_pos = self.initialize_position()
        self.player_orientation = random.randint(-180,180)
        self.player_health = 10

        self.opponent_gun = random.randint(0,1)
        self.opponent_pos = self.initialize_position()
        self.opponent_orientation = random.randint(-180,180)
        self.opponent_health = 10

    def initialize_position(self):
        pos = (random.randint(1,13), random.randint(1,18))
        while self.map[pos[0]][pos[1]] == 1:
            pos = (random.randint(1,13), random.randint(1,18))
        return [pos[0], pos[1]]

    def move(self, offset, movePlayer=True):
        if movePlayer:
            nr = self.player_pos[0] + offset[0]
            nc = self.player_pos[1] + offset[1]
            if self.map[nr][nc] != 1:
                self.player_pos = [nr,nc]
        else:
            nr = self.opponent_pos[0] + offset[0]
            nc = self.opponent_pos[1] + offset[1]
            if self.map[nr][nc] != 1:
                self.opponent_pos = [nr,nc]

    def turn(self, turnLeft=True, turnPlayer=True):
        if turnPlayer:
            self.player_orientation += 5 if turnLeft else -5
            self.player_orientation = self.mapToRange(self.player_orientation)
        else:
            self.opponent_orientation += 5 if turnLeft else -5
            self.opponent_orientation = self.mapToRange(self.opponent_orientation)

    def mapToRange(self, deg):
        deg %= 360
        if deg > 180: deg -= 360
        return deg

    def switchWeapon(self, switchPlayerWeapon=True):
        if switchPlayerWeapon:
            self.player_gun = not self.player_gun
        else:
            self.opponent_gun = not self.opponent_gun

    def shoot(self, playerShooting=True):
        if playerShooting:
            ang = math.degrees(math.atan2(
                self.opponent_pos[0]-self.player_pos[0],
                self.opponent_pos[1]-self.player_pos[1]))
            diff = ang - self.player_orientation
            dist = math.hypot(
                self.opponent_pos[0]-self.player_pos[0],
                self.opponent_pos[1]-self.player_pos[1])
            if self.player_gun == 1:
                if abs(diff)<10 and not self.has_obstacle_between() and dist*32<=300:
                    self.opponent_health -= 1
                    return True
            else:
                if abs(diff)<10 and not self.has_obstacle_between() and dist*32<=100:
                    self.opponent_health -= 3
                    return True
        else:
            ang = math.degrees(math.atan2(
                self.player_pos[0]-self.opponent_pos[0],
                self.player_pos[1]-self.opponent_pos[1]))
            diff = ang - self.opponent_orientation
            dist = math.hypot(
                self.player_pos[0]-self.opponent_pos[0],
                self.player_pos[1]-self.opponent_pos[1])
            if self.opponent_gun == 1:
                if abs(diff)<10 and not self.has_obstacle_between() and dist*32<=300:
                    self.player_health -= 1
                    return True
            else:
                if abs(diff)<10 and not self.has_obstacle_between() and dist*32<=100:
                    self.player_health -= 3
                    return True

    def perform(self, action, movePlayer=True):
        # perform on player or opponent
        if action == 0:
            self.shoot(movePlayer)
        elif action == 1:
            self.move((-1, 0), movePlayer)
        elif action == 2:
            self.move((1, 0), movePlayer)
        elif action == 3:
            self.move((0, -1), movePlayer)
        elif action == 4:
            self.move((0, 1), movePlayer)
        elif action == 5:
            self.move((-1, -1), movePlayer)
        elif action == 6:
            self.move((-1, 1), movePlayer)
        elif action == 7:
            self.move((1, -1), movePlayer)
        elif action == 8:
            self.move((1, 1), movePlayer)
        elif action == 9:
            self.turn(True, movePlayer)
        elif action == 10:
            self.turn(False, movePlayer)
        elif action == 11:
            self.switchWeapon(movePlayer)

    def ray_distance(self, angle_deg, max_dist=None):
        if max_dist is None:
            max_dist = max(len(self.map), len(self.map[0]))
        rad = math.radians(angle_deg)
        dx, dy = math.cos(rad), math.sin(rad)
        px, py = self.player_pos[1], self.player_pos[0]
        for d in range(1, max_dist+1):
            x = int(round(px + dx*d))
            y = int(round(py + dy*d))
            if not (0 <= y < len(self.map) and 0 <= x < len(self.map[0])):
                return d
            if self.map[y][x] == 1:
                return d
        return max_dist

    def bresenham_line(self, x0, y0, x1, y1):
        pts = []
        dx = abs(x1-x0); dy = -abs(y1-y0)
        sx = 1 if x0<x1 else -1; sy = 1 if y0<y1 else -1
        err = dx+dy
        while True:
            pts.append((x0,y0))
            if x0==x1 and y0==y1: break
            e2 = 2*err
            if e2 >= dy: err += dy; x0 += sx
            if e2 <= dx: err += dx; y0 += sy
        return pts

    def has_obstacle_between(self):
        line = self.bresenham_line(
            self.player_pos[1], self.player_pos[0],
            self.opponent_pos[1], self.opponent_pos[0])
        for x,y in line[1:-1]:
            if self.map[y][x] == 1: return True
        return False

    def is_terminal(self):
        return self.player_health == 0

    def reset(self):
        self.player_gun = random.randint(0,1)
        self.player_pos = self.initialize_position()
        self.player_orientation = random.randint(-180,180)
        self.player_health = 10
        self.opponent_gun = random.randint(0,1)
        self.opponent_pos = self.initialize_position()
        self.opponent_orientation = random.randint(-180,180)
        self.opponent_health = 10

# ----------------------------
# Q-Network & Replay Buffer
# ----------------------------
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size, hidden_size=256):
        super().__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, action_size)
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    def add(self, s,a,r,ns,d):
        self.buffer.append((s,a,r,ns,d))
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        s,a,r,ns,d = map(np.array, zip(*batch))
        return (torch.FloatTensor(s),
                torch.LongTensor(a),
                torch.FloatTensor(r),
                torch.FloatTensor(ns),
                torch.FloatTensor(d))
    def __len__(self):
        return len(self.buffer)

# ----------------------------
# DQN Agent with Metrics
# ----------------------------
class DQNAgent:
    def __init__(self, state_size, action_size,
                 lr=1e-3, gamma=0.99,
                 epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995,
                 target_update_freq=10, tau=None, opponent=False):
        self.state_size    = state_size
        self.action_size   = action_size
        self.q_network     = QNetwork(state_size, action_size)
        self.target_network= QNetwork(state_size, action_size)
        self.optimizer     = optim.Adam(self.q_network.parameters(), lr=lr)
        self.replay_buffer = ReplayBuffer(10000)
        self.batch_size    = 64
        self.gamma         = gamma
        self.environment_simulator = EnvironmentSimulator()
        self.player        = not opponent
        self.prev_action = None
        self.prev_position = None

        # epsilon-greedy
        self.epsilon       = epsilon_start
        self.epsilon_end   = epsilon_end
        self.epsilon_decay = epsilon_decay

        # target update
        self.target_update_freq = target_update_freq
        self.tau           = tau
        self._train_steps  = 0

        # Metrics
        self.hit_count        = 0
        self.miss_count       = 0
        self.kill_count       = 0
        self.move_count       = 0
        self.turn_left_count  = 0
        self.turn_right_count = 0
        self.switch_count     = 0

        self.update_target()

    def reset_metrics(self):
        self.hit_count        = 0
        self.miss_count       = 0
        self.kill_count       = 0
        self.move_count       = 0
        self.turn_left_count  = 0
        self.turn_right_count = 0
        self.switch_count     = 0
        self.prev_action = None
        self.prev_position = None


    def update_target(self):
        self.target_network.load_state_dict(self.q_network.state_dict())

    def select_action(self, state):
        if random.random() < self.epsilon:
            return random.randint(0, self.action_size-1)
        with torch.no_grad():
            s = torch.FloatTensor(state).unsqueeze(0)
            return self.q_network(s).argmax().item()

    def train_step(self):
        if len(self.replay_buffer) < self.batch_size: return
        s,a,r,ns,d = self.replay_buffer.sample(self.batch_size)
        q_vals = self.q_network(s).gather(1, a.unsqueeze(1)).squeeze(1)
        with torch.no_grad():
            max_next = self.target_network(ns).max(1)[0]
            target_q = r + (1-d)*self.gamma*max_next
        loss = nn.MSELoss()(q_vals, target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # epsilon decay
        self.epsilon = max(self.epsilon*self.epsilon_decay, self.epsilon_end)
        # target update
        if self.tau is None:
            self._train_steps += 1
            if self._train_steps % self.target_update_freq == 0:
                self.update_target()
        else:
            for tp,p in zip(self.target_network.parameters(),
                            self.q_network.parameters()):
                tp.data.copy_(self.tau*p.data + (1-self.tau)*tp.data)

    def step(self):
        state  = self.getState()
        action = self.select_action(state)

        # record HP before shoot
        prev_hp = (self.environment_simulator.opponent_health
                   if self.player else
                   self.environment_simulator.player_health)

        # perform with correct actor
        self.environment_simulator.perform(action, movePlayer=self.player)
        next_state = self.getState()

        # record HP after
        new_hp = (self.environment_simulator.opponent_health
                  if self.player else
                  self.environment_simulator.player_health)

        # metrics: shoot
        if action == 0:
            if new_hp < prev_hp:
                self.hit_count += 1
                if new_hp <= 0:
                    self.kill_count += 1
            else:
                self.miss_count += 1
        # metrics: other
        if action in range(1,9):
            self.move_count += 1
        elif action == 9:
            self.turn_left_count += 1
        elif action == 10:
            self.turn_right_count += 1
        elif action == 11:
            self.switch_count += 1

        reward = self.calculate_reward(state, action, next_state)
        done   = self.environment_simulator.is_terminal()
        self.replay_buffer.add(state, action, reward, next_state, done)
        self.train_step()
        # 更新前一動作與位置
        env = self.environment_simulator
        self.prev_action = action
        self.prev_position = env.player_pos[:] if self.player else env.opponent_pos[:]

        return next_state, reward, done

    def calculate_reward(self, state, action, next_state):
        """
        強化探索 + 正確進攻 的獎勵函式
        -------------------------------------------------
        state, next_state : 11 維向量（詳見 getState()）
        action            : 0~11
        -------------------------------------------------
        """

        # ── 基本幾何量 ─────────────────────────────────
        # 敵人距離（tile 為單位）
        H, W = len(self.environment_simulator.map), len(self.environment_simulator.map[0])
        dist_prev = math.hypot(state[2] * H,  state[3] * W)
        dist_next = math.hypot(next_state[2] * H, next_state[3] * W)

        # 角度差（敵人相對於玩家視線）
        ori_deg = (self.environment_simulator.player_orientation % 360)
        angle_to_enemy = self.angle_between_player_and_opponent((state[2], state[3]))
        ang_diff = self.angleDiff(angle_to_enemy, ori_deg)

        # 是否在 ±30° 內、且无遮挡
        in_fov  = abs(ang_diff) <= 30
        isClear = not self.environment_simulator.has_obstacle_between()
        enemy_visible = in_fov and isClear

        # ── reward 初始化 ─────────────────────────────
        reward = 0.0

        # ── 亂射 or 命中/擊殺 ────────────────────────
        opp_hp_prev = state[4]
        opp_hp_next = next_state[4]
        hit   = (opp_hp_next < opp_hp_prev)
        kill  = (opp_hp_next <= 0)

        if action == 0:                           # 射擊
            if enemy_visible and hit:
                reward += 200.0                   # 命中
                if kill:
                    reward += 1000.0              # 擊殺
            elif enemy_visible and not hit:
                reward -= 5.0                     # 可見卻沒打中
            else:
                reward -= 2.0                     # 看不到還亂射
        # ── 移動探索 ─────────────────────────────────
        move_actions = range(1, 9)
        turn_left, turn_right = 9, 10

        # 當前座標
        curr_pos = self.environment_simulator.player_pos if self.player \
                else self.environment_simulator.opponent_pos

        # 1) 有效移動
        if action in move_actions:
            if self.prev_position and curr_pos != self.prev_position:
                reward += 1.0                     # 位置真的改變
            else:
                reward -= 0.5                     # 撞牆或卡位
            # 若敵人可見且距離變近，額外鼓勵
            if enemy_visible and dist_next < dist_prev:
                reward += 0.3

        # 2) 轉向
        if action in (turn_left, turn_right):
            # 若敵人不可見 → 探索視野
            if not enemy_visible:
                reward += 0.3
            else:
                # 若可見 → 判斷是否朝正確方向微調
                if (action == turn_left  and ang_diff > 0) or \
                (action == turn_right and ang_diff < 0):
                    reward += 0.5                # 角度誤差有望減小
                else:
                    reward -= 0.2                # 轉錯方向

        # 3) 武器切換（保持簡單：距離<4 tile 用散彈，>4 tile 用步槍）
        if action == 11:
            use_rifle = bool(state[6])
            close_range = dist_prev < 4.0
            if (close_range and use_rifle) or (not close_range and not use_rifle):
                reward += 1.0                    # 切對武器
            else:
                reward -= 0.5                    # 切錯武器

        # ── 動作多樣性 ────────────────────────────────
        if self.prev_action is not None and action != self.prev_action:
            reward += 0.2

        # ── 存活懲罰（可選）──────────────────────────
        # 若自己 HP 下降，可給小懲罰；此處略。

        return reward


    def getState(self):
        env = self.environment_simulator
        H, W = len(env.map), len(env.map[0])
        pr, pc = env.player_pos
        row_norm, col_norm = pr/H, pc/W
        or_, oc = env.opponent_pos
        d_row, d_col = (or_-pr)/H, (oc-pc)/W
        opp_hp, ply_hp = env.opponent_health/10.0, env.player_health/10.0
        gun = float(env.player_gun)
        ori = (env.player_orientation % 360)/360.0
        def ray(a): return env.ray_distance(a, 10)/10.0
        front  = ray(ori*360)
        left30 = ray((ori*360+30)%360)
        right30= ray((ori*360-30)%360)
        return [row_norm, col_norm, d_row, d_col, opp_hp, ply_hp, gun, ori, front, left30, right30]

    def magnitude(self, v):
        return math.hypot(v[0], v[1])

    def mapToRange(self, deg):
        deg %= 360
        if deg>180: deg -= 360
        return deg

    def angle_between_player_and_opponent(self, rel):
        return self.mapToRange(math.degrees(math.atan2(rel[0], rel[1])))

    def angleDiff(self, a1, a2):
        d = a1 - a2
        if d>180: d -= 360
        if d<-180: d += 360
        return d

    def save_model(self, filename="dqn_model.pth"):
        os.makedirs("models", exist_ok=True)
        torch.save(self.q_network.state_dict(), os.path.join("models", filename))

    def load_model(self, filename="dqn_model.pth"):
        self.q_network.load_state_dict(torch.load(os.path.join("models", filename)))
        self.q_network.eval()

# ----------------------------
# Training Loop
# ----------------------------
NUMBER_OF_EPISODES = 2000
NUMBER_OF_STEPS    = 2500

player_agent   = DQNAgent(11, 12)
opponent_agent = DQNAgent(11, 12, opponent=True)

def reset():
    player_agent.environment_simulator.reset()
    es = player_agent.environment_simulator
    osim = opponent_agent.environment_simulator
    osim.player_pos         = es.opponent_pos[:]
    osim.player_health      = es.opponent_health
    osim.player_orientation = es.opponent_orientation
    osim.player_gun         = es.opponent_gun
    osim.opponent_pos       = es.player_pos[:]
    osim.opponent_health    = es.player_health
    osim.opponent_orientation = es.player_orientation
    osim.opponent_gun       = es.player_gun

for ep in range(NUMBER_OF_EPISODES):
    player_agent.reset_metrics()
    opponent_agent.reset_metrics()
    p_total, o_total = 0.0, 0.0
    p_done, o_done = False, False

    for step in range(NUMBER_OF_STEPS):
        p_ns, p_r, p_done = player_agent.step()
        o_ns, o_r, o_done = opponent_agent.step()

        p_total += p_r
        o_total += o_r

        if p_done or o_done:
            break

        # 同步環境
        es = player_agent.environment_simulator
        osim = opponent_agent.environment_simulator
        osim.player_pos         = es.opponent_pos[:]
        osim.player_health      = es.opponent_health
        osim.player_orientation = es.opponent_orientation
        osim.player_gun         = es.opponent_gun
        osim.opponent_pos       = es.player_pos[:]
        osim.opponent_health    = es.player_health
        osim.opponent_orientation = es.player_orientation
        osim.opponent_gun       = es.player_gun

    # Episode 統計
    print(f"\n=== Episode {ep} Summary ===")
    print(f"Player  Reward: {p_total:.2f}")
    print(f"  Hits: {player_agent.hit_count}, Misses: {player_agent.miss_count}, Kills: {player_agent.kill_count}")
    print(f"  Moves: {player_agent.move_count}, TurnL: {player_agent.turn_left_count}, TurnR: {player_agent.turn_right_count}, Switch: {player_agent.switch_count}")
    print(f"Opponent Reward: {o_total:.2f}")
    print(f"  Hits: {opponent_agent.hit_count}, Misses: {opponent_agent.miss_count}, Kills: {opponent_agent.kill_count}")
    print(f"  Moves: {opponent_agent.move_count}, TurnL: {opponent_agent.turn_left_count}, TurnR: {opponent_agent.turn_right_count}, Switch: {opponent_agent.switch_count}")
    print("===================================\n")

    # 決定勝者並存檔權重
    if p_done:
        winner = opponent_agent
        wname = 'opponent'
    elif o_done:
        winner = player_agent
        wname = 'player'
    else:
        if p_total >= o_total:
            winner = player_agent
            wname = 'player'
        else:
            winner = opponent_agent
            wname = 'opponent'
    winner.save_model(f"{wname}_winner_ep{ep}.pth")
    # 載入勝者權重給雙方
    state_dict = winner.q_network.state_dict()
    player_agent.q_network.load_state_dict(state_dict)
    opponent_agent.q_network.load_state_dict(state_dict)
    player_agent.update_target()
    opponent_agent.update_target()

    reset()



=== Episode 0 Summary ===
Player  Reward: 4414.90
  Hits: 11, Misses: 347, Kills: 2
  Moves: 818, TurnL: 94, TurnR: 31, Switch: 80
Opponent Reward: 200.80
  Hits: 1, Misses: 123, Kills: 0
  Moves: 964, TurnL: 78, TurnR: 117, Switch: 98


=== Episode 1 Summary ===
Player  Reward: -483.30
  Hits: 0, Misses: 880, Kills: 0
  Moves: 1132, TurnL: 257, TurnR: 148, Switch: 83
Opponent Reward: -517.90
  Hits: 0, Misses: 4, Kills: 0
  Moves: 2375, TurnL: 12, TurnR: 74, Switch: 35


=== Episode 2 Summary ===
Player  Reward: 4063.10
  Hits: 11, Misses: 501, Kills: 2
  Moves: 1047, TurnL: 68, TurnR: 44, Switch: 93
Opponent Reward: -443.20
  Hits: 0, Misses: 3, Kills: 0
  Moves: 1753, TurnL: 6, TurnR: 2, Switch: 0


=== Episode 3 Summary ===
Player  Reward: 4283.20
  Hits: 11, Misses: 1312, Kills: 2
  Moves: 537, TurnL: 26, TurnR: 85, Switch: 79
Opponent Reward: -485.60
  Hits: 0, Misses: 34, Kills: 0
  Moves: 2012, TurnL: 1, TurnR: 2, Switch: 1


=== Episode 4 Summary ===
Player  Reward: 4201.80
 

KeyboardInterrupt: 