In [3]:
import numpy as np
import gymnasium as gym
from gym import spaces
import matplotlib.pyplot as plt
from tqdm import tqdm
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import SubprocVecEnv
import os
import torch

# 환경 정의
class SeaRouteEnv(gym.Env):
    def __init__(self, grid_map_path, grid_size=(270, 236), max_steps=1000):
        super(SeaRouteEnv, self).__init__()
        self.grid_map = np.load(grid_map_path).astype(np.float32)
        self.grid_size = grid_size
        self.max_steps = max_steps
        self.action_space = spaces.Discrete(8)
        self.observation_space = spaces.Box(low=0, high=1, shape=(6,), dtype=np.float32)

    def reset(self, seed=None):
        if seed is not None:
            np.random.seed(seed)
        sea_positions = np.argwhere(self.grid_map == 0)
        self.start = tuple(sea_positions[np.random.randint(len(sea_positions))])
        self.goal = tuple(sea_positions[np.random.randint(len(sea_positions))])
        self.current_pos = self.start
        self.step_count = 0
        self.path = [self.current_pos]
        return self._get_obs(), {}

    def _get_obs(self):
        return np.array([
            self.current_pos[0] / (self.grid_size[0] - 1),
            self.current_pos[1] / (self.grid_size[1] - 1),
            self.start[0] / (self.grid_size[0] - 1),
            self.start[1] / (self.grid_size[1] - 1),
            self.goal[0] / (self.grid_size[0] - 1),
            self.goal[1] / (self.grid_size[1] - 1)
        ], dtype=np.float32)

    def step(self, action):
        moves = [(-1, 0), (1, 0), (0, -1), (0, 1), (-1, 1), (-1, -1), (1, 1), (1, -1)]
        dx, dy = moves[action]
        next_pos = (self.current_pos[0] + dx, self.current_pos[1] + dy)

        reward = -1
        done = False

        if (0 <= next_pos[0] < self.grid_size[0] and 
            0 <= next_pos[1] < self.grid_size[1] and 
            self.grid_map[next_pos[0], next_pos[1]] == 0):
            self.current_pos = next_pos
            self.path.append(self.current_pos)

        self.step_count += 1
        dist_to_goal = np.linalg.norm(np.array(self.current_pos) - np.array(self.goal))

        if self.current_pos == self.goal:
            reward += 100 - self.step_count * 0.1
            done = True
        elif self.step_count >= self.max_steps:
            done = True

        return self._get_obs(), reward, done, False, {}

# 병렬 환경 생성 함수
def make_env(grid_map_path, rank, seed=0):
    def _init():
        env = SeaRouteEnv(grid_map_path)
        env.reset(seed=seed + rank)
        return env
    return _init

# 저장 디렉토리 설정
save_dir = r"C:\baramproject\trained_model\sibal14\episode_debug\ppo_path_visualizations"
if not os.path.exists(save_dir):
    os.makedirs(save_dir)

# 그리드 맵 파일 경로
grid_map_path = "C:/baramproject/sibal/land_sea_grid_cartopy_downsized.npy"

# 병렬 환경 설정 (8개 환경 병렬 실행)
num_envs = 8
env = SubprocVecEnv([make_env(grid_map_path, i) for i in range(num_envs)])

# PPO 모델 생성 (GPU 사용, MLP 정책)
model = PPO("MlpPolicy", env, verbose=1, learning_rate=0.0003, clip_range=0.2, 
            n_steps=2048, batch_size=128, device='cuda')

# 학습 진행
total_episodes = 1000
rewards = []
for episode in tqdm(range(total_episodes), desc="학습 진행률"):
    obs = env.reset()
    done = np.array([False] * num_envs)
    total_reward = 0
    step_count = 0
    while not all(done) and step_count < 1000:
        action, _ = model.predict(obs)
        obs, reward, done, _ = env.step(action)
        total_reward += reward.mean()
        step_count += 1
    rewards.append(total_reward)
    model.learn(total_timesteps=2048 * num_envs)

    if episode % 100 == 0 and episode > 0:
        single_env = SeaRouteEnv(grid_map_path)
        obs, _ = single_env.reset()
        path = [single_env.current_pos]
        done = False
        while not done:
            action, _ = model.predict(obs)
            obs, _, done, _, _ = single_env.step(action)
            path.append(single_env.current_pos)
        plt.figure(figsize=(10, 8))
        plt.imshow(single_env.grid_map, cmap='gray', origin='upper')
        path_rows = [pos[0] for pos in path]
        path_cols = [pos[1] for pos in path]
        plt.plot(path_cols, path_rows, 'r-', linewidth=2)
        plt.plot(single_env.start[1], single_env.start[0], 'go')
        plt.plot(single_env.goal[1], single_env.goal[0], 'bo')
        plt.title(f"PPO Path at Episode {episode}")
        save_path = os.path.join(save_dir, f"episode_{episode}.png")
        plt.savefig(save_path)
        plt.close()

# 학습된 모델 저장
model.save(r"C:\baramproject\trained_model\sibal14\sea_route_ppo_model")

# 학습 리워드 그래프
plt.plot(rewards)
plt.xlabel('에피소드')
plt.ylabel('총 보상')
plt.title('학습 리워드 그래프')
plt.savefig(os.path.join(save_dir, "reward_graph.png"))
plt.close()

# 테스트
single_env = SeaRouteEnv(grid_map_path)
obs, _ = single_env.reset()
done = False
path = [single_env.current_pos]
while not done:
    action, _ = model.predict(obs)
    obs, _, done, _, _ = single_env.step(action)
    path.append(single_env.current_pos)
print("최적 경로 (그리드 좌표):", path)

# 최종 경로 시각화
plt.figure(figsize=(10, 8))
plt.imshow(single_env.grid_map, cmap='gray', origin='upper')
path_rows = [pos[0] for pos in path]
path_cols = [pos[1] for pos in path]
plt.plot(path_cols, path_rows, 'r-', linewidth=2)
plt.plot(single_env.start[1], single_env.start[0], 'go')
plt.plot(single_env.goal[1], single_env.goal[0], 'bo')
plt.title("Final PPO Path")
plt.savefig(os.path.join(save_dir, "final_path.png"))
plt.close()

Using cuda device


AssertionError: The algorithm only supports (<class 'gymnasium.spaces.box.Box'>, <class 'gymnasium.spaces.discrete.Discrete'>, <class 'gymnasium.spaces.multi_discrete.MultiDiscrete'>, <class 'gymnasium.spaces.multi_binary.MultiBinary'>) as action spaces but Discrete(8) was provided