In [26]:
import gym
from gym import spaces
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque


# 超參數設定
BATCH_SIZE = 64
GAMMA = 0.99
EPSILON = 0.2
TARGET_UPDATE = 20
MEMORY_SIZE = 10000
LEARNING_RATE  = 7e-5



# 定義記憶重播緩衝區
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return np.array(states), actions, rewards, np.array(next_states), dones

    def __len__(self):
        return len(self.buffer)

# 定義環境類別
class IntersectionEnv(gym.Env):
    def __init__(self):
        super(IntersectionEnv, self).__init__()
        # 定義觀察空間 (4 張 84x84 圖像)
        self.observation_space = spaces.Box(low=0, high=255, shape=(4, 84, 84), dtype=np.uint8)
        # 定義動作空間 (5 種動作：前進、左轉、右轉、剎車、等待)
        self.action_space = spaces.Discrete(5)
        # 初始化環境狀態
        self.reset()

    def reset(self):
        # 重置狀態
        self.state = np.zeros((4, 84, 84), dtype=np.uint8)  # 模擬環境圖像
        self.done = False
        self.steps = 0
        return self.state

    def step(self, action):
        # 更新狀態邏輯 (簡化版本)
        self.steps += 1
        reward = -0.1  # 初始懲罰降低

        # 動作對應的獎勵與懲罰設計
        if action == 0:  # 前進
            reward = 1
        elif action == 1:  # 左轉
            reward = 2
        elif action == 2:  # 右轉
            reward = 2
        elif action == 3:  # 剎車
            reward = 0.5
        elif action == 4:  # 等待
            reward = -0.1  # 降低等待懲罰

        # 增加生存獎勵和碰撞檢查
        if self.steps > 25:
            reward += 5
        if random.random() < 0.1:  # 模擬碰撞
            reward = -10
            self.done = True
        elif self.steps > 50:  # 超時處罰
            self.done = True

        # 更新狀態 (隨機圖像變化模擬)
        self.state = np.random.randint(0, 255, (4, 84, 84), dtype=np.uint8)
        return self.state, reward, self.done, {}

    def render(self, mode='human'):
        pass  # 可選擇用 OpenCV 或 Pygame 顯示圖像畫面

    def close(self):
        pass

# 定義 DRQN 模型
class DRQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DRQN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(4, 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        self.lstm = nn.LSTM(64 * 7 * 7, 512, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )

    def forward(self, x, hidden):
        x = x / 255.0  # 標準化影像輸入
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        x, hidden = self.lstm(x.unsqueeze(0), hidden)
        x = self.fc(x.squeeze(0))
        return x, hidden

# 環境與模型初始化
env = IntersectionEnv()
n_actions = env.action_space.n
input_shape = (4, 84, 84)
policy_net = DRQN(input_shape, n_actions)
target_net = DRQN(input_shape, n_actions)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=LEARNING_RATE)
memory = ReplayBuffer(MEMORY_SIZE)

best_reward = float('-inf')  # 初始化最佳獎勵

# 訓練過程
for episode in range(800):
    state = env.reset()
    hidden = (torch.zeros(1, 1, 512), torch.zeros(1, 1, 512))  # 初始化 LSTM 隱藏狀態
    done = False
    total_reward = 0
    epsilon = max(0.01, EPSILON * (0.995 ** episode))  # 動態 ε 更新

    while not done:
        # 探索與利用策略選擇動作
        if random.random() < epsilon:
            action = env.action_space.sample()
        else:
            with torch.no_grad():
                state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
                q_values, hidden = policy_net(state_tensor, hidden)
                action = q_values.max(1)[1].item()

        # 執行動作並存入記憶池
        next_state, reward, done, _ = env.step(action)
        memory.push(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

        # 訓練模型
        if len(memory) > BATCH_SIZE:
            batch = memory.sample(BATCH_SIZE)
            states, actions, rewards, next_states, dones = batch

            states = torch.tensor(states, dtype=torch.float32)
            actions = torch.tensor(actions, dtype=torch.int64).unsqueeze(1)
            rewards = torch.tensor(rewards, dtype=torch.float32)
            next_states = torch.tensor(next_states, dtype=torch.float32)
            dones = torch.tensor(dones, dtype=torch.float32)

            q_values, _ = policy_net(states, hidden)
            q_values = q_values.gather(1, actions).squeeze(1)

            next_q_values, _ = target_net(next_states, hidden)
            expected_q_values = rewards + GAMMA * next_q_values.max(1)[0] * (1 - dones)

            loss = nn.MSELoss()(q_values, expected_q_values.detach())
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    # 更新最佳模型
    if total_reward > best_reward:
        best_reward = total_reward
        torch.save(policy_net.state_dict(), 'DRQN2.pth')

    # 定期更新目標網絡
    if episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

    print(f"Episode: {episode}, Total Reward: {total_reward}, Epsilon: {epsilon:.3f}")

env.close()


Episode: 0, Total Reward: -3.0, Epsilon: 0.200
Episode: 1, Total Reward: -8, Epsilon: 0.199
Episode: 2, Total Reward: -10, Epsilon: 0.198
Episode: 3, Total Reward: 3.5, Epsilon: 0.197
Episode: 4, Total Reward: -7, Epsilon: 0.196
Episode: 5, Total Reward: 10.0, Epsilon: 0.195
Episode: 6, Total Reward: -4, Epsilon: 0.194
Episode: 7, Total Reward: -1.5, Epsilon: 0.193
Episode: 8, Total Reward: -6, Epsilon: 0.192
Episode: 9, Total Reward: 1.9000000000000004, Epsilon: 0.191
Episode: 10, Total Reward: 11.5, Epsilon: 0.190
Episode: 11, Total Reward: -10, Epsilon: 0.189
Episode: 12, Total Reward: -8, Epsilon: 0.188
Episode: 13, Total Reward: 3.5, Epsilon: 0.187
Episode: 14, Total Reward: 10, Epsilon: 0.186
Episode: 15, Total Reward: 1, Epsilon: 0.186
Episode: 16, Total Reward: 17, Epsilon: 0.185
Episode: 17, Total Reward: -6.1, Epsilon: 0.184
Episode: 18, Total Reward: 0.0, Epsilon: 0.183
Episode: 19, Total Reward: -2, Epsilon: 0.182
Episode: 20, Total Reward: 10, Epsilon: 0.181
Episode: 21, T

In [33]:
import gymnasium as gym
import torch
import numpy as np
import torch.nn.functional as F
import highway_env

# 測試參數設定
MODEL_PATH = 'DRQN2.pth'

# 加載HighwayEnv環境
env = gym.make('intersection-v0', render_mode='human')

# 模型定義
class DRQN(torch.nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DRQN, self).__init__()
        self.conv = torch.nn.Sequential(
            torch.nn.Conv2d(4, 32, kernel_size=8, stride=4),
            torch.nn.ReLU(),
            torch.nn.Conv2d(32, 64, kernel_size=4, stride=2),
            torch.nn.ReLU(),
            torch.nn.Conv2d(64, 64, kernel_size=3, stride=1),
            torch.nn.ReLU()
        )
        self.lstm = torch.nn.LSTM(64 * 7 * 7, 512, batch_first=True)
        self.fc = torch.nn.Sequential(
            torch.nn.Linear(512, 512),
            torch.nn.ReLU(),
            torch.nn.Linear(512, n_actions)
        )

    def forward(self, x, hidden):
        x = x / 255.0
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        x, hidden = self.lstm(x.unsqueeze(0), hidden)
        x = self.fc(x.squeeze(0))
        return x, hidden

# 加載訓練好的模型
input_shape = (4, 84, 84)
n_actions = 5
policy_net = DRQN(input_shape, n_actions)
policy_net.load_state_dict(torch.load(MODEL_PATH))
policy_net.eval()

# 測試模型 (至少跑 3 次)
episodes = 10
for episode in range(episodes):
    state, _ = env.reset()
    hidden = (torch.zeros(1, 1, 512), torch.zeros(1, 1, 512))  # 初始化隱藏狀態
    done = False
    total_reward = 0

    while not done:
        with torch.no_grad():
            # 確保狀態大小正確
            if len(state.shape) != 3 or state.shape[0] == 0 or state.shape[1] == 0:
                state = np.zeros((4, 84, 84), dtype=np.float32)  # 默認大小 (4, 84, 84)
            else:
                if len(state.shape) == 3:  # 確保是 (C, H, W)
                    state = np.transpose(state, (2, 0, 1))  # (C, H, W)

                # 如果通道數不是4，補充或複製通道
                if state.shape[0] != 4:
                    state = np.repeat(state, 4 // state.shape[0], axis=0)

            # 最後轉換為 Tensor
            state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)  # 增加批次維度
            print(f"State shape: {state.shape}")  # 調試輸出狀態大小

            # 更新隱藏狀態
            hidden = (hidden[0].detach(), hidden[1].detach())

            # 模型預測動作
            q_values, hidden = policy_net(state, hidden)
            action = q_values.max(1)[1].item()

        # 執行動作
        next_state, reward, done, _, _ = env.step(action)

        # 更新狀態
        state = next_state
        total_reward += reward

    print(f"Episode: {episode + 1}, Total Reward: {total_reward}")

env.close()


  logger.deprecation(


State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
Episode: 1, Total Reward: 9.0
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
Episode: 2, Total Reward: 9.0
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.Size([1, 4, 84, 84])
State shape: torch.S