In [1]:
from context import *
from stable_baselines3 import PPO,A2C,SAC,TD3,DQN,DDPG
from stable_baselines3.common.save_util import load_from_zip_file
from stable_baselines3.common.monitor import Monitor
import torch as th
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import SubprocVecEnv

from pprint import pprint
import enum

import rlrom.wrappers.stl_wrapper
import stlrom
from rlrom.envs import *
import rlrom.utils
import time
import matplotlib.pyplot as plt

import yaml

class EnvMode(enum.Enum):
    VANILLA=0
    TERM_SLOW=1


pygame 2.6.1 (SDL 2.28.4, Python 3.10.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


2025-05-16 16:06:25.651139: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-05-16 16:06:25.796307: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-05-16 16:06:25.850471: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-05-16 16:06:25.867507: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-05-16 16:06:25.973542: I tensorflow/core/platform/cpu_feature_guar

In [8]:
env_mode= EnvMode.VANILLA
collision_reward = -1
model_name = 'ppo_hw_van_high_col.zip'

cfg_hw_env = {
            "observation": {"type": "Kinematics"},
                "action": {
                    "type": "DiscreteMetaAction",
                },
                "lanes_count": 4,
                "vehicles_count": 50,
                "controlled_vehicles": 1,
                "initial_lane_id": None,
                "duration": 100,  # [s]
                "ego_spacing": 2,
                "vehicles_density": 1,
                "collision_reward": -.4,  # The reward received when colliding with a vehicle.
                "right_lane_reward": 0,  # The reward received when driving on the right-most lanes, linearly mapped to
                # zero for other lanes.
                "high_speed_reward": 1.,  # The reward received when driving at full speed, linearly mapped to zero for
                # lower speeds according to config["reward_speed_range"].
                "lane_change_reward": 0,  # The reward received at each lane change action.
                "reward_speed_range": [20, 30],
                "normalize_reward": True,
                "offroad_terminal": False,        
    }



In [None]:
cfg = dict()
cfg['stl_mode'] = 'None'
cfg['model_path'] = './models'
cfg['model_name'] = 'ppo_hw_van_high_col.zip'
cfg['cfg_hw_env'] = cfg_hw_env
pprint(cfg)

{'stl_mode': 'None', 'model_path': './models', 'model_name': 'ppo_hw_van_high_col.zip', 'cfg_hw_env': {'observation': {'type': 'Kinematics'}, 'action': {'type': 'DiscreteMetaAction'}, 'lanes_count': 4, 'vehicles_count': 50, 'controlled_vehicles': 1, 'initial_lane_id': None, 'duration': 100, 'ego_spacing': 2, 'vehicles_density': 1, 'collision_reward': -0.4, 'right_lane_reward': 0, 'high_speed_reward': 1.0, 'lane_change_reward': 0, 'reward_speed_range': [20, 30], 'normalize_reward': True, 'offroad_terminal': False}}


In [4]:
# Write to a YAML file
with open("cfg_hw.yaml", "w") as file:
    yaml.dump(cfg_hw, file,sort_keys=False, default_flow_style=False)

In [5]:
with open("cfg_hw.yaml", "r") as file:
    cfg_hw2 = yaml.safe_load(file)
print(cfg_hw2)

{'observation': {'type': 'Kinematics'}, 'action': {'type': 'DiscreteMetaAction'}, 'lanes_count': 4, 'vehicles_count': 50, 'controlled_vehicles': 1, 'initial_lane_id': None, 'duration': 100, 'ego_spacing': 2, 'vehicles_density': 1, 'collision_reward': -2.0, 'right_lane_reward': 0.0, 'high_speed_reward': 1.0, 'lane_change_reward': 0, 'reward_speed_range': [20, 30], 'normalize_reward': True, 'offroad_terminal': False}


In [None]:

def make_env(train=True, env_mode=env_mode, verbose=0):

    if train:
        env = gym.make("highway-fast-v0")
    else:
        env = gym.make("highway-v0", render_mode='human')

    env.unwrapped.configure(cfg_hw)

    if env_mode==EnvMode.TERM_SLOW:
        cfg = cfg_envs['highway-env']
        driver= stlrom.STLDriver()
        driver.parse_string(cfg['specs'])        
        env = rlrom.wrappers.stl_wrapper.STLWrapper(env,driver,signals_map=cfg, terminal_formulas={'ego_slow_too_long'})

    if verbose>=1:
        pprint(cfg)
    return env

# Training

In [None]:
n_cpu = 12
batch_size = 64
neurons = 128
policy_kwargs = dict(
    #activation_fn=th.nn.ReLU,
    net_arch=dict(pi=[neurons, neurons], qf=[neurons, neurons])
)

vec_env = make_vec_env(make_env, n_envs=n_cpu, vec_env_cls=SubprocVecEnv)
model = PPO(
     "MlpPolicy",
     vec_env,
     device='cpu',
     policy_kwargs=policy_kwargs,
     n_steps=batch_size * 12 // n_cpu,
     batch_size=batch_size,
     n_epochs=10,
     learning_rate=5e-4,
     gamma=0.9,
     verbose=1,
     tensorboard_log="./highway_ppo/"
)

In [None]:
# Train the agent
model.learn(
    total_timesteps=200_000,
    progress_bar=True
)


In [None]:
model.save('ppo_model_slow_term')

In [None]:
len(model.rollout_buffer.observations[0])

# Testing

In [None]:
model = PPO.load(model_name)

In [None]:

env = make_env(train=False,env_mode=env_mode, verbose=0)
env.unwrapped.configure({
            "observation": {"type": "Kinematics"},
                "action": {
                    "type": "DiscreteMetaAction",
                },
                "lanes_count": 4,
                "vehicles_count": 50,
                "controlled_vehicles": 1,
                "initial_lane_id": None,
                "duration": 100,  # [s]
                "ego_spacing": 2,
                "vehicles_density": 1,
                "collision_reward": -.1,  # The reward received when colliding with a vehicle.
                "right_lane_reward": 0,  # The reward received when driving on the right-most lanes, linearly mapped to
                # zero for other lanes.
                "high_speed_reward": 2.,  # The reward received when driving at full speed, linearly mapped to zero for
                # lower speeds according to config["reward_speed_range"].
                "lane_change_reward": 0.,  # The reward received at each lane change action.
                "reward_speed_range": [20, 30],
                "normalize_reward": False,
                "offroad_terminal": False,
                "manual_control": False        
    })

#obs, info = env.reset(seed=1)
obs, info = env.reset()
#env.stl_driver.set_param('v_slow', 0.3)
#env.stl_driver.set_param('v_fast', 0.35)
#wobs = env.wrapped_obs
for _ in range(100):    
    #action, _states = model.predict(wobs)
    action, _states = model.predict(obs)
    obs, reward, terminated, truncated, info = env.step(action)    
    #wobs= env.wrapped_obs

    if terminated:
        print('Crash')
        break    
env.close()


In [None]:
lay = """
 action
 ego_x_fast
 reward
 """
lay = utils.get_layout_from_string(lay)

width = 12
height = 4
fig, axs = plt.subplots(len(lay),1, figsize=(width, height))

idx_ax =0
for sig_list in lay:
    for sig in sig_list:
        if len(lay)>1:
            env.plot_signal(sig, axs[idx_ax])
        else:
            env.plot_signal(sig, axs)
    idx_ax +=1
