In [1]:
import pygame
import os
import sys

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import numpy as np
from PIL import Image
import random
import matplotlib.pyplot as plt
from matplotlib import animation
from collections import deque

pygame 2.2.0 (SDL 2.32.54, Python 3.11.8)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
print(torch.cuda.is_available())  # 應該為 True
print(torch.version.cuda)         # 應該列出 CUDA 版本
print(torch.backends.cudnn.version())  # cuDNN 版本
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

True
11.8
90100
Using device: cuda


In [3]:
script_dir = os.path.join(os.getcwd(), 'space_ship_game_RL')
if script_dir not in sys.path:
    sys.path.append(script_dir)

from setting import *
from game import Game


In [4]:
# CNN-based DQN Model
class DQN(nn.Module):
    def __init__(self, num_actions):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(4, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.fc1 = nn.Linear(64 * 7 * 7, 512)
        self.fc2 = nn.Linear(512, num_actions)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        return self.fc2(x)
# Preprocess frames (grayscale and resize to 84x84)
# 預處理影格：轉為灰階並縮放為 84x84
def preprocess_frame(frame):
    # frame 是 numpy array (H, W, 3)，先轉為 PIL Image
    # Input is a color image (RGB), convert to PIL format for easier processing.
    # 輸入是彩色圖像（RGB），轉成 PIL Image 以方便處理。
    image = Image.fromarray(frame)

    # 轉灰階
    # Convert the image to grayscale to reduce input complexity.
    # 將影像轉為灰階，降低輸入維度與計算量。
    image = image.convert('L')

    # resize 成 84x84
    # Resize the image to a standard 84x84 shape, as per DQN convention.
    # 依照 DQN 的慣例將影像統一縮放至 84x84。
    image = image.resize((84, 84), Image.Resampling.BILINEAR)  # or NEAREST, or LANCZOS

    # 轉回 numpy 並正規化
    # Convert back to NumPy and normalize pixel values to [0, 1].
    # 轉回 NumPy 格式並將像素值標準化到 [0, 1]。
    frame = np.asarray(image, dtype=np.float32) / 255.0

    return frame


def stack_frames(stacked_frames, state, is_new_episode):
    # 預處理目前影格
    frame = preprocess_frame(state)

    if is_new_episode or stacked_frames is None:
        # If it's a new episode or no previous frames, initialize with 4 identical frames
        # 若是新的一集或是尚未初始化，則用目前影格複製 4 次形成初始堆疊
        stacked_frames = deque([frame]*4, maxlen=4)
    else:
        # 否則把新影格加入到堆疊中，自動捨棄最舊的
        stacked_frames.append(frame)

    # Stack the 4 frames along the first dimension: shape becomes (4, 84, 84)
    # 沿著第一維（channel）堆疊成 4 通道輸入：形狀變成 (4, 84, 84)
    stacked_state = np.stack(stacked_frames, axis=0)

    return stacked_state, stacked_frames


In [6]:
class SpaceShipEnv():
    def __init__(self, frame_skip=4, stack_frames=4):
        pygame.init()
        # pygame.font.init() # 如果不在 env 中顯示文字，可以不用初始化

        self.screen = None 
        self.clock = pygame.time.Clock()
        self.fps = FPS

        self.game = Game() # 遊戲邏輯實例

        # 動作空間: 0: 不動, 1: 左, 2: 右, 3: 射擊
        self.action_space = [0, 1, 2, 3] 
        self.action_space_n = len(self.action_space)

        # 幀處理相關
        self.frame_skip = frame_skip
        self.stack_frames = stack_frames
        self.frame_buffer = deque(maxlen=self.stack_frames)

        self.frames_since_last_shot = 0
        self.SHOT_COOLDOWN_FRAMES = 60 # 假設子彈冷卻時間為 60 幀
        
        # 定義中間區域的範圍
        self.CENTER_ZONE_WIDTH_PERCENT = 0.5 # 中間 20% 的區域
        self.CENTER_ZONE_X_START = WIDTH * (0.5 - self.CENTER_ZONE_WIDTH_PERCENT / 2)
        self.CENTER_ZONE_X_END = WIDTH * (0.5 + self.CENTER_ZONE_WIDTH_PERCENT / 2)
        # ## 新增: 勝利條件分數 ##
        self.WIN_SCORE = 10000 

    def step(self, action):
        """
        執行一個動作，並回傳 (下一個狀態, 獎勵, 遊戲是否結束, 當前分數)
        """
        total_reward = 0.0
        done = False

        # --- 狀態追蹤 (State Tracking) ---
        # 在 action 執行前，記錄當前的狀態，以便後續計算獎勵
        prev_health = self.game.player.sprite.health
        prev_score = self.game.score
        prev_is_power = self.game.is_power 
        # --- 執行動作 (Action Execution) ---
        # 根據 frame_skip 的設定，重複執行同一個動作
        for _ in range(self.frame_skip):
            self.game.update(action) 
            
            # ## 優化 ##
            # 除非需要渲染畫面來獲取狀態，否則 draw 是不必要的。
            # 這裡假設 self.game.update() 會更新 self.game.state
            # 如果不是，則保留 self.game.draw(None)
            
            # 檢查遊戲是否在任何一幀結束
            if not self.game.running:
                done = True
                break

        # --- 獎勵計算 (Reward Calculation) ---
        # 在所有幀都執行完畢後，根據「狀態的變化」來計算總獎勵
        
        # 1. 主要獎勵：分數增益 (Score Gain)
        current_score = self.game.score
        current_is_power = self.game.is_power # 獲取最新的 is_power 狀態
        current_health = self.game.player.sprite.health # 獲取最新的健康值
        current_is_hit_rock = self.game.is_hit_rock # 獲取最新的是否擊中石頭狀態

        score_gain = current_score - prev_score
        if score_gain > 0:
            # 將分數的增益直接轉換為獎勵，權重可以調整
            # 這樣 Agent 會自然學會優先攻擊分數較高的石頭 (通常更大、更危險)
            total_reward += score_gain * 0.5  # 權重是超參數，可以實驗調整 🎮
            
            # 根據分數增益給予額外的大石頭獎勵，與 1_2 版本對齊
            if score_gain >= 40: # 超大型石頭 (例如半徑 ~40)
                total_reward += 10.0 
            elif score_gain >= 15: # 大型石頭 (例如半徑 ~18)
                total_reward += 5.0 
            elif score_gain >= 5: # 中型石頭
                total_reward += 3 
            else: # 小型石頭
                total_reward += 2 

        # 2. 主要懲罰：生命值損失 (Health Loss)
        health_diff = current_health - prev_health
        if health_diff < 0: # 掉血懲罰
            total_reward -= abs(health_diff) * 10  # 固定的高額懲罰 🩸 (使用 abs 以便計算正數)
        elif health_diff > 0: # 補血獎勵
            total_reward += health_diff * 1.5 # 補血有獎勵！🩹

        # 3. 行為塑造：射擊成本 (Shooting Cost) 及擊中獎勵
        if action == 3: # 假設 3 是射擊
            if self.frames_since_last_shot < self.SHOT_COOLDOWN_FRAMES:
                total_reward -= 2.5 # 冷卻中射擊懲罰大幅提高！❌
            else:
                self.frames_since_last_shot = 0 # 重置射擊冷卻計時器
                if current_is_hit_rock and score_gain > 0: # 射擊並成功擊中石頭並得分
                    total_reward += 10 # 成功射擊本身有小獎勵
                else: # 亂射或沒擊中
                    total_reward -= 5 # 亂射懲罰再高一點 🎯

        # 4. 行為塑造：生存獎勵 (Survival Reward)
        if not done:
            if action == 0:
                total_reward += 0.5
            else:
                total_reward += 0.55
        # 5. 終局獎勵/懲罰 (Terminal Reward/Penalty)
        if done:
            if current_health <= 0: # 因為死亡導致遊戲結束
                total_reward -= 500.0 # 死亡的懲罰，比單次受傷更重！💀
            if current_score >= self.WIN_SCORE: # 勝利條件達成
                total_reward += 10000.0 # 勝利的喜悅！🎉

        # 吃到寶物獎勵
        if current_is_power and not prev_is_power: 
            total_reward += 500 # 吃到寶物給予獎勵 ✨ (調整為 20 以與 1_2 更接近)

        # 中間區域額外獎勵 (調整為與 1_2 相同邏輯)
        ship_center_x = self.game.player.sprite.rect.centerx 
        if ship_center_x >= self.CENTER_ZONE_X_START and \
           ship_center_x <= self.CENTER_ZONE_X_END: # 不再需要判斷是否有最佳射擊目標和子彈準備好
            total_reward += 0.3 # 鼓勵保持在中心 🎯

        # 新增：基於總分數的額外獎勵 (每次 step 都給予，但權重可以很小)
        if self.game.score > 0:
            reward_for_total_score = self.game.score * 0.005 # 舉例值，需要實驗調整！💡
            total_reward += reward_for_total_score

        # --- 狀態處理 (State Processing) ---
        # 這部分邏輯與您原本的設計相同，非常棒！
        current_raw_frame = self.game.state
        processed_frame = preprocess_frame(current_raw_frame)
        self.frame_buffer.append(processed_frame)
        
        # 確保 buffer 在遊戲一開始就能填滿
        while len(self.frame_buffer) < self.stack_frames:
            self.frame_buffer.append(processed_frame)
            
        stacked_state = np.array(list(self.frame_buffer))

        # info 通常是一個字典，可用來傳遞除錯資訊，這裡我們先簡單回傳分數
        info = {'score': self.game.score}

        return stacked_state, total_reward, done, info

    def reset(self):
        # Reset 函數的邏輯很完美，不需要大改
        self.game = Game()
        self.frame_buffer.clear()
        
        # 填充初始的 frame buffer
        for _ in range(self.stack_frames):
            initial_raw_state = self.game.state 
            processed_frame = preprocess_frame(initial_raw_state) 
            self.frame_buffer.append(processed_frame)
        
        stacked_state = np.array(list(self.frame_buffer))
        return stacked_state

    def render(self):
        # Render 函數的邏輯很完美，不需要修改
        if self.screen is None:
            self.screen = pygame.display.set_mode((WIDTH, HEIGHT))
            pygame.display.set_caption("SpaceShip RL Environment")

        # 讓視窗可以被關閉
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.close()
                sys.exit()

        self.game.draw(self.screen) 
        pygame.display.update()
        self.clock.tick(self.fps)

    def close(self):
        pygame.quit()

In [None]:
num_actions = 4  # Breakout 中的動作數量（例如：無動作、左移、右移、發球）  
# Number of possible actions in Breakout (e.g., NOOP, LEFT, RIGHT, FIRE)

model = DQN(num_actions).to(device)  
# 建立 DQN 模型並放到指定裝置（CPU 或 GPU）  
# Create a DQN model and move it to the specified device (CPU or GPU)

checkpoint = torch.load('checkpointV1_6_2292_1888.pth', map_location=device)
model.load_state_dict(checkpoint['policy_net'])
# 載入訓練好的模型權重（可跨裝置載入）  
# Load trained model weights (supports device mapping for CPU/GPU compatibility)

model.eval()  
# 設定模型為評估模式，關閉 dropout/batchnorm 等訓練特性  
# Set the model to evaluation mode (disables dropout, batchnorm, etc.)


  checkpoint = torch.load('checkpointV1_6_444_1456.pth', map_location=device)


DQN(
  (conv1): Conv2d(4, 32, kernel_size=(8, 8), stride=(4, 4))
  (conv2): Conv2d(32, 64, kernel_size=(4, 4), stride=(2, 2))
  (conv3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
  (fc1): Linear(in_features=3136, out_features=512, bias=True)
  (fc2): Linear(in_features=512, out_features=4, bias=True)
)

In [8]:
import torch
import pygame
import numpy as np
import os
import cv2 # 為了可視化狀態，我們可能需要它

# 如果您還有定義 stack_frames 或 preprocess_frame 函數，現在可以將它們刪除或註釋掉，
# 因為 SpaceShipEnv 應已接管這些功能。

# ===================== 模型載入設置 =====================
# 設定 PyTorch 運行設備 (GPU 或 CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 假設這裡的 'model' 變數就是您要載入的 DQN 模型實例
# 我們使用 play_with_trained_model 中的 policy_net 作為參考，統一變數名
# 實例化環境以獲取 action_dim
# env_temp = SpaceShipEnv(frame_skip=2, stack_frames=4) # 臨時環境，用於獲取動作空間
# action_dim = len(env_temp.action_space) 
# env_temp.close() # 關閉臨時環境

policy_net = DQN(4).to(device)

# 載入訓練好的模型檔案
model_path = "checkpointV1_6_2292_1888.pth" # 請確認這個路徑是正確的！
if not os.path.exists(model_path):
    print(f"錯誤：找不到模型檔案：{model_path} ❌ 請確認路徑是否正確！")
    exit() # 如果模型檔案不存在，直接退出

# ⭐⭐⭐ 關鍵修改在這裡：載入整個檢查點字典，然後提取 policy_net 的 state_dict ⭐⭐⭐
print(f"嘗試從 {model_path} 載入模型...")
checkpoint = torch.load(model_path, map_location=device)

# 檢查 checkpoint 字典中是否有 'policy_net' 這個鍵
if 'policy_net' in checkpoint:
    policy_net.load_state_dict(checkpoint['policy_net'])
    print("✅ 成功從 checkpoint 中載入 'policy_net' 的 state_dict。")
else:
    # 如果 checkpoint 本身就是 state_dict (不太可能，但以防萬一)
    try:
        policy_net.load_state_dict(checkpoint)
        print("✅ 成功載入模型 state_dict。 (模型檔案可能直接儲存了 state_dict)")
    except RuntimeError as e:
        print(f"❌ 無法從 {model_path} 載入模型。錯誤訊息：{e}")
        print("請確認 'checkpoint.pth' 檔案儲存的格式是否正確。")
        exit() # 載入失敗則退出

policy_net.eval() # 設置為評估模式 (關閉 Dropout 等)
# =========================================================

# Visualization of trained agent
env = SpaceShipEnv(frame_skip=2, stack_frames=4) # 重新實例化環境，用於遊戲
# 由於 SpaceShipEnv 內部會處理渲染，這裡不需要 env.render() 放在 reset 之前
# 如果 env.render() 在這裡會報錯，請刪除這行或確保它不會觸發顯示初始化
scores = []
All_games = []
for i in range(100):
    state = env.reset() # 獲取初始狀態，假設它已是 (4, 84, 84) 的堆疊幀
    done = False
    frames = [] # 用於儲存每一幀畫面，以便後續可以生成影片

    print("🚀 模型遊戲可視化開始！按 ESC 或關閉視窗結束。🚀")

    while not done:
        # 處理 Pygame 事件 (例如關閉視窗)
        for event in pygame.event.get(): 
            if event.type == pygame.QUIT:
                done = True # 設置為 True 結束當前回合
                print("使用者關閉視窗，可視化提前結束。")
                break # 跳出事件處理迴圈

        if done: # 如果因為關閉視窗而跳出，則立即結束遊戲循環
            break

        # 渲染遊戲畫面。由於 SpaceShipEnv 內部處理了顯示，直接呼叫即可。
        env.render()
        
        with torch.no_grad(): # 在推理時禁用梯度計算，節省記憶體和加速
            # 將 NumPy 狀態轉換為 PyTorch Tensor，並增加一個 batch 維度
            state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
            
            # 從模型獲取 Q 值 (這裡使用 policy_net 而不是 model，保持一致性)
            q_values = policy_net(state_tensor)
            print(q_values)
            # 選擇 Q 值最高的動作
            action = torch.argmax(q_values, dim=1).item()

        # 執行動作並獲取下一個狀態、獎勵、是否結束和 info
        next_state, reward, done_episode, info = env.step(action)
        
        # 從 info 字典中獲取分數
        current_score = info.get('score', 0) # 使用 .get() 避免 KeyError，如果 info 中沒有 'score'

        # 更新狀態和 done 標誌
        state = next_state
        done = done_episode

        # 把畫面抓下來（RGB）
        # 這裡假設 Pygame 的主顯示表面是透過 pygame.display.get_surface() 取得的
        surface = pygame.display.get_surface()
        if surface: # 確保表面存在
            frame = pygame.surfarray.array3d(surface)  # shape: (W, H, 3)
            frame = np.transpose(frame, (1, 0, 2))      # Pygame 是 (W, H, C) → imageio/OpenCV 是 (H, W, C)
            frames.append(frame)
        else:
            print("警告：無法獲取 Pygame 顯示表面，可能未正確初始化或已關閉。")
            # 如果沒有表面，跳過幀捕獲

    # 遊戲結束後，關閉環境
    # env.close()
    # pygame.quit() # 確保所有 Pygame 模組都被關閉

    # 打印最終的獎勵和分數
    # 注意：這裡的 reward 是最後一個 step 的獎勵，不是總獎勵
    # 如果您需要總獎勵，需要在循環中累計
    scores.append(current_score) # 最終分數
    All_games.append(frames)

for index, score in enumerate(scores):
    print(f"第{index+1}局遊戲分數: {score}")

# 如果您想將幀儲存為 GIF 或影片，可以在這裡添加額外的程式碼，例如使用 imageio
# import imageio
# output_gif_path = "agent_play.gif"
# imageio.mimsave(output_gif_path, frames, fps=FPS)
# print(f"遊戲過程已儲存為 {output_gif_path}")

嘗試從 checkpointV1_6_2292_1888.pth 載入模型...


  checkpoint = torch.load(model_path, map_location=device)


✅ 成功從 checkpoint 中載入 'policy_net' 的 state_dict。
🚀 模型遊戲可視化開始！按 ESC 或關閉視窗結束。🚀
tensor([[86526.4297, 86781.1953, 87388.5078, 86919.1172]], device='cuda:0')
tensor([[3127700.2500, 3121146.2500, 3113042.5000, 3099755.7500]],
       device='cuda:0')
tensor([[3162237.2500, 3159691.5000, 3159574.5000, 3139476.0000]],
       device='cuda:0')
tensor([[3167918.2500, 3165548.7500, 3169732.5000, 3148322.7500]],
       device='cuda:0')
tensor([[3201772.5000, 3195453.2500, 3203776.7500, 3182957.7500]],
       device='cuda:0')
tensor([[3218542.5000, 3216972.0000, 3217138.2500, 3198414.5000]],
       device='cuda:0')
tensor([[3234300.0000, 3238007.7500, 3232755.7500, 3216976.7500]],
       device='cuda:0')
tensor([[3232981.5000, 3236269.0000, 3231505.5000, 3215694.2500]],
       device='cuda:0')
tensor([[3218741.7500, 3216455.2500, 3217485.7500, 3198867.2500]],
       device='cuda:0')
tensor([[3202536.5000, 3195882.0000, 3204634.0000, 3184483.5000]],
       device='cuda:0')
tensor([[3201789.2500, 319539

In [None]:
print(len(All_games[8]))

399


In [12]:
import imageio

video_path = "space_ship_run_rl.mp4"

imageio.mimsave(video_path, frames, fps=60, quality=9)
print(f"Saved gameplay video to: {video_path}")



Saved gameplay video to: space_ship_run_rl.mp4
