# Light Switching Optimization

This Notebook contains exploratory code related to developing an optimization model which determines the lane each traffic light opens at a given point in time. Each light determines it's individual optimal behavior. The goal is to maximize the number of cars that can pass an intersection over an undefined period of time. Each light can only open one lane at a time. Lights are further constrained by a cooldown, which limits a light to change it's decision after 5 steps from the previous decision.

In [1]:
import pyoptinterface as poi
from pathlib import Path
from pyoptinterface import gurobi
import numpy as np

In [2]:
license_path = Path.joinpath(Path(".secrets/"), Path("gurobi.lic"))

secrets = {}

with open(license_path, "r") as file:
    for line in file:
        line = line.strip()
        if line.startswith("#") or not line:
            continue
        try:
            key, value = line.split("=", 1)
            secrets[key.strip()] = value.strip()
        except ValueError:
            pass

WLSACCESSID = secrets.get("WLSACCESSID")
WLSSECRET = secrets.get("WLSSECRET")
LICENSEID = secrets.get("LICENSEID")


In [3]:
env = gurobi.Env(empty=True)

In [4]:
env.set_raw_parameter("WLSACCESSID", WLSACCESSID)
env.set_raw_parameter("WLSSECRET", WLSSECRET)
env.set_raw_parameter("LICENSEID", LICENSEID)
env.start()

Set parameter WLSAccessID
Set parameter WLSSecret
Set parameter LicenseID to value 2641387
Academic license 2641387 - for non-commercial use only - registered to ca___@cas.dhbw.de


In [6]:
opt_model = gurobi.Model(env)

## Environment

The following section creates the environment to optimize.

In [7]:
connected_lanes = ["intersection_4", "intersection_7", "intersection_1"]
connected_lanes

['intersection_4', 'intersection_7', 'intersection_1']

In [8]:
time = range(0, 11)
time

range(0, 11)

In [9]:
int_4_arrivals = np.array([0, 5, 5, 1, 1, 1, 3, 5, 1, 1, 1])
int_7_arrivals = np.array([0, 1, 1, 10, 10, 10, 7, 1, 10, 10, 10])
int_1_arrivals = np.array([0, 10, 10, 1, 1, 1, 1, 10, 1, 1, 1])

sim_env = np.array([int_4_arrivals, int_7_arrivals, int_1_arrivals]).T

sim_env.shape

(11, 3)

In [10]:
sim_env

array([[ 0,  0,  0],
       [ 5,  1, 10],
       [ 5,  1, 10],
       [ 1, 10,  1],
       [ 1, 10,  1],
       [ 1, 10,  1],
       [ 3,  7,  1],
       [ 5,  1, 10],
       [ 1, 10,  1],
       [ 1, 10,  1],
       [ 1, 10,  1]])

## Decision Variables

At each point in time, a light can make a decision for each connected lane.


$$ x_{lt}\in\{0,1\} $$
$$ with: $$
$$ l\in{L}, \quad L=\{Connected Lanes\} $$
$$ t\in{T}, \quad T=\left[ 0;5 \right]

In [11]:
decisions = opt_model.add_variables(
    time, connected_lanes, domain=poi.VariableDomain.Binary
)

## Constraints

A light can only have one open lane at any point in time. It can also decide not to open any lane.

$$ \sum_{t\in{T}} \sum_{l\in{L}} x_{l,t} \leq{1} $$

In [12]:
for time_step in time:
    opt_model.add_linear_constraint(
        poi.quicksum(decisions[time_step, lane] for lane in connected_lanes), poi.Leq, 1
    )

A light cannot make a new decision after a cooldown of 5 time steps.

$$ \sum_{l\in{L}} \sum_{t\in{T}}^{0\leq{t}\leq{5}} (x_{l,t-1}-x_{l,t})^{2} \leq{1}$$

In [13]:
# for lane in possible_lanes:
# opt_model.add_linear_constraint(poi.quicksum(((lanes[tick-1, lane] - lanes[tick, lane]) * (lanes[tick-1, lane] - lanes[tick, lane])) for tick in time[1:]), poi.Leq, 1.0)

In [14]:
for decision in decisions:
    opt_model.add_quadratic_constraint(
        poi.quicksum(
            (
                (decisions[i, decision[1]] - decisions[(i + 1), decision[1]])
                * (decisions[i, decision[1]] - decisions[(i + 1), decision[1]])
            )
            for i in range(max(0, (decision[0] - 5)), decision[0])
        ),
        poi.Leq,
        1.0,
    )

Check for successful creation of constraint objects:

In [15]:
for decision in decisions:
    print(
        poi.quicksum(
            (
                (decisions[i, decision[1]] - decisions[(i + 1), decision[1]])
                * (decisions[i, decision[1]] - decisions[(i + 1), decision[1]])
            )
            for i in range(max(0, (decision[0] - 5)), decision[0])
        ),
        poi.Leq,
        1.0,
    )

<pyoptinterface._src.core_ext.ExprBuilder object at 0x10c1f9930> ConstraintSense.LessEqual 1.0
<pyoptinterface._src.core_ext.ExprBuilder object at 0x10c1f9f70> ConstraintSense.LessEqual 1.0
<pyoptinterface._src.core_ext.ExprBuilder object at 0x10c1fa010> ConstraintSense.LessEqual 1.0
<pyoptinterface._src.core_ext.ExprBuilder object at 0x10c1f9f70> ConstraintSense.LessEqual 1.0
<pyoptinterface._src.core_ext.ExprBuilder object at 0x10c1fa010> ConstraintSense.LessEqual 1.0
<pyoptinterface._src.core_ext.ExprBuilder object at 0x10c1f9f70> ConstraintSense.LessEqual 1.0
<pyoptinterface._src.core_ext.ExprBuilder object at 0x10c1fa010> ConstraintSense.LessEqual 1.0
<pyoptinterface._src.core_ext.ExprBuilder object at 0x10c1f9f70> ConstraintSense.LessEqual 1.0
<pyoptinterface._src.core_ext.ExprBuilder object at 0x10c1fa010> ConstraintSense.LessEqual 1.0
<pyoptinterface._src.core_ext.ExprBuilder object at 0x10c1f9f70> ConstraintSense.LessEqual 1.0
<pyoptinterface._src.core_ext.ExprBuilder object a

Verify that at each step the model can find the previous decision(s):

In [16]:
for decision in decisions:
    print(f"Current Decision: {decision}")
    [
        print(f"Previous Decision: {decisions[i, decision[1]]}")
        for i in range(max(0, (decision[0] - 5)), decision[0])
    ]
    print(100 * "-")

Current Decision: (0, 'intersection_4')
----------------------------------------------------------------------------------------------------
Current Decision: (0, 'intersection_7')
----------------------------------------------------------------------------------------------------
Current Decision: (0, 'intersection_1')
----------------------------------------------------------------------------------------------------
Current Decision: (1, 'intersection_4')
Previous Decision: <pyoptinterface._src.core_ext.VariableIndex object at 0x10c157cd0>
----------------------------------------------------------------------------------------------------
Current Decision: (1, 'intersection_7')
Previous Decision: <pyoptinterface._src.core_ext.VariableIndex object at 0x10c1578b0>
----------------------------------------------------------------------------------------------------
Current Decision: (1, 'intersection_1')
Previous Decision: <pyoptinterface._src.core_ext.VariableIndex object at 0x10c157a1

## Objective

The goal is to maximize the number of cars that can pass the light.

In [17]:
objective = poi.quicksum(
    poi.quicksum(
        decisions[time_step, lane] * sim_env[time_step][connected_lanes.index(lane)] for lane in connected_lanes
    )
    for time_step in time
)
opt_model.set_objective(objective, poi.ObjectiveSense.Maximize)

## Optimization

In [18]:
opt_model.optimize()

Gurobi Optimizer version 12.0.1 build v12.0.1rc0 (mac64[arm] - Darwin 24.3.0 24D81)

CPU model: Apple M2
Thread count: 8 physical cores, 8 logical processors, using up to 8 threads

Academic license 2641387 - for non-commercial use only - registered to ca___@cas.dhbw.de
Optimize a model with 11 rows, 33 columns and 33 nonzeros
Model fingerprint: 0xcb9de4f8
Model has 33 quadratic constraints
Variable types: 0 continuous, 33 integer (33 binary)
Coefficient statistics:
  Matrix range     [1e+00, 1e+00]
  QMatrix range    [1e+00, 2e+00]
  Objective range  [1e+00, 1e+01]
  Bounds range     [1e+00, 1e+00]
  RHS range        [1e+00, 1e+00]
  QRHS range       [1e+00, 1e+00]
Found heuristic solution: objective -0.0000000
Presolve time: 0.00s
Presolved: 101 rows, 63 columns, 423 nonzeros
Variable types: 0 continuous, 63 integer (63 binary)

Root relaxation: objective 8.800000e+01, 28 iterations, 0.00 seconds (0.00 work units)

    Nodes    |    Current Node    |     Objective Bounds      |     W

## Results

Get the number of total cars, that crossed the intersection:

In [19]:
print(f"Objective Value: {opt_model.get_obj_value()}")

AttributeError: 'Model' object has no attribute 'get_obj_value'

Get the decision for each point in time:

In [20]:
for lane in decisions:
    if opt_model.get_value(decisions[lane]) == 1:
        print(
            f"Time: {lane[0]}; Intersection: {lane[1]}"
        )

Time: 0; Intersection: intersection_1
Time: 1; Intersection: intersection_1
Time: 2; Intersection: intersection_1
Time: 3; Intersection: intersection_7
Time: 4; Intersection: intersection_7
Time: 5; Intersection: intersection_7
Time: 6; Intersection: intersection_7
Time: 7; Intersection: intersection_7
Time: 8; Intersection: intersection_7
Time: 9; Intersection: intersection_7
Time: 10; Intersection: intersection_7


Get a matrix of decisions with the lanes as columns and time as rows:

In [21]:
decision_matrix = [opt_model.get_value(decisions[lane]) for lane in decisions]
decision_matrix_time = [
    decision_matrix[i : i + 3] for i in range(0, len(decision_matrix), 3)
]
decision_matrix_time

[[-0.0, -0.0, 1.0],
 [-0.0, -0.0, 1.0],
 [-0.0, -0.0, 1.0],
 [-0.0, 1.0, 0.0],
 [-0.0, 1.0, 0.0],
 [-0.0, 1.0, -0.0],
 [0.0, 1.0, -0.0],
 [0.0, 1.0, 0.0],
 [-0.0, 1.0, -0.0],
 [-0.0, 1.0, -0.0],
 [-0.0, 1.0, -0.0]]

Get a transposed decision matrix:

In [22]:
decision_matrix_light = list(map(list, zip(*decision_matrix_time)))
decision_matrix_light

[[-0.0, -0.0, -0.0, -0.0, -0.0, -0.0, 0.0, 0.0, -0.0, -0.0, -0.0],
 [-0.0, -0.0, -0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
 [1.0, 1.0, 1.0, 0.0, 0.0, -0.0, -0.0, 0.0, -0.0, -0.0, -0.0]]

For each point in time, get the previous decisions and evaluate if a light can switch:

In [23]:
for light in decision_matrix_light:  # Historie von Entscheidung pro Ampel
    for i in range(len(light)):
        decision_history = (
            [0] * max(0, 5 - i) + light[max(0, i - 5) : i]
        )  # Entscheidungen von t-5 bis t=0; Wenn vorher keine Entscheidung gab, dann führende 0; Zu erkennen an int 0 vs. float 0.0
        block_status = 0  # Bei 0 darf geöffnet werden, bei 1 ist cooldown aktiv, bei größer 1 gabs nen fehler in der entscheidung
        for j in range(1, (len(decision_history) - 1)):
            block_status += (
                decision_history[j - 1] - decision_history[j]
            ) ** 2  # Berechnen von @mxrio Loop (** ist exponent)
        print(decision_history)
        print(block_status)
        if block_status == 0:
            print("Light can switch")
        elif block_status == 1:
            print("Light is cooldown blocked")
        elif block_status > 1:
            print("Forbidden Action")

[0, 0, 0, 0, 0]
0
Light can switch
[0, 0, 0, 0, -0.0]
0
Light can switch
[0, 0, 0, -0.0, -0.0]
0.0
Light can switch
[0, 0, -0.0, -0.0, -0.0]
0.0
Light can switch
[0, -0.0, -0.0, -0.0, -0.0]
0.0
Light can switch
[-0.0, -0.0, -0.0, -0.0, -0.0]
0.0
Light can switch
[-0.0, -0.0, -0.0, -0.0, -0.0]
0.0
Light can switch
[-0.0, -0.0, -0.0, -0.0, 0.0]
0.0
Light can switch
[-0.0, -0.0, -0.0, 0.0, 0.0]
0.0
Light can switch
[-0.0, -0.0, 0.0, 0.0, -0.0]
0.0
Light can switch
[-0.0, 0.0, 0.0, -0.0, -0.0]
0.0
Light can switch
[0, 0, 0, 0, 0]
0
Light can switch
[0, 0, 0, 0, -0.0]
0
Light can switch
[0, 0, 0, -0.0, -0.0]
0.0
Light can switch
[0, 0, -0.0, -0.0, -0.0]
0.0
Light can switch
[0, -0.0, -0.0, -0.0, 1.0]
0.0
Light can switch
[-0.0, -0.0, -0.0, 1.0, 1.0]
1.0
Light is cooldown blocked
[-0.0, -0.0, 1.0, 1.0, 1.0]
1.0
Light is cooldown blocked
[-0.0, 1.0, 1.0, 1.0, 1.0]
1.0
Light is cooldown blocked
[1.0, 1.0, 1.0, 1.0, 1.0]
0.0
Light can switch
[1.0, 1.0, 1.0, 1.0, 1.0]
0.0
Light can switch
[1.0, 