In [1]:
import numpy as np
from scipy.optimize import minimize, brute, differential_evolution, NonlinearConstraint, BFGS
import sys
from functools import reduce
from typing import Tuple, List 
np.set_printoptions(threshold=sys.maxsize)

##### Simulation and Optimization Code

In [2]:
# Global hyperparameters
E1, E2, E3 = 1, 2, 3
Ndim = 15   # 1-7 rho for segment 1-6, with 0, 7 dummy segment 0, 7 for rho, 8-13 v for segment 1-7, 14 w_r
T, L, Lambda, D_r = 1 / 360, 1, 2, 1500
Tau, Mu, C_r, Rho_m, Alpha, K, A, V_f, Rho_c = 19, 60, 2000, 120, 0.1, 40, 1.867, 120, 33 + E1 / 3

In [3]:
# Utility method (softmax min)
def soft_min(xs: List, beta: float=50.0):
    nx = np.array(xs)
    m = np.min(nx)
    return m - (1.0 / beta) * np.log(np.sum(np.exp(-beta * (nx - m))))

def closest(arr: np.ndarray, q: float):
    idx = np.searchsorted(arr, q)
    if idx == 0: return arr[0]
    if idx == len(arr): return arr[-1]
    return arr[idx-1] if abs(q - arr[idx-1]) <= abs(arr[idx] - q) else arr[idx]

In [4]:
# Wrap up sophisticated functions in the simulation as well as optimization to produce less bugs
def _observation(states: np.ndarray, i: int):
    return T * states[i, -1] + T * L * Lambda * np.sum(states[i, 1:7])

def _speed_dynamics(states: np.ndarray, control_inputs: np.ndarray, i: int, j: int):
    prv_desired_speed = V_f * np.exp(-np.pow(-soft_min([-states[i-1, j], 0]) / Rho_c, A) / A)
    prv_V = soft_min([(1 + Alpha) * control_inputs[i-1, 0].item(), prv_desired_speed]) if j == 2 or j == 3 else prv_desired_speed
    return states[i-1, j+7] + T / Tau * (prv_V - states[i-1, j+7]) + T / L * states[i-1, j+7] * ((states[i-1, j+6] - states[i-1, j+7]) if j > 1 else 0) - (Mu * T * (states[i-1, j+1] - states[i-1, j])) / (Tau * L * (states[i-1, j] + K))

def _init_density(states: np.ndarray, i: int):
    v_0, cur_q_0 = states[i, 8], (3000 + 50 * E2) if i < 60 else (1000 + 50 * E2)
    return cur_q_0 / (Lambda * v_0)

def _density_dynamics(states: np.ndarray, i: int, j: int):
    return states[i-1, j] + T / L * states[i-1, j-1] * (states[i-1, j-1+7] if j > 1 else states[i-1, 8]) - T / L * states[i-1, j] * states[i-1, j+7]

def _ramp_density(states: np.ndarray, control_inputs: np.ndarray, i: int):
    return soft_min([control_inputs[i-1, 1].item() * C_r, D_r + states[i-1, -1].item() / T, C_r * (Rho_m - states[i-1, 5].item()) / (Rho_m - Rho_c)])

def _add_ramp_up(states: np.ndarray, control_inputs: np.ndarray, i: int):
    return T / (Lambda * L) * _ramp_density(states, control_inputs, i)

def _queue_length_dynamics(states: np.ndarray, control_inputs: np.ndarray, i: int):
    return max(0, states[i-1, -1] + T * (D_r - _ramp_density(states, control_inputs, i)))

In [5]:
def dynamics(control_inputs: np.ndarray, num_step: int=120):
    states, outputs = np.zeros(shape=(num_step+1, Ndim)), np.zeros(shape=num_step+1)
    # Time boundary conditions
    states[0, 0] = 25
    for i in range(1, 8): states[0, i], states[0, i+7] = 25, 80
    states[0, -1] = 0
    # Time Evolution
    for i in range(1, num_step+1):
        # evaluate speed
        for j in range(1, 7):
            states[i, j+7] = _speed_dynamics(states, control_inputs, i, j)
        # evaluate density
        states[i, 0] = _init_density(states, i)
        for j in range(1, 7):
            states[i, j] = _density_dynamics(states, i, j)
            if j == 5:
                states[i, j] += _add_ramp_up(states, control_inputs, i)
        # evaluate queue size
        states[i, 7] = states[i, 6]
        states[i, -1] = _queue_length_dynamics(states, control_inputs, i)
        # evaluate output
        outputs[i] = _observation(states, i)
    return states, outputs

In [6]:
def unpack(num_step: int, opt_vars: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    controls, states = opt_vars[:2*num_step], opt_vars[2*num_step:]
    return controls.reshape((-1, 2)), states.reshape((-1, Ndim))

def pack(controls: np.ndarray, states: np.ndarray) -> np.ndarray:
    return np.append(controls.reshape(-1), states.reshape(-1))

def create_control(vsl_: np.ndarray, r_: np.ndarray) -> np.ndarray:
    vsl, r = np.repeat(vsl_, 6), np.repeat(r_, 6)
    return np.column_stack((vsl, r))

def objectives(states: np.ndarray, num_step=120):
    return reduce(lambda x, y: x + y, map(lambda idx: _observation(states, idx), range(1, num_step+1)))

In [7]:
def simulate(vsl_: np.array=None, r_: np.array=None, num_step: int=120, ):
    control_inputs: np.ndarray = create_control(vsl_, r_)
    return dynamics(control_inputs, num_step=num_step)

In [20]:
def optimize(num_step: int=120, init_vsl_: np.array=None, init_r_: np.array=None, max_queue: bool=False, discrete: bool=False):
    num_minute = int(num_step / 6)

    def objective(opt_vars: np.ndarray):
        assert not np.isnan(opt_vars).any() and not np.isinf(opt_vars).any()
        controls, states = unpack(num_step, opt_vars)
        return reduce(lambda x, y: x + y, map(lambda idx: _observation(states, idx), range(1, num_step+1)))
    
    def dynamics_constraint(opt_vars: np.ndarray):
        """
        Express the nonlinear equality constraints x[k+1]=f(x[k],u[k]) => f(x[k],u[k])-x[k+1]=0
        """
        controls, states = unpack(num_step, opt_vars) 
        results = np.zeros(shape=num_step*Ndim) 
        # Initialization Contraints
        results[0] = states[0, 0] - 25
        for i in range(1, 8):
            results[i] = states[i, 0] - 25
            if i != 7: results[i+7] = states[0, i+7] - 80
        results[14] = states[0, -1]
    
        for i in range(0, num_step):
            # Speed contraints: same with speed dynamics
            for j in range(1, 7):
                results[i*Ndim+j+7] = _speed_dynamics(states, controls, i, j) - states[i, j+7]
            # Initial density constraints: v_0(k) = v_1(k), q_0(k) = lambda * rho_0(k) * v_0(k)
            results[i*Ndim] = _init_density(states, i) - states[i, 0]
            # Density constraints
            for j in range(1, 7):
                results[i*Ndim+j] = _density_dynamics(states, i, j) - states[i, j]
                if j == 5:
                    results[i*Ndim+j] += _add_ramp_up(states, controls, i)
            results[i*Ndim+7] = states[i, 7] - states[i, 6]
            results[(i+1)*Ndim-1] = states[i, -1] - _queue_length_dynamics(states, controls, i)
        return results

    def discrete_dynamics(controls: np.ndarray):
        vsl_, r_ = controls[:num_minute], controls[num_minute:]
        disc_vsl_, disc_r_ = np.zeros_like(vsl_), np.zeros_like(r_)
        for i in range(num_minute): disc_vsl_[i] = closest(np.array([60, 80, 100, 120]), vsl_[i])
        for i in range(num_minute): disc_r_[i] = closest(np.array([0.2, 0.4, 0.6, 0.8]), r_[i])
        discretized: np.ndarray = create_control(disc_vsl_, disc_r_)
        states, outputs = dynamics(discretized)
        return objectives(states)

    def continous_dynamics(controls: np.ndarray):
        #print(controls[0:5])
        vsl_, r_ = controls[:num_minute], controls[num_minute:]
        print("=================")
        print(vsl_)
        print(r_)
        control_inputs: np.ndarray = create_control(vsl_, r_)
        states, _ = dynamics(control_inputs, num_step=num_step)
        return objectives(states)

    def queue_length_constrain(controls: np.ndarray) -> np.ndarray:
        vsl_, r_ = controls[:num_minute], controls[num_minute:]
        control_inputs: np.ndarray = create_control(vsl_, r_)
        states, _ = dynamics(control_inputs, num_step=num_step)
        return states[:,-1]
                
    result = None
    if not discrete:
        # init_opt_vars = pack(init_controls, init_states) # define initial values for optimization
        # nonlinear_contraints = NonlinearConstraint(dynamics_constraint, lb=np.zeros(shape=num_step*Ndim), ub=np.zeros(shape=num_step*Ndim), jac='3-point', hess=BFGS())     # define nonlinear contraints
        # bounds = [(0, 1), (60, 120)] * num_step + ([(0, Rho_c)] * 8 + [(0, 200)] * 6 + [(0, (np.inf if not max_queue else 23 + E1 / 6))])  * (num_step + 1)
        # assert len(init_opt_vars) == len(bounds)
        # result = minimize(objective, init_opt_vars, method='trust-constr', jac='3-point', hess=BFGS(), bounds=bounds, constraints=[nonlinear_contraints], options={'verbose': 2, "gtol": 1e-5, "xtol": 1e-5, "barrier_tol": 1e-7, "sparse_jacobian": True, "factorization_method": "AugmentedSystem", "maxiter": 100})
        bounds = [(60, 120)] * num_minute + [(0, 1)] * num_minute
        init_opt_vars = np.append(init_vsl_, init_r_)
        if not max_queue: result = minimize(continous_dynamics, init_opt_vars, method='trust-constr', jac='3-point', bounds=bounds, options={'verbose': 2, "gtol": 1e-5, "xtol": 1e-5, "barrier_tol": 1e-7, "sparse_jacobian": True, "factorization_method": "AugmentedSystem", "maxiter": 100})
        else: 
            constraint = NonlinearConstraint(queue_length_constrain, lb=np.zeros(shape=num_step+1), ub=np.array([23+E1/6]*(num_step+1)))
            result = minimize(continous_dynamics, init_opt_vars, constraints=constraint, method='trust-constr', jac='3-point', hess=BFGS(), bounds=bounds, options={'verbose': 2, "gtol": 1e-5, "xtol": 1e-5, "barrier_tol": 1e-7, "sparse_jacobian": True, "factorization_method": "AugmentedSystem", "maxiter": 100})
    else: 
        bounds = [(60, 130)] * num_minute + [(0.2, 1)] * num_minute
        result = differential_evolution(discrete_dynamics, bounds=tuple(bounds), disp=True)
    print(f"Success: {result.success}, message: {result.message}")
    optimal_opt_vars = result.x
    optimal_controls, optimal_states = unpack(num_step, optimal_opt_vars)
    return optimal_controls, optimal_states

##### Task Code for the problems

In [9]:
def task1(num_control: int):
    vsl_, r_ = np.random.rand(num_control) * 60 + 60, np.random.rand(num_control)
    states, outputs = simulate(vsl_, r_)
    print(objectives(states))

In [10]:
def task3():
    optimize(120, np.array([60] * 20), np.array([0] * 20))
    optimize(120, np.array([120] * 20), np.array([1] * 20))

In [11]:
def task6():
    vsl_, r_ = np.random.rand(20) * 60 + 60, np.random.rand(20)
    optimize(120, vsl_, r_, max_queue=True)

In [12]:
def task7():
    optimize(120, np.array([120] * 20), np.array([1] * 20), discrete=True)

In [13]:
task1(20)

104.57575627991963
