# Traffic Flow 
### A traffic simulation in search of optimized junction designs
By Ravi Mohanlal & Jelle Stoffels

In [1]:
import pygame
import itertools
import random
import numpy as np
import sys
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline

pygame 1.9.4
Hello from the pygame community. https://www.pygame.org/contribute.html


## Global constants
Lengths are in pixels. Times are in frames.

In [2]:
# Car parameters
CAR_WIDTH = 10
CAR_LENGTH = (20,27)
TRUCK_LENGTH = (42,50)
TRUCK_PROBABILITY = 0.1
ACCELERATION = 0.02
DECELERATION = -0.02
MAX_SPEED = 2
FOLLOWING_TIME = 40
BUFFER = 20 
SPAWN_RATE = 30 # cars per minute

# Frame rate
FPS = 60

# Intersection dimensions
H = 900
B = 250
L = 50

# Colors
WHITE = (240, 240, 245)
RED1 = (255, 100, 100)
RED2 = (220, 65, 65)
GREY1 = (150, 150, 150)

# Left mouse click
LEFT = 1

## Paths and sections
Each car follows a certain path. A path is made up of a number of segments, which are mathematically defined lines and curves.

In [3]:
# A general section has a few parameters defining its position, length, orientation and direction.
# It also contains the cars which are driving on it.
class Section():
    def __init__(self, x, y, a, b, length):
        self.x = x
        self.y = y
        self.a = a
        self.b = b
        self.length = length
        self.cars = []
        self.priority = False

In [4]:
# Lines and curves are special cases of sections, which contain functions for calculating
# the position and angle a certain distance along this section.

class Line(Section):
    def position(self, s):
        return (self.x + self.a*s, self.y + self.b*s)

    def angle(self, s):
        return np.arctan2(-self.b, self.a)

class Curve(Section):
    def __init__(self, x, y, a, b, l, r):
        super().__init__(x, y, a, b, l*np.pi*r/2)
        self.l = l
        self.r = r

    def position(self, s):
        c = self.a*np.pi/2 + self.b*s/self.r
        return (self.x+self.r*np.cos(c), self.y-self.r*np.sin(c))
    
    def angle(self, s):
        return (self.a+1)*np.pi/2 + self.b*s/self.r

## Cars
This class represents a car. It contains functions for updating its acceleration, speed and traveled distance. The position and angle of the car are determined by its traveled distance and the path that it is on. 

In [5]:
class Car():
    def __init__(self, length, path, lights):
        self.length = length
        self.sections, self.blockades = path
        self.i = 0
        self.distance = 0
        self.speed = MAX_SPEED
        self.acceleration = 0
        self.special_priority = False
        self.color = pygame.Color(0)
        self.color.hsla = (random.randint(0,360), 75, 25)
        self.t = 0
        self.lights = lights
        
    # Returns the section that this car is on.
    def section(self):
        return self.sections[self.i]
    
    # Returns the car's position.
    def position(self):
        return self.section().position(self.distance)
    
    # Returns the car's angle.
    def angle(self):
        return self.section().angle(self.distance)
    
    # Returns the car that this car is following.
    def find_next_car(self):
        for car in reversed(self.section().cars):
            if car.distance > self.distance:
                return car
        for section in self.sections[self.i+1:]:
            if section.cars:
                return section.cars[-1]
            
    # Updates the acceleration of the car.
    def update_acceleration(self):
        next_car = self.find_next_car()
        self.acceleration = ACCELERATION
        
        # In case of a traffic light or oncoming traffic which has priority:
        if self.i == 1 and ((self.lights and not self.section().priority) or \
        (not self.lights and any(b.cars for b in self.blockades) and not self.special_priority)):
            # Decelerate if your braking distance is about as long as the distance to the stopline.    
            space = self.sections[1].length - self.distance - self.length/2
            braking_distance = self.speed**2 / (2 * -DECELERATION)
            difference = space - braking_distance
            if 0 < difference < 10:
                self.acceleration = DECELERATION

        # In case you're following a car:
        if next_car:
            # Calculate distance to next car.            
            space = next_car.distance - self.distance - \
                    (self.length + next_car.length)/2 - BUFFER
            for i in range(self.i, len(self.sections)):
                if self.sections[i] == next_car.section():
                    break
                space += self.sections[i].length
            
            # Update acceleration such that there is a constant following time to the next car.
            # (Think of two-second rule in driving)
            acceleration = 2*(space - self.speed*FOLLOWING_TIME) / FOLLOWING_TIME**2
            if acceleration < self.acceleration:
                self.acceleration = acceleration

    # Update speed by adding acceleration.
    # Speed is clipped between 0 and MAX_speed. And a very low speed is 0.    
    def update_speed(self):
        self.speed = round(max(0, min(MAX_SPEED, self.speed + self.acceleration)),2)

    # Update distance by adding speed.    
    def update_distance(self):
        self.distance += self.speed
    
    # Update the section which this car is on.
    def update_section(self):
        section = self.section()

        if self.distance > section.length:
            self.distance -= section.length
            section.cars.remove(self)
            self.i += 1
            
            if self.i < len(self.sections):
                self.section().cars.append(self)
            else:
                return True
    
    # Update everything.
    def update(self):
        self.t += 1
        self.update_acceleration()
        self.update_speed()
        self.update_distance()
        return self.update_section()
        

## Intersections
This class represents an intersection. It contains paths and cars among other things. It contains functions for things such as spawning cars and handling traffic lights.

In [6]:
class Intersection():
    def __init__(self, background, sections, paths, triggers, lights=False, spawn_rate=SPAWN_RATE):
        if background:
            self.background = pygame.image.load(background + '.png').convert()
        self.sections = sections
        self.paths = paths
        self.triggers = triggers
        self.car_count = 0
        self.spawn_rate = spawn_rate
        self.cars_per_min = 0
        self.car_time_list = []
        self.average_travel_time = 0
        self.timer = 0
        self.lights = lights
    
    # Iterates over all the cars in the intersection.
    def cars(self):
        for section in self.sections:
            for car in section.cars:
                yield car
        
    # Possibly spawns a car.    
    def spawn_cars(self, t):
        # Spawns according to certain probability    .    
        if random.random() < self.spawn_rate/60/FPS:
            if random.random() < TRUCK_PROBABILITY:
                length = random.randint(*TRUCK_LENGTH)
            else:
                length = random.randint(*CAR_LENGTH)

            path = random.choice(self.paths)
            section0 = path[0][0]
            
            # Only spawns if there's enough room.            
            if not section0.cars or section0.cars[-1].distance > 80: 
                car = Car(length, path, self.lights)
                section0.cars.append(car)

    # Update things that have to happen globally.
    def update_global(self, t):
        # Handling traffic lights.        
        if self.lights:
            for s in self.triggers:
                if s.priority and not s.cars:
                    s.priority = False
                    self.timer = t

            if t - self.timer > FPS:
                if not any(s.priority for s in self.triggers):
                    occupied = [s for s in self.triggers if s.cars]
                    if occupied:
                        random.choice(occupied).priority = True
                        
        # Randomly gives a car priority in a situation where nobody can move.
        else:
            if not t%60 and not any(s.cars and s.cars[0].speed for s in self.triggers):
                cars = [s.cars[0] for s in self.triggers if s.cars]
                if cars:
                    random.choice(cars).special_priority = True
    
    # Update the cars and some stats.
    def update_cars(self, t):        
        for section in self.sections:
            for car in section.cars:
                if car.update():
                    self.car_count += 1
                    self.cars_per_min = self.car_count/(t/FPS/60)
                    self.car_time_list.append(round(car.t/FPS,3))
                    self.average_travel_time = np.mean(self.car_time_list)

    # Update everything.    
    def update(self, t):
        self.spawn_cars(t)
        self.update_global(t)
        self.update_cars(t)

## Graphics
This class contains all the fuctions for drawing the graphics.

In [7]:
class Graphics():
    def __init__(self):
        self.screen = pygame.display.set_mode((H, H))

    # Use a coordinate system where the origin is in the center.
    def center(self, points):
        return np.array(points) + H/2
    
    # Draw a car.
    def draw_car(self, car):
        p, l, w, a = car.position(), car.length, CAR_WIDTH, car.angle()
        corners = [[-l/2,w/2], [l/2,w/2], [l/2,-w/2], [-l/2,-w/2]]

        for c in corners:
            temp =  c[0]*np.cos(a) - c[1]*np.sin(a) + p[0]
            c[1] = -c[0]*np.sin(a) - c[1]*np.cos(a) + p[1]
            c[0] = temp

        pygame.draw.polygon(self.screen, car.color, self.center(corners))
        
    # Draw some stats.
    def draw_stats(self, intersection, t):
        font = pygame.font.SysFont('Helvetica', 18)
        text_time =  font.render('Seconds past: ' + str(round(t/60, 1)), False, (0, 0, 0))
        text_count = font.render('Number of cars: ' + str(intersection.car_count) + ' cars', False, (0, 0, 0))
        text_cars_per_minute = font.render('Cars per minute: ' + str(round(intersection.cars_per_min, 2)) + ' cars', False, (0, 0, 0))
        text_average_travel_time = font.render('Average travel time: ' + str(round(intersection.average_travel_time, 2))+' sec', False, (0, 0, 0))
        
        self.screen.blit(text_time,(20,20))
        self.screen.blit(text_count,(20,40))
        self.screen.blit(text_cars_per_minute,(20,60))
        self.screen.blit(text_average_travel_time,(20,80))
        
    # Draw buttons.
    def draw_buttons(self, button_data, text_data):
        # Draw buttons.
        buttons = button_data.copy()
        for name in button_data:
            buttons[name] = pygame.draw.rect(self.screen, *buttons[name])

        # Draw text on buttons.
        for text, size, x, y in text_data:
            font = pygame.font.Font('freesansbold.ttf', size)
            text = font.render(text, True, WHITE)
            rect = text.get_rect(center=(x,y))
            self.screen.blit(text, rect)
        
        return buttons

    # Draw the entire simulation.
    def draw_sim(self, intersection, button_data, text_data, t):
        self.screen.blit(intersection.background, (0, 0))
        buttons = self.draw_buttons(button_data, text_data)
            
        for car in intersection.cars():
            self.draw_car(car)

        self.draw_stats(intersection, t)
        
        pygame.display.flip()
        return buttons
        
    # Draw menu.
    def draw_menu(self, button_data, text_data):
        self.screen.fill((100,100,100))
        buttons = self.draw_buttons(button_data, text_data)
        pygame.display.flip()
        return buttons

## Simulation and Menu
The simulation class deals with handling input, updating the intersection and drawing.

In [8]:
class Simulation():
    def __init__(self, intersection, clock=None, graphics=None):
        self.intersection = intersection
        self.clock = clock
        self.graphics = graphics

        self.buttons = {'reset': (RED1, (740, 20, 100, 40)), 'quit': (RED1, (740, 80, 100, 40))}
        self.text = (('Reset', 21, 790, 40), ('Quit', 21, 790, 100))
        
    # Run interactive simulation.    
    def run(self):
        buttons = self.graphics.draw_sim(self.intersection, self.buttons, self.text, 0)
        
        for t in itertools.count():
            self.clock.tick(FPS)
            self.intersection.update(t)
            self.graphics.draw_sim(self.intersection, self.buttons, self.text, t)

            # Check for quitting or a button click
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    pygame.quit()
                    sys.exit()
                elif event.type == pygame.MOUSEBUTTONDOWN and event.button == LEFT:
                    for name in buttons: 
                        if buttons[name].collidepoint(event.pos):
                            return name
               
    # Run simulation for a certain number of ticks without drawing.
    def run2(self, ticks):
        for t in range(ticks):
            self.intersection.update(t)
        pygame.quit()

The menu class deals mainly with handling input.

In [9]:
class Menu():
    def __init__(self, designs, clock, graphics):
        self.designs = designs
        self.clock = clock
        self.graphics = graphics
        self.i = 0
        
        self.buttons = {'start': (RED1, (250,250,400,40)),    'design': (RED1, (290, 310, 320, 40)),
                        'prev':  (RED2, (250, 310, 40, 40)),  'next':   (RED2, (610, 310, 40, 40)),
                        'about': (RED1, (250, 370, 400, 40)), 'quit':   (RED1, (250, 430, 400, 40))}
        
        self.text =   (('Traffic Flow', 64, 450, 100), ('By Ravi & Jelle', 21, 450, 150),
                       ('Start', 21, 450, 270),        ['', 21, 450, 330], 
                       ('Prev', 16, 269, 331),         ('Next', 16, 629, 331), 
                       ('About', 21, 450, 390),        ('Quit', 21, 450, 450))
        
    # Update the text.
    def update(self):
        self.text[3][0] = 'Design: ' + self.designs[self.i].__name__.replace('_',' ')
        return self.graphics.draw_menu(self.buttons, self.text)
        
    # Handling input.
    def run(self):
        buttons = self.update()
        
        while True:
            self.clock.tick(FPS)

            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    pygame.quit()
                    sys.exit()
                    
                elif event.type == pygame.MOUSEBUTTONDOWN and event.button == LEFT:
                    for name in buttons: 
                        if buttons[name].collidepoint(event.pos):
                            if name == 'start': 
                                return self.designs[self.i]
                            elif name == 'prev':
                                self.i = max(0, self.i - 1)
                                self.update()
                            elif name == 'next':
                                self.i = min(len(self.designs) - 1, self.i + 1)
                                self.update()
                            elif name == 'quit':
                                pygame.quit()
                                sys.exit()

## Intersection implementations
Here follow four different types of intersections. Namely Free crossing, Shark teeth, Traffic lights and Roundabout:

In [10]:
# Basic intersection, which can be used by the first three intersections.
def basic_intersection(blockades=tuple([[]]*12)):
    sections = [Line(L/2, H/2, 0, -1, H/2-L-B),
                Line(L/2, L+B, 0, -1, B),
                Line(L/2, L, 0, -1, L),
                Line(L/2, 0, 0, -1, L),
                Line(L/2, -L, 0, -1, H/2-L),
                Curve(L, L, 2, -1, 1, L/2),
                Curve(0, 0, 0, 1, 1, L/2)]
    
    for _ in range(3):
        for s in sections[-7:]:
            if isinstance(s, Line):
                sections.append(Line(s.y,-s.x,s.b,-s.a,s.length))
            else:
                sections.append(Curve(s.y,-s.x,s.a+1,s.b,s.l,s.r))
    
    l = ((0,1,2,3,4), (0,1,5,25), (0,1,2,6,10,11))
    paths = [[sections[(n+i*7)%28] for n in p] for i in range(4) for p in l]
    triggers = [sections[n*7+1] for n in range(4)]
    paths = list(zip(paths, [[triggers[n] for n in t] for t in blockades]))
    return sections, paths, triggers

def Free_crossing():
    blockades = ([1],[],[1,2],[2],[],[2,3],[3],[],[3,0],[0],[],[0,1])
    return basic_intersection(blockades)

def Shark_teeth():
    blockades = ([1,3],[3],[1,2,3],[],[],[3],[3,1],[1],[3,0,1],[],[],[1])
    return basic_intersection(blockades)

def Traffic_lights():
    return basic_intersection() + (True,)

In [11]:
# The roudabout has a different implementation.
def Roundabout():
    sections = [Line(25, 450, 0, -1, 100),
                Line(25, 350, 0, -1, 200),
                Curve(100, 150, 2, -1, 0.63, 75),
                Curve(0, 0, 3.37, 1, 0.25, 105),
                Curve(0, 0, 3.63, 1, 0.75, 105),
                Curve(150, 100, 1.63, -1, 0.63, 75),
                Line(150, 25, 1, 0, 300)]
    
    for _ in range(3):
        for s in sections[-7:]:
            if isinstance(s, Line):
                sections.append(Line(s.y,-s.x,s.b,-s.a,s.length))
            else:
                sections.append(Curve(s.y,-s.x,s.a+1,s.b,s.l,s.r))
    
    l = ((0,1,2,3,5,6),(0,1,2,3,4,10,12,13),(0,1,2,3,4,10,11,17,19,20))
    paths = [[sections[(n+i*7)%28] for n in p] for i in range(4) for p in l]
    blockades = ([[18,23,24,25]]*3 + [[25,2,3,4]]*3 + [[4,9,10,11]]*3 + [[11,16,17,18]]*3)
    paths = list(zip(paths, [[sections[n] for n in b] for b in blockades]))
    return sections, paths, []

In [12]:
designs = [Free_crossing, Shark_teeth, Traffic_lights, Roundabout]

In [13]:
def data_stats(data):
    
    travel_times_A = [value[1] for value in data[0]]    
    car_rates_A = [value[1] for value in data[1]]    
    
    # Test:
    travel_times_B = [value[1]*1.2 for value in data[0]]    
    car_rates_B = [value[1]*1.2 for value in data[1]]    
    
    travel_times_T = stats.ttest_ind(travel_times_A, travel_times_B)
    car_rates_T = stats.ttest_ind(car_rates_A, car_rates_B)
    
    print('Max travel time: ',  np.amax(travel_times_A))
    print('Mean Average travel time: ', np.mean(travel_times_A))
    print('Max car rate: ',  np.amax(car_rates_A))
    print('Mean Average cars per minute: ', np.mean(car_rates_A))
    print('')
    print('Student T test:')
    print('T travel: ', travel_times_T)
    print('T cars: ', car_rates_T)

## Visualization
Run this cell to see a visualization of the simulation:

In [14]:
pygame.init()
pygame.display.set_caption('Traffic Flow')
clock = pygame.time.Clock()
graphics = Graphics()

while True:
    menu = Menu(designs, clock, graphics)
    design = menu.run()
    
    while True:
        intersection = Intersection(design.__name__, *design(), spawn_rate=80)
        simulation = Simulation(intersection, clock, graphics)
        if simulation.run() == 'quit':
            break

SystemExit: 

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


## Graphs
Run this cell to generate the graphs. Could take about 20 minutes.

In [None]:
matplotlib.rcParams.update({'font.size': 18})
pygame.init()

x = range(5, 220, 5)
Y0 = []

for design in designs:
    name = design.__name__.replace('_',' ')
    y0 = []
    y1 = []

    for rate in x:
        b = ("Loading " , name , round(rate/220*100, 1) , '%')
        print (b, end="\r")

        intersection = Intersection(None, *design(), spawn_rate=rate)
        simulation = Simulation(intersection)
        simulation.run2(70000)
        cpm, att = intersection.cars_per_min, intersection.average_travel_time
        y0.append(cpm)
        y1.append(att)
        
    Y0.append((y0,name))

    plt.figure(figsize=(12, 12))
    plt.plot(x, y0, label='Flow (cars/min)')
    plt.plot(x, y1, label='Travel time (seconds)')
    plt.ylim(0,75)
    plt.xlim(5,115)
    plt.xlabel('Busyness (cars/min)')
    plt.title(name)
    plt.legend(loc=2)
    plt.show()
    

plt.figure(figsize=(12, 12))

for t in Y0:
    plt.plot(x, t[0], label=t[1])
    
plt.title('Car Flow')
plt.xlabel('Busyness (cars/min)')
plt.ylabel('Car Flow (cars/min)')
plt.xlim(5,115)
plt.legend()
plt.show()