In [27]:
### pip install pygame

## Libraries

Here are all the libraries necessary to run the program. Be sure to install pygame with pip before starting to generate the animation (not included in Anaconda)

In [28]:
import numpy as np # for optimized lists
import pygame, sys # to generate the animation
from pygame.locals import * # to generate animation
import matplotlib.pyplot as plt # to genenerate graphs (not yet implemented)
from random import randint,uniform # to generate random numbers
from math import floor # to find VEICLIST ID limits
import pandas as pd # to read "csv" files
import time # to calculate the total run time
import winsound # to beep when the program is over

### Check entry flow data

Here you can check the file that is being used for the entry flow. You need to insert the desired file manually.

In [29]:
flow_file_name = 'calibration_entry_flow'
df = pd.read_csv(flow_file_name+'.csv', sep = ';', encoding = "UTF-8", header = 0)
df.head()

Unnamed: 0,t,Q1,Q2
0,0,2200,2200
1,250,2200,2200
2,500,2200,2200
3,750,2200,2200
4,1000,2200,2200


### START MENU - Scenario inputs

Insert the inputs for the highway you wish to simulate.

In [30]:
# file name?
#file_name = input('File name: ')
file_name = 'sim'

# fload flow data
flow_file_name = 'calibration_entry_flow'
ENTRY_FLOW_TABLE = np.loadtxt(open(flow_file_name+".csv", "rb"), delimiter=";", skiprows=1)

# use standard or custom values?
loop = True
while loop == True:
    # settings = input('Use default settings? (y - yes / n - no) ')
    settings = 'y'

    # standard values
    if settings == 'y':
        R = 1 # number of runs
        L = 300 # number of cells 
        T = 5000 # number of seconds for the simulation (xtime_step for real time)
        K0 = 0.0 # initial density
        vmax = np.array([5,5]) # maximum car speed (equivalent to 135 and 162 km/h)
        p = 0.32 # probability of random stop
        tp = np.array([0.015,0.015]) # truck/total ratio
        n_lanes = 3 # number of n_lanes in each direction
        print('entry flow table:')
        print(ENTRY_FLOW_TABLE)
        superelevation = 1.0 # tendency to go to the righter lanes
        sensor_time_step = 250 # seconds or once every 5 minutes
        sensor_x = np.array([30,70]) # sensor positions
        time_step = 1.2 # in seconds, equivalence to real time
        vred = np.array([5,5])
        speed_reducer = [40,60]
        loop = False

    # custom values
    elif settings == 'n':
        T = int(input('Period of simulation [T] in seconds: '))
        p = float(input('Probability of random stop [p] in range 0-1: '))
        tp = float(input('Percentage of trucks [tp] in range 0-1: '))
        Lm = float(input('Lenght of the simulation [L] in meters: ')) 
        L = round(Lm/7.5,0) #L in cells
        n_lanes = int(input('Number of lanes in each direction: '))
        vmax_kmh = float(input('Maximum car speed in km/h (vmax): '))
        vmax = round(vmax_kmh/(3.6*7.5),0) #vmax in cells/s
        K0_km = float(input('Initial car density [K0] in veic/km (0 for a clear road in t0): '))
        K0 = K0_km*7.5/1000 #K0 in veic/cell
        flowList = np.zeros((2*n_lanes), dtype = int)
        #load_flow_table()
        print('Now, insert the entry flow for all ',2*n_lanes,'lanes, from top to bottom\n')
        lane = 0
        while lane <2*n_lanes: 
            if lane < n_lanes:
                print('Flow in left lane ',lane+1,': ')
            else:
                print('Flow in right lane ',lane+1-n_lanes,': ')
            flowList[lane] = int(input())
            lane += 1 
        loop = False
    
    #in case of error
    else:
        print('Invalid answer! Please use only "y" or "n".')

entry flow table:
[[   0. 2200. 2200.]
 [ 250. 2200. 2200.]
 [ 500. 2200. 2200.]
 [ 750. 2200. 2200.]
 [1000. 2200. 2200.]
 [1250. 2200. 2200.]
 [1500. 2200. 2200.]
 [1750. 2200. 2200.]
 [2000. 2200. 2200.]
 [2250. 2200. 2200.]
 [2500. 2200. 2200.]
 [2750. 2200. 2200.]
 [3000. 2200. 2200.]
 [3250. 2200. 2200.]
 [3500. 2200. 2200.]
 [3750. 2200. 2200.]
 [4000. 2200. 2200.]
 [4250. 2200. 2200.]
 [4500. 2200. 2200.]
 [4750. 2200. 2200.]]


### Driver agressiveness rule

How much the driver desire to change lanes

In [31]:
def set_ag():
    
    #possible agressiveness values: 0,3 - 0,4 - 0,5 - 0,6 - 0,7
    return (randint(3,7))/10

### Sensor

Add detected cars to sensor database

In [32]:
# 0   1    2    3     4     5    6     7    8     9    10    11   12   13    14    15   16    
# t Q1cA Vm1cA Q1tA Vm1tA Q2cA Vm2cA Q2tA Vm2tA Q1cB Vm1cB Q1tB Vm1tB Q2cB Vm2cB Q2tB Vm2tB

# Q = flow
# Vm = mean speed (velocity)
# c = car
# t = truck
# A = sensor A (beginning of the segment - x = 25)
# B = sensor B (ending of the segment - x = L-25)


def vehicle_detected(id,AB):
    
    # check vehicle direction
    side = floor(veicList[id].lane/n_lanes)*4
        
    # check which sensor did the vehicle triggered
    sensor_code = 0
    if AB == 'B':
        sensor_code = 8
    
    # COUNT CAR
    if veicList[id].type == 0:
        sensor[sensor_line][1+side+sensor_code] += 1 # count car
        sensor[sensor_line][2+side+sensor_code] += veicList[id].v # add speed
        
    # COUNT TRUCK
    elif veicList[id].type == 1:
        sensor[sensor_line][3+side+sensor_code] += 1 # count truck
        sensor[sensor_line][4+side+sensor_code] += veicList[id].v # add speed 

### Car-following rules, Nagel-Schrekenberg model

Defining class Veic

In [33]:
class Veic:
    def __init__(self,id,ty,lane,v,x):
        self.id = id #VEIHCLE ID
        self.type = ty #VEIHCLE TYPE (0 = car, 1 = truck)
        self.lane = lane #CURRENT LANE
        self.v = v #SPEED
        self.x = x #REAR BUMPER POSITION
        self.col = (randint(0,255),randint(60,140),randint(0,255)) #SET EACH VEIHCLE WITH A RANDOM COLOR
        self.ag = set_ag()

    #DRIVING RULES
    def update(self):
        
        side = floor(self.lane/n_lanes)
        
        #find front gap
        next_veic = neighbors(self.id)
        
        if next_veic[1] == 'no car':
            gap = vmax[side]
        else:
            gap = veicList[next_veic[1]].x - self.x - 1 - self.type #consider the front in case it's a truck
        
    #STEP 1: acceleration 
    # All cars that have not already reached the maximal velocity vmax acceleration by one unit: v -> v+1         
        speed_limit = vmax[side]-self.type
        if self.x > speed_reducer[0] and self.x < speed_reducer[1]:
            speed_limit = vred[side]
        
        if self.v < speed_limit and gap > self.v+1:
            self.v += 1        
        
    #STEP 2: safety distance
    #If a car has d empty cells in front of it and is its velocity v (after step 1) larger then d, then it reduces the velocity to d: v -> min{d,v}                            
        if self.v > gap:
            self.v = gap
            
    #STEP 3: randomization
    # With probability p, the velocity is reduced by one unit (if v after step 2):v -> v-1
        if self.v > 0:
            if uniform(0,1) < p:
                self.v -= 1
        else:
            self.v = 0
            
        # detection zones
        if self.x <= sensor_x[0] and self.x >= sensor_x[0]-vmax[side]:
            if self.x + self.v > sensor_x[0]:
                vehicle_detected(self.id,'A')
        if self.x <= sensor_x[1] and self.x >= sensor_x[1]-vmax[side]:
            if self.x + self.v > sensor_x[1]:
                vehicle_detected(self.id,'B')

    def advance(self):
    
    #STEP 4: advancing
    #After steps 1-3 the new velocity vn for each car n has been determined forward by vn cells: xn -> xn+vn.
        self.x += self.v
            
    #LANE CHANGE
    def change_lane(self,move):
        self.lane  += move

### Neighborhood check

Find the veihcle's imediate neighbors (left, front and right) through simList. 

In [34]:
def neighbors(id):
    
    # GET CAR DATA
    veic = veicList[id] #veihcle data from veicList
    n = len(simList[veic.lane]) #amount of vehicles inside this lane
    pos = 0 #veihcle's position inside simList
    while simList[veic.lane][pos] != id:
        pos += 1
    
    # FRONT NEIGHBOR
    if pos == (n-1): #if veihcle is leading
        front = 'no car'
    else:
        front = simList[veic.lane][pos+1]
    
    # FIND ROAD SHOULDERS (simulation limits)
    if veic.lane < n_lanes:
        left_shoulder = 0
        right_shoulder = n_lanes-1
    else:
        left_shoulder = n_lanes
        right_shoulder = 2*n_lanes-1
        
    #SEEKER FUNCTIONS
    def check_left():       
        pos_left = 0
        while pos_left < len(simList[veic.lane-1]) and veicList[simList[veic.lane-1][pos_left]].x < veic.x:
            pos_left += 1
            
        if pos_left == len(simList[veic.lane-1]): #the veihcle would lead in the left lane
            left = 'no car'
        else:
            left = simList[veic.lane-1][pos_left]
        return left
    
    def check_right():      
        pos_right = 0
        while pos_right < len(simList[veic.lane+1]) and veicList[simList[veic.lane+1][pos_right]].x < veic.x:
            pos_right += 1
                
        if pos_right == len(simList[veic.lane+1]): #the veihcle would lead in the right lane
            right = 'no car'
        else:
            right = simList[veic.lane+1][pos_right]
        return right
    
    # BOUNDARY CONDITIONS
    # SINGLE LANE
    if n_lanes == 1:
        left = 'null'
        right = 'null'
        
    # TWO LANES    
    elif n_lanes == 2:
        
        # check if RIGHT or LEFT shoulder
        if veic.lane == left_shoulder: 
            
            # search for right neighbor position
            left = 'null'
            right = check_right()
                
        else:

            # search for left neighbor position
            left = check_left()
            right = 'null'
            
    # THREE OR MORE LANES
    else:
        if veic.lane == left_shoulder: 
            
            # search for right neighbor position
            left = 'null'
            right = check_right()
                
        elif veic.lane == right_shoulder:
            
            # search for left neighbor position
            left = check_left()
            right = 'null'
        
        else: #central lane
            
            # search for both neighbor positions
            left = check_left()
            right = check_right()

    return [left,front,right] 

#'no car', if the car would lead the lane (no vehicle ahead of him in that lane)
#'null', if the desired lane is out of bonds
# vehicle ID, if the vehicle exists 

### Gap check

find available manouver space for the veihcle

In [35]:
def gaps(id):
    
    # GET VEIHCLE AND NEIGHBORS DATA FROM VEICLIST
    veic = veicList[id]
    next_veic = neighbors(id)
    side = floor(veic.lane/n_lanes)
    
    # DETERMINE GAPS
    # FRONT GAP
    if next_veic[1] == 'no car':
        front_gap = vmax[side]
    else:
        front_gap = veicList[next_veic[1]].x - veic.x - 1
    
    # LEFT GAP
    if next_veic[0] == 'no car':
        left_gap = vmax[side]
    elif next_veic[0] == 'null':
        left_gap = 0
    else:
        left_gap = veicList[next_veic[0]].x - veic.x
    
    # RIGHT GAP
    if next_veic[2] == 'no car':
        right_gap = vmax[side]
    elif next_veic[2] == 'null':
        right_gap = 0
    else:
        right_gap = veicList[next_veic[2]].x - veic.x

    return [left_gap, front_gap, right_gap]

### Lane Change Rule

check and apply lane change rules

In [36]:
def try_lane_change(id,move):
    
    if veicList[id].x < (L-1): #check if the vehicle is inside the simulation
        
        # GET RELEVANT VEIHCLE INFO
        veic = veicList[id] # desired veihcle
        next_veic_id = neighbors(id) # veihcle neighbors

        # GAP CHECK
        if next_veic_id[1+move] != 'null': # if the desired lane exists  

            # CONDITIONS FOR LANE CHANGE:
            # 1 - the vehicle is blocked by something (safe distance check).
            # 2 - there is room for the changing manouver (gap check).
            # 3 - the next driver in the desired lane is faster then the current next driver (it is appropriate to change lanes)
            # OR the desired lane is free (no car ahead).
            # 4 - the driver is impacient (driver agressivness check)

            # CONDITION 1
            blocked = False
            if next_veic_id[1] != 'no car' and veic.v > veicList[next_veic_id[1]].v+1:
                blocked = True
            
            # CONDITION 4 willingness to change lanes (measure driver agressivness)
            want = False
            if uniform(0,1) < veic.ag:
                want = True

            # CONDITION 3 - Check the neighborhood
            # CASE 1: no current leaders in both lanes
            if next_veic_id[1+move] == 'no car' and next_veic_id[1] == 'no car':
                appropriate = False #no need, the current lane is also free

            # CASE 2: there is a current leader in current lane, but no leader in the desired lane
            elif next_veic_id[1+move] == 'no car' and next_veic_id[1] != 'no car':
                appropriate = True #there is more space to accelerate in the desired lane

            # CASE 3: there is a current leader in the desired lane, but no leader in the current lane
            elif next_veic_id[1+move] != 'no car' and next_veic_id[1] == 'no car':
                appropriate = False #no sense, the current lane is already free and the desired one is occupied

            # CASE 4: both lanes have leaders
            else:
                if veicList[next_veic_id[1+move]].v > veicList[next_veic_id[1]].v:
                    appropriate = True #the leader in the desired lane is faster than the current leader
                else:
                    appropriate = False 

            if blocked == True and appropriate == True and want == True:

                # apply change to simList
                if next_veic_id[1+move] == 'no car': # CASE 2
                    simList[veic.lane].remove(id)
                    simList[veic.lane+move].append(id)

                else: # CASE 4
                    n_desired = len(simList[veic.lane+move])
                    desired_pos = 0 
                    while veicList[simList[veic.lane+move][desired_pos]].x < veic.x:
                        desired_pos += 1

                    new_lane = []
                    pos = 0
                    while pos < n_desired:
                        if pos == desired_pos:
                            new_lane.append(id)
                        new_lane.append(simList[veic.lane+move][pos])
                        pos += 1
                    simList[veic.lane].remove(id)
                    simList[veic.lane+move] = new_lane
                    
                # apply change to veicList
                veicList[id].change_lane(move)

### Find active IDs

Find out which region of veicList is active

In [37]:
def find_limits():
    
    min_id = ID
    max_id = 0
    
    i = 0
    while i < ID: 
        if veicList[i].x < (L-1): #if the veihcle is inside the simulation
            if i < min_id:
                min_id = i
            if i > max_id:
                max_id = i
        i+=1
    
    return (min_id,max_id)

## simList

A list of all IDs of the vehicles currently inside the simulation, ordered "x" and separated by lane.

In [38]:
def sim_list_update():
    
    simList = []
    for i in range(2*n_lanes):
        simList.append([])
        
    limits = find_limits()

    # run through all vehicles in veicList and find the ones currently inside the simulation
    id = limits[0]
    while id < ID: 
        if veicList[id].x < (L-1): #if the veihcle is inside the simulation
            simList[veicList[id].lane].append(id)
        id+=1
            
    # bubble sort algorithm to sort the found IDs by "x" for each lane
    done = False
    while done == False:
        lane = 0
        done = True
        while lane < 2*n_lanes:
            i = 0
            while i < (len(simList[lane])-1):
                if veicList[simList[lane][i]].x > veicList[simList[lane][i+1]].x:
                    change = simList[lane][i]
                    simList[lane][i] = simList[lane][i+1]
                    simList[lane][i+1] = change
                    done = False
                i += 1
            lane += 1
    
    return simList

### Simple display

print current road with vehicle speeds

In [39]:
def simple_display():
    
    #set empty road
    road = np.full((2*n_lanes,L),'_')
    
    #place current vehicles
    simList = sim_list_update()
    i = 0
    while i < ID:
        if veicList[i].x < L-1:
            if veicList[i].lane >= n_lanes:
                road[veicList[i].lane][veicList[i].x] = veicList[i].v
            else:
                road[veicList[i].lane][(L-1)-veicList[i].x] = veicList[i].v
        i += 1
    
    #display results
    print(road,"\n")

### RUN SIMULATION

Start simulation, animate and generate csv file

In [40]:
log = True #generate simulation log file? (this feature may increase the processing time)

# TWO PARAMETERS TEST
#value1 = np.array([0.00,0.10,0.20,0.30,0.40,0.50])
#value2 = np.array([5,15,25,35,45,55,65,75,85,95])
#n_scenarios = len(value1)*len(value2)

# ONE PARAMETER TEST
#value = np.array([0.00,0.10,0.20,0.30,0.40,0.50])
#n_scenarios = len(value)

sensor_file_name = 'jam'
n_scenarios = 1
run_n = 0
current_scenario = 0
processing_time = 0

while run_n < R*n_scenarios:
    
    if run_n-current_scenario*R >= R:
        current_scenario += 1
        
    # TWO PARAMETERS
    #current_v1 = current_scenario - floor(current_scenario/len(value1))*len(value1)
    #current_v2 = floor(current_scenario/len(value1))
        
    #p = value1[current_v1]
    #sensor_x = np.array([value2[current_v2],value2[current_v2]+5])
    #sensor_file_name = 'sensorX_'+str(value2[current_v2])+'_p_'+str(value1[current_v1])+'_'
    
    
    if log == True:
        # FILE HANDLING (SIMULATION LOG)

        # file name
        sim_file = file_name+'.csv' #matrix file

        # debugging
        try:
            sim_hand = open(sim_file,'w')
        except:
            print('File cannot be opened: ',sim_file)
            exit() 

        # preparing .csv    
        sim_hand.write('t,ID,type,lane,pos,speed,ag\n')

    # SIMULATION SETUP

    #print("Simulation setup...\n")
    print("Run number: ",run_n+1)

    # NUMBER OF VEHICLES IN THE BEGINNING OF THE SIMULATION
    ID = 0 #starting ID
    flow_line = 0 # starting entry flow table line
    ENTRY = np.zeros(2*n_lanes) # entry control variable
    sensor_line = 0 # starting sensor table line
    Q1 = ENTRY_FLOW_TABLE[flow_line][1] # initial Q1
    Q2 = ENTRY_FLOW_TABLE[flow_line][2] # initial Q2
    cells = L*2*n_lanes #total number of cells in the simulation
    n0 = round(K0*L*(1-tp[0])) #number of vehicles in each lane ("tp" is the car/truck ratio)
    t0 = round(n0*tp[0]) #number of trucks in each lane
    c0 = n0 - t0 #number of cars in each lane

    #print("the simulation will have a total of",2*n_lanes,"lanes and a lenght of L:",L,"cells")
    #print("there will be inserted: ",c0,"cars and",t0,"trucks in each lane, with an initial density of ",(c0+t0*2)/L,"\n")

    # INITIATE LISTS
    veicList = np.array([]) #veihcle list
    detector = np.zeros([2*n_lanes,2],dtype = 'int32') #detector[lane number][i] with [0] for entrance and [1] for exit
    detector[:,0] = n0 #include in the entrance detectors the number of cars already in the simulation
    posMatrix = []

    # INITIATE SENSOR
    sensor = np.zeros((int(T/sensor_time_step+1),17))
    i = 1
    while i < int(T/sensor_time_step+1):
        sensor[i][0] = int(sensor_time_step + sensor[i-1][0])
        i += 1   

    #PLACE VEHICLES
    #if n0 > 0:
        #fill posMatrix with n0 unique random positions for each lane
        #lane = 0
        #while lane < 2*n_lanes:

            #posMatrix.append([])
            #count = 0
            #while count < n0:
                #position = randint(0,L-1)
                #if position not in posMatrix[lane]: 
                    #posMatrix[lane].append(position)
                    #count += 1
            #lane += 1

        #show PosMatrix
        #print("posMatrix:\n",posMatrix,"\n")

        #car_count = c0
        #truck_count = t0

        #lane = 0
        #while lane < 2*n_lanes:
            #veic = 0
            #while veic < n0:
                #pos = randint(0,len(posMatrix[lane])-1) #sort a random position from posMatrix
                #x = posMatrix[lane][pos]
                #side = floor(lane/n_lanes,0)
                #if x+1 not in posMatrix[lane] and t0 > 0: #x+1 is free and more trucks are needed
                    #veicList = np.append(veicList,Veic(ID,1,lane,randint(0,vmax[side]),x)) #insert a truck
                    #truck_count -= 1
                #else:
                    #veicList = np.append(veicList,Veic(ID,0,lane,randint(0,vmax[side]),x)) #insert a car
                    #car_count -= 1
                #ID += 1
                #posMatrix[lane].remove(x) #remove the occupied position
                #veic += 1
            #lane += 1

    #else:
        #print("the road starts empty!\n") #in case n0 = 0

    #print("detectors\n",detector,"\n")

    simList = sim_list_update()

    #print SimList
    #print("simList:\n",simList,"\n")

    #simple_display()

    #print("All set!")

    # START SIMULATION
    t = 0
    done = 0
    start_time = time.time()
    print('PROGRESS: {',end='')

    while t < T:

        # SET SENSOR LINE
        if sensor_line < len(sensor)-1:
            if t > sensor[sensor_line+1][0]:
                sensor_line +=1

        # LANE CHANGE PHASE
        # changing lanes BEFORE moving foward to avoid collision.

        # define which movement to make
        # LEFT movements in even time-steps and RIGHT movements in odd time-steps
        if t%2 == 0:
            move = -1
        else:
            move = 1

        # allow vehicles to change lanes

        #run through all active vehicles and try to change lanes to desired direction
        limits = find_limits()
        i = limits[0]
        while i < limits[1]:
            try_lane_change(i,move)
            i += 1

        # ENTRY PHASE
        # move to current flow line acording to time-step
        if flow_line < len(ENTRY_FLOW_TABLE)-1:
            if t >= ENTRY_FLOW_TABLE[flow_line+1][0]:
                flow_line += 1

        # set entry flow
        lane = 0
        while lane < 2*n_lanes:

            side = floor(lane/n_lanes)

            try:
                free_dist = veicList[simList[lane][0]].x
            except:
                free_dist = vmax[side]

            Q = ENTRY_FLOW_TABLE[flow_line][side+1]/n_lanes*time_step

            # add entry flow to entry
            ENTRY[lane] += round(Q/3600,3)

            if (ENTRY[lane]-(detector[lane][0]-n0)-lane+n_lanes*side) > 1 and free_dist > 0:

                #roll type of new veihcle
                side = floor(lane/n_lanes)
                ty = 0
                if uniform(0,1) < tp[side] and free_dist > 1:
                    ty = 1

                #add veihcle to position
                veicList = np.append(veicList,Veic(ID,ty,lane,randint(2,vmax[side]),0))
                detector[lane][0] += 1
                ID += 1   
            lane += 1

        # UPDATE PHASE
        # update car speed from Upstream to Downstream 
        lane = 0
        while lane < 2*n_lanes:
            n = len(simList[lane])
            while n > 0:
                veicList[simList[lane][n-1]].update()
                if log == True:
                    sim_hand.write(str(t)+','+str(veicList[simList[lane][n-1]].id)+','+str(veicList[simList[lane][n-1]].type)+','+str(veicList[simList[lane][n-1]].lane)+','+str(veicList[simList[lane][n-1]].x)+','+str(veicList[simList[lane][n-1]].v)+','+str(veicList[simList[lane][n-1]].ag)+'\n')
                n -= 1
            lane += 1
            
        # ADVANCE PHASE
        # advance cars and register progress inside ".csv"
        lane = 0
        while lane < 2*n_lanes:
            n = len(simList[lane])
            while n > 0:
                veicList[simList[lane][n-1]].advance()
                n -= 1
            lane += 1

        # SAVE CURRENT TIME-STEP
        simList = sim_list_update() 

        # PROGRESS BAR
        progress = t/T
        if progress - done > 0.01:
            done += 0.01
            print('|',end='')

        #NEXT TIME STEP
        t += 1

    print('} done!')

    print("Simulation period (T): ",T)
    print("Processed in: %s seconds" % round(time.time() - start_time,2))
    processing_time += time.time() - start_time
    if log == True:
        sim_hand.close()

    # print sensor data

    #sensor_file_name = 'vmax4_'
    sensor_file = sensor_file_name+str(run_n+1-current_scenario*R)+'_sensor.csv' #report file

    try:
        sensor_hand = open(sensor_file,'w')
    except:
        print('File cannot be opened: ',sensor_file)
        exit()    

    sensor_hand.write('t,Q1Ac,VmA1c,Q1At,VmA1t,Q2Ac,Vm2Ac,Q2At,Vm2At,Q1Bc,VmB1c,Q1Bt,VmB1t,Q2Bc,Vm2Bc,Q2Bt,Vm2Bt\n')

    i = 0
    while i < len(sensor):
        line = str(sensor[i][0])
        for j in range(16):
            line += ','+str(int(sensor[i][j+1]))
        sensor_hand.write(line+'\n')
        i+=1

    sensor_hand.close()

    df_sensor = pd.read_csv(sensor_file, sep = ',', encoding = "UTF-8", header = 0)
    df_sensor.head()
    run_n += 1 
    
print("Done!")
print(run_n," scenarios run in a period of:",round(processing_time,2),"seconds.")
winsound.Beep(500, 1000)

Run number:  1
PROGRESS: {|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||} done!
Simulation period (T):  5000
Processed in: 101.9 seconds
Done!
1  scenarios run in a period of: 101.9 seconds.


### Check csv file

display desired ".csv" file

In [41]:
file_name = 'sim'
df = pd.read_csv(file_name+'.csv', sep = ',', encoding = "UTF-8", header = 0)
df.head()

Unnamed: 0,t,ID,type,lane,pos,speed,ag
0,5,0,0,0,0,3,0.3
1,5,1,0,3,0,3,0.5
2,6,0,0,0,3,4,0.3
3,6,1,0,3,3,4,0.5
4,7,0,0,0,7,4,0.3


### SIMULATION REPORT

Reads the ".csv" file to generate Graphs and enable reanimation

In [22]:
TABLE = np.loadtxt(open(file_name+".csv", "rb"), delimiter=",", skiprows=1)

#TABLE: 't','ID','type','lane','pos','speed','ag'
array  = np.amax(TABLE, axis = 0)
print(array[0])
print(array[1])
print(array[3])
print(array[4])
print(array[5])
print(array[6])

4999.0
7307.0
5.0
98.0
5.0
0.7


### ANIMATE

creates an animation for a desired ".csv" file

In [42]:
# FILE HANDLING
#TABLE: 't','ID','type','lane','pos','speed','ag'

# load custom file
file_name = 'sim'

try:
    TABLE = np.loadtxt(open(file_name+".csv", "rb"), delimiter=",", skiprows=1)
except:
    print("Some unexpected error ocurried. Please test the file and try again.")

# EXTRACT SIMULATION INFO
sim_info  = np.amax(TABLE, axis = 0)
ID_max = int(sim_info[1]) # total of processed vehicles

extract = False

if extract == True:
    T = int(sim_info[0])+1 # duration in s
    vmax = int(sim_info[2])
    L = int(sim_info[3])-vmax+1 # simulation lenght
    n_lanes = int(sim_info[4])/2 # number of lanes
    
# ANIMATION SETUP
# set variables
SIZE = 600
CELL = SIZE/L

# set colors
WHITE = (255,255,255)
BLACK = (0,0,0)
YELLOW = (255,225,0)
GREEN = (0,255,0)
GRAY = (200,200,200)

# set animation screen
pygame.init()
pygame.display.init()
pygame.display.set_caption('CellSIM')
screen = pygame.display.set_mode((SIZE,SIZE)) 
icon = pygame.image.load("cell_sim.png")
pygame.display.set_icon(icon)
screen.fill(WHITE)
font = pygame.font.Font('freesansbold.ttf', 18)
clock = pygame.time.Clock()
pygame.display.flip()

# DISPLAY

#display lane (space-time diagram)
display_lane = 0
clear_counter = 0

# set image sizes
field = 8
central_bed = 2 # half the total central bed
shoulder = 3
road = int(CELL*n_lanes+2*shoulder)
display = 2*(field+road+central_bed)
menu_bar = 30
st_diagram = SIZE - display - menu_bar

# define colors for each vehicle

COLOR = []
i = 0
while i < ID_max:
    COLOR.append([])
    COLOR[i].append(randint(0,255))
    COLOR[i].append(randint(60,140))
    COLOR[i].append(randint(0,255))  
    i+=1
    
# DRAW ROAD
grass = pygame.draw.rect(screen,GREEN,(0,0,SIZE,display))

upstream_road = pygame.draw.rect(screen,GRAY,(0,field,SIZE,road))
downstream_road = pygame.draw.rect(screen,GRAY,(0,display/2+central_bed,SIZE,road))

white_line1 = pygame.draw.line(screen,WHITE,(0,field+shoulder-1),(SIZE,field+shoulder-1))
white_line2 = pygame.draw.line(screen,WHITE,(0,field+road-(shoulder-1)-1),(SIZE,field+road-(shoulder-1)-1))
white_line3 = pygame.draw.line(screen,WHITE,(0,display/2+central_bed+shoulder-1),(SIZE,display/2+central_bed+shoulder-1))
white_line3 = pygame.draw.line(screen,WHITE,(0,display/2+central_bed+road-(shoulder-1)-1),(SIZE,display/2+central_bed+road-(shoulder-1)-1))                      

gray_bar = pygame.draw.rect(screen,GRAY,(0,display,SIZE,menu_bar))

pygame.display.update()
    
# ANIMATE
t = 0
i = 0

while t < T:
    for event in pygame.event.get():
        if event.type == QUIT:
            pygame.display.quit()
            pygame.quit()
            sys.exit()
            
    #print("t:",t)

    # set fps
    clock.tick(10)

    # clear road
    upstream_lanes = pygame.draw.rect(screen,BLACK,(0,field+shoulder,SIZE,road-2*shoulder))
    downstream_lanes = pygame.draw.rect(screen,BLACK,(0,display/2+central_bed+shoulder,SIZE,road-2*shoulder))

    gray_bar = pygame.draw.rect(screen,GRAY,(0,display,SIZE,menu_bar))
    clear_space_time = pygame.Surface((SIZE,st_diagram))
    clear_space_time.fill(WHITE)

    # fill in the vehicles for the time-step 
    while TABLE[i][0] == t and i < len(TABLE)-1:

        # EXTRACT VEHICLE DATA
        id = int(TABLE[i][1])
        ty = int(TABLE[i][2])
        lane = int(TABLE[i][3])
        x = int(TABLE[i][4])
        speed = int(TABLE[i][5])
        ag = TABLE[i][6]
        col = (COLOR[id][0],COLOR[id][1],COLOR[id][2])

        # SET VEIHCLE POSITION
        if lane < n_lanes: # upstream flow
            xt = CELL*((L-1)-x) 
            yt = field+shoulder+(lane)*CELL
        else: # downstream flow
            xt = CELL*x 
            yt = display/2+central_bed+shoulder+(lane-n_lanes)*CELL  
                
        # GENERATE VEHICLE ICON
        veic = pygame.Surface((CELL*(1+ty),CELL))
        veic.fill(col)

        # DRAW VEIHCLE
        #live display
        screen.blit(veic,(xt,yt))

        #space-time diagram (only for display_lane)
        if lane == display_lane: 
            xstd = CELL*x
            ystd = (600 - ((t+1)*CELL-clear_counter*st_diagram))
            if ystd < display+menu_bar:
                screen.blit(clear_space_time,(0,display+menu_bar))
                clear_counter += 1
                ystd = (600 - ((t+1)*CELL-clear_counter*st_diagram))
            
            screen.blit(veic,(xstd,ystd))

        # next vehicle
        i += 1

    # SHOW TIME-STEP INFORMATION
    t_font = font.render('t: %s' % (t), True, BLACK)
    t_rect = t_font.get_rect()
    t_rect.topleft = (10, display+8)
    screen.blit(t_font, t_rect)
    pygame.display.update()
            
    # next time-step
    t+=1

while True:
    for event in pygame.event.get():
        if event.type == QUIT:
            pygame.display.quit()
            pygame.quit()
            sys.exit()

SystemExit: 

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [None]:
# test the sensor building fuction
sensor_test = np.zeros((int(T/sensor_time_step+1),9))

i = 1
while i < int(T/sensor_time_step-1):
    sensor_test[i][0] = sensor_time_step + sensor[i-1][0]
    i += 1 
print(sensor_test)

### Export sensor data

Export sensor data into a "csv" file

In [None]:
# check sensor data

print(sensor)

In [None]:
# print sensor data

sensor_file_name = 'vmax3'

sensor_file = sensor_file_name+'_sensor.csv' #report file

try:
    sensor_hand = open(sensor_file,'w')
except:
    print('File cannot be opened: ',sensor_file)
    exit()    

sensor_hand.write('t,Q1Ac,VmA1c,Q1At,VmA1t,Q2Ac,Vm2Ac,Q2At,Vm2At,Q1Bc,VmB1c,Q1Bt,VmB1t,Q2Bc,Vm2Bc,Q2Bt,Vm2Bt\n')

i = 0
while i < len(sensor):
    line = str(sensor[i][0])
    for j in range(16):
        line += ','+str(sensor[i][j+1])
    sensor_hand.write(line+'\n')
    i+=1

sensor_hand.close()

df_sensor = pd.read_csv(sensor_file, sep = ',', encoding = "UTF-8", header = 0)
df_sensor.head()

In [44]:
flow_file_name = 'jam_entry_flow'
df = pd.read_csv(flow_file_name+'.csv', sep = ';', encoding = "UTF-8", header = 0)
df.head()

Unnamed: 0,t,Q1,Q2
0,0,110,110
1,250,220,220
2,500,330,330
3,750,440,440
4,1000,550,550
