In [1]:
# miny problem: speed limit prevent followers to follow the front vehicle closely

In [2]:
import sys
import os

import pandas as pd

os.environ['SUMO_HOME'] = 'C:\\Program Files (x86)\\Eclipse\\Sumo\\'
if 'SUMO_HOME' in os.environ:
    tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
    sys.path.append(tools)
else:
    sys.exit("please declare environment variable 'SUMO_HOME'")
from sumolib import checkBinary
import traci
import traci.constants as tc

# from helper import make_complete_speed_profile, import_HighD_speed_profile

In [3]:
sumoBinary = checkBinary('sumo-gui')
sumoCmd = [sumoBinary, "-c", "singlelink.sumocfg.xml"]

In [4]:
traci.close()

FatalTraCIError: Not connected.

In [None]:
def get_veh_info(veh_id):
    # get pos,speed,and acc of the veh_id
    longitudinal_pos= traci.vehicle.getPosition(veh_id)[0]
    speed = traci.vehicle.getSpeed(veh_id)
    acc = traci.vehicle.getAcceleration(veh_id)
    return longitudinal_pos, speed, acc

In [None]:
def get_speed_free_driving(veh_id, max_speed):
    # predict the speed of next timestep by free driving
    ego_pos,ego_speed,ego_acc = get_veh_info(veh_id)
    if ego_speed<max_speed:
        acc = 2
    else:
        acc = -2
    ego_next_speed = ego_speed + acc
    return ego_next_speed

In [None]:
def get_speed_cacc(veh_id):
    # predict the speed of next timestep by CACC mode
    ego_pos,ego_speed,ego_acc = get_veh_info(veh_id)
    leader_veh_id,_ = traci.vehicle.getLeader(veh_id,dist=50)
    leader_pos,leader_speed,leader_acc = get_veh_info(leader_veh_id)
    leader_length = traci.vehicle.getLength(leader_veh_id)
    distance_to_leader_veh = leader_pos-ego_pos
    net_distance_to_lead_vehicle = distance_to_leader_veh - leader_length
    # configure the parameters of CACC
    coef_acc = 1.0
    coef_speed = 0.58
    coef_distance = 0.1
    standstill = 4.0
    headway_for_CACC = 1.0
    reference_distance = max(standstill, headway_for_CACC * ego_speed)
    acc = coef_acc*leader_acc + coef_speed*(leader_speed-ego_speed) + coef_distance*(net_distance_to_lead_vehicle-reference_distance)
    ego_next_speed = ego_speed + acc
    return ego_next_speed


In [5]:
save_simulation_data = True
output_path = 'simulation_data_cacc100.csv'
data = []
# speed_profile_leader = import_HighD._speed_profile(10000)
traci.start([sumoBinary, "-c", "singlelink100.sumocfg.xml","--collision.action","none","--device.emissions.probability","1.0","--tripinfo-output","emission_output"])
# traci.start(sumoCmd)
step = 0

# subscribe all vehicles
while True:
    traci.simulationStep()
    step += 1
    if '199' in traci.simulation.getDepartedIDList():
        for i in range(200):
            traci.vehicle.subscribe(str(i), varIDs=(tc.VAR_DISTANCE, tc.VAR_SPEED, tc.VAR_ACCELERATION))
        traci.simulationStep()
        step += 1
        break
        
# start to implement control logic        
while traci.simulation.getMinExpectedNumber() > 0:
    traci.simulationStep()
    veh_ids = traci.vehicle.getIDList()
    for veh_id in veh_ids:
        # for CACC 
        if (((int(veh_id)-1)%3==0)and(int(veh_id)<100)):
#         # select every 2 out of 3 consecutive vehicles to execute CACC
#        if veh_id=='1':
            traci.vehicle.setColor(veh_id,(155,255,0))
            #traci.vehicle.subscribe(str(veh_id), varIDs=(tc.VAR_DISTANCE, tc.VAR_SPEED, tc.VAR_ACCELERATION))
            leader_info = traci.vehicle.getLeader(veh_id,dist=50) 
            if not leader_info:
                # if there is no vehicle ahead, do free-driving
                # ego_next_speed = get_speed_free_driving(veh_id,50)
                control = None
            else:
                # if there is a vehicle ahead 
                leader_veh_id,_ = leader_info
                ego_next_speed = get_speed_cacc(veh_id)
                traci.vehicle.setSpeed(veh_id,ego_next_speed)
        # for the lead vehicle
        if veh_id=='0':
            traci.vehicle.setSpeed(veh_id,speed_profile_leader[step])
        if veh_id=='100':
            traci.vehicle.setSpeed(veh_id,speed_profile_leader[step])
    # save to simulation data to file        
    if save_simulation_data and '0' in veh_ids:
        state = traci.vehicle.getAllSubscriptionResults()
        positions = [state[str(veh_id)][tc.VAR_DISTANCE] for veh_id in range(200)]
        speeds = [state[str(veh_id)][tc.VAR_SPEED] for veh_id in range(200)]
        accelerations = [state[str(veh_id)][tc.VAR_ACCELERATION] for veh_id in range(200)]
        fuel = [traci.vehicle.getFuelConsumption(str(veh_id)) for veh_id in range(200)]
        co2 = [traci.vehicle.getCO2Emission(str(veh_id)) for veh_id in range(200)]
        co = [traci.vehicle.getCOEmission(str(veh_id)) for veh_id in range(200)]
        hc = [traci.vehicle.getHCEmission(str(veh_id)) for veh_id in range(200)]
        pmx = [traci.vehicle.getPMxEmission(str(veh_id)) for veh_id in range(200)]
        nox = [traci.vehicle.getNOxEmission(str(veh_id)) for veh_id in range(200)]
        step_data = positions + speeds + accelerations +fuel + co2 + co + hc + pmx + nox
        data.append(step_data)
    step += 1



if traci.simulation.getMinExpectedNumber() <= 0 and save_simulation_data:
    position_name = ['pos_veh_{}'.format(veh_id) for veh_id in range(200)]
    speed_name = ['spd_veh_{}'.format(veh_id) for veh_id in range(200)]
    acceleration_name = ['acc_veh_{}'.format(veh_id) for veh_id in range(200)]
    fuel_name = ['fuel_veh_{}'.format(veh_id) for veh_id in range(200)]
    co2_name = ['co2_veh_{}'.format(veh_id) for veh_id in range(200)]
    co_name = ['co_veh_{}'.format(veh_id) for veh_id in range(200)]
    hc_name = ['hc_veh_{}'.format(veh_id) for veh_id in range(200)]
    pmx_name = ['pmx_veh_{}'.format(veh_id) for veh_id in range(200)]
    nox_name = ['nox_veh_{}'.format(veh_id) for veh_id in range(200)]
    column_name = position_name + speed_name + acceleration_name + fuel_name + co2_name + co_name + hc_name + pmx_name + nox_name
    df = pd.DataFrame(data,columns=column_name)
    df.to_csv(output_path)
traci.close()
sys.stdout.flush()

 Retrying in 1 seconds




 Retrying in 1 seconds




 Retrying in 1 seconds




 Retrying in 1 seconds




 Retrying in 1 seconds




KeyboardInterrupt: 

In [None]:
os.system('sumo-gui -c singlelink.sumocfg.xml')

In [None]:
traci.start([sumoBinary, "-c", "singlelink100.sumocfg.xml","--collision.action","none","--device.emissions.probability","1.0","--tripinfo-output","emission_output"],label='1')

In [None]:
traci.getLabel()

In [None]:
a = traci.getConnection(label="1")
a

In [None]:
traci.close()

In [None]:
def check_mandatory_lc(veh_id):
    # When the target vehicle is close to the merge point (with certain distance threshold) and it is on the closed lane, 
    # it would gain a motivation to do lane change before the merge point.
    is_mandatory_lc = False
    distance_threshold = 50 # check other people' choice
    
    return is_mandatory_lc

def check_discrete_lc():
    # When the target vehicle is been slowed down to a level he could not tolerante and he observed a better utility in ajascent lane,
    # he would gain a motivation to do a lane change as long as such utility gap exists.
    is_discrete_lc = False
    return is_discrete_lc
def check_lc_incentive():
    has_incentive = False
    if check_mandatory_lc() or check_discrete_lc():
        has_incentive = True
    return has_incentive