In [1]:
# Load Vissim
# Documents here
# C:\Users\Public\Documents\PTV Vision\PTV Vissim 11\Examples Training
# Signal Control files here
# C:\Program Files\PTV Vision\PTV Vissim 11\API\SignalControl_DLLs

# for loading client
import win32com.client as com
import os
# standard libraries 
import numpy as np
import math
import time
from IPython import display
# For Q-function
from collections import defaultdict
# For saving
import datetime
import dill
import pickle

'''
This loads up a Vissim instance 
    -- Required: inpx file , layx file
    -- warning: quite flakey loading but once loaded okay
'''
def Load_Vissim(End_of_simulation = 10000,\
                Quick_Mode=1,\
                Path_to_network = 'C:\\Users\\Public\\Documents\\PTV Vision\\PTV Vissim 11\\Examples Training\\COM\\Basic Commands\\',\
                inpx_Filename = 'COM Basic Commands.inpx',\
                layx_Filename = 'COM Basic Commands.layx'\
               ):
    Vissim = None
    # Load Vissim
    Vissim = com.gencache.EnsureDispatch("Vissim.Vissim")
    # Load file
    inpx_Filename                = os.path.join(Path_to_network, inpx_Filename)
    flag_read_additionally  = False # you can read network(elements) additionally, in this case set "flag_read_additionally" to true
    Vissim.LoadNet(inpx_Filename, flag_read_additionally)
    # Load a Layout:
    layx_Filename = os.path.join(Path_to_network, layx_Filename)
    Vissim.LoadLayout(layx_Filename)
    # Configure non-GUI for training
    Vissim.Simulation.SetAttValue('UseMaxSimSpeed', True)
    Vissim.Simulation.AttValue('UseAllCores')
    Vissim.Graphics.CurrentNetworkWindow.SetAttValue("QuickMode",Quick_Mode)
    # Set a long simulation time
    Vissim.Simulation.SetAttValue('SimPeriod', End_of_simulation)
    return Vissim

# '''
# Set up Vissim and the parameters for optimizing
# (Worth commenting out after set up)
# if ERROR like ''has no attribute 'CLSIDToClassMap' ''
# DELETE folders:
# C:\Users\nwalton\AppData\Local\Temp\gen_py
# C:\Users\nwalton\AppData\Local\Temp\VISSIM
# Then Restart PC...
# '''

# We attempt 5 times to load vissim

Attempts = 5 
for _ in range(Attempts):
    try:
        Vissim \
        = \
        Load_Vissim(
        Path_to_network = 'C:\\Users\\nwalton\\OneDrive - The Alan Turing Institute\\Documents\\MLforFlowOptimisation\\Vissim\\Single_Cross_Straight',\
        inpx_Filename = 'Single_Cross_Straight.inpx',\
        layx_Filename = 'Single_Cross_Straight.layx'\
        )
        print("Success")
        break
    except:
        print("Fail")

Success


In [2]:
# Get simulation parameters
'''
Set up the parameters of the optimization
'''

'''
makes a dictionary for all signals and thier positions
'''
def Get_Signal_Positions(Signal_Groups):
    Signal_Positions = dict()
    for SG in Signal_Groups:
        for SH in SG.SigHeads:
            Lane = SH.AttValue('Lane')
            Position = SH.AttValue('Pos')
            Signal_Positions[Lane] = Position
    return Signal_Positions

# Lanes for detection
Lane_List = ['3-1','1-1','7-1','5-1']

# which lane signals can be green at the same time
actions = [(1,0,1,0),\
            (0,1,0,1)]
# Define the Q-function
# Q_fn = Q_function(actions)

# round the state space
rounding = 1.
sim_steps = 100 # number of simulation steps before update
# set the load to be light
number_of_inputs = len(Vissim.Net.VehicleInputs)
new_volume = 400
for key in range(1,number_of_inputs+1):
    Vissim.Net.VehicleInputs.ItemByKey(key).SetAttValue('Volume(1)', new_volume)
    
# get the list of signal controllers
Signal_Controller = Vissim.Net.SignalControllers.GetAll()[0]
Signal_Groups = Signal_Controller.SGs.GetAll()
Signal_Positions = Get_Signal_Positions(Signal_Groups)

# These are states and rewards which are global variables 
# Assigned None for now

Q_Size = None # Queue sizes at junctions
delays = dict() # Total delay and change in delay for each vehicle

In [3]:
# Get state and reward info
'''
This code gets the average delay from 1000 steps under MaxWeight
'''
def Get_Q_Size(Lane_List=None, rounding=None):
    # Loads globals if variables not specfied
    if Lane_List is None :
        Lane_List = globals()['Lane_List']
    if rounding is None :
        rounding = globals()['rounding']
        
    # initialize with zero queues
    Qsum = 0
    Q_sizes = dict.fromkeys(Lane_List)
    for key in Q_sizes.keys():
        Q_sizes[key]=0

    # initialize with zero numbers of non-waiting cars
    nonQsum = 0
    nonQ_sizes = dict.fromkeys(Lane_List)
    for key in nonQ_sizes.keys():
        nonQ_sizes[key]=0

    # get all Q lengths    
    All_Vehicles = Vissim.Net.Vehicles.GetAll() 
    for Veh in All_Vehicles:
        lane = Veh.AttValue('Lane')
        if lane in Lane_List : 
            if Veh.AttValue('InQueue') == 1 :
                Q_sizes[lane] += 1
            else : 
                nonQ_sizes[lane] += 1

    state = []

    for lane in Lane_List :
        state.append(math.ceil(Q_sizes[lane] / rounding))
        
    return tuple(state)

'''
state is now the closest vehicle to the junction
reward is now the total delay
'''


def Get_First_Vehicle(Lane_List=None, rounding=None):
    # Loads globals if variables not specfied
    if Lane_List is None :
        Lane_List = globals()['Lane_List']
    if rounding is None :
        rounding = globals()['rounding']
    
    All_Vehicles = Vissim.Net.Vehicles.GetAll()
    
    lane_state = dict()

    for cnt_Veh in range(len(All_Vehicles)):
        veh_position = All_Vehicles[cnt_Veh].AttValue('Pos')
        veh_lane = All_Vehicles[cnt_Veh].AttValue('Lane')

        if veh_lane in Signal_Positions.keys():

            rel_position = rounding * math.ceil((Signal_Positions[veh_lane] - veh_position) / rounding) 

            if  rel_position >= 0 :

                if veh_lane in lane_state.keys():    
                    if rel_position < lane_state[veh_lane]:
                        lane_state[veh_lane] = rel_position
                else :
                    lane_state[veh_lane] = rel_position
    
    state = []
    for lane in Lane_List:    
        if lane in lane_state.keys():
            state.append(lane_state[lane])
        else:
            state.append(np.nan)
    return tuple(state)


'''
Gets the delays of all vehicles in the network:
    -- dictionary keys are vehicle numbers
    -- 1st entry is delay
    -- 2nd entry is change in delay
'''
state = None

def Delay_Dictionary(Current_Dict=None):
    # make sure current state is defined
    if Current_Dict is None:
        try :
            Current_Dict = globals()['delays']
        except NameError:
            Current_Dict = dict()

    Delay_Dict= dict()
    All_Vehicles = Vissim.Net.Vehicles.GetAll() # get all vehicles in the network at the actual simulation second
    for cnt_Veh in range(len(All_Vehicles)):
        veh_number      = All_Vehicles[cnt_Veh].AttValue('No')
        delay           = All_Vehicles[cnt_Veh].AttValue('DelayTm')  

        if veh_number in Current_Dict.keys():
            old_delay = Current_Dict[veh_number][0]
            Delay_Dict[veh_number] = [delay,delay-old_delay]
        else :
            Delay_Dict[veh_number] = [delay,0.]
    return Delay_Dict

'''
state is now the closest vehicle to the junction
reward is now the total delay
'''

def Get_Delay(delays=None):
    # Use global as default
    if delays is None:
        delays = globals()['delays']
        
    total_delay = 0
    for key, val in delays.items():
        total_delay += val[1]
    return -total_delay

def Get_Total_Queue(Q_Size=None):
    # Use global as default
    if Q_Size is None:
        Q_Size = globals()['Q_Size']
        
    return -sum(Q_Size)

In [4]:
# Actions

# Simple RED and GREEN Actions
def Do_Action_Easy(action=None,Signal_Groups=None):
    # Set global as default
    if action is None:
        action = globals()['action']
    if Signal_Groups is None:
        Signal_Groups = globals()['Signal_Groups']
    
    for i, sg in enumerate(Signal_Groups):
        if action[i] == 1:
            new_state = "GREEN"
        else :
            new_state = "RED"
        sg.SetAttValue("SigState", new_state)

# GREEN/AMBER/RED/REDAMBER Actions
def Do_Action_RGA(action=None,Signal_Groups=None):
# Consist of 4 steps: 
# Greens go Amber
# Ambers go Red
# Reds go RedAmber
# RedAmbers go Green

    # Set global as default
    if action is None:
        action = globals()['action']
    if Signal_Groups is None:
        Signal_Groups = globals()['Signal_Groups']

    # Initial Parameters
    Sim_Period = Vissim.Simulation.AttValue('SimPeriod') #End of Simulation
    Amber_Time = 4. #One second of Amber
    Red_Time = 1.
    RedAmber_Time = 1.

    # If current_state = 'GREEN' and next_state = 'RED'
    # Then go AMBER
    for i, sg in enumerate(Signal_Groups):
        current_state = sg.AttValue("SigState")
        if current_state == "GREEN" and action[i] == 0 :
            sg.SetAttValue("SigState", "AMBER")

    # Simulate 4 seconds for Amber
    Sim_Time = Vissim.Simulation.AttValue('SimSec')
    Amber_Break = min(Sim_Time+Amber_Time,Sim_Period)
    Vissim.Simulation.SetAttValue('SimBreakAt', Amber_Break)
    Vissim.Simulation.RunContinuous()

    # Set the AMBER lights red
    for i, sg in enumerate(Signal_Groups):
        current_state = sg.AttValue("SigState")
        if current_state == "AMBER":
            sg.SetAttValue("SigState", "RED")

    # Simulate 1 second for Red
    Sim_Time = Vissim.Simulation.AttValue('SimSec')
    Red_Break = min(Sim_Time+Red_Time,Sim_Period)
    Vissim.Simulation.SetAttValue('SimBreakAt', Red_Break)
    Vissim.Simulation.RunContinuous()

    # If current state "RED" and next_state = "GREEN"
    # Then go RedAmber
    for i, sg in enumerate(Signal_Groups):
        current_state = sg.AttValue("SigState")
        if current_state == "RED" and action[i] == 1 :
            sg.SetAttValue("SigState", "REDAMBER")

    # Simulate 1 second for RedAmber
    Sim_Time = Vissim.Simulation.AttValue('SimSec')
    RedAmber_Break = min(Sim_Time+RedAmber_Time,Sim_Period)
    Vissim.Simulation.SetAttValue('SimBreakAt', RedAmber_Break)
    Vissim.Simulation.RunContinuous()
    
    # Finally set all RedAmbers to Green
    for i, sg in enumerate(Signal_Groups):
        current_state = sg.AttValue("SigState")
        if current_state == "REDAMBER":
            sg.SetAttValue("SigState", "GREEN")

In [5]:
def Weighted_MaxWeight(state,actions,weights):
    opt_val = 0
    for action in actions : 
        val = np.dot(action,np.multiply(state,weights))
        if val >= opt_val :
            opt_val = val
            opt_act = action
    return opt_act, opt_val

def Weight_Update(weights, state, action, reward, next_state, lr=0.001, discount_factor = 0.8):
    Gradient = np.multiply(state,action)
    _, state_value = Weighted_MaxWeight(state,actions,weights)
    _, next_state_value = Weighted_MaxWeight(next_state,actions,weights)
    Temporal_Difference = reward + discount_factor * next_state_value
    new_weights = weights + lr * ( state_value - Temporal_Difference ) *  Gradient
    # apply Relu
    new_weights = new_weights * (new_weights > 0)    
    # rescale/project weights (couple be softmax instead of this and relu)
    new_weights = ( new_weights * len(new_weights) ) / sum(new_weights)
    return new_weights
    

In [13]:
# weight training
Get_State = Get_Q_Size
Get_Reward = Get_Delay
Do_Action = Do_Action_RGA
sim_steps = 1
sim_length = 1000

# initialize weights
weights = np.ones(len(Lane_List))

Vissim.Graphics.CurrentNetworkWindow.SetAttValue("QuickMode",True)
Vissim.Simulation.SetAttValue('UseMaxSimSpeed', True)
Vissim.Simulation.AttValue('UseAllCores')
delays = dict()
rewards = []
Queues = []

for _ in range(sim_length):
    if Vissim.Simulation.AttValue('SimSec') == 0.0 :
        for _ in range(sim_steps):
            Vissim.Simulation.RunSingleStep()
    Q_Size = Get_Q_Size()
    delays = Delay_Dictionary()
    state = Get_State()
    action, _ = Weighted_MaxWeight(state,actions,weights)
    Do_Action()
    for _ in range(sim_steps):              # Take a few simulation steps
        Vissim.Simulation.RunSingleStep()
    reward = Get_Reward()              # Get the reward
    next_state = Get_State()
    
    weights = Weight_Update(weights, state, action, reward, next_state)
    
    rewards.append(reward)
    Queues.append(Get_Total_Queue())
    print(state, next_state, reward)
    print(weights)
    print(np.mean(rewards))
    display.clear_output(wait=True)

KeyboardInterrupt: 

In [None]:
weights

In [14]:
# Controllers / Learners
'''
MaxWeight
'''
def MaxWeight(state,actions):
    opt_val = 0
    for action in actions : 
        val = np.dot(action,state)
        if val >= opt_val :
            opt_val = val
            opt_act = action
    return opt_act

In [15]:
# Evaluate MaxWeight implementation
# Defined above: MaxWeight(state=Q_Size,actions=actions) 

Get_State = Get_Q_Size
Get_Reward = Get_Delay
Do_Action = Do_Action_RGA
sim_steps = 1
sim_length = 100

Vissim.Graphics.CurrentNetworkWindow.SetAttValue("QuickMode",True)
Vissim.Simulation.SetAttValue('UseMaxSimSpeed', True)
Vissim.Simulation.AttValue('UseAllCores')
delays = dict()
rewards = []
Queues = []

for _ in range(sim_length):
    if Vissim.Simulation.AttValue('SimSec') == 0.0 :
        for _ in range(sim_steps):
            Vissim.Simulation.RunSingleStep()
    Q_Size = Get_Q_Size()
    delays = Delay_Dictionary()
    state = Get_State()
    action = MaxWeight(Q_Size,actions)
    Do_Action()
    for _ in range(sim_steps):              # Take a few simulation steps
        Vissim.Simulation.RunSingleStep()
    reward = Get_Reward()              # Get the reward
    rewards.append(reward)
    Queues.append(Get_Total_Queue())
    print(np.mean(rewards),np.mean(Queues))
    display.clear_output(wait=True)

-34.09761402085531 -2.98


In [6]:
# weighted_MaxWeight implementation
# Defined above: MaxWeight(state=Q_Size,actions=actions) 

Get_State = Get_Q_Size
Get_Reward = Get_Delay
Do_Action = Do_Action_RGA
sim_steps = 1
sim_length = 100
weights = [0.35571327, 1.53468494, 0.22172345, 1.88787834]

Vissim.Graphics.CurrentNetworkWindow.SetAttValue("QuickMode",True)
Vissim.Simulation.SetAttValue('UseMaxSimSpeed', True)
Vissim.Simulation.AttValue('UseAllCores')
delays = dict()
rewards = []
Queues = []

for _ in range(sim_length):
    if Vissim.Simulation.AttValue('SimSec') == 0.0 :
        for _ in range(sim_steps):
            Vissim.Simulation.RunSingleStep()
    Q_Size = Get_Q_Size()
    delays = Delay_Dictionary()
    state = Get_State()
    action, _ = Weighted_MaxWeight(state,actions,weights)
    Do_Action()
    for _ in range(sim_steps):              # Take a few simulation steps
        Vissim.Simulation.RunSingleStep()
    reward = Get_Reward()              # Get the reward
    rewards.append(reward)
    Queues.append(Get_Total_Queue())
    print(np.mean(rewards),np.mean(Queues))
    display.clear_output(wait=True)

-41.41584738664899 -4.11


# Debugging

In [61]:
weights, state, action, reward, next_state

(array([1., 1., 1., 1., 1.]),
 (1, 0, 0, 0, 0),
 (1, 1, 0, 0, 1),
 -12.724097979719463,
 (1, 0, 0, 0, 0))

In [72]:
lr=0.01
discount_factor = 0.8
Gradient = np.multiply(state,action)
_, state_value = Weighted_MaxWeight(state,actions,weights)
_, next_state_value = Weighted_MaxWeight(next_state,actions,weights)
Temporal_Difference = reward + discount_factor * next_state_value
new_weights = weights + lr * ( state_value - Temporal_Difference ) *  Gradient
# apply Relu
new_weights = new_weights * (new_weights > 0)   

In [74]:
state_value, next_state_value, reward, new_weights

(1.0,
 1.0,
 -12.724097979719463,
 array([1.12924098, 1.        , 1.        , 1.        , 1.        ]))

In [70]:
Gradient

array([0, 0, 0, 0, 0])

In [69]:
 ( state_value - Temporal_Difference )

12.924097979719463

In [60]:
Weight_Update(weights, state, action, reward, next_state)

array([1., 1., 1., 1., 1.])