In [11]:
import random
import xml.etree.ElementTree as ET
import traci

# Code with initial timings is set by us

In [145]:
class Particle:
    def __init__(self, initial_position):
        # Use provided initial position as traffic light timings
        self.position = initial_position.copy()
        self.velocity = [random.uniform(-1, 1) for _ in range(len(initial_position))]  # Random initial velocity
        self.best_position = self.position.copy()  # Initialize personal best position
        self.best_fitness = float('inf')  # Initialize personal best fitness to infinity

def initialize_particles(num_particles, initial_timings):
    particles = [Particle(initial_timings) for _ in range(num_particles)]
    return particles

def evaluate_fitness(particle, tripinfo_file):
    # Parse trip info XML file
    tree = ET.parse(tripinfo_file)
    root = tree.getroot()

    total_waiting_time = 0

    # Iterate over tripinfo elements
    for tripinfo in root.findall('tripinfo'):
      # only consider the vehicle with id 10
      if tripinfo.get('id') == "10":
        # Extract relevant attributes
        waiting_time = float(tripinfo.get('waitingTime'))
        # Accumulate waiting time at red lights
        total_waiting_time += waiting_time
    
    return total_waiting_time

def decode(position):
    # Convert particle's position (traffic light timings) into format usable by SUMO simulation
    return position

def update_velocity(particle, global_best_position, w, c1, c2):
    for i in range(len(particle.velocity)):
        r1 = random.random()
        r2 = random.random()
        cognitive_component = c1 * r1 * (particle.best_position[i] - particle.position[i])
        social_component = c2 * r2 * (global_best_position[i] - particle.position[i])
        particle.velocity[i] = w * particle.velocity[i] + cognitive_component + social_component


def update_position(particle):
    for i in range(len(particle.position)):
        # Update traffic light timings based on velocity
        particle.position[i] += particle.velocity[i]

# PSO Parameters picked at random
num_particles = 50
max_iterations = 100
w = 0.5  # Inertia weight
c1 = 1.5  # Cognitive parameter
c2 = 1.5  # Social parameter
num_phases = 4  # Number of traffic light phases


initial_timings = [10,4,10,4]  # Adjust these values according to your requirements

# Initialize swarm with custom initial traffic light timings
particles = initialize_particles(num_particles, initial_timings)

# Initialize swarm

# Initialize global best position and fitness
global_best_position = particles[0].position.copy()
global_best_fitness = float('inf')

# Path to trip info XML file
tripinfo_file = '../sumo_files/tripinfo.xml'

for iteration in range(max_iterations):
    for particle in particles:
        # Evaluate fitness
        fitness = evaluate_fitness(particle, tripinfo_file)
        
        # Update personal best
        if fitness < particle.best_fitness:
            particle.best_position = particle.position.copy()
            particle.best_fitness = fitness
        
        # Update global best
        if fitness < global_best_fitness:
            global_best_position = particle.position.copy()
            global_best_fitness = fitness

    for particle in particles:
        # Update velocity
        update_velocity(particle, global_best_position, w, c1, c2)

        # Update position
        update_position(particle)

# Output best traffic light timings
best_timings = decode(global_best_position)
print("Best traffic light timings: ", best_timings)
print("Best fitness: ", global_best_fitness)


Best traffic light timings:  [10, 4, 10, 4]
Best fitness:  26.0


# This is how we set phase durations in network.net.xml file

In [79]:
tree = ET.parse("../sumo_files/network.net.xml")
root = tree.getroot()

for tl in root.findall('tlLogic'):
  # set phase duration for each traffic light according to the best timings
  for i, phase in enumerate(tl.findall('phase')):
    phase.set('duration', str(best_timings[i]))

# write the changes to the file
tree.write("../sumo_files/network.net.xml")

# Run simulation with best traffic light timings
traci.start(['sumo-gui', '-c', '../sumo_files/config.sumo.cfg', '--tripinfo-output', '../sumo_files/tripinfo.xml', '--start', '--quit-on-end', '--scale', '1'])
traci.close()


# Code with random initial timings

In [126]:
class Particle:
    def __init__(self, num_phases):
        # Can change the traffic light timings to be outside of (0-30) range if needed
        self.position = [random.uniform(0, 30) for _ in range(num_phases)]  # Random initial position - traffic light timings
        self.velocity = [random.uniform(-1, 1) for _ in range(num_phases)]  # Random initial velocity
        self.best_position = self.position.copy()  # Initialize personal best position
        self.best_fitness = float('inf')  # Initialize personal best fitness to infinity

def initialize_particles(num_particles, num_phases):
    particles = [Particle(num_phases) for _ in range(num_particles)]
    return particles

def evaluate_fitness(particle, tripinfo_file):
    # Parse trip info XML file
    tree = ET.parse(tripinfo_file)
    root = tree.getroot()

    total_waiting_time = 0

    # Iterate over tripinfo elements
    for tripinfo in root.findall('tripinfo'):
      # only consider the vehicle with id 10
      if tripinfo.get('id') == "10":
        # Extract relevant attributes
        waiting_time = float(tripinfo.get('waitingTime'))
        # Accumulate waiting time at red lights
        total_waiting_time += waiting_time
    
    return total_waiting_time

def decode(position):
    # Convert particle's position (traffic light timings) into format usable by SUMO simulation
    return position

def update_velocity(particle, global_best_position, w, c1, c2):
    for i in range(len(particle.velocity)):
        r1 = random.random()
        r2 = random.random()
        cognitive_component = c1 * r1 * (particle.best_position[i] - particle.position[i])
        social_component = c2 * r2 * (global_best_position[i] - particle.position[i])
        particle.velocity[i] = w * particle.velocity[i] + cognitive_component + social_component


def update_position(particle):
    for i in range(len(particle.position)):
        # Update traffic light timings based on velocity
        particle.position[i] += particle.velocity[i]
        # Ensure that traffic light timings are within the desired range (0 to 30 seconds)
        particle.position[i] = max(0, min(particle.position[i], 30))

# PSO Parameters picked at random
num_particles = 50
max_iterations = 100
w = 0.5  # Inertia weight
c1 = 1.5  # Cognitive parameter
c2 = 1.5  # Social parameter
num_phases = 4  # Number of traffic light phases

# Initialize swarm
particles = initialize_particles(num_particles, num_phases)

# Initialize global best position and fitness
global_best_position = particles[0].position.copy()
global_best_fitness = float('inf')

# Path to trip info XML file
tripinfo_file = '../sumo_files/tripinfo.xml'

for iteration in range(max_iterations):
    for particle in particles:
        # Evaluate fitness
        fitness = evaluate_fitness(particle, tripinfo_file)
        
        # Update personal best
        if fitness < particle.best_fitness:
            particle.best_position = particle.position.copy()
            particle.best_fitness = fitness
        
        # Update global best
        if fitness < global_best_fitness:
            global_best_position = particle.position.copy()
            global_best_fitness = fitness

    for particle in particles:
        # Update velocity
        update_velocity(particle, global_best_position, w, c1, c2)

        # Update position
        update_position(particle)

# Output best traffic light timings
best_timings = decode(global_best_position)
print("Best traffic light timings: ", best_timings)
print("Best fitness: ", global_best_fitness)

Best traffic light timings:  [21.125739593004294, 5.21527446838289, 20.144798726477678, 22.31692631576774]
Best fitness:  26.0


# Initial code that I had that "simulates" traffic

In [146]:
import numpy as np

# Define the objective function to minimize (e.g., total waiting time, number of stops)
def objective_function(signal_timings):
    # Simulate traffic flow and calculate the objective value based on signal timings
    # Your simulation logic goes here
    objective_value = simulate_traffic(signal_timings)
    return objective_value

def simulate_traffic(signal_timings):
    # Ensure signal_timings is a list
    if not isinstance(signal_timings, list):
        raise ValueError("signal_timings must be a list of signal timings")
    
    # Ensure all elements of signal_timings are lists with at least two elements
    for phase_timing in signal_timings:
        if not isinstance(phase_timing, list) or len(phase_timing) < 2:
            raise ValueError("Each signal phase timing must be a list with at least two elements: green time and yellow time")
    
    # Example: Simulate traffic flow and calculate the objective value based on signal timings
    # For demonstration purposes, let's assume a simple metric such as total waiting time
    total_waiting_time = 0
    
    # Iterate over each signal phase
    for phase_timing in signal_timings:
        # Example: Calculate waiting time for each phase based on green time, yellow time, etc.
        # You should replace this with a more realistic calculation based on traffic flow models
        green_time = phase_timing[0]  # Green time for the current phase
        yellow_time = phase_timing[1]  # Yellow time for the current phase
        
        # Example: Calculate waiting time based on traffic flow model (e.g., traffic queues, vehicle arrivals)
        # For demonstration purposes, let's assume a random waiting time between 0 and 100 seconds per phase
        waiting_time = np.random.randint(0, 100)
        total_waiting_time += waiting_time
    
    # Calculate the overall objective value based on the total waiting time
    objective_value = total_waiting_time
    return objective_value

# Particle Swarm Optimization algorithm
def particle_swarm_optimization(objective_function, num_dimensions, num_particles, max_iter):
    # Initialize particles' positions and velocities
    particles_position = np.random.rand(num_particles, num_dimensions)  # Random initialization
    particles_velocity = np.random.rand(num_particles, num_dimensions)  # Random initialization

    # Initialize personal best positions and values for each particle
    particles_best_position = particles_position.copy()
    particles_best_value = np.zeros(num_particles)
    for i in range(num_particles):
        particles_position_list = [list(particles_position[i])]  # Convert to list of lists
        particles_best_value[i] = objective_function(particles_position_list)

    # Initialize global best position and value using the first particle's values
    global_best_value = particles_best_value[0]
    global_best_position = particles_best_position[0]

    # PSO parameters
    inertia_weight = 0.5
    cognitive_weight = 1.5
    social_weight = 1.5

    # Iterate through optimization process
    for _ in range(max_iter):
        # Update particles' velocities and positions
        for i in range(num_particles):
            # Update velocity
            r1 = np.random.rand(num_dimensions)
            r2 = np.random.rand(num_dimensions)
            cognitive_component = cognitive_weight * r1 * (particles_best_position[i] - particles_position[i])
            social_component = social_weight * r2 * (global_best_position - particles_position[i])
            particles_velocity[i] = inertia_weight * particles_velocity[i] + cognitive_component + social_component

            # Update position
            particles_position[i] += particles_velocity[i]

            # Ensure particle positions are within bounds (e.g., non-negative)
            particles_position[i] = np.clip(particles_position[i], 0, None)

            # Update personal best position and value for particle i
            current_value = objective_function([list(particles_position[i])])  # Convert to list of lists
            if current_value < particles_best_value[i]:
                particles_best_position[i] = particles_position[i]
                particles_best_value[i] = current_value

                # Update global best position and value if needed
                if current_value < global_best_value:
                    global_best_value = current_value
                    global_best_position = particles_position[i]

    return global_best_position, global_best_value

# Example usage
if __name__ == "__main__":
    # Example parameters
    num_dimensions = 4  # Number of dimensions (e.g., signal timings for different phases)
    num_particles = 50  # Number of particles in the swarm
    max_iter = 100  # Maximum number of iterations

    # Run PSO optimization
    best_solution, best_value = particle_swarm_optimization(objective_function, num_dimensions, num_particles, max_iter)

    print("Best solution (signal timings):", best_solution)
    print("Best objective value:", best_value)

Best solution (signal timings): [1.17963752 0.81207884 0.21056422 0.39366757]
Best objective value: 0
