In [1]:
import sys
sys.path.append("F:/AI/ACC_ENV")


In [2]:
from my_highway_env.envs.car_following_env_by_config_dynamical import CarFollowingEnv

In [22]:
dataset_path_train = "D:/Dataset/data/CarFollowing/benchmark/" + "NGSIM_I_80" + "_" + "train" + "_data.npy"
# dataset_path_val = "/ai/syf/ACC/Dataset/ProcessedData/NGSIM_I_80_val_data_C.npy"
obs_type = 'BaselineAccObservation'
PF = 10

env_config_NGSIM_train_1 = {
    'observation': {"type": obs_type, 'noise': False},
    'action': {"type": "ContinuousAction", "acceleration_range": (-4, 2),"jerk_max":60,'lateral':False,"dynamical": False},
    # 'dataset_path': "D:/Dataset/data/CarFollowing/benchmark/" + "NGSIM_I_80" + "_" + "train" + "_data.npy",
    'dataset_path': dataset_path_train,
    'simulation_frequency':10,
    'policy_frequency': PF,
	'vehicle': {"tau": 0, "delay": 0},
}

env_config_NGSIM_train_2 = {
    'observation': {"type": obs_type, 'noise': False},
    'action': {"type": "ContinuousAction", "acceleration_range": (-4, 2),"jerk_max":60,'lateral':False,"dynamical": False},
     # 'dataset_path': "D:/Dataset/data/CarFollowing/benchmark/" + "NGSIM_I_80" + "_" + "train" + "_data.npy",
    'dataset_path': dataset_path_train,
    'simulation_frequency':10,
    'policy_frequency': PF,
	'vehicle': {"tau": 0.2, "delay": 0},
}

env_config_NGSIM_train_3 = {
    'observation': {"type": obs_type, 'noise': False},
    'action': {"type": "ContinuousAction", "acceleration_range": (-4, 2),"jerk_max":60,'lateral':False,"dynamical": False},
     # 'dataset_path': "D:/Dataset/data/CarFollowing/benchmark/" + "NGSIM_I_80" + "_" + "train" + "_data.npy",
    'dataset_path': dataset_path_train,
    'simulation_frequency':10,
    'policy_frequency': PF,
	'vehicle': {"tau": 0.4, "delay": 0},
}

In [23]:
env = CarFollowingEnv(env_config_NGSIM_train_1)

In [24]:
env.reset()

array([41.410255 , 24.624468 , 21.746164 , -2.8783035, -0.6748754],
      dtype=float32)

In [65]:
__credits__ = ["Carlos Luis"]

from os import path
from typing import Optional

import numpy as np
import copy
import gym
import torch

DEFAULT_X = np.pi
DEFAULT_Y = 1.0

'''
    model-based environment 构建注意事项
        1. ensemble models: 随机选择
        2. device设置统一
        3. space范围clip
        4. reward 返回值设置
        5. terminal函数设定
        
    此环境(MB_PendulumEnv)通过sb3的env_checker
    训练时需要观察models的更新。
'''


class MB_CarFollowing(gym.Env):

    def __init__(self, real_env, models, device):
        
		
        # This will throw a warning in tests/envs/test_envs in utils/env_checker.py as the space is not symmetric
        #   or normalised as max_torque == 2 by default. Ignoring the issue here as the default settings are too old
        #   to update to follow the gymnasium api
        self.action_space = copy.deepcopy(real_env.action_space)
        self.observation_space = copy.deepcopy(real_env.observation_space)
        '''
        	env models
        '''
        self.models = models
        self.device = device
        self.real_env = real_env
        
        self.TTC_threshold = real_env.TTC_threshold
        self.PF = real_env.config['policy_frequency']
        self.acc_th = real_env.acc_th
        self.acc_max = real_env.acc_max

    def step(self, u):
        
        self.cur_model = self.models[np.random.randint(0, len(self.models))]
        last_obs = self.obs  # th := theta
        
        reward = self._get_reward(self.obs, u)
        
        next_obs = self._get_obs_by_model(last_obs, action=u)
        self.obs = next_obs
        self.time_step_cnt += 1
        terminal = self._is_terminal(next_obs)

        # truncation=False as the time limit is handled by the `TimeLimit` wrapper added during `make`
        return next_obs, reward, terminal, {}

    def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
        obs = self.real_env.reset()
        self.last_acceleration = obs[4]
        self.time_step_cnt = 0
        self.obs = obs
        return self.real_env.reset()

    def _get_obs(self):
        theta, thetadot = self.state
        return np.array([np.cos(theta), np.sin(theta), thetadot], dtype=np.float32)
    
    def _get_obs_by_model(self, obs, action):
        obs_tensor = torch.tensor(obs, dtype=torch.float32)
        action_tensor = torch.tensor(action, dtype=torch.float32)
        input_d = torch.cat((obs_tensor, action_tensor))
        input_d = input_d.to(self.device)
        output_tensor = self.cur_model.forward(input_d)
        output_np = output_tensor.detach().cpu().numpy()
        
        obs = self.obs + output_np
        
        # Clip the output to the observation space range
        obs_clipped = np.clip(obs, self.observation_space.low, self.observation_space.high)
        return obs_clipped
        
    def _get_reward(self, obs, action):
        front_space, ego_speed, front_speed, related_speed, ego_acc = obs
        
        acceleration = ego_acc
        jerk = (acceleration - self.last_acceleration) * self.PF
        self.last_acceleration = acceleration
        self.jerk = jerk
        if related_speed == 0:
            related_speed = 0.0001
        
        self.TTC = -front_space/related_speed

        if self.TTC >= 0 and self.TTC <= self.TTC_threshold:
            fTTC = np.log(self.TTC/self.TTC_threshold)
        else:
            fTTC = 0

        fSafety = fTTC

        mu = 0.422618
        sigma = 0.43659
        hdw = front_space/ego_speed
        if hdw <= 0:
            fHdw = -1
        else:
            fHdw = (np.exp(-(np.log(hdw) - mu) ** 2 / (2 * sigma ** 2)
                           ) / (hdw * sigma * np.sqrt(2 * np.pi)))
        
        fEffic = fHdw
        
        acc_abs = abs(acceleration)
        jerk_abs = abs(jerk)
        acc_abs = min(acc_abs,4)
        jerk_abs = min(jerk_abs,60)
        if acc_abs > self.acc_th:
            fAcc = - ((acc_abs - self.acc_th)/(self.acc_max - self.acc_th))**2
        else:
            fAcc = 0
        
        # fJerk = -(jerk_abs/60)**2
        fJerk = -((jerk**2)/ (self.PF * 6)**2)
        fJerk = float(fJerk)
        fAcc = float(fAcc)
        fComf = fJerk
        
        self.fJerk = float(fJerk)
        self.fAcc = float(fAcc)
        
        if front_space < 0:
            fCrash = -100
        else:
            fCrash = 0
            
        totalReward = fSafety + fComf + fEffic + fCrash

        self.total_reward = float(totalReward)
        self.total_risk = float(fSafety)
        self.front_risk = float(fSafety)
        self.fRisk = float(fSafety)
        # self.front_left_collision_risk = (f_left_weight, f_left_risk)
        # self.rear_left_collision_risk = (r_left_weight, r_left_risk)
        # self.front_right_collision_risk = (f_right_weight, f_right_risk)
        # self.rear_right_collision_risk = (r_right_weight, r_right_risk)
        self.fComf = float(fComf)
        self.fEffic = float(fEffic)

        return float(totalReward)
        
    
    def _is_terminal(self, obs):
        if obs[0] <= 0 or self.time_step_cnt > 100:
            return False
        else:
            return True

    def close(self):
        if self.screen is not None:
            import pygame

            pygame.display.quit()
            pygame.quit()
            self.isopen = False
            
    def _recover_theta(self,obs1, obs2):
        theta = np.arctan2(obs2, obs1)
        return theta


def angle_normalize(x):
    return ((x + np.pi) % (2 * np.pi)) - np.pi

In [66]:
from PredictionModel import PredictionModel
model  =  PredictionModel(obs_size= env.observation_space.shape[0], action_size=env.action_space.shape[0])
model.cuda()

PredictionModel(
  (net): Sequential(
    (0): Linear(in_features=6, out_features=256, bias=True)
    (1): ReLU()
    (2): Linear(in_features=256, out_features=256, bias=True)
    (3): ReLU()
    (4): Linear(in_features=256, out_features=5, bias=True)
  )
)

In [67]:
mb_cf_env = MB_CarFollowing(real_env=env, models= [model], device='cuda')

In [68]:
mb_cf_env.reset()

array([14.161553 , 10.247312 ,  6.309777 , -3.9375348, -0.6313247],
      dtype=float32)

In [69]:
mb_cf_env.step(mb_cf_env.action_space.sample())

(array([79.97539  , 27.115202 , 16.910925 , -6.69385  ,  4.0711794],
       dtype=float32),
 0.03913663325667282,
 True,
 {})

In [70]:
from stable_baselines3.common.env_checker import check_env

In [71]:
check_env(mb_cf_env)

AssertionError: Your environment must inherit from the gymnasium.Env class cf. https://gymnasium.farama.org/api/env/

In [72]:
check_env(env)

AssertionError: Your environment must inherit from the gymnasium.Env class cf. https://gymnasium.farama.org/api/env/

In [73]:
import gym
import gymnasium as gymnasium
import gymnasium.spaces as spaces
import numpy as np

class GymToGymnasiumWrapper(gymnasium.Env):
    def __init__(self, gym_env):
        super(GymToGymnasiumWrapper, self).__init__()
        self.gym_env = gym_env
        # 将 gym 的 space 转换为 gymnasium 的 space
        self.observation_space = self._convert_space(gym_env.observation_space)
        self.action_space = self._convert_space(gym_env.action_space)

    def reset(self, *, seed=None, options=None):
        if seed is not None:
            self.gym_env.seed(seed)
        observation = self.gym_env.reset()
        return observation, {}

    def step(self, action):
        observation, reward, done, info = self.gym_env.step(action)
        return observation, reward, done, False, info

    def render(self, mode="human"):
        return self.gym_env.render(mode=mode)

    def close(self):
        return self.gym_env.close()

    def seed(self, seed=None):
        return self.gym_env.seed(seed)

    def _convert_space(self, space):
        """将 gym 空间转换为 gymnasium 空间"""
        if isinstance(space, gym.spaces.Box):
            return spaces.Box(low=space.low, high=space.high, shape=space.shape, dtype=space.dtype)
        elif isinstance(space, gym.spaces.Discrete):
            return spaces.Discrete(n=space.n)
        elif isinstance(space, gym.spaces.MultiDiscrete):
            return spaces.MultiDiscrete(nvec=space.nvec)
        elif isinstance(space, gym.spaces.MultiBinary):
            return spaces.MultiBinary(n=space.n)
        elif isinstance(space, gym.spaces.Tuple):
            return spaces.Tuple([self._convert_space(s) for s in space.spaces])
        elif isinstance(space, gym.spaces.Dict):
            return spaces.Dict({key: self._convert_space(s) for key, s in space.spaces.items()})
        else:
            raise NotImplementedError(f"空间类型 {type(space)} 还未实现转换")

# 示例用法
gym_env = gym.make('CartPole-v1')  # Gym 环境
gymnasium_env = GymToGymnasiumWrapper(gym_env)

# 现在可以正常使用 stable-baselines3 的 check_env 进行环境检查
from stable_baselines3.common.env_checker import check_env
check_env(gymnasium_env)


In [74]:
env_ga = GymToGymnasiumWrapper(env)

In [75]:
check_env(env_ga)

In [76]:
mb_env_ga = GymToGymnasiumWrapper(mb_cf_env)
check_env(mb_env_ga)