# Libs

In [None]:
import gurobipy as grb
from gurobipy import *
import numpy as np
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt

# MaxBAND Model

## MaxBAND basic

In [None]:
class MaxBAND_basic():
    def __init__(self, num_signals):
        self.num_signals = num_signals
    
    
    def build_program(self, red_time, queue_clearance_time, travel_time, intranode_offset):
        
        assert red_time.shape == (2, self.num_signals)
        assert queue_clearance_time.shape == (2, self.num_signals)
        assert travel_time.shape == (2, self.num_signals - 1)
        assert intranode_offset.shape == (self.num_signals,)
        
        # create model
        maxband = Model("MaxBAND_basic")
        
        # create varibles
        b = maxband.addVars(['b_' + str(i) for i in range(2)],
                            lb=0, ub=1, obj=1, vtype=GRB.CONTINUOUS)
        w = maxband.addVars(['w_' + str(i) for i in range(2 * self.num_signals)],
                            lb=0, ub=1, obj=1, vtype=GRB.CONTINUOUS)
        m = maxband.addVars(['m_' + str(i) for i in range(self.num_signals)],
                            lb=1, obj=1, vtype=GRB.INTEGER)
        b = np.array(b.values())
        w = np.array(w.values()).reshape((2, self.num_signals))
        m = np.array(m.values())
        maxband.update()
        
        # build objective
        obj = np.sum(b)
        maxband.setObjective(obj, GRB.MAXIMIZE)
        
        r = red_time
        tao = queue_clearance_time
        t = travel_time
        delta = intranode_offset
        
        # add constraints
        # No.1: Directional Interference
        for d in range(2):
            for i in range(self.num_signals):
                maxband.addConstr(w[d, i] + b[d] <= 1 - r[d, i])

        # No.2: Loop Integer
        for i in range(self.num_signals - 1):
            # left-hand side
            term1 = w[0, i] + w[1, i]
            term2 = -(w[0, i + 1] + w[1, i + 1])
            term3 = t[0, i] + t[1, i]
            term4 = delta[i] - delta[i + 1]
            lhs = term1 + term2 + term3 + term4
            
            # right-hand side
            term1 = -1/2 * (r[0, i] + r[1, i])
            term2 = 1/2 * (r[0, i + 1] + r[1, i + 1])
            term3 = tao[1, i] + tao[0, i + 1]
            term4 = m[i]
            rhs = term1 + term2 + term3 + term4
            
            maxband.addConstr(lhs == rhs)

        
        return maxband

    
    def solve(self, red_time, queue_clearance_time, travel_time, intranode_offset):
        
        maxband = self.build_program(red_time, queue_clearance_time,
                                     travel_time, intranode_offset)
        maxband.optimize()
        
        b_range = 2
        w_range = b_range + 2 * self.num_signals
        solution_b = [v.x for v in maxband.getVars()[:b_range]]
        solution_w = [v.x for v in maxband.getVars()[b_range:w_range]]
        solution_m = [v.x for v in maxband.getVars()[w_range:]]
        
        return solution_b, solution_w, solution_m

## MaxBAND versatile

In [None]:
class MaxBAND_versatile():
    def __init__(self, num_signals):
        self.num_signals = num_signals
    
    
    def build_program(self, red_time, queue_clearance_time,
                      two_way_vol_ratio, cycle_range, speed_range,
                      speed_change_range, distance, left_turn_time):
        
        assert red_time.shape == (2, self.num_signals)
        assert queue_clearance_time.shape == (2, self.num_signals)
        assert left_turn_time.shape == (2, self.num_signals)
        assert speed_range.shape == (2, self.num_signals - 1, 2)
        assert speed_change_range.shape == (2, self.num_signals - 2, 2)
        assert distance.shape == (2, self.num_signals - 1)
        
        # create model
        maxband = Model("MaxBAND_versatile")
        
        # create varibles
        b = maxband.addVars(['b_' + str(i) for i in range(2)],
                            lb=0, ub=1, obj=1, vtype=GRB.CONTINUOUS)
        w = maxband.addVars(['w_' + str(i) for i in range(2 * self.num_signals)],
                            lb=0, ub=1, obj=1, vtype=GRB.CONTINUOUS)
        m = maxband.addVars(['m_' + str(i) for i in range(self.num_signals)],
                            lb=1, obj=1, vtype=GRB.INTEGER)
        z = maxband.addVars(['z_' + str(i) for i in range(1)],
                            lb=cycle_range[1], ub=cycle_range[0], obj=1, vtype=GRB.CONTINUOUS)
        t = maxband.addVars(['t_' + str(i) for i in range(2 * (self.num_signals - 1))],
                            lb=0, obj=1, vtype=GRB.CONTINUOUS)
        sig = maxband.addVars(['sig_' + str(i) for i in range(2 * self.num_signals)],
                              obj=1, vtype=GRB.BINARY)
        b = np.array(b.values())
        w = np.array(w.values()).reshape((2, self.num_signals))
        m = np.array(m.values())
        z = np.array(z.values())
        t = np.array(t.values()).reshape((2, self.num_signals - 1))
        sig = np.array(sig.values()).reshape((2, self.num_signals))
        maxband.update()
        
        # build objective
        k = two_way_vol_ratio
        obj = b[0] + k * b[1]
        maxband.setObjective(obj, GRB.MAXIMIZE)
        
        r = red_time
        tao = queue_clearance_time
        L = left_turn_time
        d = distance
        e, f = speed_range[:,:,0], speed_range[:,:,1]
        g, h = speed_change_range[:,:,0], speed_change_range[:,:,1]
        
        # add constraints
        # No.1: Directional Interference
        for j in range(2):
            for i in range(self.num_signals):
                maxband.addConstr(w[j, i] + b[j] <= 1 - r[j, i])

        # No.2: Loop Integer
        for i in range(self.num_signals - 1):
            # left-hand side
            term1 = w[0, i] + w[1, i]
            term2 = -(w[0, i + 1] + w[1, i + 1])
            term3 = t[0, i] + t[1, i]
            term4 = sig[0, i] * L[0, i] - sig[1, i] * L[1, i]
            term4 -= sig[0, i + 1] * L[0, i + 1] - sig[1, i + 1] * L[1, i + 1]
            term5 = -m[i]
            lhs = term1 + term2 + term3 + term4 + term5
            
            # right-hand side
            term1 = r[0, i + 1] - r[0, i]
            term2 = tao[0, i] + tao[1, i]
            rhs = term1 + term2
            
            maxband.addConstr(lhs == rhs)

        # No.3: Weighted inbound and outbound bandwidth
        maxband.addConstr((1 - k) * b[1] >= (1 - k) * k * b[0])
        
        # No.4: Speed Range
        for i in range(self.num_signals - 1):
            maxband.addConstr(t[0, i] >= (d[0, i] / f[0, i]) * z[0])
            maxband.addConstr(t[0, i] <= (d[0, i] / e[0, i]) * z[0])
            maxband.addConstr(t[1, i] >= (d[1, i] / f[1, i]) * z[0])
            maxband.addConstr(t[1, i] <= (d[1, i] / e[1, i]) * z[0])
            
        # No.5: Speed Change Range
        for i in range(self.num_signals - 2):
            maxband.addConstr(((d[0, i] / d[0, i + 1]) * t[0, i + 1] - t[0, i]) <= (d[0, i] / h[0, i]) * z[0])
            maxband.addConstr(((d[0, i] / d[0, i + 1]) * t[0, i + 1] - t[0, i]) >= (d[0, i] / g[0, i]) * z[0])
            maxband.addConstr(((d[1, i] / d[1, i + 1]) * t[1, i + 1] - t[1, i]) <= (d[1, i] / h[1, i]) * z[0])
            maxband.addConstr(((d[1, i] / d[1, i + 1]) * t[1, i + 1] - t[1, i]) >= (d[1, i] / g[1, i]) * z[0])
            
        
        return maxband

    
    def solve(self, red_time, queue_clearance_time,
              two_way_vol_ratio, cycle_range, speed_range,
              speed_change_range, distance, left_turn_time):
        
        maxband = self.build_program(red_time, queue_clearance_time,
                                     two_way_vol_ratio, cycle_range, speed_range,
                                     speed_change_range, distance, left_turn_time)
        maxband.optimize()
        
        b_range = 2
        w_range = b_range + 2 * self.num_signals
        m_range = w_range + self.num_signals
        t_range = m_range + 1 + 2 * (self.num_signals - 1)
        solution_b = [v.x for v in maxband.getVars()[:b_range]]
        solution_w = [v.x for v in maxband.getVars()[b_range:w_range]]
        solution_m = [v.x for v in maxband.getVars()[w_range:m_range]]
        solution_z = [v.x for v in maxband.getVars()[m_range:m_range + 1]]
        solution_t = [v.x for v in maxband.getVars()[m_range + 1:t_range]]
        solution_sig = [v.x for v in maxband.getVars()[t_range:]]
        
        return solution_b, solution_w, solution_m, solution_z, solution_t, solution_sig

## Input and Solve

In [None]:
# inputs
num_signals = 5
red_time = 0.4 + 0.1 * np.random.rand(2, num_signals)
queue_clearance_time = 0.1 * np.random.rand(2, num_signals)
travel_time = 0.3 + np.random.rand(2, num_signals - 1)
intranode_offset = 0.05 * np.random.rand(num_signals)
two_way_vol_ratio = 1
cycle_range = [1/40, 1/150]
speed_range = 10 * np.ones((2, num_signals - 1, 2))
speed_range[:, :, 1] = 20
speed_change_range = -20 * np.ones((2, num_signals - 2, 2))
speed_change_range[:, :, 1] = 20
distance = 250 + 200 * np.random.rand(2, num_signals - 1)
distance[1, :] = distance[0, :]
left_turn_time = 0.1 + 0.4 * np.random.rand(2, num_signals)

# build basic MaxBAND model and solve
maxband = MaxBAND_basic(num_signals=num_signals)
b, w, m = maxband.solve(red_time, queue_clearance_time,
                        travel_time, intranode_offset)

# build versatile MaxBAND model and solve
maxband = MaxBAND_versatile(num_signals=num_signals)
b, w, m, z, t, sig = maxband.solve(red_time, queue_clearance_time,
                                   two_way_vol_ratio, cycle_range, speed_range,
                                   speed_change_range, distance, left_turn_time)

# MultiBAND Model

In [None]:
class MultiBAND():
    def __init__(self, num_signals, band_weights):
        self.num_signals = num_signals
        self.alpha = band_weights # v/c ratio
    
    
    def build_program(self, red_time, queue_clearance_time,
                      two_way_vol_ratio, cycle_range, speed_range,
                      speed_change_range, distance, left_turn_time):
        
        assert red_time.shape == (2, self.num_signals)
        assert queue_clearance_time.shape == (2, self.num_signals)
        assert left_turn_time.shape == (2, self.num_signals)
        assert speed_range.shape == (2, self.num_signals - 1, 2)
        assert speed_change_range.shape == (2, self.num_signals - 2, 2)
        assert distance.shape == (2, self.num_signals - 1)
        assert two_way_vol_ratio.shape == (self.num_signals - 1,)
        
        # create model
        multiband = Model("MultiBAND")
        
        # create varibles
        b = multiband.addVars(['b_' + str(i) for i in range(2 * (self.num_signals - 1))],
                            lb=0, ub=1, obj=1, vtype=GRB.CONTINUOUS)
        w = multiband.addVars(['w_' + str(i) for i in range(2 * self.num_signals)],
                            lb=0, ub=1, obj=1, vtype=GRB.CONTINUOUS)
        m = multiband.addVars(['m_' + str(i) for i in range(self.num_signals)],
                            lb=1, obj=1, vtype=GRB.INTEGER)
        z = multiband.addVars(['z_' + str(i) for i in range(1)],
                            lb=cycle_range[1], ub=cycle_range[0], obj=1, vtype=GRB.CONTINUOUS)
        t = multiband.addVars(['t_' + str(i) for i in range(2 * (self.num_signals - 1))],
                            lb=0, obj=1, vtype=GRB.CONTINUOUS)
        sig = multiband.addVars(['sig_' + str(i) for i in range(2 * self.num_signals)],
                              obj=1, vtype=GRB.BINARY)
        b = np.array(b.values()).reshape((2, self.num_signals - 1))
        w = np.array(w.values()).reshape((2, self.num_signals))
        m = np.array(m.values())
        z = np.array(z.values())
        t = np.array(t.values()).reshape((2, self.num_signals - 1))
        sig = np.array(sig.values()).reshape((2, self.num_signals))
        multiband.update()
        
        # build objective
        obj = np.sum(self.alpha * b) / (self.num_signals - 1)
        multiband.setObjective(obj, GRB.MAXIMIZE)
        
        r = red_time
        tao = queue_clearance_time
        L = left_turn_time
        d = distance
        e, f = speed_range[:,:,0], speed_range[:,:,1]
        g, h = speed_change_range[:,:,0], speed_change_range[:,:,1]
        k = two_way_vol_ratio
        
        # add constraints
        # No.1: Directional Interference
        for j in range(2):
            for i in range(self.num_signals - 1):
                multiband.addConstr(w[j, i] >= 1/2 * b[j, i])
                multiband.addConstr(w[j, i] <= 1 - r[j, i] - 1/2 * b[j, i])
                multiband.addConstr(w[j, i + 1] >= 1/2 * b[j, i])
                multiband.addConstr(w[j, i + 1] <= 1 - r[j, i + 1] - 1/2 * b[j, i])

        # No.2: Loop Integer
        for i in range(self.num_signals - 1):
            # left-hand side
            term1 = w[0, i] + w[1, i]
            term2 = -(w[0, i + 1] + w[1, i + 1])
            term3 = t[0, i] + t[1, i]
            term4 = sig[0, i] * L[0, i] - sig[1, i] * L[1, i]
            term4 -= sig[0, i + 1] * L[0, i + 1] - sig[1, i + 1] * L[1, i + 1]
            term5 = -m[i]
            lhs = term1 + term2 + term3 + term4 + term5
            
            # right-hand side
            term1 = r[0, i + 1] - r[0, i]
            term2 = tao[0, i] + tao[1, i]
            rhs = term1 + term2
            
            multiband.addConstr(lhs == rhs)

        # No.3: Weighted inbound and outbound bandwidth
        for i in range(self.num_signals - 1):
            multiband.addConstr((1 - k[i]) * b[1, i] >= (1 - k[i]) * k[i] * b[0, i])
        
        # No.4: Speed Range
        for j in range(2):
            for i in range(self.num_signals - 1):
                multiband.addConstr(t[j, i] >= (d[j, i] / f[j, i]) * z[0])
                multiband.addConstr(t[j, i] <= (d[j, i] / e[j, i]) * z[0])
            
        # No.5: Speed Change Range
        for j in range(2):
            for i in range(self.num_signals - 2):
                multiband.addConstr(((d[j, i] / d[j, i + 1]) * t[j, i + 1] - t[j, i]) <= (d[j, i] / h[j, i]) * z[0])
                multiband.addConstr(((d[j, i] / d[j, i + 1]) * t[j, i + 1] - t[j, i]) >= (d[j, i] / g[j, i]) * z[0])
    
        return multiband

    
    def solve(self, red_time, queue_clearance_time,
              two_way_vol_ratio, cycle_range, speed_range,
              speed_change_range, distance, left_turn_time):
        
        maxband = self.build_program(red_time, queue_clearance_time,
                                     two_way_vol_ratio, cycle_range, speed_range,
                                     speed_change_range, distance, left_turn_time)
        maxband.optimize()
        
        b_range = 2 * self.num_signals
        w_range = b_range + 2 * self.num_signals
        m_range = w_range + self.num_signals
        t_range = m_range + 1 + 2 * (self.num_signals - 1)
        solution_b = [v.x for v in maxband.getVars()[:b_range]]
        solution_w = [v.x for v in maxband.getVars()[b_range:w_range]]
        solution_m = [v.x for v in maxband.getVars()[w_range:m_range]]
        solution_z = [v.x for v in maxband.getVars()[m_range:m_range + 1]]
        solution_t = [v.x for v in maxband.getVars()[m_range + 1:t_range]]
        solution_sig = [v.x for v in maxband.getVars()[t_range:]]
        
        return solution_b, solution_w, solution_m, solution_z, solution_t, solution_sig

In [None]:
# inputs
num_signals = 5
red_time = 0.4 + 0.1 * np.random.rand(2, num_signals)
queue_clearance_time = 0.1 * np.random.rand(2, num_signals)
two_way_vol_ratio = 2 * np.ones((num_signals - 1, ))
cycle_range = [1/40, 1/150]
speed_range = 10 * np.ones((2, num_signals - 1, 2))
speed_range[:, :, 1] = 20
speed_change_range = -20 * np.ones((2, num_signals - 2, 2))
speed_change_range[:, :, 1] = 20
distance = 250 + 200 * np.random.rand(2, num_signals - 1)
distance[1, :] = distance[0, :]
left_turn_time = 0.1 + 0.4 * np.random.rand(2, num_signals)

# band weigths
vc_ratio = 0.4 + 0.3 * np.random.rand(2, num_signals - 1)
vc_ratio[0,:] /= np.sum(vc_ratio[0,:])
vc_ratio[1,:] /= np.sum(vc_ratio[1,:])
vc_ratio *= (num_signals - 1)
band_weights = vc_ratio ** 2

# build MultiBAND model and solve
multiband = MultiBAND(num_signals=num_signals, band_weights=band_weights)
b, w, m, z, t, sig = multiband.solve(red_time, queue_clearance_time,
                                   two_way_vol_ratio, cycle_range, speed_range,
                                   speed_change_range, distance, left_turn_time)