# Customizing OpenAI Gym Environments and Implementing Reinforcement Learning Agents with Stable Baselines

### Theme: Car Racing

- Constança
- Daniela Osório, 202208679
- Inês Amorim, 202108108

---

## Imports

In [None]:
%pip install -r requirements.txt

In [None]:
import gymnasium as gym
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
import os
import gc
from eval import *
from custom_cr import CustomCarRacing

---

## 1. Introduction

The CarRacing-v3 environment from Gymnasium (previously Gym) is part of the Box2D environments, and it offers an interesting challenge for training reinforcement learning agents. It's a top-down racing simulation where the track is randomly generated at the start of each episode. The environment offers both continuous and discrete action spaces, making it adaptable to different types of reinforcement learning algorithms.

- **Action Space:**

   - **Continuous:** Three actions: steering, gas, and braking. Steering ranges from -1 (full left) to +1 (full right).
   -  **Discrete:** Five possible actions: do nothing, steer left, steer right, gas, and brake.

- **Observation Space:**

    - The environment provides a 96x96 RGB image of the car and the track, which serves as the state input for the agent.

- **Rewards:**

    - The agent receives a -0.1 penalty for every frame, encouraging efficiency.
    - It earns a positive reward for visiting track tiles: the formula is Reward=1000−0.1×framesReward=1000−0.1×frames, where "frames" is the number of frames taken to complete the lap. The reward for completing a lap depends on how many track tiles are visited.

- Episode Termination:

    - The episode ends either when all track tiles are visited or if the car goes off the track, which incurs a significant penalty (-100 reward).

In [2]:
env = gym.make("CarRacing-v3", continuous=False, render_mode='rgb_array') 
obs, info = env.reset()
#continuous = False to use Discrete space

In [9]:
#check render modes
print(env.metadata["render_modes"])

['human', 'rgb_array', 'state_pixels']


- Checking if everything is okay and working

In [10]:
# Reset the environment and render the first frame
obs, info = env.reset()

# Close the environment
env.close()

print("Environment initialized successfully!")

Environment initialized successfully!


In [11]:
print("Action space:", env.action_space)

Action space: Discrete(5)


In [12]:
print("Action Space:", env.action_space)
print("Observation Space:", env.observation_space)
print("Environment Metadata:", env.metadata)


Action Space: Discrete(5)
Observation Space: Box(0, 255, (96, 96, 3), uint8)
Environment Metadata: {'render_modes': ['human', 'rgb_array', 'state_pixels'], 'render_fps': 50}


In [3]:
obs = env.reset()
for _ in range(10):
    """action = env.action_space.sample()  # Random action
    print(f"Action before step: {action}, Type: {type(action)}")
    obs, reward, done, info = env.step(action)"""
    env.step(env.action_space.sample())

env.close()

---
## 2. Training with DQN

In [1]:
MODELS_DIR = '../../labiacd/models/#isia'

### 2.1. Baseline

In [2]:
env = gym.make("CarRacing-v3", continuous=False)

In [3]:
obs = env.reset()

In [6]:
#create directories
logs_dir = 'DQN_baseline_logs'
logs_path = os.path.join(MODELS_DIR, logs_dir)
os.makedirs(logs_path, exist_ok=True)

#video_dir = os.path.join(logs_path, "videos")
tensorboard_dir = os.path.join(logs_path, "tensorboard")
model_dir = os.path.join(logs_path, "models")
#os.makedirs(video_dir, exist_ok=True)
os.makedirs(tensorboard_dir, exist_ok=True)
os.makedirs(model_dir, exist_ok=True)

In [None]:
# Simple DQN architecture
policy_kwargs = dict(net_arch=[64, 64])  # Simpler architecture with 2 layers of 64 units

# Set up the model with simpler hyperparameters
model = DQN('CnnPolicy', env, policy_kwargs=policy_kwargs, 
            verbose=1, buffer_size=50000, learning_starts=1000, 
            learning_rate=0.0005, batch_size=32, exploration_fraction=0.1,
            exploration_final_eps=0.05)

# Setup evaluation and checkpoint callbacks
eval_callback = EvalCallback(env, best_model_save_path=model_dir,
                             log_path=model_dir, eval_freq=5000, n_eval_episodes=5,
                             deterministic=True, render=False)

checkpoint_callback = CheckpointCallback(save_freq=10000, save_path=model_dir,
                                         name_prefix='dqn_model_checkpoint')

# Start training the model with callbacks for evaluation and checkpoints
model.learn(total_timesteps=1_000_000, callback=[eval_callback, checkpoint_callback])

# Save the final model after training
model.save("dqn_car_racing_model")
env.close()

In [None]:
env.close()
del env
foo = gc.collect()

### 2.2. Environment Modifications

In [None]:
#create directories
logs_dir = 'DQN_env_mod_logs'
logs_path = os.path.join(MODELS_DIR, logs_dir)
os.makedirs(logs_path, exist_ok=True)

tensorboard_dir = os.path.join(logs_path, "tensorboard")
model_dir = os.path.join(logs_path, "models")
os.makedirs(tensorboard_dir, exist_ok=True)
os.makedirs(model_dir, exist_ok=True)

### 2.3. Rewards Modifications

In [None]:
#create directories
logs_dir = 'DQN_rewards_mod_logs'
logs_path = os.path.join(MODELS_DIR, logs_dir)
os.makedirs(logs_path, exist_ok=True)

tensorboard_dir = os.path.join(logs_path, "tensorboard")
model_dir = os.path.join(logs_path, "models")
os.makedirs(tensorboard_dir, exist_ok=True)
os.makedirs(model_dir, exist_ok=True)

### 2.4. Environment and Rewards Modifications

In [None]:
#create directories
logs_dir = 'DQN_env_rewards_mod_logs'
logs_path = os.path.join(MODELS_DIR, logs_dir)
os.makedirs(logs_path, exist_ok=True)

tensorboard_dir = os.path.join(logs_path, "tensorboard")
model_dir = os.path.join(logs_path, "models")
os.makedirs(tensorboard_dir, exist_ok=True)
os.makedirs(model_dir, exist_ok=True)

---

# 3. Evaluation

In [None]:
# Adjust number of episodes based on the environment's characteristics
if hasattr(env, "max_episode_steps"):
    # If the environment has predefined max steps, use a higher number for evaluation
    num_episodes = 50  
else:
    # For simpler environments, use fewer episodes
    num_episodes = 20

In [None]:
model = foo #load best model

In [None]:
obs = env.reset()
episode_rewards = []
for episode in range(num_episodes):
    total_reward = 0
    while True:
        action = model.predict(obs)  # Use trained policy
        obs, reward, done, info = env.step(action)
        total_reward += reward
        record_agent_dynamics(env)  # Record smoothness metrics
        if done:
            break
    episode_rewards.append(total_reward)
    obs = env.reset()

print(f"Average Reward: {np.mean(episode_rewards)}")
