In [1]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import time
import random
import pygame

# --- Constants ---
SCREEN_WIDTH = 1200
SCREEN_HEIGHT = 800
FPS = 60

# Colors
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
RED = (220, 50, 50)
GREEN = (50, 220, 50)
BLUE = (50, 50, 220)
GRAY = (150, 150, 150)

In [2]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pygame

# Define colors
WHITE = (255, 255, 255)
RED = (255, 0, 0)
GREEN = (0, 255, 0)
BLUE = (0, 0, 255)  # Color for the trajectory

class CustomEnv(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 60}

    def __init__(self):
        super().__init__()
        self.grid_size = 10
        self.obstacle_num = 8
        self.distance_threshold_goal = 0.3  # Tolerance for goal proximity
        self.distance_threshold_obstacle = 1  # Tolerance for obstacle proximity
        self.steps = 0

        self.pre_distance = 0  # Previous distance to goal, used for reward shaping
        self.current_distance = 0  # Current distance to goal, used for reward shaping
        # Action space (dx, dy)
        self.action_space = spaces.Box(low=-1, high=1, shape=(2,), dtype=np.float32)
        # Observation space (robot_x, robot_y, goal_x, goal_y)
        self.observation_space = spaces.Box(low=0, high=self.grid_size, shape=(4,), dtype=np.float32)

        # For rendering
        self.window = None
        self.clock = None
        self.cell_size = 50 # Pixels per grid unit
        self.trajectory_points = [] # New: List to store past robot positions

    def _get_obs(self):
        return np.concatenate((self.robot_position, self.goal_position))

    def _get_info(self):
        return {
            "distance_to_goal": np.linalg.norm(self.robot_position - self.goal_position)
        }

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.robot_position = np.random.uniform(0, self.grid_size, size=2)
        self.goal_position = np.random.uniform(0, self.grid_size, size=2)
        self.obstacle_position = [np.random.uniform(0, self.grid_size, size=2) for _ in range(self.obstacle_num)]
        self.steps = 0
        self.trajectory_points = [self.robot_position.copy()] # New: Reset trajectory and add initial position
        self.pre_distance = np.linalg.norm(self.robot_position - self.goal_position)
        return self._get_obs(), self._get_info()

    def _reward(self):
        self.current_distance = np.linalg.norm(self.robot_position - self.goal_position)
        reward = (self.pre_distance - self.current_distance)*50  # Reward shaping based on distance change
        self.pre_distance = self.current_distance
        # penalize
        for obs in self.obstacle_position:
            if np.linalg.norm(self.robot_position - obs) < self.distance_threshold_obstacle:  # If too close to an obstacle
                reward -= 75  # Add a penalty
                # print(f"Collision with obstacle at {obs}, increasing distance penalty.")
                break   
        

        # Check if the robot is close enough to the goal
        if np.linalg.norm(self.robot_position - self.goal_position) < self.distance_threshold_goal:
            reward += 100
        return reward

    def step(self, action):
        stride = 0.5  # Base stride length
        self.robot_position += action * stride  # Scale the action to control speed
        self.robot_position = np.clip(self.robot_position, 0, self.grid_size)

        self.trajectory_points.append(self.robot_position.copy()) # New: Add current position to trajectory

        reward = self._reward()
        # Consider a small tolerance for checking if robot_position is "equal" to goal_position
        terminated = np.linalg.norm(self.robot_position - self.goal_position) < self.distance_threshold_goal
        truncated = False

        info = self._get_info()
        observation = self._get_obs()
        self.steps += 1
        return observation, reward, terminated, truncated, info

    def render(self):
        if self.window is None:
            pygame.init()
            pygame.display.init()
            self.window = pygame.display.set_mode(
                (int(self.grid_size * self.cell_size), int(self.grid_size * self.cell_size))
            )
            pygame.display.set_caption("CustomEnv")
        if self.clock is None:
            self.clock = pygame.time.Clock()
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                import sys
                sys.exit() # Exit the program
            elif event.type == pygame.MOUSEBUTTONDOWN:
                if event.button == 1: # Left mouse button click
                    mouse_x, mouse_y = event.pos # Get mouse position in pixels
                    # Convert pixel coordinates to grid coordinates
                    new_goal_x = mouse_x / self.cell_size
                    new_goal_y = mouse_y / self.cell_size
                    self.goal_position = np.array([new_goal_x, new_goal_y], dtype=np.float32)
                    print(f"New goal set at grid position: {self.goal_position}")


        canvas = pygame.Surface((self.grid_size * self.cell_size, self.grid_size * self.cell_size))
        canvas.fill(WHITE)
        virus_image = pygame.image.load("病毒.png").convert_alpha()  # Load an image if needed, but not used here
        robot_image = pygame.transform.scale(virus_image, (int(self.cell_size * 0.8), int(self.cell_size * 0.8)))  # Scale the image
        # New: Draw the trajectory
        if len(self.trajectory_points) > 1:
            scaled_points = []
            for point in self.trajectory_points:
                scaled_points.append((int(point[0] * self.cell_size), int(point[1] * self.cell_size)))
            
            # Draw lines between consecutive points
            pygame.draw.lines(canvas, BLUE, False, scaled_points, 2) # Blue line, not closed, 2 pixels wide
            
            # Optionally, draw small circles at each point to emphasize
            for point_coord in scaled_points:
                pygame.draw.circle(canvas, BLUE, point_coord, 3) # Small blue circles

        # Draw robot
        pygame.draw.circle(
            canvas,
            RED,
            (int(self.robot_position[0] * self.cell_size), int(self.robot_position[1] * self.cell_size)),
            int(self.cell_size * 0.2)
        )
        # Draw obstacles
        for obs in self.obstacle_position:
            # pygame.draw.circle(
            #     canvas,
            #     GRAY,
            #     (int(obs[0] * self.cell_size), int(obs[1] * self.cell_size)),
            #     int(self.cell_size * 0.5)
            # )
            canvas.blit(robot_image, (int(obs[0] * self.cell_size), int(obs[1] * self.cell_size)))
        # Draw goal
        pygame.draw.circle(
            canvas,
            GREEN,
            (int(self.goal_position[0] * self.cell_size), int(self.goal_position[1] * self.cell_size)),
            int(self.cell_size * 0.3)
        )

        self.window.blit(canvas, canvas.get_rect())
        pygame.event.pump()
        pygame.display.flip()
        self.clock.tick(self.metadata["render_fps"])

    def close(self):
        if self.window is not None:
            pygame.display.quit()
            pygame.quit()

# Example Usage
if __name__ == '__main__':
    env = CustomEnv()
    obs, info = env.reset()

    for episode in range(3): # Run a few episodes to see trajectory resets
        print(f"--- Episode {episode + 1} ---")
        obs, info = env.reset() # Reset environment and clear trajectory
        done = False
        total_reward = 0
        step_count = 0

        while not done and step_count < 200: # Limit steps per episode
            action = env.action_space.sample() * 0.5 # Take a random action, scaled down for slower movement
            obs, reward, terminated, truncated, info = env.step(action)
            total_reward += reward
            step_count += 1
            env.render() # Display current frame with trajectory

            if terminated or truncated:
                done = True
                print(f"Episode finished after {step_count} steps. Total reward: {total_reward:.2f}")

    env.close()


--- Episode 1 ---
--- Episode 2 ---
--- Episode 3 ---


In [237]:
import gymnasium as gym
import torch
from stable_baselines3 import PPO


policy_kwargs = dict(
    net_arch=[dict(pi=[64, 128], vf=[128, 256])],
    activation_fn=torch.nn.ReLU  # 改为 ReLU，通常更适合稀疏奖励
)

model = PPO("MlpPolicy", env, verbose=1, policy_kwargs=policy_kwargs,tensorboard_log="./ppo_custom_env_tensorboard/")


model.learn(total_timesteps=500000)

vec_env = model.get_env()
obs = vec_env.reset()
# for i in range(1000):
#     action, _states = model.predict(obs, deterministic=True)
#     obs, reward, dones, info = vec_env.step(action)
    # vec_env.render()


env.close()

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


ImportError: Trying to log data to tensorboard but tensorboard is not installed.

In [225]:
model.policy

ActorCriticPolicy(
  (features_extractor): FlattenExtractor(
    (flatten): Flatten(start_dim=1, end_dim=-1)
  )
  (pi_features_extractor): FlattenExtractor(
    (flatten): Flatten(start_dim=1, end_dim=-1)
  )
  (vf_features_extractor): FlattenExtractor(
    (flatten): Flatten(start_dim=1, end_dim=-1)
  )
  (mlp_extractor): MlpExtractor(
    (policy_net): Sequential(
      (0): Linear(in_features=4, out_features=64, bias=True)
      (1): Tanh()
      (2): Linear(in_features=64, out_features=64, bias=True)
      (3): Tanh()
    )
    (value_net): Sequential(
      (0): Linear(in_features=4, out_features=64, bias=True)
      (1): Tanh()
      (2): Linear(in_features=64, out_features=64, bias=True)
      (3): Tanh()
    )
  )
  (action_net): Linear(in_features=64, out_features=2, bias=True)
  (value_net): Linear(in_features=64, out_features=1, bias=True)
)

In [227]:
env = CustomEnv()
obs,_ = env.reset()
print(obs)
for i in range(1000000):
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, teminated,_, info = env.step(action)
    print(action,info["distance_to_goal"])

    env.render()
    time.sleep(0.6)  # Control the frame rate
    if teminated:
        env.close()
        break
        print("Resetting environment")



[7.94845784 9.2430564  5.05651407 1.34419754]
[-1. -1.] 7.7758927148843675
[-1. -1.] 7.153579864232719
[-1. -1.] 6.548503816485092
[-1. -1.] 5.965911463899196
[-1. -1.] 5.4130672410055904
New goal set at grid position: [5.96 0.36]
[-1.        -0.9270272] 6.005347975585182
[-0.9417341  -0.96099925] 5.637439804249912
[-0.70692027 -0.841851  ] 5.343399771780247
[-0.5242584 -0.7220329] 5.107855669041954
[-0.38124934 -0.5993341 ] 4.921892781592191
[-0.27338487 -0.48949495] 4.774552089583291
[-0.18730712 -0.40951082] 4.649422826425122
[-0.12369078 -0.34163368] 4.541654786013297
[-0.07939956 -0.2819771 ] 4.449631102906388
[-0.05550933 -0.2384766 ] 4.370592196053318
[-0.04085033 -0.2003997 ] 4.303945723502053
[-0.02986712 -0.16882241] 4.247463090172077
[-0.02216529 -0.1430105 ] 4.199384696228254
[-0.01781329 -0.11984728] 4.1593873304458295
[-0.0153974  -0.09951131] 4.126757375959005
[-0.01326987 -0.08264637] 4.100089806207518
[-0.01181624 -0.06861416] 4.078410801555416
[-0.01105777 -0.05688451

SystemExit: 

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
