In [116]:
import gymnasium as gym
from gymnasium import spaces
from gymnasium.utils import seeding
import numpy as np
import pygame


class BinpickingPathFindingEnv(gym.Env):
    def __init__(
        self,
        screen_width=800,
        screen_height=600,
        table_width=600,
        table_height=400,
        step_size=15,
        time_limit=1000,
    ):
        super(BinpickingPathFindingEnv, self).__init__()

        self.screen_width = screen_width
        self.screen_height = screen_height
        self.table_width = table_width
        self.table_height = table_height
        self.step_size = step_size
        self.time_limit = time_limit

        self.table_top_left = (
            (self.screen_width - self.table_width) // 2,
            (self.screen_height - self.table_height) // 2,
        )
        self.table_bottom_right = (
            self.table_top_left[0] + self.table_width,
            self.table_top_left[1] + self.table_height,
        )

        # Observation space: object position and all goals
        self.observation_space = spaces.Box(
            low=0, high=max(screen_width, screen_height), shape=(2 + 4 * 2,), dtype=np.float32
        )

        # Action space: 120 discrete angles (3 degrees apart)
        self.action_space = spaces.Discrete(120)

        self.colors = {
            "background": (30, 30, 30),
            "table": (50, 50, 150),
            "object": (0, 255, 0),
            "goal": (255, 0, 0),
        }

        self.goal_size = 50
        self.object_radius = 15
        self.screen = None
        self.clock = None

        self.reset()

    def reset(self, seed=None, options=None):
        if seed is not None:
            self.seed(seed)

        self.object = np.random.rand(2) * [
            self.table_width - 2 * self.object_radius,
            self.table_height - 2 * self.object_radius,
        ]
        self.object += [
            self.table_top_left[0] + self.object_radius,
            self.table_top_left[1] + self.object_radius,
        ]
        self.object = self.object.astype(np.float32)

        top_goal_x = self.table_top_left[0] + self.goal_size // 1.1
        bottom_goal_x = self.table_bottom_right[0] - self.goal_size // 1.1
        top_goal_y = self.table_top_left[1] - self.goal_size // 2
        bottom_goal_y = self.table_bottom_right[1] + self.goal_size // 2

        self.goals = np.array([
            [top_goal_x, top_goal_y],
            [bottom_goal_x, top_goal_y],
            [top_goal_x, bottom_goal_y],
            [bottom_goal_x, bottom_goal_y],
        ]).astype(np.float32)

        distances = np.linalg.norm(self.goals - self.object, axis=1)
        self.closest_goal_index = np.argmin(distances)

        self.time_steps = 0
        return self.get_observation(), {}

    def step(self, action):
        self.time_steps += 1

        # Convert discrete action to angle θ in radians
        theta = action * (2 * np.pi / self.action_space.n)
        # directions = {
        #     0: np.array([1, 0]),    # Right
        #     1: np.array([0, 1]),    # Down
        #     2: np.array([-1, 0]),   # Left
        #     3: np.array([0, -1]),   # Up
        #     4: np.array([1, 1]),    # Down-right
        #     5: np.array([-1, 1]),   # Down-left
        #     6: np.array([-1, -1]),  # Up-left
        #     7: np.array([1, -1]),   # Up-right
        # }
        # move = directions[action] * self.step_size
        # Move the object
        move = np.array([np.cos(theta), np.sin(theta)]) * self.step_size
        new_pos = self.object + move

        reward = 0
        target = self.goals[self.closest_goal_index]
        prev_distance_to_goal = np.linalg.norm(self.object - target)

        if (
            new_pos[0] < self.table_top_left[0]
            or new_pos[0] > self.table_bottom_right[0]
            or new_pos[1] < self.table_top_left[1]
            or new_pos[1] > self.table_bottom_right[1]
        ):
            reward += -10
            done = True
        else:
            self.object = new_pos
            reward += 1

            distance_to_goal = np.linalg.norm(self.object - target)
            distance_change = prev_distance_to_goal - distance_to_goal

            if distance_change > 0:
                reward += 0.5 * distance_change
            elif distance_change < 0:
                reward -= 0.5 * abs(distance_change)

            done = False
            if distance_to_goal < self.goal_size // 2:
                reward += 50
                done = True

        reward += max(0, (self.time_limit - self.time_steps) / self.time_limit)
        truncated = False
        return self.get_observation(), reward, done, truncated, {}

    def render(self, mode="human"):
        if self.screen is None:
            pygame.init()
            self.screen = pygame.display.set_mode((self.screen_width, self.screen_height))
            pygame.display.set_caption("Rectangular Table Pathfinding")
            self.clock = pygame.time.Clock()

        self.screen.fill(self.colors["background"])

        pygame.draw.rect(
            self.screen,
            self.colors["table"],
            pygame.Rect(
                self.table_top_left[0], self.table_top_left[1], self.table_width, self.table_height
            ),
        )

        for i, goal in enumerate(self.goals):
            color = self.colors["goal"] if i == self.closest_goal_index else (100, 100, 100)
            pygame.draw.rect(
                self.screen, 
                color, 
                pygame.Rect(goal[0] - self.goal_size // 2, goal[1] - self.goal_size // 2, self.goal_size, self.goal_size)
            )

        pygame.draw.circle(self.screen, self.colors["object"], self.object.astype(int), self.object_radius)

        pygame.display.flip()
        self.clock.tick(30)

    def close(self):
        if self.screen is not None:
            pygame.quit()

    def get_observation(self):
        return np.concatenate([self.object.flatten(), self.goals.flatten()]).astype(np.float32)

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return seed


In [117]:
env = BinpickingPathFindingEnv()

In [None]:
env.action_space.

120

In [210]:
env.get_observation()

array([338.96487, 169.07323, 145.     ,  75.     , 655.     ,  75.     ,
       145.     , 525.     , 655.     , 525.     ], dtype=float32)

In [None]:
from stable_baselines3 import PPO, TD3
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import BaseCallback

# Initialize the environment
env = BinpickingPathFindingEnv()





In [113]:
# Check the environment
check_env(env, warn=True)



In [114]:
# Initialize PPO model
model = TD3("MlpPolicy", env, verbose=1)

# Initialize the callback

# Train the model
model.learn(total_timesteps=1000000)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 51.2     |
|    ep_rew_mean     | 377      |
| time/              |          |
|    episodes        | 4        |
|    fps             | 320      |
|    time_elapsed    | 0        |
|    total_timesteps | 205      |
| train/             |          |
|    actor_loss      | -18.9    |
|    critic_loss     | 29.7     |
|    learning_rate   | 0.001    |
|    n_updates       | 104      |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 42.5     |
|    ep_rew_mean     | 317      |
| time/              |          |
|    episodes        | 8        |
|    fps             | 316      |
|    time_elapsed    | 1        |
|    total_timesteps | 340      |
| train/             |          |
|    actor_loss      | -22      |
|    critic_loss     

KeyboardInterrupt: 