In [2]:
import math
import random
import sys
import numpy as np
import pygame

import gym
from gym import spaces

from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv

#
# 1) HumanLikeAgent replaced by "manual" input from keyboard in the enjoy phase.
#    If you still want to simulate a "human" automatically, re-introduce that code.
#


class CursorControlEnv(gym.Env):
    """
    A 2D environment with:
      - Exactly ONE randomly chosen goal each episode
      - Weighted sum of agent_action + manual/human_action
      - Reward for staying close to the perfect path from start->goal (straight line).
    No obstacles.

    In "training mode", we simulate a random 'human' so the agent can learn a policy.
    In "manual mode", we read keyboard input so you can see how the policy behaves with your input.
    """

    def __init__(
        self,
        width=800,
        height=600,
        gamma=0.5,              # weight for PPO agent's action in the sum
        max_steps=500,
        goal_distance_threshold=20.0,
        render_mode=False
    ):
        super().__init__()
        self.width = width
        self.height = height
        self.gamma = gamma
        self.max_steps = max_steps
        self.goal_distance_threshold = goal_distance_threshold

        # Spaces
        # PPO agent outputs a 2D action in [-1,1].
        # We'll combine it with a "human/manual" action (also in [-1,1]) in step().
        self.action_space = spaces.Box(
            low=np.array([-1.0, -1.0], dtype=np.float32),
            high=np.array([1.0,  1.0], dtype=np.float32),
            shape=(2,),
            dtype=np.float32
        )
        # Observations: [dot_x, dot_y, goal_x, goal_y]
        high = np.array([width, height, width, height], dtype=np.float32)
        self.observation_space = spaces.Box(
            low=np.zeros_like(high, dtype=np.float32),
            high=high,
            shape=(4,),
            dtype=np.float32
        )

        # State
        self.dot_pos = None
        self.start_pos = None
        self.goal_pos = None
        self.current_step = 0
        self.done_flag = False

        self.max_speed = 3.0  # scale for final movement
        self.dot_radius = 10.0

        # Rendering
        self.render_mode = render_mode
        if self.render_mode:
            pygame.init()
            self.screen = pygame.display.set_mode((self.width, self.height))
            pygame.display.set_caption("Cursor Control: Manual + Agent")
            self.clock = pygame.time.Clock()
            self.font = pygame.font.SysFont(None, 24)

        # For training mode, we simulate a random human action
        # (If you want a better "human" model, adapt these.)
        self.simulate_human_noise = 0.3

    def reset(self):
        self.done_flag = False
        self.current_step = 0
        self.dot_pos = np.array([self.width/2, self.height/2], dtype=np.float32)
        self.start_pos = self.dot_pos.copy()

        gx = random.uniform(0, self.width)
        gy = random.uniform(0, self.height)
        self.goal_pos = np.array([gx, gy], dtype=np.float32)

        return self._get_obs()

    def step(self, agent_action, manual_action=None):
        """
        Step the environment:
          agent_action: (dx, dy) from PPO in [-1,1].
          manual_action: (dx, dy) from keyboard/human in [-1,1].
                         If None, we simulate a random 'human' for training.

        Weighted sum:
          combined = gamma*(agent_action) + (1-gamma)*(manual_action)
        Then scaled by self.max_speed, applied to self.dot_pos.

        Reward = negative distance to the perfect line (start->goal),
                 +10 if within threshold of the goal.
        """
        if self.done_flag:
            # If called after done, just return the current state
            return self._get_obs(), 0.0, True, {}

        self.current_step += 1
        if self.current_step >= self.max_steps:
            self.done_flag = True
            return self._get_obs(), 0.0, True, {}

        # If no manual_action is provided, we assume "training mode"
        if manual_action is None:
            manual_action = self._simulate_random_human()

        # Weighted sum
        combined_x = self.gamma * agent_action[0] + (1 - self.gamma) * manual_action[0]
        combined_y = self.gamma * agent_action[1] + (1 - self.gamma) * manual_action[1]

        # Scale by max_speed
        dx = float(combined_x) * self.max_speed
        dy = float(combined_y) * self.max_speed

        new_x = np.clip(self.dot_pos[0] + dx, 0, self.width)
        new_y = np.clip(self.dot_pos[1] + dy, 0, self.height)
        self.dot_pos = np.array([new_x, new_y], dtype=np.float32)

        # Reward shaping
        dist_line = self._distance_to_line(self.start_pos, self.goal_pos, self.dot_pos)
        reward = -0.01 * dist_line

        dist_to_goal = np.linalg.norm(self.dot_pos - self.goal_pos)
        if dist_to_goal < self.goal_distance_threshold:
            reward += 10.0
            self.done_flag = True

        return self._get_obs(), reward, self.done_flag, {}

    def render(self, mode="human"):
        if not self.render_mode:
            return
        # Standard Pygame event check so the window doesn't freeze
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                sys.exit()

        self.screen.fill((255, 255, 255))

        # Perfect path line
        pygame.draw.line(
            self.screen, (200, 200, 200),
            (int(self.start_pos[0]), int(self.start_pos[1])),
            (int(self.goal_pos[0]), int(self.goal_pos[1])),
            2
        )
        # Goal
        pygame.draw.circle(
            self.screen,
            (255, 0, 0),
            (int(self.goal_pos[0]), int(self.goal_pos[1])),
            8
        )
        # Dot
        pygame.draw.circle(
            self.screen,
            (0, 0, 255),
            (int(self.dot_pos[0]), int(self.dot_pos[1])),
            int(self.dot_radius)
        )

        text = self.font.render(f"Step: {self.current_step}", True, (0,0,0))
        self.screen.blit(text, (10, 10))

        pygame.display.flip()
        self.clock.tick(60)

    def close(self):
        if self.render_mode:
            pygame.quit()

    def _get_obs(self):
        return np.array([
            self.dot_pos[0],
            self.dot_pos[1],
            self.goal_pos[0],
            self.goal_pos[1]
        ], dtype=np.float32)

    def _simulate_random_human(self):
        """
        For training: a simple random direction to emulate a 'confused' or random user.
        Could be replaced by a more realistic noisy action toward the goal.
        """
        dx = np.random.uniform(-1, 1)
        dy = np.random.uniform(-1, 1)
        # Optional mild normalization
        mag = math.hypot(dx, dy)
        if mag > 1e-6:
            dx /= mag
            dy /= mag
        # Add small noise
        dx += np.random.normal(0, self.simulate_human_noise)
        dy += np.random.normal(0, self.simulate_human_noise)
        # Re-normalize if we want to keep it in [-1,1]
        mag = math.hypot(dx, dy)
        if mag > 1e-6:
            dx /= mag
            dy /= mag
        return np.array([dx, dy], dtype=np.float32)

    def _distance_to_line(self, start, end, point):
        line_len = np.linalg.norm(end - start)
        if line_len < 1e-9:
            return np.linalg.norm(point - start)
        t = np.dot(point - start, end - start) / (line_len**2)
        t = max(0.0, min(1.0, t))
        proj = start + t * (end - start)
        return np.linalg.norm(point - proj)


def run_manual_control(model):
    """
    Let you manually control the environment (via arrow keys).
    The PPO model also outputs an action each step.
    We do a weighted sum of (model_action, your_action).
    This loop continues until you close the window.
    """
    env = CursorControlEnv(render_mode=True)  # GUI
    obs = env.reset()
    done = False
    total_reward = 0.0

    print("\nManual control + PPO running!")
    print("Use arrow keys to move. Window will remain open until you close it.")

    while True:  # We'll let you run multiple episodes in a row
        # If episode is done, reset
        if done:
            print(f"Episode finished. Total reward: {total_reward:.2f}")
            obs = env.reset()
            done = False
            total_reward = 0.0

        # 1) Check for user input from pygame
        #    We'll convert arrow keys to a [-1,1] action
        #    e.g. left arrow => (-1, 0), right arrow => (+1, 0)
        pygame.event.pump()  # to get current key states
        keys = pygame.key.get_pressed()
        user_dx = 0.0
        user_dy = 0.0
        if keys[pygame.K_LEFT]:
            user_dx -= 1.0
        if keys[pygame.K_RIGHT]:
            user_dx += 1.0
        if keys[pygame.K_UP]:
            user_dy -= 1.0
        if keys[pygame.K_DOWN]:
            user_dy += 1.0

        # If you want to exit, press ESC or close the window
        if keys[pygame.K_ESCAPE]:
            break

        # Normalize if needed
        mag = math.hypot(user_dx, user_dy)
        if mag > 1e-6:
            user_dx /= mag
            user_dy /= mag

        manual_action = np.array([user_dx, user_dy], dtype=np.float32)

        # 2) Get the PPO model's action
        model_action, _states = model.predict(obs, deterministic=True)

        # 3) Step the environment with both
        obs, reward, done, info = env.step(
            agent_action=model_action,
            manual_action=manual_action
        )
        total_reward += reward

        # 4) Render
        env.render()

    env.close()
    print("Closed manual control window. Exiting...")


def main():
    """
    1) Train a PPO model in a headless environment (where 'human' is random).
    2) Afterwards, let the user manually control the dot (arrow keys) combined
       with the model’s action in real time.
    """

    # 1) TRAINING (headless)
    train_env = CursorControlEnv(render_mode=False)
    vec_env = DummyVecEnv([lambda: train_env])
    model = PPO(
        "MlpPolicy",
        vec_env,
        verbose=1,
        n_steps=1024,
        batch_size=64,
        learning_rate=3e-4,
        gamma=0.99,
        ent_coef=0.01
    )

    print("Training PPO for 100,000 steps (this may take a while)...")
    model.learn(total_timesteps=100_000)
    print("Training complete!")

    # 2) MANUAL CONTROL DEMO
    run_manual_control(model)


if __name__ == "__main__":
    main()


TypeError: Descriptors cannot be created directly.
If this call came from a _pb2.py file, your generated code is out of date and must be regenerated with protoc >= 3.19.0.
If you cannot immediately regenerate your protos, some other possible workarounds are:
 1. Downgrade the protobuf package to 3.20.x or lower.
 2. Set PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python (but this will use pure-Python parsing and will be much slower).

More information: https://developers.google.com/protocol-buffers/docs/news/2022-05-06#python-updates