# Init

In [1]:
import pygame
import numpy as np
import gymnasium as gym

pygame 2.6.1 (SDL 2.28.4, Python 3.12.6)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
def human_render_loop(agent, env, transpose=False, print_step=False):
    obs, info = env.reset()

    clock = pygame.time.Clock()
    fps = env.metadata["render_fps"]

    pygame.display.init()
    size = env.render().shape[:2]
    if transpose:
        size = size[::-1]

    display = pygame.display.set_mode(size)

    term = trunc = quit = False
    step = 0
    total_reward = 0

    if print_step:
        print("initial state")
        print("\tobservation", obs)
        print("\tinfo", info)

    while not (term or trunc or quit):
        quit = pygame.key.get_pressed()[pygame.K_q]

        action = agent(obs)
        obs, reward, term, trunc, info = env.step(action)

        pygame.event.pump()

        step += 1
        total_reward += reward

        if print_step:
            print("step", step)
            print("\taction", action)
            print("\tobservation", obs)
            print("\treward", reward)
            print("\tterminated", term)
            print("\ttruncated", trunc)
            print("\tinfo", info)

        frame = env.render()
        if transpose:
            frame = frame.transpose((1, 0, 2))

        pygame.surfarray.blit_array(display, frame)
        pygame.display.flip()
        clock.tick(fps)

    if print_step:
        print("Total reward", total_reward)

    pygame.display.quit()

# Rocket Landing Game

In [3]:
# pygame.init()

WIDTH, HEIGHT = 1200, 900
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)

gravity = 0.5
mass = 1

# screen = pygame.display.set_mode((WIDTH, HEIGHT))
# pygame.display.set_caption("Rocket Landing Game")
# clock = pygame.time.Clock()

class Rocket:
    def __init__(self, state):
        # state: ['x', 'y', 'vx', 'vy', 'theta', 'w']
        # Cmd: ['Thrust', 'delta']

        self.image = pygame.image.load("rocket.png")
        # self.width, self.height = self.image.get_size()
        self.width = 20  
        self.height = 60

        # State Init
        self.state = state

        # Cmd Init
        self.thruster_angle = 0
        self.thrust = 0

        # Thrust position from direct kinematic
        self.thrust_x = self.state[0] + np.sin(np.radians(self.state[4])) * self.height / 2
        self.thrust_y = self.state[1] + np.cos(np.radians(self.state[4])) * -self.height / 2

        # Kinematic params
        self.mass = mass
        self.g = -gravity
        self.I = 50 #1/12 * self.mass * (self.width**2 + self.height**2)

        self.thrust_on = False

    def update_state(self, cmd, dt):
        x, y, vx, vy, theta, w = self.state   
        T, delta = np.array(cmd)

        self.thruster_angle = theta - delta
        self.thrust = T

        # Forces applied by thrust
        Fx = np.sin(np.radians(self.thruster_angle)) * T
        Fy = np.cos(np.radians(self.thruster_angle)) * T

        # Acceleration from thrust
        ax = Fx / self.mass
        ay = -Fy / self.mass - self.g

        ## Update linear states
        # Update speed
        vx += ax * dt * 30 
        vy += ay * dt * 30

        # Update position
        x += vx * dt * 30
        y += vy * dt * 30

        # Angular acceleration due to thrust
        self.thrust_x = (x + np.sin(np.radians(theta)) * -self.height / 2)
        self.thrust_y = (y + np.cos(np.radians(theta)) * self.height / 2)

        lever_arm_x = self.thrust_x - x
        lever_arm_y = self.thrust_y - y

        alpha = (lever_arm_x * Fy + lever_arm_y * Fx) / self.I

        ## Update angular states
        # Update speed
        w += alpha * dt * 30

        # Update angle
        theta += w * dt * 30

        self.state = np.array([x, y, vx, vy, theta, w])

    def get_state(self):
        return self.state

    def draw(self, screen):
        x, y, _, _, theta, _ = self.state

        rotated_image = pygame.transform.rotate(self.image, -theta)
        new_rect = rotated_image.get_rect(center=(x, y))
        screen.blit(rotated_image, new_rect.topleft)

        thrust_length = self.thrust * 50
        thrust_x = self.thrust_x - thrust_length * np.sin(np.radians(self.thruster_angle))
        thrust_y = self.thrust_y + thrust_length * np.cos(np.radians(self.thruster_angle))

        pygame.draw.line(screen, (255, 0, 0), (self.thrust_x, self.thrust_y), (thrust_x, thrust_y), 3)

    def check_collision(self, screen, target):
        _, y, _, _, _, _ = self.state

        # Floor
        floor_y = target[0] + self.height + 80
        pygame.draw.rect(screen, (139, 69, 19), (0, floor_y, WIDTH, HEIGHT - y))  # Brown-colored floor

        tower_width = 10
        tower_height = self.height + 40
        tower_x = target[0] + self.width + tower_width
        tower_y = floor_y - tower_height
        pygame.draw.rect(screen, (150, 150, 150), (tower_x, tower_y, tower_width, tower_height))

        # Ground collision
        # if y + self.height / 2  >= floor_y:
        #     self.state[1] = floor_y - self.height / 2
            
        #     if np.isclose(self.state, target, atol=5).all():
        #         return "win"  
        #     else:
        #         return "lose"  
        
        return "continue"

# Gym Environment 

Les espaces d'observations et d'action sont établie selon la physique du jeu (selon le nombre de pixel et le nombre de FPS)

In [4]:
class RocketLandingEnv(gym.Env):
    metadata = {"render_modes": ["rgb_array"], "render_fps": 30}

    def __init__(self, cible = [WIDTH // 2, HEIGHT - HEIGHT // 4, 0.0, 0.0, 0.0, 0.0], world_size=(WIDTH, HEIGHT), render_mode=None):
        super(RocketLandingEnv, self).__init__()

        self.world_size = self.HEIGHT, self.WIDTH = world_size

        # State Init
        self.state = np.array([0, 0, 0, 0, 0, 0])
        self.cible = cible

        self.rocket = Rocket(self.state)

        # Cmd Init
        self.cmd = np.zeros(2)
        self.dt = 1 / self.metadata["render_fps"]
        self.t = 0

        # State and action spaces
        self.observation_space = gym.spaces.Box(
            low= np.array([0, 0, -20, -20, -180, -10]),
            high= np.array([self.WIDTH, self.HEIGHT, 20, 20, 180, 10]), 
            shape=(6,),
            dtype=np.float32)
        self.action_space = gym.spaces.Box(low=np.array([gravity * mass - 0.7, -15]), high=np.array([gravity * mass + 0.7, 15]), shape=(2,), dtype=np.float32)

        self.game_over = "continue"
        self.init_offset = 100 # Pixels
        self.render_mode = render_mode

    def _dynamic_step(self, action):
        self.t += self.dt
        self.cmd = action
        
        self.rocket.update_state(self.cmd, self.dt)

        self.state = self.rocket.get_state()
        
    def _g(self):
        u = self.cmd
        dstate = self.cible - self._get_observation()
        
        Q = np.diag([1., 1., 0.1, 0.1, 5., 0.1]) # x, y, vx, vy, theta, w
        R = np.diag([0.001, 0.001]) # T, delta

        cout = (np.dot(dstate.T, np.dot(Q , dstate)) + np.dot(u.T, np.dot(R ,u))) * self.dt

        if (np.linalg.norm(dstate) < 2.5):
            cout = 0.0
            
        cout += 100 * (not self.observation_space.contains(self.state))

        return cout 

    def _get_observation(self):
        return (self.state).astype(np.float32)

    def _should_truncate(self):
        return not self.observation_space.contains(self._get_observation())

    def _should_terminate(self):

        dx = self.cible - self._get_observation()
        return (np.linalg.norm(dx) < 2.5)
    
    def _get_info(self):
        return {
            "t": self.t,
            "distance": np.linalg.norm(self.cible - self.state)
        }

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        self.t = 0

        self.state = np.array([np.random.uniform(WIDTH // 2 - self.init_offset, WIDTH // 2 + self.init_offset), HEIGHT // 4, 0, 0, 0, 0])
        #self.state = np.array([WIDTH // 2 , HEIGHT // 4, 0, 0, 0, 0])
        self.rocket = Rocket(self.state)
        observation = self._get_observation()

        self.cmd = np.zeros(2)

        info = self._get_info()

        return observation, info

    def step(self, action):
        self._dynamic_step(action)
        observation = self._get_observation()
        terminate = self._should_terminate()
        truncate = self._should_truncate()

        reward = -self._g()
        info = self._get_info()
        
        return observation, reward, terminate, truncate, info

    def render(self):
        if self.render_mode == "rgb_array":

            size = pygame.Vector2(self.world_size)
            surface = pygame.Surface(size, flags=pygame.SRCALPHA)

            self.game_over = self.rocket.check_collision(surface, self.cible)

            self.rocket.draw(surface)
        
            return pygame.surfarray.pixels3d(surface)

In [5]:
pygame.quit()

In [6]:
gym.register(id="RocketLandingEnv-v0", entry_point="__main__:RocketLandingEnv")

In [17]:
env = gym.make("RocketLandingEnv-v0", render_mode="rgb_array", max_episode_steps=1000)

def human_agent(_obs):
    keys = pygame.key.get_pressed()
    pygame.event.pump()

    delta_input = 0
    trust_input = 0

    if keys[pygame.K_UP]:
        if not env.rocket.thrust_on:
            env.rocket.thrust_on = True
    else:
        env.rocket.thrust_on = False

    if env.rocket.thrust_on:
        trust_input = gravity * mass + 0.2
    else:
        trust_input = 0

    if keys[pygame.K_LEFT]:
        delta_input = -15
    if keys[pygame.K_RIGHT]:
        delta_input = 15

    action = np.array([trust_input, delta_input])

    return action

human_render_loop(human_agent, env, print_step=False)

# PPO Agent Training

## Wrappers

In [8]:
class NormalizeObservation(gym.ObservationWrapper):

    def __init__(self, env):
        super().__init__(env)
        self.observation_space = gym.spaces.Box(-1, 1, shape=env.observation_space.shape, dtype=env.observation_space.dtype)

    def observation(self, obs):
        low = self.env.observation_space.low
        high = self.env.observation_space.high
        return -1 + 2 / (high - low) * (high - obs)

In [9]:
class NormalizeAction(gym.ActionWrapper):

    def __init__(self, env):
        super().__init__(env)
        self.action_space = gym.spaces.Box(-1, 1, shape=env.action_space.shape, dtype=env.action_space.dtype)

    def action(self, act):
        low = self.env.action_space.low
        high = self.env.action_space.high
        return low + (high-low) / 2 * (1 - act)

In [10]:
class ScaleReward(gym.RewardWrapper):

    def __init__(self, env, factor):
        super().__init__(env)
        self.factor = factor

    def reward(self, reward):
        return reward * self.factor

## Training

In [11]:
env = gym.make("RocketLandingEnv-v0", render_mode="rgb_array", max_episode_steps=1000)
env = NormalizeObservation(env)
env = NormalizeAction(env)
env = ScaleReward(env, factor=1/10000)

In [13]:
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EvalCallback

eval_callback = EvalCallback(
    env,
    best_model_save_path="best_model",
    log_path="logs",
    eval_freq=10000,
    deterministic=True,
    render=False
)

model = PPO("MlpPolicy", env, verbose=1, n_epochs=100, tensorboard_log="ppo_rocket_tensorboard")
model.learn(total_timesteps=500_000, callback=eval_callback)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to ppo_rocket_tensorboard\PPO_19


---------------------------------
| rollout/           |          |
|    ep_len_mean     | 107      |
|    ep_rew_mean     | -85.6    |
| time/              |          |
|    fps             | 853      |
|    iterations      | 1        |
|    time_elapsed    | 2        |
|    total_timesteps | 2048     |
---------------------------------


KeyboardInterrupt: 

# Test the Agent

In [None]:
model = PPO.load("Best_Model/Rocket_RL_Rand300.zip")
def agent(obs):
    return model.predict(obs, deterministic=True)[0]

human_render_loop(agent, env, print_step=False)

## Train from a model
Changer l'environnement pour augmenter le décallage par rapport à la cible et réentrainer en partant du model précédent

In [109]:
model = PPO.load("Best_Model/Rocket_RL_Rand300.zip", env=env)


model.learn(total_timesteps=500_000, callback=eval_callback)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to ppo_rocket_tensorboard\PPO_24


---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1e+03    |
|    ep_rew_mean     | -25.7    |
| time/              |          |
|    fps             | 2452     |
|    iterations      | 1        |
|    time_elapsed    | 0        |
|    total_timesteps | 2048     |
---------------------------------


KeyboardInterrupt: 