In [1]:
import numpy as np
import math
import random
import matplotlib.pyplot as plt

## Part 1: Traffic jams on a circular road

In [314]:
class TrafficSimulation:
    def __init__(self, road_length=100, traffic_density=0.03, max_velocity=5, probability_slow_down=0.5):
        self.road_len = road_length
        self.traff_dens = traffic_density
        self.max_vel = max_velocity
        self.prob_slow = probability_slow_down
        self.fl_count = 0
       
        self.traff_state = self.initialize_traffic()
        
    def initialize_traffic(self):
        self.fl_count = 0
        traff_state = np.repeat(-1, self.road_len)
        num_cars = self.probabilistic_round(self.traff_dens * self.road_len)
        car_locs = np.random.choice(traff_state.size, num_cars, replace=False)
        traff_state[car_locs] = self.init_velocity(num_cars)

        return traff_state
    
    def run_simulation(self, num_iter=15, display=True):
        self.traff_state = self.initialize_traffic()
        self.fl_count = 0
        
        for it in range(num_iter):
            self.update_velocities()
            if display:
                self.display() 
            self.advance_cars()
            
        return self.fl_count / num_iter

    def update_velocities(self):
        car_locs = np.where(self.traff_state != -1)[0]
        for i, car_loc in enumerate(car_locs):
            if len(car_locs) > 1:
                next_car = i + 1 if i + 1 < len(car_locs) else 0
                dist_to_next_car = (car_locs[next_car] - car_loc) % self.road_len   
            else:
                dist_to_next_car = self.road_len
            
            # acceleration
            if self.traff_state[car_loc] < self.max_vel and self.traff_state[car_loc] + 1 < dist_to_next_car:
                self.traff_state[car_loc] += 1
            
            # slowing
            if self.traff_state[car_loc] >= dist_to_next_car:
                self.traff_state[car_loc] = dist_to_next_car - 1
                
            # random slowing
            rand_num = random.random()
            if rand_num < self.prob_slow and self.traff_state[car_loc] > 0:
                self.traff_state[car_loc] -= 1          
    
    def advance_cars(self):
        car_locs = np.where(self.traff_state != -1)[0]
        for car_loc in car_locs:
            cur_vel = self.traff_state[car_loc]
            next_loc = (car_loc + cur_vel * 1) % self.road_len
            self.update_flow(car_loc, next_loc)
            self.traff_state[car_loc] = -1
            self.traff_state[next_loc] = cur_vel
            

    def update_flow(self, car_loc, next_loc):
        bound_passed = next_loc < car_loc
        
        if bound_passed:
            self.fl_count += 1
        
    def display(self):
        cur_dens = np.sum(self.traff_state != -1) / self.traff_state.size
        print(''.join('·' if x == -1 else str(x) for x in self.traff_state) + '\t' + str(cur_dens))
        print('\n')  # create space between consecutive states of multi-lane road
        
    @staticmethod
    def probabilistic_round(num):
        return int(math.floor(num + random.random()))
    
    def init_velocity(self, num_cars):
        rand_vels = np.random.randint(self.max_vel, size=num_cars)
        return rand_vels
    
    def __getitem__(self, index):
        return self.traff_state[index]
        
    def __setitem__(self, index, value):  
        self.traff_state[index] = value

In [315]:
traff_sim = TrafficSimulation(traffic_density=0.2)
traff_sim.run_simulation()

·0·1·······2······4··········3···0·2··4····2·······2··0·0··03···········4·····0·2··0·0··········3···	0.2


·0··2········2········5·········00···3····1··3·······01·1··1···3············1·1···00·1·············1	0.2


00····3········2···········3····01······2··2····4····1·0·1··2·····3··········1·2··01··1·············	0.2


00·······3·······2············1·0·1·······1··2······0·00··2···2······3········1··00·2··1············	0.2


01··········3······3···········00··2·······2···3····0·00····2···2·······4······0·01···1·1···········	0.2


0·2············3······4········01····2·······2····1·0·01······3···3·········1··1·0·1···0·2··········	0.2


0···3·············3·······4····1·2·····2·······3···00·1·1········2···3·······1··01··1··0···2········	0.2


0······3·············4········1·1··3·····2········001··0·2·········2····3·····0·1·2··1·1·····2······	0.2


0·········4··············4·····1·1····4····2······01·0·0···3·········2·····2··0··2··0·0·1······2····	0.2


1·············4··············2··1·1··

0.2

## Part 2: Multi-lane highways

In [319]:
class TrafficSimulationMultilane:
    def __init__(
        self, num_lanes=2, probability_switch_lane=0.2, uniform_init_distribution=False,  # multi-lane properties
        road_length=100, traffic_density=0.03, max_velocity=5, probability_slow_down=0.5  # single-lane properties
    ):  
        '''
        Initialization for multi-lane traffic simulation.
        
        Parameters
        ----------
        num_lanes: int
            number of lanes in the multi-lane traffic system
        probability_switch_lane: float
            probability that a car will swithc lanes, given no cars in back or front on other lane
            
        '''
        
        self.num_lanes = num_lanes
        self.prob_switch_lane = probability_switch_lane
        self.uniform_distr = uniform_init_distribution
        
        self.road_len = road_length
        self.traff_dens = traffic_density
        self.max_vel = max_velocity
        self.prob_slow = probability_slow_down
        self.fl_count = 0
        
        # initialize multiple lanes randomly given initial traffic density
        self.roads = self.initialize_roads(self.uniform_distr)
        
        [TrafficSimulation(
            road_length=self.road_len,
            traffic_density=self.traff_dens,
            max_velocity=self.max_vel,
            probability_slow_down=self.prob_slow
        ) for _ in range(num_lanes)]
        
    def initialize_roads(self, uniform=False):
        '''
        Initialize cars on road lanes.
        
        Parameters
        ----------
        uniform: bool
            indicates whether all roads should have the same initial traffic density
        
        Returns
        ----------
        roads: list of TrafficSimulation objects
            roads with initialized traffic
        
        '''
        
        roads = []
        if uniform:
            for _ in range(num_lanes):
                roads.append(
                    TrafficSimulation(
                        road_length=self.road_len,
                        traffic_density=self.traff_dens,
                        max_velocity=self.max_vel,
                        probability_slow_down=self.prob_slow))
        else:
            total_cells = self.num_lanes * self.road_len
            flattened_road = np.repeat(-1, total_cells)
            num_cars = TrafficSimulation.probabilistic_round(self.traff_dens * total_cells)
            car_locs = np.random.choice(flattened_road.size, num_cars, replace=False)
            flattened_road[car_locs] = 0  # indicate where cars would be

            # resize the flattened road into lanes and initialize with respective densities
            lanes = flattened_road.reshape(self.num_lanes, self.road_len)
            for lane in lanes:
                num_cars_in_lane = np.sum(lane == 0)
                lane_dens = num_cars_in_lane / self.road_len
                roads.append(
                    TrafficSimulation(
                        road_length=self.road_len,
                        traffic_density=lane_dens,
                        max_velocity=self.max_vel,
                        probability_slow_down=self.prob_slow))
                
        return roads
    
    def run_simulation(self, num_iter=10, display=True):
        self.traff_state = self.initialize_roads()
        self.fl_count = 0
        
        for it in range(num_iter):
            if display:
                self.display()
            self.update()
    
    def update(self):
        # update velocities
        for road in self.roads:
            road.update_velocities()
        
        # update lanes
        for i, road in enumerate(self.roads):
            car_locs = np.where(road.traff_state != -1)[0]
            for cell_id, car_loc in enumerate(car_locs):
                switches = self.get_switch_directions(i, car_loc)
                # print('car {0} on lane {1} can switch to {2}'.format(car_loc, i, switches))
                if len(switches) > 0:  # there are available lane switches:
                    switch_dir = np.random.choice(switches, 1)[0]
                    if random.random() < self.prob_switch_lane:
                        self.roads[i + switch_dir][car_loc] = self.roads[i][car_loc]
                        self.roads[i][car_loc] = -1
        
        # advance cars
        for road in self.roads:
            road.advance_cars()
            
    def get_switch_directions(self, road_idx, car_loc):
        switch_up, switch_down = False, False
        if road_idx > 0:
            switch_up = self.check_lane_free(road_idx, road_idx - 1, car_loc)
        if road_idx < self.num_lanes - 1:
            switch_down = self.check_lane_free(road_idx, road_idx + 1, car_loc)

        switches = []  # get available lane switch directions
        if switch_up:
            switches.append(-1)  # up
        if switch_down:
            switches.append(1)  # down
            
        return switches
            
    def check_lane_free(self, road_from, road_to, insert_pos):
        vel = self.roads[road_from][insert_pos]
        # using distance instead of gap to calculate freeness of other lane
        lookback_idx = (insert_pos - 5) % self.road_len  
        lookahead_idx = (insert_pos + vel + 1) % self.road_len 
        # print(lookback_idx, lookahead_idx)

        if lookback_idx < lookahead_idx:  # checking if we are crossing the periodic boundary
            lane_free = np.all(self.roads[road_to][lookback_idx:lookahead_idx] == -1)
        else:  # we are crossing the periodic boundary
            lane_free = np.all(self.roads[road_to][:lookahead_idx] == -1) and\
                        np.all(self.roads[road_to][lookback_idx:] == -1) 
        
        return (True if lane_free else False)
    
    def display(self):
        for road in self.roads:
            cur_dens = np.sum(road.traff_state != -1) / road.traff_state.size
            print(''.join('·' if x == -1 else str(x) for x in road.traff_state) + '\t' + str(cur_dens))
        print('\n')  # create space between consecutive states of multi-lane road

In [320]:
multlane_sim = TrafficSimulationMultilane(num_lanes=2, traffic_density=0.2, probability_switch_lane=0.5)

In [321]:
multlane_sim.run_simulation()

··2················44····22·104···2···20···1··3······2····3···3·203·····4·····2···········41········	0.23
·3···············2·········43····3···4·········4·30·····0·2··0·········1···30·····4········2········	0.17


····2··············0···3···100···3···30·1···1····3······3····30·00···3······4···2·········0··2······	0.22
····3··············2·····0·0····4···3·····5····0·0·1····0···2·1··········2·0·1·········5······3·····	0.18


·······3···········0······300·1·····300··1····2······4·····3·0·100·······4·0···3··2········1···2····	0.23
········4············2····1·1·····2····3·····3··10···2···1··0···2········0·····2············5·····4·	0.17


···········4·······0······00·1·1····000····2····2·········50·0·000········10····1···2········2···2··	0.23
··4·········4··········2···1··2·····2······4··1·0·1····2···20·····2·······1······2··············4···	0.17


···············4·········20·10··1···000·······3···2·······00·0·00·1·······00·····1·····3········3··2	0.23
4·····4··········5··1········2··2·····

In [267]:
a = np.array([1, 2, 3, 4, 5, 6, 7])

In [268]:
a[5:1]

array([], dtype=int64)