In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import numba as nb
import scipy as sc
import math
import time
from scipy.integrate import cumtrapz
from numba import jit, njit, prange

# LOAD PARAMETER

In [None]:
# Steady State Response
param_ssr = np.load('../model/ssr.npy')[-1]

# Dynamics
param_dynamics = np.load('../model/sys_id.npy')[-1]

# Generate Trajectory

## Step & ramp function

In [None]:
def step(tt):
    out = np.zeros_like(tt)
    out[tt >= 0] = 1
    return out

def ramp(tt):
    out = np.array(tt)
    out[tt < 0] = 0
    return out

def jitter(gain, omega, tt, t0, tf):
    out = np.array(tt)
    
    out = gain * np.sin(omega*(tt-t0))
    out[tt-t0 < 0] = 0
    out[tt-tf > 0] = 0
    return out

## Continuous acceleration

In [None]:
t0 = np.arange(3, 288, 0.02)
a0 = ramp(t0-3) - ramp(t0-4.5) - ramp(t0-8) + ramp(t0-9.5) \
    - 0.25*ramp(t0-27) + 0.25*ramp(t0-30) + 0.25*ramp(t0-32) - 0.25*ramp(t0-35) \
    + 0.5*ramp(t0-40) - 1.*ramp(t0-44) + 0.5*ramp(t0-48) \
    - 1*ramp(t0-60) + 2*ramp(t0 - 62) - 1*ramp(t0-64) \
    - 0.1*ramp(t0-79) + 0.4*ramp(t0-85) - 0.3*ramp(t0-87) \
    + 0.35*ramp(t0-95) - 0.7*ramp(t0-98) + 0.35*ramp(t0-101) \
    - 0.5*ramp(t0-101) + 1*ramp(t0-102.5) - 0.5*ramp(t0-104) \
    + 0.35*ramp(t0-104) - 0.7*ramp(t0-107) + 0.35*ramp(t0-110) \
    - 0.15*ramp(t0-110) + 0.3*ramp(t0-114) - 0.15*ramp(t0-118) \
    + jitter(0.25, np.pi / 2.0, t0, 132, 152) \
    + 2.*ramp(t0-160) - 2.*ramp(t0-161) - 2.*ramp(t0-163) + 2.*ramp(t0-164) \
    - 2.*ramp(t0 - 180) + 2*ramp(t0-181) + 2 *ramp(t0-183) - 2*ramp(t0-184) \
    + 2.0 * ramp(t0-210) - 2.0*ramp(t0-210.2) - 2.0*ramp(t0-216) + 2.0*ramp(t0-216.4)\
    + 2.0 * ramp(t0-218.4) - 2.0*ramp(t0-218.8)  - 2.0*ramp(t0 - 230) + 2.0*ramp(t0-230.2) \
    - 1.5*ramp(t0-240) + 1.5*ramp(t0-241) + 1.5*ramp(t0-243) - 1.5*ramp(t0-244) #\
    #+ 5.0*step(t0-255) - 5.0*step(t0-256)
t0 = np.arange(0, 285, 0.02)
v0 = cumtrapz(a0, t0, initial=0.) + 1.

fig, ax1 = plt.subplots()
ax1.set_xlabel('Time (s)')
ax1.plot(t0, v0, color='tab:blue', linewidth=2.0, label='Speed')
ax1.set_ylabel('Speed 'r'$(m/s)$', color='tab:blue')

ax2 = ax1.twinx()
ax2.plot(t0, a0, color='black', linestyle='--', linewidth=1.5, label='Acceleration')
ax2.set_ylabel('Acceleration '+r'$(m/s^2)$', color='black')
ax2.set_ylim(ax2.get_ylim()[0], 3 * ax2.get_ylim()[1])

fig.legend()
plt.title('Reference Trajectory')
# fig.savefig('images/trajectory', dpi=600)
plt.show()

In [None]:
# Untuk ngitung overshoot
idx = np.array([[9.5, 27.], [35., 40.], [48., 60.], [64., 79.], [87., 95.], [118., 132.], [164., 180.], [184., 210.], [230.2, 240.], [244., t0[-1]+3.]]) -3
direction = np.array([1, 0, 1, 0, 0, 1, 1, 0, 1, 0])
reg = np.ones(t0.shape[0])
for i in range(t0.shape[0]):
    for j in range(idx.shape[0]):
        if idx[j,0] <= t0[i] and t0[i] <= idx[j,1]:
            reg[i] = v0[i]

In [None]:
plt.plot(t0, v0)
plt.plot(t0, reg, linestyle='--')
#direction = np.array([1, 0, 1, 0, 0, 0, 1, 0, 1, 0, 1])

# MAKE FUNCTION

## Generate Population

In [None]:
def generate_population(num, dim, rng):
    """
    Generate population:
        Input:
            num: number of population (integer)
            dim: number of parameters (integer)
            rng: range number used in initialization (list or numpy array)
        Output:
            pop: initial position of the population (numpy array)
    """
    pop = np.zeros((num,dim))
    for i in range(dim):
        lim = rng[i]
        pop[:, i] = np.random.uniform(lim[0], lim[1], size=num)
    return pop

## Forward Propagation

In [None]:
@njit
def delayed_control_signal(i, u, u_list, td):
    if i < td:
        ut = 0.0
    else:
        if td == 0:
            ut = u
        else:
            ut = u_list[i-td]
    return ut
_ = delayed_control_signal(1, 0.1, np.array([0.1, 0.2]), 0)

In [None]:
@njit
def clip(a, a_min, a_max):
    if a > a_max:
        return a_max
    elif a < a_min:
        return a_min
    else:
        return a
    
_ = clip(2.0, -1.0, 1.0)

In [None]:
# Steady state response parameters
beta1, beta2, beta3 = param_ssr

# System parameters
a1, a2, a3, b1, b2, b3, b4, c1, c2, c3, c4, td11, td12, td13, td21, td22, td23 = param_dynamics
td11 = int(np.around(td11))
td12 = int(np.around(td12))
td13 = int(np.around(td13))
td21 = int(np.around(td21))
td22 = int(np.around(td22))
td23 = int(np.around(td23))

In [None]:
sat_min = -1.
sat_max = 1.

@njit
def forward_propagation(t, v, param):    
    kp, ki, kd = param

    dt = np.mean(t[1:] - t[:-1])
    
    ki = ki * dt
    kd = kd / dt    
    
    e_sum = 0.0
    e_last = 0.0
    e_int_state = 0 # 0 --> No Saturation || 1 --> Saturation (+) || -1 --> Saturation (-1)
    is_start = True
    
    u1_list = np.empty(t.shape)
    u2_list = np.empty(t.shape)
    out = np.empty(t.shape)
    y = 0.0
    for i in range(t.shape[0]):     
        # LONGITUDINAL CONTROLLER
        sp = clip(v[i], 0.0, np.Inf)
        sr = beta1 * (1 - np.exp(beta2*sp)) + beta3
        sr = clip(sr, 0., sat_max) * 0.5
        
        err = sp - y
        if e_int_state == 0:
            e_sum += err
        elif e_int_state == 1:
            if err < 0:
                e_sum += err
        elif e_int_state == -1:
            if err > 0:
                e_sum += err
        
        if is_start:
            temp = sr + kp * err + ki * e_sum + 0.
            is_start = False
        else:
            temp = sr + kp * err + ki * e_sum + kd * (err - e_last)
        
        e_last = err

        if temp > sat_max: # Saturation (+)
            temp = sat_max
            e_int_state = 1
        elif temp < sat_min: # Saturation (-)
            temp = sat_min
            e_int_state = -1
        else: # Not saturated
            e_int_state = 0
        
        u1 = clip(temp, 0.0, sat_max)
        u2 = clip(-temp, 0.0, -sat_min)
        
        # DYNAMICS     
        u11t = delayed_control_signal(i, u1, u1_list, td11)
        u12t = delayed_control_signal(i, u1, u1_list, td12)
        u13t = delayed_control_signal(i, u1, u1_list, td13)
        u21t = delayed_control_signal(i, u2, u2_list, td21)
        u22t = delayed_control_signal(i, u2, u2_list, td22)
        u23t = delayed_control_signal(i, u2, u2_list, td23)
        
        temp = 0.
        if y != 0.:
            temp = a1
                    
        y_dot = temp + a2 * y + a3 * y**2 \
                + b1 * u11t + b2 * np.exp(b3 * y + b4 * u12t) * u13t  \
                + c1 * u21t + c2 * np.exp(c3 * y + c4 * u22t) * u23t
        
        y += y_dot * dt
        if y < 0.0:
            y = 0.0

        u1_list[i] = u1
        u2_list[i] = u2
        out[i] = y

    return out, u1_list, u2_list
_ = forward_propagation(np.arange(10, dtype=float), np.ones(10), np.array([0.1, 0.1, 0.1]))

In [None]:
%timeit forward_propagation(t0, v0, np.array([0.2, 0.1550, 0.1]))

## Constraint

In [None]:
@njit
def admissible(param):
    kp, ki, kd = param
    if kp < 0. or ki < 0. or kd < 0.:
        return False
    else:
        return True
n_dim = 3
_ = admissible(np.random.randn(n_dim))

## Cost

In [None]:
@njit
def gradient(a, t):
    dt = np.mean(t[1:]-t[:-1])
    
#     forward = np.zeros_like(a)
#     forward[:-1] = a[1:]
#     forward[-1] = forward[-2]
#     backward = np.zeros_like(a)
#     backward[1:] = a[:-1]
#     backward[0] = backward[1]    
#     out = (forward - backward) / 2 / dt
    
    out = np.zeros_like(a)
    out[1:-1] = (a[2:] - a[:-2]) / 2 / dt
    out[0] = out[1]
    out[-1] = out[-2]
    return out
_ = gradient(v0, t0)

In [None]:
idx = np.array([[9.5, 27.], [35., 40.], [48., 60.], [64., 79.], [87., 95.], [118., 132.], [164., 180.], [184., 210.], [230.2, 240.], [244., t0[-1]+3.]]) -3
direction = np.array([1, 0, 1, 0, 0, 1, 1, 0, 1, 0])

@njit
def max_os_sim(mv):
    out = 0.
    for i in range(mv.shape[0]):
        for j in range(idx.shape[0]):
            if idx[j,0] <= t0[i] and t0[i] <= idx[j,1]:
                if direction[j] > 0.5:
                    temp = mv[i] - v0[i]
                else:
                    temp = v0[i] - mv[i]
                temp = temp / v0[i] * 100
                temp = clip(temp, 0.0, np.Inf)
                if temp > out:
                    out = temp
    return out
_ = max_os_sim(np.zeros(v0.shape[0]))

In [None]:
@njit
def cost(t, v, param, lamda):
    mv, cs1, cs2 = forward_propagation(t, v, param)
    
    error = v - mv
    
    # ma = gradient(mv, t)
    # mj = gradient(ma, t)
    
    # mj = gradient(error, t)
    # mj = gradient(mj, t)
    
    mj = gradient(cs1, t)
    
    max_os = max_os_sim(mv)
    if max_os > lamda[1]: # max_os %
        return np.Inf
    # loss = np.sum(np.abs(error)) + lamda[0] * np.sum(np.abs(mj))
    # loss = np.sum(error**2) + lamda[0] * np.sum(mj**2)
    loss = np.sum(error**2) + lamda[0] * np.sum(np.abs(mj))
    M = t.shape[0]
    return loss / M
_ = cost(np.arange(10, dtype=float), np.ones(10), np.ones(3), np.array([0.001, 0.001]))

In [None]:
@njit
def mean_squared_error(t, v, param):
    mv, _, _ = forward_propagation(t, v, param)
    error = v - mv
    cost = np.mean(error**2)
    return cost
_ = mean_squared_error(np.arange(10, dtype=float), np.ones(10), np.array([0.1, 0.1, 0.1]))

@njit
def mean_absolute_error(t, v, param):
    mv, _, _ = forward_propagation(t, v, param)
    error = v - mv
    out = np.mean(np.abs(error))
    return out
_ = mean_absolute_error(np.arange(10, dtype=float), np.ones(10), np.array([0.1, 0.1, 0.1]))

@njit
def max_absolute_error(t, v, param):
    mv, _, _ = forward_propagation(t, v, param)
    error = v - mv
    return np.max(np.abs(error))
_ = max_absolute_error(np.arange(10, dtype=float), np.ones(10), np.array([0.1, 0.1, 0.1]))

@njit
def mean_absolute_jerk(t, v, param):
    mv, _, _ = forward_propagation(t, v, param)
    
    ma = gradient(mv, t)
    mj = gradient(ma, t)
    return np.mean(np.abs(mj))
_ = mean_absolute_jerk(np.arange(10, dtype=float), np.ones(10), np.array([0.1, 0.1, 0.1]))

@njit
def mean_squared_jerk(t, v, param):
    mv, _, _ = forward_propagation(t, v, param)
    
    ma = gradient(mv, t)
    mj = gradient(ma, t)
    return np.mean(mj**2)
_ = mean_squared_jerk(np.arange(10, dtype=float), np.ones(10), np.array([0.1, 0.1, 0.1]))

@njit
def max_percent_overshoot(t, v, param):
    mv, _, _ = forward_propagation(t, v, param)
    return max_os_sim(mv)
_ = max_percent_overshoot(np.arange(10, dtype=float), np.ones(10), np.array([0.1, 0.1, 0.1]))

@njit
def mean_absolute_u_dot(t, v, param):
    mv, cs1, cs2 = forward_propagation(t, v, param)
    
    cs1_dot = gradient(cs1, t)
    cs2_dot = gradient(cs2, t)
    return np.mean(np.abs(cs1_dot)+np.abs(cs2_dot))
_ = mean_absolute_u_dot(np.arange(10, dtype=float), np.ones(10), np.array([0.1, 0.1, 0.1]))

@njit
def mean_squared_u_dot(t, v, param):
    mv, cs1, cs2 = forward_propagation(t, v, param)
    
    cs1_dot = gradient(cs1, t)
    cs2_dot = gradient(cs2, t)
    return np.mean(np.abs(cs1_dot)**2+np.abs(cs2_dot)**2)
_ = mean_squared_u_dot(np.arange(10, dtype=float), np.ones(10), np.array([0.1, 0.1, 0.1]))

In [None]:
@njit
def calculate_total_cost(param, lamda):
    if admissible(param):
        return cost(t0, v0, param, lamda)
    return np.Inf
_ = calculate_total_cost(np.array([0.1, 0.1, 0.1]), np.array([0.001, 0.001]))

In [None]:
@njit(parallel=True)
def population_cost(population, lamda):
    length = population.shape[0]
    losses = np.zeros(length)
    for ii in prange(length):
        losses[ii] = calculate_total_cost(population[ii], lamda)
    return losses
_ = population_cost(np.array([[0.1, 0.1, 0.1], [0.1, 0.1, 0.1]]), np.array([0.001, 0.001]))

## PSO

In [None]:
@njit
def pso(population, pop_velocity, global_, global_loss, best_pos_local,
        best_local_loss, w_max, w_min, c1, c2, iteration, max_iter, lamda):
    num = population.shape[0]
    dim = population.shape[1]
    
    # Initial conditions
    ppos_vector = np.copy(population)
    pvel_vector = np.copy(pop_velocity)
    gbest_pos = np.copy(global_)
    gfit_value = global_loss + 0.0 # to avoid shallow copy
    pbest_pos = np.copy(best_pos_local)
    pfit_value = np.copy(best_local_loss)

    # Additional variables for catching the next global best
    next_gbest_pos = np.copy(gbest_pos)
    next_gfit_value = gfit_value + 0.0 # to avoid shallow copy
    
    # Calculate w based on w_max, w_min, current number of iteration,
    # and the maximum iteration
    w = w_max - (w_max - w_min) * iteration / max_iter
    
    for i in range(num):        
        # Get random numbers for r1 and r2
        r1 = np.random.uniform(0., 1.)
        r2 = np.random.uniform(0., 1.)
        
        # Update the velocity and position vector
        d1 = pbest_pos[i] - ppos_vector[i]
        d2 = gbest_pos - ppos_vector[i]
        pvel_vector[i] = w*pvel_vector[i] + c1*r1*d1+ c2*r2*d2
        ppos_vector[i] = pvel_vector[i] + ppos_vector[i]
        
        # Calculate the cost function or fit value
        cost_func = calculate_total_cost(ppos_vector[i], lamda)
        
        # Update the next local best for i-th particle if applicable
        if(pfit_value[i] > cost_func):
            pfit_value[i] = cost_func + 0.0 # to avoid shallow copy
            pbest_pos[i] = np.copy(ppos_vector[i])
                
        # Update the next global best if applicable
        if(next_gfit_value > cost_func):
            next_gfit_value = cost_func  + 0.0 # to avoid shallow copy
            next_gbest_pos = np.copy(ppos_vector[i])
        
    return ppos_vector, pvel_vector, pbest_pos, pfit_value, next_gbest_pos, next_gfit_value

xx1 = np.ones((2, n_dim))
xx2 = np.ones(2)
xx3 = np.random.randn(n_dim)
_ = pso(xx1, xx1, xx3, 100.0, xx1*0.5, xx2, 0.9, 0.4, 0.7, 0.8, 0, 20, np.array([0., np.Inf]))
# _ = pso(xx1, xx1, xx2, xx3, 100.0, 0.8, 1.5, 0.1, np.array([0., np.Inf]))

# SIMULATION (OPTIMIZATION)

In [None]:
num = 50
n_sim = 20
n_itr = 5000

r_kp = [0.0, 1.0]
r_ki = [0.0, 1.0]
r_kd = [0.0, 1.0]
rng = [r_kp, r_ki, r_kd]
dim = len(rng)

w_max = 0.9
w_min = 0.4
c1 = 0.7
c2 = 0.8

In [None]:
lamda = np.array([0.0, np.Inf])

param_history = np.zeros((n_sim, dim))
loss_history = np.ones(n_sim) * np.Inf

the_best_param_history = np.zeros((n_itr, dim))
the_best_loss_history = np.zeros(n_itr)

for j in range(n_sim):
    print(f'Optimization: {j+1} ------------------------------------------')
    
    print('Initializing ...')
    while True:
        try:
            population = generate_population(num, dim, rng)
            rng_vel = [[-max([abs(rng[i][0]), abs(rng[i][1])]), max([abs(rng[i][0]), abs(rng[i][1])])] for i in range(len(rng))]
            pop_velocity = generate_population(num, dim, rng_vel)
            global_ = None
            global_loss_ = np.Inf
            local_best_pos = np.copy(population)

            loss_population = population_cost(population, lamda)
            loss_population[np.isnan(loss_population)] = np.Inf
            min_idx = np.argmin(loss_population)
            min_loss = loss_population[min_idx]
            if global_loss_ > min_loss:
                global_loss_ = min_loss
                global_ = np.copy(population[min_idx, :])

            global_history = np.empty((n_itr, dim))
            global_history[0] = global_
            global_loss_history = np.empty(n_itr)
            global_loss_history[0] = global_loss_
            
            # Biasanya di sini suka gagal, kalau inisialisasi population awal semuanya menyelisihi constraint
            population, pop_velocity, local_best_pos, loss_population, global_, global_loss_ = pso(
                population, pop_velocity, global_, global_loss_, local_best_pos, loss_population,
                w_max, w_min, c1, c2, i, n_itr, lamda
            )
            break
        except:
            print('Re-Initializing ...')
            
    print('Continue ...')
    for i in range(1, n_itr):
        # Flower Pollination Algorithm
        population, pop_velocity, local_best_pos, loss_population, global_, global_loss_ = pso(
            population, pop_velocity, global_, global_loss_, local_best_pos, loss_population,
            w_max, w_min, c1, c2, i, n_itr, lamda
        )
        
        if (i-1) % 500 == 0:
            print('simulation: {} || iteration: {} || global_loss: {:.5f}'.format(j+1, i, global_loss_))

        global_history[i] = global_
        global_loss_history[i] = global_loss_

    if np.min(loss_history) > global_loss_history[-1]:
        the_best_loss_history = np.copy(global_loss_history)
        the_best_param_history = np.copy(global_history)
        
    param_history[j] = np.copy(global_history[-1])
    loss_history[j] = np.copy(global_loss_history[-1])
    
    print('simulation: {} || the best loss: {:.10f}'.format(j, the_best_loss_history[-1]))

In [None]:
# Save the simulation
np.save('result/param_history.npy', param_history)
np.save('result/loss_history.npy', loss_history)
np.save('result/the_best_loss_history.npy', the_best_loss_history)
np.save('result/the_best_param_history.npy', the_best_param_history)

f = open("result/sim.cfg", "w+")
f.writelines('num: {} # The number of particles\n'.format(num))
f.writelines('n_sim: {} # The number of simulation loop\n'.format(n_sim))
f.writelines('n_itr: {} # The number of iteration for each simulation\n'.format(n_itr))
f.writelines('\n# Lambda value\n')
f.writelines('lambda0: {}'.format(lamda[0]))
f.writelines('lambda1: {}'.format(lamda[1]))
f.writelines('\n# The boundary of the initialization value\n')
f.writelines('r_kp: {}\n'.format(r_kp))
f.writelines('r_ki: {}\n'.format(r_ki))
f.writelines('r_kd: {}\n'.format(r_kd))
f.writelines('\n# The PSO hyperparameters\n')
f.writelines('w: {}\n'.format(w))
f.writelines('c1: {}\n'.format(c1))
f.writelines('c2: {}\n'.format(c2))
f.close()

In [None]:
print('Lambda')
print(lamda)
print('Parameters')
print(global_)
print('Total loss: {}'.format(global_loss_))
print('MAE: {}'.format(mean_absolute_error(t0, v0, global_)))
print('MAJ: {}'.format(mean_absolute_jerk(t0, v0, global_)))
print('MSJ: {}'.format(mean_squared_jerk(t0, v0, global_)))
print('MAUD: {}'.format(mean_absolute_u_dot(t0, v0, global_)))
print('maximum %OS: {}'.format(max_percent_overshoot(t0, v0, global_)))

In [None]:
print('MSUD: {}'.format(mean_squared_u_dot(t0, v0, np.array([0.56458294, 2.2533995,  0.07817718]))))

In [None]:
mv, cs1, cs2 = forward_propagation(t0, v0, global_)
plt.plot(v0)
plt.plot(mv)

In [None]:
plt.plot(cs1)

In [None]:
eo = 200
el = 500
plt.plot(v0[eo:el])
plt.plot(mv[eo:el])