In [1]:
import pygame
import os
import sys

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import numpy as np
from PIL import Image
import random
import matplotlib.pyplot as plt
from matplotlib import animation
from collections import deque

pygame 2.6.1 (SDL 2.28.4, Python 3.9.15)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
print(torch.cuda.is_available())  # 應該為 True
print(torch.version.cuda)         # 應該列出 CUDA 版本
print(torch.backends.cudnn.version())  # cuDNN 版本
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

True
12.1
90100


In [3]:
script_dir = os.path.join(os.getcwd(), 'space_ship_game_RL')
if script_dir not in sys.path:
    sys.path.append(script_dir)

from setting import *
from game import Game


In [4]:
class SpaceShipEnv():
    def __init__(self):
        pygame.init()
        pygame.font.init()

        # 延後畫面初始化，等 render() 時才設置
        self.screen = None
        self.clock = pygame.time.Clock()
        self.fps = FPS

        self.game = Game()

        self.action_space = [0, 1, 2, 3]
        self.observation = self.get_state()

    def step(self, action):
        self.game.update(action)
    
        if self.screen is None:
            self.game.draw()
        else:
            self.game.draw(self.screen)
            self.clock.tick(self.fps)
    
        img_state, structured_state = self.get_state()
        state_info = self.game.get_state_info()
    
        # Components
        reward_components = {}
    
        reward_components['base'] = -0.1  # base survival penalty
    
        danger_zones = state_info["danger_zones"]  # list of 0/1 flags per zone, length 8
        player_speed = abs(self.game.player.sprite.speedx)
        rock_density = state_info.get("rock_density", 0)
        player_x = self.game.player.sprite.rect.centerx
    
        zone_id = int(state_info["zone_id"])
    
        # Safe zone bonus if player zone is not in danger zones
        if zone_id not in [i for i, danger in enumerate(danger_zones) if danger]:
            reward_components['safe_zone_bonus'] = 0.5
        else:
            reward_components['safe_zone_bonus'] = 0.0
    
        # Penalty for being in dangerous zones
        reward_components['danger_zone_penalty'] = -0.05 * sum(danger_zones)
    
        # Encourage movement if danger is present
        if sum(danger_zones) > 0 and player_speed > 0.5:
            reward_components['movement_bonus'] = 0.2
        elif sum(danger_zones) == 0 and player_speed < 0.1:
            reward_components['movement_penalty'] = -0.1
        else:
            reward_components['movement_bonus'] = 0.0
            reward_components['movement_penalty'] = 0.0
    
        # Penalize crowded areas
        reward_components['rock_density_penalty'] = -0.3 if rock_density > 3 else 0.0
    
        # Penalize cornering near edges
        reward_components['edge_penalty'] = -0.1 if (player_x < WIDTH * 0.1 or player_x > WIDTH * 0.9) else 0.0
    
        # Last-minute dodging bonus
        if hasattr(self, 'last_zone_danger') and self.last_zone_danger and zone_id not in [i for i, danger in enumerate(danger_zones) if danger]:
            reward_components['dodging_bonus'] = 2.0
        else:
            reward_components['dodging_bonus'] = 0.0
        self.last_zone_danger = zone_id in [i for i, danger in enumerate(danger_zones) if danger]
    
        # Bonuses and penalties from game events
        reward_components['hit_rock_bonus'] = 1.0 if self.game.is_hit_rock else 0.0
        reward_components['collision_penalty'] = -1.5 if self.game.is_collided else 0.0
        reward_components['power_bonus'] = 0.7 if self.game.is_power else 0.0
    
        # Encourage player to stay below falling powers (lower priority)
        power_alignment = any(
            abs(power.rect.centerx - player_x) < 40 and
            power.rect.bottom > self.game.player.sprite.rect.top - 100
            for power in self.game.powers
        )
        reward_components['power_alignment_bonus'] = 0.3 if power_alignment else 0.0
    
        # Total reward
        reward = sum(reward_components.values())
    
        next_state = (img_state, structured_state)
        done = not self.game.running or self.game.score >= 10000
        info = self.game.score
    
        return next_state, reward, done, info, reward_components
        
    def reset(self):
        self.game = Game()

        return self.get_state()

    def render(self):
        if self.screen is None:
            self.screen = pygame.display.set_mode((WIDTH, HEIGHT))
            pygame.display.set_caption("SpaceShip RL Environment")

    def close(self):
        pygame.quit()

    def get_state(self):
        raw_frame = self.game.state  # shape: (WIDTH, HEIGHT, 3)
        frame = preprocess_frame(raw_frame.swapaxes(0, 1).astype(np.uint8))  # now shape (H, W, 3)
        
        if not hasattr(self, 'stacked_frames'):
            self.stacked_frames = deque([frame]*4, maxlen=4)
        else:
            self.stacked_frames.append(frame)
    
        stacked_image = np.stack(self.stacked_frames, axis=0)  # shape: (4, 84, 84)
    
        # === Structured Features ===
        player = self.game.player.sprite
        player_x = player.rect.centerx
        player_speed = player.speedx
        player_zone_id = int(player_x / (WIDTH / 8))
    
        # --- Danger zones ---
        danger_zones = [0] * 8
        for rock in self.game.rocks:
            if rock.rect.bottom > player.rect.top - 80:  # near the player vertically
                zone = int(rock.rect.centerx / (WIDTH / 8))
                danger_zones[min(zone, 7)] = 1
    
        # --- Rock density near player ---
        rock_density = sum(
            1 for rock in self.game.rocks 
            if abs(rock.rect.centery - player.rect.centery) < 150
        )
    
        # --- Alignment with falling power ---
        power_zone = 0
        for power in self.game.powers:
            if power.rect.bottom > player.rect.top - 100:
                if abs(power.rect.centerx - player_x) < 40:
                    power_zone = 1
                    break
    
        # --- Edge proximity flag ---
        is_near_edge = 1 if (player_x < WIDTH * 0.1 or player_x > WIDTH * 0.9) else 0
    
        # --- Last frame zone danger flag ---
        last_zone_danger = 0
        if hasattr(self, 'last_zone_danger_flag'):
            last_zone_danger = int(self.last_zone_danger_flag)
    
        # Save current danger status for next step
        self.last_zone_danger_flag = danger_zones[player_zone_id] == 1
    
        # --- Construct final structured state ---
        structured_state = np.array(
            danger_zones +                                 # 8
            [player_speed / 10.0,                          # 1 (normalized speed)
             rock_density / 10.0,                          # 1 (normalized density)
             power_zone,                                   # 1 (under power drop)
             player_zone_id / 7.0,                         # 1 (zone ID normalized to 0~1)
             is_near_edge,                                 # 1 (edge danger flag)
             last_zone_danger],                            # 1 (prior zone danger)
            dtype=np.float32
        )
    
        return stacked_image, structured_state


In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


Using device: cuda


In [6]:
# CNN-based DQN Model
class DQN(nn.Module):
    def __init__(self, num_actions, structured_dim=14):
        super(DQN, self).__init__()

        # === CNN part for image input ===
        self.conv1 = nn.Conv2d(4, 32, kernel_size=8, stride=4)  # (4, 84, 84) → (32, 20, 20)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2) # (64, 9, 9)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1) # (64, 7, 7)
        self.flattened_size = 64 * 7 * 7

        self.image_fc = nn.Linear(self.flattened_size, 512)

        # === MLP for structured input ===
        self.structured_fc = nn.Sequential(
            nn.Linear(structured_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 64)
        )

        # === Final layers ===
        self.combined_fc = nn.Sequential(
            nn.Linear(512 + 64, 256),
            nn.ReLU(),
            nn.Linear(256, num_actions)
        )

    def forward(self, x_img, x_struct):
        # x_img: (B, 4, 84, 84)
        x = F.relu(self.conv1(x_img))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = F.relu(self.image_fc(x))  # shape: (B, 512)

        s = self.structured_fc(x_struct)  # shape: (B, 64)

        combined = torch.cat((x, s), dim=1)  # (B, 512 + 64)
        return self.combined_fc(combined)


In [7]:
# Preprocess frames (grayscale and resize to 84x84)
# 預處理影格：轉為灰階並縮放為 84x84

def preprocess_frame(frame):
    # frame: (H, W, 3), dtype should be uint8
    if frame.shape[-1] != 3 or frame.dtype != np.uint8:
        raise ValueError(f"Invalid frame shape or dtype: {frame.shape}, {frame.dtype}")
    # frame 是 numpy array (H, W, 3)，先轉為 PIL Image
    # Input is a color image (RGB), convert to PIL format for easier processing.
    # 輸入是彩色圖像（RGB），轉成 PIL Image 以方便處理。
    image = Image.fromarray(frame)

    # 轉灰階
    # Convert the image to grayscale to reduce input complexity.
    # 將影像轉為灰階，降低輸入維度與計算量。
    image = image.convert('L')

    # resize 成 84x84
    # Resize the image to a standard 84x84 shape, as per DQN convention.
    # 依照 DQN 的慣例將影像統一縮放至 84x84。
    image = image.resize((84, 84), Image.Resampling.BILINEAR)  # or NEAREST, or LANCZOS

    # 轉回 numpy 並正規化
    # Convert back to NumPy and normalize pixel values to [0, 1].
    # 轉回 NumPy 格式並將像素值標準化到 [0, 1]。
    frame = np.asarray(image, dtype=np.float32) / 255.0

    return frame


def stack_frames(stacked_frames, state, is_new_episode):
    # 預處理目前影格
    frame = preprocess_frame(state)

    if is_new_episode or stacked_frames is None:
        # If it's a new episode or no previous frames, initialize with 4 identical frames
        # 若是新的一集或是尚未初始化，則用目前影格複製 4 次形成初始堆疊
        stacked_frames = deque([frame]*4, maxlen=4)
    else:
        # 否則把新影格加入到堆疊中，自動捨棄最舊的
        stacked_frames.append(frame)

    # Stack the 4 frames along the first dimension: shape becomes (4, 84, 84)
    # 沿著第一維（channel）堆疊成 4 通道輸入：形狀變成 (4, 84, 84)
    stacked_state = np.stack(stacked_frames, axis=0)

    return stacked_state, stacked_frames


In [8]:
num_actions = 4  # Breakout 中的動作數量（例如：無動作、左移、右移、發球）  
# Number of possible actions in Breakout (e.g., NOOP, LEFT, RIGHT, FIRE)

model = DQN(num_actions, structured_dim=14).to(device)
# 建立 DQN 模型並放到指定裝置（CPU 或 GPU）  
# Create a DQN model and move it to the specified device (CPU or GPU)

checkpoint = torch.load('checkpoint.pth', map_location=device)
model.load_state_dict(checkpoint['policy_net'])
# 載入訓練好的模型權重（可跨裝置載入）  
# Load trained model weights (supports device mapping for CPU/GPU compatibility)

model.eval()  
# 設定模型為評估模式，關閉 dropout/batchnorm 等訓練特性  
# Set the model to evaluation mode (disables dropout, batchnorm, etc.)


  checkpoint = torch.load('checkpoint.pth', map_location=device)


DQN(
  (conv1): Conv2d(4, 32, kernel_size=(8, 8), stride=(4, 4))
  (conv2): Conv2d(32, 64, kernel_size=(4, 4), stride=(2, 2))
  (conv3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
  (image_fc): Linear(in_features=3136, out_features=512, bias=True)
  (structured_fc): Sequential(
    (0): Linear(in_features=14, out_features=64, bias=True)
    (1): ReLU()
    (2): Linear(in_features=64, out_features=64, bias=True)
  )
  (combined_fc): Sequential(
    (0): Linear(in_features=576, out_features=256, bias=True)
    (1): ReLU()
    (2): Linear(in_features=256, out_features=4, bias=True)
  )
)

In [9]:
# Visualization of trained agent
env = SpaceShipEnv()
env.render()
stacked_img, struct = env.reset()  # Already returns (4, 84, 84) and structured vec
state = (stacked_img, struct)

done = False
frames = []

while not done:
    img_tensor = torch.tensor(state[0], dtype=torch.float32, device=device).unsqueeze(0)
    struct_tensor = torch.tensor(state[1], dtype=torch.float32, device=device).unsqueeze(0)
    q_values = model(img_tensor, struct_tensor)
    action = torch.argmax(q_values, dim=1).item()

    next_state, reward, done, score, reward_components = env.step(action)
    state = next_state  # (stacked_img, struct)

    # 把畫面抓下來（RGB）
    surface = pygame.display.get_surface()
    frame = pygame.surfarray.array3d(surface)  # shape: (W, H, 3)
    frame = np.transpose(frame, (1, 0, 2))     # pygame 是 x,y → imageio 是 y,x
    frames.append(frame)

print(f"reward: {reward}, score: {score}")
env.close()

reward: -2.45, score: 558


In [10]:
print(len(frames))

1741


In [11]:
import imageio

video_path = "space_ship_run_rl.mp4"

imageio.mimsave(video_path, frames, fps=60, quality=9)
print(f"Saved gameplay video to: {video_path}")



Saved gameplay video to: space_ship_run_rl.mp4
