In [None]:
import numpy as np
import pandas as pd

# Example traffic data
data = pd.DataFrame({
    'Direction': ['North', 'South', 'East', 'West'],
    'Traffic_Volume': [18.36, 24.6, 26.16, 25.04],
    'Average_Speed_kmph': [43.18, 42.98, 21.33, 32.51],
    'Queue_Length_meters': [21.29, 16.79, 14.05, 14.39],
    'Traffic_Density_vehicles_per_meter': [0.79, 1.47, 1.86, 1.77]
})

# PSO parameters
num_particles = 30
num_iterations = 100
c1 = 2.0  # cognitive coefficient
c2 = 2.0  # social coefficient
w = 0.7  # inertia weight

# Define a fitness function (minimize total waiting time, congestion, etc.)
def fitness_function(signal_timings, data):
    # Extract signal timings (Green, Red, Orange times)
    green_times = signal_timings[:4]
    red_times = signal_timings[4:8]
    orange_times = signal_timings[8:]
    
    # Define your traffic flow model based on the traffic data
    total_waiting_time = 0
    for i, direction in enumerate(data['Direction']):
        traffic_density = data['Traffic_Density_vehicles_per_meter'][i]
        traffic_volume = data['Traffic_Volume'][i]
        queue_length = data['Queue_Length_meters'][i]
        
        # Example logic: A simple model for waiting time (this is a placeholder)
        waiting_time = (queue_length / traffic_density) * (1 / green_times[i]) * (traffic_volume / 100)
        total_waiting_time += waiting_time
    
    return total_waiting_time  # Minimize waiting time

# PSO Algorithm
def pso_traffic_signal_optimization():
    # Initialize particles (random initial signal timings for Green, Red, and Orange)
    particles = np.random.rand(num_particles, 12)  # 4 green, 4 red, 4 orange times for 4 directions
    velocities = np.random.rand(num_particles, 12)  # Random velocities
    
    # Initialize personal best positions and fitness
    pbest_positions = particles.copy()
    pbest_fitness = np.array([fitness_function(p, data) for p in particles])
    
    # Initialize global best position and fitness
    gbest_position = pbest_positions[np.argmin(pbest_fitness)]
    gbest_fitness = np.min(pbest_fitness)
    
    # PSO loop
    for iteration in range(num_iterations):
        for i in range(num_particles):
            # Update velocity and position
            velocities[i] = (w * velocities[i] +
                             c1 * np.random.rand() * (pbest_positions[i] - particles[i]) +
                             c2 * np.random.rand() * (gbest_position - particles[i]))
            particles[i] += velocities[i]
            
            # Boundary conditions (ensure signal times are within a reasonable range)
            particles[i] = np.clip(particles[i], 10, 60)  # Example range for signal timings (10s to 60s)
            
            # Evaluate fitness
            fitness = fitness_function(particles[i], data)
            
            # Update personal best
            if fitness < pbest_fitness[i]:
                pbest_fitness[i] = fitness
                pbest_positions[i] = particles[i]
        
        # Update global best
        gbest_position = pbest_positions[np.argmin(pbest_fitness)]
        gbest_fitness = np.min(pbest_fitness)
        
        print(f"Iteration {iteration + 1}, Best Fitness: {gbest_fitness}")
    
    return gbest_position  # Return optimized signal timings

# Run PSO
optimized_signal_timings = pso_traffic_signal_optimization()

# Print the optimized signal timings
print("Optimized Signal Timings:", optimized_signal_timings)


In [None]:
import numpy as np
import pandas as pd
import random

# Example traffic data
data = pd.DataFrame({
    'Direction': ['North', 'South', 'East', 'West'],
    'Traffic_Volume': [18.36, 24.6, 26.16, 25.04],
    'Average_Speed_kmph': [43.18, 42.98, 21.33, 32.51],
    'Queue_Length_meters': [21.29, 16.79, 14.05, 14.39],
    'Traffic_Density_vehicles_per_meter': [0.79, 1.47, 1.86, 1.77]
})

# PSO parameters
num_particles = 40
num_iterations = 150
c1 = 2.5  # cognitive coefficient
c2 = 2.5  # social coefficient
w = 0.6  # inertia weight

# Define a fitness function
def fitness_function(signal_timings, data, total_cycle_time=120):
    green_times = signal_timings[:4]
    orange_time = 3

    # Enforce total-cycle constraints
    if np.sum(green_times) + orange_time * len(green_times) > total_cycle_time:
        return float('inf')  # Invalid configuration

    red_times = calculate_red_times(green_times, total_cycle_time, orange_time)

    # Compute congestion factors with adjusted weights
    congestion_factors = (data['Traffic_Density_vehicles_per_meter'] * 0.4 +
                          data['Queue_Length_meters'] * 0.3 +
                          data['Traffic_Volume'] * 0.3)
    congestion_factors = congestion_factors / congestion_factors.sum()

    # Penalize deviations from congestion-proportional green times
    proportional_green_times = congestion_factors * green_times.sum()
    penalty = np.sum(np.abs(green_times - proportional_green_times))

    # Calculate waiting time based on congestion factors
    total_waiting_time = 0
    for i, direction in enumerate(data['Direction']):
        waiting_time = ((red_times[i] + orange_time) / (green_times[i] + orange_time)) * congestion_factors[i]
        total_waiting_time += waiting_time

    return total_waiting_time + penalty


# Normalize green times based on traffic congestion
def normalize_green_times(green_times, data, total_cycle_time=120, orange_time=3):
    # Compute congestion factors with adjusted weights
    congestion_factors = (data['Traffic_Density_vehicles_per_meter'] * 0.4 +
                          data['Queue_Length_meters'] * 0.3 +
                          data['Traffic_Volume'] * 0.3)
    congestion_factors = congestion_factors / congestion_factors.sum()  # Normalize to sum to 1

    # Allocate green times based on congestion factors
    available_time = total_cycle_time - orange_time * len(data)
    green_times = congestion_factors * available_time

    return green_times




# Calculate red times based on green times
def calculate_red_times(green_times, total_cycle_time=120, orange_time=3):
    red_times = total_cycle_time - (green_times + orange_time)
    return red_times

# PSO Algorithm with congestion-based initialization and validation
def pso_traffic_signal_optimization_refined():
    # Initialize particles with congestion-based green times and random red times
    congestion_factors = (data['Traffic_Density_vehicles_per_meter'] * 0.5 +
                          data['Queue_Length_meters'] * 0.3 +
                          data['Traffic_Volume'] * 0.2)
    congestion_factors = congestion_factors / congestion_factors.sum()
    #particles = np.random.rand(num_particles, 8) * np.array([120, 120, 120, 120, 120, 120, 120, 120])
    particles = np.random.rand(num_particles, 8)
    for i in range(num_particles):
        # Initialize green times proportionally to congestion
        particles[i][:4] = congestion_factors * (120 - 3 * len(data))  # Green times
        particles[i][4:] = calculate_red_times(particles[i][:4])       # Red times

    # Validate red times during initialization
    for i in range(num_particles):
        if np.any(particles[i][4:] < 0):
            raise ValueError(f"Negative red time detected for particle {i}: {particles[i]}")

    # Initialize personal best positions and fitness
    pbest_positions = particles.copy()
    pbest_fitness = np.array([fitness_function(p, data) for p in particles])
    
    # Initialize global best position
    gbest_position = pbest_positions[np.argmin(pbest_fitness)]
    gbest_fitness = np.min(pbest_fitness)
    
    # Start iterations for PSO
    for iteration in range(num_iterations):
        for i in range(num_particles):
            # Update green timings for the particle
            particles[i][:4] = normalize_green_times(particles[i][:4], data)

            # Update red timings based on updated green times
            particles[i][4:] = calculate_red_times(particles[i][:4])

            # Validate red times (optional debug step)
            if np.any(particles[i][4:] < 0):
                raise ValueError(f"Negative red time detected for particle {i}: {particles[i]}")

            # Calculate fitness for the particle
            fitness = fitness_function(particles[i], data)

            # Update personal best
            if fitness < pbest_fitness[i]:
                pbest_fitness[i] = fitness
                pbest_positions[i] = particles[i]
        
        # Update global best
        gbest_position = pbest_positions[np.argmin(pbest_fitness)]
        gbest_fitness = np.min(pbest_fitness)
        
        print(f"Iteration {iteration + 1}, Best Fitness: {gbest_fitness}")
        print(f"Current Best Timings: {gbest_position[:4]}")  # Only show green times for now

    return gbest_position

# Run the PSO optimization
optimized_signal_timings = pso_traffic_signal_optimization_refined()

# Display the optimized green and red signal timings
print("\nOptimized Green and Red Signal Timings:")
for i, direction in enumerate(data['Direction']):
    green_time = optimized_signal_timings[i]
    red_time = optimized_signal_timings[i + 4]
    print(f"{direction}: Green = {green_time:.2f} sec, Red = {red_time:.2f} sec, Orange = 3 sec")


In [None]:
#Final PSO
import numpy as np
import pandas as pd

data = pd.read_csv('../outputs/sample_data.csv')

# PSO parameters
num_particles = 40
num_iterations = 150
c1 = 2.5  # cognitive coefficient
c2 = 2.5  # social coefficient
w = 0.5  # inertia weight

def fitness_function(signal_timings, data, total_cycle_time=120):
    green_times = signal_timings[:4]
    orange_time = 3

    # Enforce total-cycle constraints
    if np.sum(green_times) + orange_time * len(green_times) > total_cycle_time:
        return float('inf')  # Invalid configuration

    red_times = calculate_red_times(green_times, total_cycle_time, orange_time)

    # Compute congestion factors with adjusted weights
    congestion_factors = (data['Traffic_Density_vehicles_per_meter'] * 0.4 +
                          data['Queue_Length_meters'] * 0.3 +
                          data['Traffic_Volume'] * 0.3)
    congestion_factors = congestion_factors / congestion_factors.sum()

    # Penalize deviations from congestion-proportional green times
    proportional_green_times = congestion_factors * green_times.sum()
    penalty = np.sum(np.abs(green_times - proportional_green_times))

    # Calculate waiting time based on congestion factors
    total_waiting_time = 0
    for i, direction in enumerate(data['Direction']):
        waiting_time = ((red_times[i] + orange_time) / (green_times[i] + orange_time)) * congestion_factors[i]
        total_waiting_time += waiting_time

    return total_waiting_time + penalty

def normalize_green_times(green_times, data, total_cycle_time=120, orange_time=3):
    congestion_factors = (data['Traffic_Density_vehicles_per_meter'] * 0.4 +
                          data['Queue_Length_meters'] * 0.3 +
                          data['Traffic_Volume'] * 0.3)
    congestion_factors = congestion_factors / congestion_factors.sum()

    available_time = total_cycle_time - orange_time * len(data)
    green_times = congestion_factors * available_time

    return green_times

def calculate_red_times(green_times, total_cycle_time=120, orange_time=3):
    red_times = total_cycle_time - (green_times + orange_time)
    return red_times

def pso_traffic_signal_optimization_refined():
    congestion_factors = (data['Traffic_Density_vehicles_per_meter'] * 0.5 +
                          data['Queue_Length_meters'] * 0.3 +
                          data['Traffic_Volume'] * 0.2)
    congestion_factors = congestion_factors / congestion_factors.sum()

    particles = np.random.rand(num_particles, 8)
    for i in range(num_particles):
        particles[i][:4] = congestion_factors * (120 - 3 * len(data))
        particles[i][4:] = calculate_red_times(particles[i][:4])

    for i in range(num_particles):
        if np.any(particles[i][4:] < 0):
            raise ValueError(f"Negative red time detected for particle {i}: {particles[i]}")

    pbest_positions = particles.copy()
    pbest_fitness = np.array([fitness_function(p, data) for p in particles])

    gbest_position = pbest_positions[np.argmin(pbest_fitness)]
    gbest_fitness = np.min(pbest_fitness)

    for iteration in range(num_iterations):
        for i in range(num_particles):
            particles[i][:4] = normalize_green_times(particles[i][:4], data)
            particles[i][4:] = calculate_red_times(particles[i][:4])

            if np.any(particles[i][4:] < 0):
                raise ValueError(f"Negative red time detected for particle {i}: {particles[i]}")

            fitness = fitness_function(particles[i], data)

            if fitness < pbest_fitness[i]:
                pbest_fitness[i] = fitness
                pbest_positions[i] = particles[i]

        gbest_position = pbest_positions[np.argmin(pbest_fitness)]
        gbest_fitness = np.min(pbest_fitness)

        print(f"Iteration {iteration + 1}, Best Fitness: {gbest_fitness}")
        print(f"Current Best Timings: {gbest_position[:4]}")

    return gbest_position

optimized_signal_timings = pso_traffic_signal_optimization_refined()

print("\nOptimized Green and Red Signal Timings:")
for i, direction in enumerate(data['Direction']):
    green_time = round(optimized_signal_timings[i])
    red_time = round(optimized_signal_timings[i + 4])
    print(f"{direction}: Green = {green_time:.2f} sec, Red = {red_time:.2f} sec, Orange = 3 sec")


Iteration 1, Best Fitness: 2.5377042532516016
Current Best Timings: [71.3295436  25.24296834  8.15354871  3.27393935]
Iteration 2, Best Fitness: 2.5377042532516016
Current Best Timings: [71.3295436  25.24296834  8.15354871  3.27393935]
Iteration 3, Best Fitness: 2.5377042532516016
Current Best Timings: [71.3295436  25.24296834  8.15354871  3.27393935]
Iteration 4, Best Fitness: 2.5377042532516016
Current Best Timings: [71.3295436  25.24296834  8.15354871  3.27393935]
Iteration 5, Best Fitness: 2.5377042532516016
Current Best Timings: [71.3295436  25.24296834  8.15354871  3.27393935]
Iteration 6, Best Fitness: 2.5377042532516016
Current Best Timings: [71.3295436  25.24296834  8.15354871  3.27393935]
Iteration 7, Best Fitness: 2.5377042532516016
Current Best Timings: [71.3295436  25.24296834  8.15354871  3.27393935]
Iteration 8, Best Fitness: 2.5377042532516016
Current Best Timings: [71.3295436  25.24296834  8.15354871  3.27393935]
Iteration 9, Best Fitness: 2.5377042532516016
Current Be