In [None]:
import numpy as np
import random
import seaborn as sns
import matplotlib.pyplot as plt

class TrafficSimulation(object):
    
    def __init__(self, road_length, traffic_density, 
                 max_velocity, slowdown_proba, 
                 p_swap_lane = 1, n_lanes=2):
        
        assert type(max_velocity) is int, "Max_velocity must be an integer!"
        self.max_velocity = max_velocity
        
        assert type(road_length) is int, "Road length must be an integer!"
        self.road_length = road_length
        
        assert traffic_density >= 0 and traffic_density <=1, "Traffic density must be a proportion, 0<=x<=1"
        self.traffic_density = traffic_density
        
        assert slowdown_proba >= 0 and slowdown_proba <=1, "slowdown_proba must be a probability, 0<=x<=1"
        self.slowdown_proba = slowdown_proba
        
        assert p_swap_lane >= 0 and p_swap_lane <=1, "p_swap_lane must be a probability, 0<=x<=1"
        self.p_swap_lane = p_swap_lane
        
        assert type(n_lanes)==int, "Number of lanes must be an integer"
        self.n_lanes = n_lanes
        
        #Track number of cars that have passed the boundary
        self.flow = 0
        
        #Track number of timestep the simulation has been run
        self.time = 0
        
        #Number of cells to look back is equal to the maximum velocity
        self.look_back = max_velocity
        
        #Make road
        #Infer number of cars
        self.num_cars = np.round(road_length*traffic_density*n_lanes)
        num_cars = np.round(road_length*n_lanes*traffic_density)
        road_array = -np.ones((n_lanes, road_length), dtype=int)
        
        car_counter = 0
        #Insert the cars with 0 velocity at random positions on the road
        while car_counter < num_cars:
            random_loc = random.randrange(0, n_lanes), random.randrange(0, road_length)
            if road_array[random_loc] == -1:
                road_array[random_loc] = 0
                car_counter += 1
                
        self.road = road_array
        
    def look(self, lane, cell, ahead=True):
        '''
        Takes a current position, defined using lane and cell
        Returns distance to next car in direction ahead
        if ahead=False, returns distance to next car behind
        
        Distance is defined to be 0 if there is a car in same cell in other lane
        '''
        if self.road[lane, cell] != -1:
            return 0
            
        #1 if ahead
        if ahead:
            increment = 1
        else: 
            increment = -1
        
        #Keep looking one cell further out until a car is found
        #or far out enough has been checked for it not to matter
        #% operator accounts for periodic boundary condition
        distance = increment
        while self.road[lane, (cell + distance) % self.road_length] == -1 and abs(distance) < self.max_velocity+2:
            distance += increment
        
        return distance
    
    def check_lane(self, lane, cell, velocity):
        '''
        Checks if car, moved to road[lane, position] can:
        1) Accelerate
        2) Fit without obstructing car behind it
        3) Random draw from uniform distrubution on 0,1 < p_change
        '''
        
        #Look ahead, see if can accelerate
        front_distance = self.look(lane, cell)
        cond1 = front_distance-1 > velocity+1
        
        #Look back, see if there is space
        back_distance = self.look(lane, cell, ahead=False)
        #Back distance is negative, so we convert it to positive by *-1
        cond2 = (-1*back_distance)-1 > self.max_velocity
        
        cond3 = random.random() < self.p_swap_lane
        
        #Returns True iff all conditions are met
        return cond1*cond2*cond3
    
    def update_lanes(self):
        '''
        Updates the lane of each of the cars on the road,
        Starting from the top lane and working downwards
        
        look forward = v+1, lo = l, lo; back = 5 = vmax, and pchange = 1
        '''
        #If there is only one lane on the road, skip this step
        if self.n_lanes == 1:
            return
        
        #Make an empty road to store updates on
        next_state = -np.ones((self.n_lanes, self.road_length), dtype=int)
        for lane in range(self.n_lanes):
            #For every cell in lane
            for cell in range(self.road_length):    
                
                #if cell is not a car, go to next cell
                if self.road[lane, cell] != -1:
                    
                    #Current speed of car
                    current_velocity = self.road[lane, cell]

                    #Get distance to car in front
                    distance = 1  # The number of empty cells between this car and the next plus 1
                    while self.road[lane, ((cell + distance) % self.road_length)] == -1:
                        distance += 1
                    
                    #if gap(i) < l
                    attempt_lane_swap = (distance-1) < (current_velocity+1)
                    
                    #IF not attempting to switch, stay
                    if attempt_lane_swap:                        
                        #If already in the top lane, evaluate switching down                        
                        if lane == 0:
                            if self.check_lane(lane+1, cell, current_velocity):
                                next_state[lane+1, cell] = current_velocity
                            else:
                                next_state[lane, cell] = current_velocity
                            
                        #If already in the bottom lane, evaluate switching up
                        elif lane == self.n_lanes-1:
                            if self.check_lane(lane-1, cell, current_velocity):
                                next_state[lane-1, cell] = current_velocity
                            else:
                                next_state[lane, cell] = current_velocity
                        
                        #If there is a lane both to the left and to the right of current lane
                        else:
                            #Randomly choose whether to evaluate switching up or down first
                            random_lane = random.choice([-1, 1])
                            
                            if self.check_lane(lane+random_lane, cell, current_velocity):
                                next_state[lane+random_lane, cell] = current_velocity
                                
                                
                            #Else, check if it is possible to change to other lane
                            elif self.check_lane(lane+(-1*random_lane), cell, current_velocity):
                                next_state[lane+random_lane, cell] = current_velocity
                                
                            #If no laneswap is possible, stay in current lane
                            else: 
                                next_state[lane, cell] = current_velocity
                                
                    else:
                        next_state[lane, cell] = current_velocity
                else:
                    next_state[lane, cell] = self.road[lane, cell]
                    
        #Update the lane of all cars simultaneously
        self.road = next_state
    
    
    def update_velocities(self):
        ''''
        Updates velocity of each of the cars on the road. The following 3 steps are performed
        for each car. Then the state of each is updated synchronously.
        
        1. If the car is travelling at less than max velocity, increase speed by 1.
        
        2. If the distance, defined as the number of empty cells between two cars plus 1, 
        to the next car in front is smaller than or equal to the current velocity of the car, 
        decrease the velocity to the distance-1.
        
        3. If the current speed is not 0, with some fixed probability (slowdown_proba), 
        decrease the velocity by 1.
        '''
        
        for lane in range(self.n_lanes):
            #For every cell in lane
            for cell in range(self.road_length):
                
                #if cell is not a car, go to next cell
                if self.road[lane, cell] != -1:
                    

                    #Get distance to car in front
                    #distance = self.look(lane, cell
                    distance = 1  # The number of empty cells between this car and the next plus 1
                    while self.road[lane, ((cell + distance) % self.road_length)] == -1:
                        distance += 1

                    #Accelerate car by 1, if not already at max speed
                    if self.road[lane, cell] < self.max_velocity:
                        self.road[lane, cell] += 1

                    #Slow down due to other cars?
                    if self.road[lane, cell] >= distance:
                        self.road[lane, cell] = distance-1

                    #Random slowdown?
                    if random.random() <= self.slowdown_proba and  self.road[lane, cell]> 0:
                        self.road[lane, cell] -= 1
                    
        
    def step(self, track=False):
        '''
        Advances each car along its lane by its velocity
        
        Stores all updates on a copy of the road
        
        If track=True, every run increments self.time by 1 
        and whenever a car passes the periodic boundary, self.flow is incremented by 1
        '''
        if track:
            self.time += 1
        
        #Copy of the road, intialized to be empty
        #Will be filled successively
        next_state = -np.ones((self.n_lanes, self.road_length), dtype=int)
        #For every lane
        for lane in range(self.n_lanes):
            #For every cell in lane
            for cell in range(self.road_length):
                if self.road[lane, cell] != -1:
                    
                    #Check if car passes boundary
                    if track and (cell + self.road[lane, cell] > self.road_length-1):
                        self.flow += 1
                    
                    #next cell = current cell + current velocity, adjusting for periodic boundary
                    next_cell = (cell + self.road[lane, cell]) % self.road_length
                    next_state[lane, next_cell] = self.road[lane, cell]
                    
        self.road = next_state
                
    def display(self):
        '''
        returns a heatmap displaying the state of each cell on the road
        There are 5 possible states in the standard version of the simulation:
        White = -1, no car
        Red = Car w. velocity 0
        Orange = Car w. velocity 1
        Yellow = Car w. velocity 2
        Light Green = Car w. velocity 3
        Green = Car w. velocity 4
        Dark Green = Car w. velocity 5
        
        In general, the scale goes from red to green, with greener being faster.
        '''
        plt.figure(figsize=(10, 2.5+(2.55*self.n_lanes)))
        sns.heatmap(np.where(self.road==-1, np.nan, self.road),
                        square=True, cmap="RdYlGn", vmin=0, vmax=self.max_velocity,
                        cbar=False, xticklabels=False, yticklabels=False,
                        alpha=.75, linewidths=0.75, linecolor="black")
        
    def display_basic(self):
        print(''.join('.' if x == -1 else str(x) for x in self.road[0]))
        
    def run(self, steps, warmup=0, display=False, track=True):
        '''
        Runs warmup laps without visualizing the results to get cars up to speed
        Then visualizes steps of the simulation
        '''
        
        #Warmup: Run warmup times without displaying state
        for step in range(warmup):

            #Update lanes, velocities and step
            self.update_lanes()
            self.update_velocities()
            self.step() #not tracking

        #Run simulation for # steps, displaying state after each
        for step in range(steps):
                
            #Update and step
            self.update_lanes()
            self.update_velocities()
            
            #Display state
            if display:
                self.display()
                
            self.step(track=track)

In [18]:
import numpy as np
import random
import seaborn as sns
import matplotlib.pyplot as plt

class TrafficSimulation(object):
    
    def __init__(self, road_length, traffic_density, 
                 max_velocity, slowdown_proba, 
                 p_swap_lane = 1, n_lanes=2):
        
        assert type(max_velocity) is int, "Max_velocity must be an integer!"
        self.max_velocity = max_velocity
        
        assert type(road_length) is int, "Road length must be an integer!"
        self.road_length = road_length
        
        assert traffic_density >= 0 and traffic_density <=1, "Traffic density must be a proportion, 0<=x<=1"
        self.traffic_density = traffic_density
        
        assert slowdown_proba >= 0 and slowdown_proba <=1, "slowdown_proba must be a probability, 0<=x<=1"
        self.slowdown_proba = slowdown_proba
        
        assert type(n_lanes)==int, "Number of lanes must be an integer"
        self.n_lanes = n_lanes
        
        #Track number of cars that have passed the boundary
        self.flow = 0
        
        #Track number of timestep the simulation has been run
        self.time = 0
        
        #Number of cells to look back is equal to the maximum velocity
        self.look_back = max_velocity
        
        #Make road
        #Infer number of cars
        self.num_cars = np.round(road_length*traffic_density*n_lanes)
        num_cars = np.round(road_length*traffic_density)
        road_array = -np.ones((n_lanes, road_length), dtype=int)
        
        car_counter = 0
        #Insert the cars with 0 velocity at random positions on the road
        while car_counter < num_cars:
            random_loc = random.randrange(0, n_lanes), random.randrange(0, road_length)
            if road_array[random_loc] == -1:
                road_array[random_loc] = 0
                car_counter += 1
                
        self.road = road_array
        
    def look(self, lane, cell, ahead=True):
        '''
        Takes a current position, defined using lane and cell
        Returns distance to next car in direction ahead
        if ahead=False, returns distance to next car behind
        
        Distance is defined to be 0 if there is a car in same cell in other lane
        '''
        #If there is a car in the same cell in the other lane
        if self.road[lane, cell] != -1:
            return 0
            
        #1 if ahead
        if ahead:
            increment = 1
        else: 
            increment = -1
        
        distance = increment
        while self.road[lane, (cell + distance) % self.road_length] == -1 and abs(distance) < self.road_length:
            distance += increment
        
        return distance
        
    def can_accelerate(self, velocity, front_distance):
        '''
        Takes a speed and a distance to the next car in front,
        return a bool indicating if the car can speed up 
        (or in the case it is already at max_velocity, maintain its speed)
        '''
        
        #If already at max speed, see if you can maintain it in lane
        if velocity == self.max_velocity:
            if velocity >= front_distance:
                return False
        
        else: #see if you can speed up
            if velocity+1 >= front_distance:
                return False
        
        return True
    
    def check_lane(self, lane, cell, velocity, current_lane=False):
        '''
        Checks if car, moved to road[lane, position] can:
        1) Accelerate
        2) Fit without obstructing car behind it
        '''
        
        #Look ahead, see if can accelerate
        front_distance = self.look(lane, cell)
        if not self.can_accelerate(velocity, front_distance):
            return False
        
        #Look back, see if there is space
        back_distance = self.look(lane, cell, ahead=False)
        if back_distance < self.max_velocity:
            return False
        
        return True
        
    def update_lanes(self):
        '''
        Updates the lane of each of the cars on the road,
        Starting from the top lane and working downwards
        '''
        if self.n_lanes == 1:
            return
        
        #Make an empty road to store updates on
        next_state = -np.ones((self.n_lanes, self.road_length), dtype=int)
        
        for lane in range(self.n_lanes):
            #For every cell in lane
            for cell in self.road[lane, :]:    
                
                #if cell is not a car, go to next cell
                if self.road[lane, cell] == -1:
                    continue
                
                #Current speed of car
                current_velocity = self.road[lane, cell]
                
                #Get distance to car in front
                distance = self.look(lane, cell)
                
                #Flag for whether the car will look for lane swap
                attempt_lane_swap = not self.can_accelerate(current_velocity, distance)
                    
                #Try switching to lane above, else try lane down
                if attempt_lane_swap:
                    
                    #If not already in the top lane, evaluate switching there
                    if lane-1 > 0:
                        if check_lane(lane-1, cell, current_velocity, current_velocity==self.max_velocity):
                            
                            #Swap possible to swap, swap with p=p_swap_lane
                            if random.random() <= p_swap_lane:
                                next_state[lane-1, cell] = current_velocity  
                            
                            else:
                                next_state[lane, cell] = self.road[lane, cell]
                                
                        else:
                            next_state[lane, cell] = self.road[lane, cell]
                            
                                
                    #If not already in the bottom lane, evaluate switching there
                    #Same logic applies
                    elif lane+1 <= self.n_lanes-1:
                        if self.check_lane(lane+1, cell, current_velocity):
                            
                            #Swap possible to swap, swap with p=p_swap_lane
                            if random.random() <= p_swap_lane:
                                next_state[lane+1, cell] = current_velocity
                            else:
                                next_state[lane, cell] = self.road[lane, cell]
                        
                        else:
                            next_state[lane, cell] = self.road[lane, cell]
                    
                    else:
                        next_state[lane, cell] = self.road[lane, cell]
                else:
                    next_state[lane, cell] = self.road[lane, cell]
        
        #Update the lane of all cars simultaneously
        self.road = next_state
    
    
    def update_velocities(self):
        '''
        Updates velocity of each of the cars on the road
        '''
        
        for lane in range(self.n_lanes):
            #For every cell in lane
            for cell in range(self.road_length):
                
                #if cell is not a car, go to next cell
                if self.road[lane, cell] == -1:
                    continue
                
                #Current speed of car
                current_velocity = self.road[lane, cell]
                
                #Get distance to car in front
                #distance = self.look(lane, cell
                distance = 1  # The number of empty cells between this car and the next plus 1
                while self.road[lane, ((cell + distance) % self.road_length)] == -1:
                    distance += 1
                
                #Accelerate car by 1, if not already at max speed
                if current_velocity < self.max_velocity:
                    self.road[lane, cell] += 1

                #Slow down due to other cars?
                if distance <= current_velocity and current_velocity > 0:
                    self.road[lane, cell] = distance - 1

                #Random slowdown?
                if random.random() <= self.slowdown_proba and current_velocity > 0:
                    self.road[lane, cell] -= 1
                    
                    
    def step(self, track=False):
        '''
        Advances each car along the road by its velocity
        '''
        
        #If we need to track flow and time, increment time
        if track:
            self.time += 1
            
        #Make an empty road to store updates on
        next_state = -np.ones((self.n_lanes, self.road_length), dtype=int)   
        
        for lane in range(self.n_lanes):
            #For every cell in lane
            for cell in range(self.road_length):
                
                #if cell is not a car, go to next cell
                if self.road[lane, cell] == -1:
                    continue
            
                #Find velocity of car
                velocity = self.road[lane, cell]

                #Nothing happens if the care doesn't move
                if velocity == 0:
                    continue

                #If we need to track flow and time car passes boundary, increment flow
                if track is True and cell+velocity > self.road_length:
                    self.flow += 1

                #Next pos, given periodic boundary condition
                next_pos = (cell+velocity)%self.road_length

                #Move car to next location
                next_state[lane, next_pos] = velocity

                #If previous location is not current location, reset previous location
                self.road[lane, cell] = -1
        
        assert next_state.shape == self.road.shape, "Shapes no match"
        #print("Cars:", (self.road_length*self.n_lanes)-self.num_cars)
        #print("next state:", np.count_nonzero(next_state==-1))
        #assert (self.road_length*self.n_lanes)-self.num_cars == np.count_nonzero(next_state==-1), "Cars removed in step: {}".format(self.time)
        self.road = next_state
        
    def step_2(self, track=False):
        next_state = -np.ones((self.n_lanes, self.road_length), dtype=int)
        for lane in range(self.n_lanes):
            #For every cell in lane
            for cell in range(self.road_length):
                if self.road[lane, cell] != -1:
                    next_state[lane, (cell + self.road[lane, cell]) % self.road_length] = self.road[lane, cell]
        self.state = next_state
                
    def display(self):
        plt.figure(figsize=(20, 5))
        sns.heatmap(self.road,
                        square=True,cmap="RdYlGn", vmin=-1, vmax=5, center=-1,
                        cbar=False, xticklabels=False, yticklabels=False,
                        alpha=.7)
        
    def display_basic(self):
        print(''.join('.' if x == -1 else str(x) for x in self.road[0]))

        
    def run(self, steps, warmup=100, display=False, track=True):
        '''
        Runs warmup laps without visualizing the results to get cars up to speed
        Then visualizes steps of the simulation
        '''
        
        #Warmup: Run warmup times without displaying state
        for step in range(warmup):

            #Update lanes, velocities and step
            self.update_lanes()
            self.update_velocities()
            self.step()

        #Run simulation for # steps, displaying state after each
        for step in range(steps):
            #Display state
            if display:
                self.display_basic()

            #Update and step
            #self.update_lanes()
            self.update_velocities()
            self.display_basic()
            self.step_2()

In [19]:
#self, road_length, traffic_density,  max_velocity, slowdown_proba, p_swap_lane = 1, n_lanes=2
my_road = TrafficSimulation(10, 0.1, 5, 0, n_lanes=2)
#print(my_road.road)
my_road.run(20, 0, display=True)

..0.......
..1.......
..1.......
..2.......
..2.......
..3.......
..3.......
..4.......
..4.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......
..5.......


In [20]:
my_road.road

array([[-1, -1,  5, -1, -1, -1, -1, -1, -1, -1],
       [-1, -1, -1, -1, -1, -1, -1, -1, -1, -1]])

In [21]:
my_road.look(1, 1, ahead=False)

-10

In [None]:
Cars don't slow down properly
Cars behind directly behind get removed in step

In [272]:
n_lanes = 2
road_length = 10

next_state = -np.ones((n_lanes, road_length), dtype=int)
for lane in range(n_lanes):
    #For every cell in lane
    for cell in range(road_length):
        if my_road.road[lane, cell] != -1:
            next_state[lane, (cell + my_road.road[lane, cell]) % my_road.road_length] = my_road.road[lane, cell]
my_road.road = next_state
my_road.road

array([[-1, -1, -1,  5, -1, -1, -1, -1,  2, -1],
       [-1, -1, -1, -1,  5, -1, -1, -1, -1, -1]])

In [None]:
    def look(self, lane, cell, ahead=True):
        '''
        Takes a current position, defined using lane and cell
        Returns distance to next car in direction ahead
        if ahead=False, returns distance to next car behind
        
        Distance is defined to be 0 if there is a car in same cell in other lane
        '''
        #If there is a car in the same cell in the other lane
        if self.road[lane, cell] != -1:
            return 0
            
        #1 if ahead
        if ahead:
            distance = 1  # The number of empty cells between this car and the next plus 1
            while self.road[lane, ((cell + distance) % self.road_length)] == -1 or distance < self.road_length:
                distance += 1
                if abs(distance) == 200:
                    print(lane, cell)
                    self.display()
                    assert 1==2, "ahead loop error"

        else: 
            distance = -1
            while self.road[lane, ((cell + distance) % self.road_length)] == -1  or distance < self.road_length:
                distance -= 1
                if abs(distance) == 200:
                    print(lane, cell)
                    self.display()
                    assert 1==2, "back loop error"
        
        return distance