In [1]:
import pybullet as p
import pybullet_data
import time
import numpy as np
import gymnasium as gym
from gymnasium import spaces

from stable_baselines3 import PPO # Or A2C, DQN for discrete actions
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.vec_env import DummyVecEnv
import time
from stable_baselines3.common.callbacks import ProgressBarCallback

pybullet build time: Jan 29 2025 23:16:28


In [2]:
class MazeCarEnv(gym.Env):
    metadata = {'render_modes': ['human'], "render_fps": 500}

    def __init__(self, render_mode=None):
        super().__init__()

        # --- Define Action Space ---
        # Hier exemplarisch: 2 diskrete Aktionen (links/rechts drehen + vorr√ºcken)
        self.action_space = spaces.Discrete(3)  # 0: links, 1: rechts
        
        # --- Define Observation Space ---
        # Beobachtung: [Auto_x, Auto_y, Auto_yaw, Ziel_x, Ziel_y]
        low = np.array([-6, -6, -np.pi, -6, -6], dtype=np.float32)
        high = np.array([6, 6, np.pi, 6, 6], dtype=np.float32)
        self.observation_space = spaces.Box(low, high, dtype=np.float32)

        # --- PyBullet Setup ---
        self.render_mode = render_mode
        self.client = p.connect(p.DIRECT if render_mode is None else p.GUI)
        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        p.setGravity(0, 0, -9.81, physicsClientId=self.client)

        # Lade eine Plane als Boden
        self.planeId = p.loadURDF("plane.urdf", physicsClientId=self.client)

        # Maze als URDF laden (dieses sollte z. B. als "maze.urdf" im Arbeitsverzeichnis liegen)
        # Die Option useFixedBase=True sorgt daf√ºr, dass das Maze statisch bleibt.
        self.mazeId = p.loadURDF("urdf/maze.urdf",
                                 basePosition=[0, 0, 0],
                                 useFixedBase=True,
                                 physicsClientId=self.client)

        # Ziele definieren (2 verschiedene, beispielhaft)
        # Passe sie an die tats√§chlichen G√§nge/Ausg√§nge in deinem Maze an.
        self.goal_area_1 = np.array([4.5, 4.5])   # oberer Ausgang
        self.goal_area_2 = np.array([4.5, -4.5])  # unterer Ausgang
        self.goal_radius = 0.5
        self.target_goal_pos = None  # Wird in reset() gesetzt
        self.correct_goal_index = self.np_random.integers(0, 2  )  # Zuf√§llige Wahl des Ziels

        # Falls im GUI-Modus: Visualisiere die Ziele als farbige Kugeln
        if p.getConnectionInfo()['connectionMethod'] == p.GUI:
            goal_visual_shape_1 = p.createVisualShape(
                p.GEOM_SPHERE,
                radius=self.goal_radius,
                rgbaColor=[0, 1, 0, 0.5]  # halbdurchsichtig gr√ºn
            )
            goal_visual_shape_2 = p.createVisualShape(
                p.GEOM_SPHERE,
                radius=self.goal_radius,
                rgbaColor=[1, 0, 0, 0.5]  # halbdurchsichtig rot
            )
            p.createMultiBody(baseVisualShapeIndex=goal_visual_shape_1,
                              basePosition=[self.goal_area_1[0], self.goal_area_1[1], 0.1])
            p.createMultiBody(baseVisualShapeIndex=goal_visual_shape_2,
                              basePosition=[self.goal_area_2[0], self.goal_area_2[1], 0.1])

        # Roboter (Auto) laden
        # Passe ggf. den Pfad an, wenn das URDF woanders liegt
        self.start_pos = [-4.5, 0.0, 0.1]  # Start nahe der linken Seite des Maze
        self.start_orn = p.getQuaternionFromEuler([0, 0, 0])
        self.carId = p.loadURDF("urdf/simple_two_wheel_car.urdf",
                                self.start_pos, self.start_orn,
                                physicsClientId=self.client)
        
        # IDs der R√§der (m√ºssen zu deinem Roboter-URDF passen)
        self.left_wheel_joint_index = 1
        self.right_wheel_joint_index = 0

        self.step_counter = 0
        self.max_steps_per_episode = 1000  # z. B. mehr Schritte erlauben

        self.action_repeat = 50

    def _get_obs(self):
        pos, orn_quat = p.getBasePositionAndOrientation(self.carId, physicsClientId=self.client)
        euler = p.getEulerFromQuaternion(orn_quat)
        yaw = euler[2]
        return np.array([pos[0], pos[1], yaw,
                         self.target_goal_pos[0],
                         self.target_goal_pos[1]], dtype=np.float32)

    def _get_info(self):
        car_pos, _ = p.getBasePositionAndOrientation(self.carId, physicsClientId=self.client)
        dist_goal1 = np.linalg.norm(np.array(car_pos[:2]) - self.goal_area_1)
        dist_goal2 = np.linalg.norm(np.array(car_pos[:2]) - self.goal_area_2)
        return {
            "distance_goal1": dist_goal1,
            "distance_goal2": dist_goal2,
            "target_goal_index": self.correct_goal_index
        }

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.step_counter = 0

        # Auto zur√ºcksetzen
        # Zuf√§llige Startposition leicht variieren (z.‚ÄØB. ¬±0.3 in X/Y)
        start_x = self.start_pos[0] + self.np_random.uniform(-0.3, 0.3)
        start_y = self.start_pos[1] + self.np_random.uniform(-0.3, 0.3)
        p.resetBasePositionAndOrientation(self.carId, [start_x, start_y, self.start_pos[2]], self.start_orn, physicsClientId=self.client)
        p.resetBaseVelocity(self.carId,
                            linearVelocity=[0, 0, 0],
                            angularVelocity=[0, 0, 0],
                            physicsClientId=self.client)

        # Zuf√§llige Wahl des "richtigen" Ziels (oder fest vorgegeben)
        # self.correct_goal_index = self.np_random.integers(0, 2)
        self.correct_goal_index = 0  # zum Testen immer Goal 1
        if self.correct_goal_index == 0:
            self.target_goal_pos = self.goal_area_1
        else:
            self.target_goal_pos = self.goal_area_2

        observation = self._get_obs()
        info = self._get_info()

        if self.render_mode == "human":
            self._render_frame()
            # time.sleep(0.02)
        car_pos, _ = p.getBasePositionAndOrientation(self.carId, physicsClientId=self.client)
        self.prev_dist_to_goal = np.linalg.norm(np.array(car_pos[:2]) - self.target_goal_pos)

        return observation, info

    def step(self, action):

        if action == 0:
            left_vel, right_vel = 10.0, 10.0
        elif action == 1:
            left_vel, right_vel = 2.0, 10.0
        else:  # action == 2
            left_vel, right_vel = 10.0, 2.0

        # Setze die Motoren der beiden R√§der
        p.setJointMotorControl2(
            bodyUniqueId=self.carId,
            jointIndex=self.left_wheel_joint_index,
            controlMode=p.VELOCITY_CONTROL,
            targetVelocity=left_vel,
            force=20.0
        )
        p.setJointMotorControl2(
            bodyUniqueId=self.carId,
            jointIndex=self.right_wheel_joint_index,
            controlMode=p.VELOCITY_CONTROL,
            targetVelocity=right_vel,
            force=20.0
        )


        # print(f"Action: {action}, Velocities: L={left_vel}, R={right_vel}, Step={self.step_counter}")
        # car_pos, _ = p.getBasePositionAndOrientation(self.carId, physicsClientId=self.client)
        # print(f"Position: {car_pos}")


        # Jetzt die Simulation mehrmals updaten,
        # damit der Roboter tats√§chlich f√§hrt und gegen W√§nde kollidieren kann
        for _ in range(self.action_repeat):
            p.stepSimulation()
            # if self.render_mode == "human":
            #     time.sleep(0.001) 
            #     print("Step Simulation")
        
        self.step_counter += 1



        # Beobachtung + Reward + Done bestimmen
        observation = self._get_obs()
        info = self._get_info()

        # # Check goal conditions
        car_pos, _ = p.getBasePositionAndOrientation(self.carId, physicsClientId=self.client)
        in_goal_1 = np.linalg.norm(car_pos[:2] - self.goal_area_1) < self.goal_radius
        in_goal_2 = np.linalg.norm(car_pos[:2] - self.goal_area_2) < self.goal_radius

        terminated = False
        # Neue Distanz zum Ziel
        # curr_dist_to_goal = np.linalg.norm(np.array(car_pos[:2]) - self.target_goal_pos)
        # reward = self.prev_dist_to_goal - curr_dist_to_goal  # Belohnung f√ºr Ann√§herung

        # Kleine Zeitstrafe, damit das Auto schneller f√§hrt
        reward = 0.0
        reward -= 0.01

        # Belohnung aktualisieren
        # self.prev_dist_to_goal = curr_dist_to_goal


        if in_goal_1:
            if self.correct_goal_index == 0:
                reward = 50.0
                terminated = True
            else:
                reward = -10.0
                terminated = True
        elif in_goal_2:
            if self.correct_goal_index == 1:
                reward = 50.0
                terminated = True
            else:
                reward = -10.0
                terminated = True

        # Nach dem Schritt: Pr√ºfe auf Kollision
        # Pr√ºfe auf Kontakt mit dem Maze
        contacts = p.getContactPoints(bodyA=self.carId, bodyB=self.mazeId, physicsClientId=self.client)
        if len(contacts) > 0:
            reward -= 1.0  # Strafe f√ºr Kollision mit Maze-Wand
            terminated = True 


        truncated = (self.step_counter >= self.max_steps_per_episode)

        if self.render_mode == "human":
            self._render_frame()

        return observation, reward, terminated, truncated, info

    def render(self):
        # Bei PyBullet im GUI-Modus passiert das Rendering automatisch
        pass

    def _render_frame(self):
        # Debug-Anzeigen, falls erw√ºnscht
        pass

    def close(self):
        p.disconnect(physicsClientId=self.client)


In [None]:
env = MazeCarEnv(render_mode=None) 
# env = MazeCarEnv(render_mode="human")

try:
    check_env(env)
    print("Environment check passed!")
except Exception as e:
    print(f"Environment check failed: {e}")
    env.close()
    exit()

model = PPO("MlpPolicy", 
            env, 
            # verbose=1, 
            tensorboard_log="./ppo_mazecar_tensorboard/",
            batch_size=256, # after n_steps the data is split into batches
            learning_rate=0.0003,
            n_steps=1024,   # number of steps collected before a training
            device="cuda"
)


Environment check passed!


  return torch._C._cuda_getDeviceCount() > 0


In [4]:
# from stable_baselines3.common.callbacks import BaseCallback
# import time

# class EvalCallback(BaseCallback):
#     def __init__(self, eval_env, eval_freq=1000, n_eval_episodes=1, verbose=1):
#         super().__init__(verbose)
#         self.eval_env = eval_env
#         self.eval_freq = eval_freq
#         self.n_eval_episodes = n_eval_episodes
#         self.best_mean_reward = -float("inf")
    
#     def _manual_eval(self):
#         episode_rewards = []
#         for _ in range(self.n_eval_episodes):
#             obs, info = self.eval_env.reset()
#             done = False
#             truncated = False
#             total_reward = 0.0
#             while not (done or truncated):
#                 action, _states = self.model.predict(obs, deterministic=True)
#                 obs, reward, done, truncated, info = self.eval_env.step(action)
#                 total_reward += reward
#                 time.sleep(1. / 120.0)  # Slows simulation so GUI can show movement
#             episode_rewards.append(total_reward)
#         mean = sum(episode_rewards) / len(episode_rewards)
#         std = (sum((r - mean) ** 2 for r in episode_rewards) / len(episode_rewards)) ** 0.5
#         return mean, std

#     def _on_step(self):
#         if self.n_calls % self.eval_freq == 0:
#             print(f"\nüöÄ Evaluating at step {self.n_calls}")
#             mean_reward, std_reward = self._manual_eval()
#             print(f"‚úÖ Step {self.n_calls}: Mean reward = {mean_reward:.2f} ¬± {std_reward:.2f}")

#             if mean_reward > self.best_mean_reward:
#                 self.best_mean_reward = mean_reward
#                 self.model.save(f"best_model_step_{self.n_calls}")
#         return True





In [5]:
# eval_env = MazeCarEnv(render_mode='human')
# eval_freq = 1000 
# eval_callback = EvalCallback(eval_env, eval_freq, verbose=1)

model.learn(total_timesteps=500000, progress_bar=True)

model.save("ppo_mazecar_model_2")
env.close()
# check_env.close()

Output()

In [17]:
# --- To evaluate ---
eval_env = MazeCarEnv(render_mode="human") # Render during evaluation
model = PPO.load("ppo_mazecar_model_2", env=eval_env)

obs, info = eval_env.reset()
for _ in range(500): # Max steps for evaluation
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = eval_env.step(action)
    time.sleep(1./120.0)  # Adjust sleep time for rendering speed
    if terminated or truncated:
        print(f"Evaluation episode finished. Reached target goal: {info.get('target_goal_index')}, Reward: {reward}")
        obs, info = eval_env.reset() # Reset for next evaluation episode

eval_env.close()

startThreads creating 1 threads.
starting thread 0
started thread 0 
argc=2
argv[0] = --unused
argv[1] = --start_demo_name=Physics Server
ExampleBrowserThreadFunc started
X11 functions dynamically loaded using dlopen/dlsym OK!
X11 functions dynamically loaded using dlopen/dlsym OK!
Creating context
Created GL 3.3 context
Direct GLX rendering context obtained
Making context current
GL_VENDOR=Intel
GL_RENDERER=Mesa Intel(R) Graphics (RPL-P)
GL_VERSION=4.6 (Core Profile) Mesa 23.2.1-1ubuntu3.1~22.04.3
GL_SHADING_LANGUAGE_VERSION=4.60
pthread_getconcurrency()=0
Version = 4.6 (Core Profile) Mesa 23.2.1-1ubuntu3.1~22.04.3
Vendor = Intel
Renderer = Mesa Intel(R) Graphics (RPL-P)
b3Printf: Selected demo: Physics Server
startThreads creating 1 threads.
starting thread 0
started thread 0 
MotionThreadFunc thread started
ven = Intel
Workaround for some crash in the Intel OpenGL driver on Linux/Ubuntu
ven = Intel
Workaround for some crash in the Intel OpenGL driver on Linux/Ubuntu
Evaluation episo

In [7]:
env.close()

check_env.close()

eval_env.close()




# car_pos, _ = p.getBasePositionAndOrientation(eval_env.carId, physicsClientId=eval_env.client)
# p.resetDebugVisualizerCamera(cameraDistance=3, cameraYaw=50, cameraPitch=-35, cameraTargetPosition=car_pos)


error: Not connected to physics server.

In [None]:
# ~/masterthesis/mt_start$   tensorboard --logdir=./ppo_mazecar_tensorboard/

In [None]:
import torch
print(torch.cuda.is_available())  # Should return True if GPU is available
print(torch.cuda.get_device_name(0))  # Prints the name of the GPU

True
NVIDIA GeForce RTX 4060 Laptop GPU
