# Optimize PI Gain with Bayesian Optimization

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from typing import Tuple
from matplotlib import patches
from matplotlib.animation import ArtistAnimation
from IPython import display

## Control Target : Vehicle 
- Longitudinal dynamics : Point Mass Model
- Lateral dynamics : Kinematic Bicycle Model

In [None]:
# this class simulates the simple vehicle dynamics and visualizes the vehicle trajectory.
class Vehicle():
    def __init__(
            self,
            l_f: float = 1.5, # [m]
            l_r: float = 1.0, # [m]
            max_steer_abs: float = 0.523, # [rad]
            max_accel_abs: float = 2.000, # [m/s^2]
            ref_path: np.ndarray = np.array([[-30.0, 0.0], [30.0, 0.0]]),
            delta_t: float = 0.05, # [s]
            visualize: bool = True,
        ) -> None:
        """initialize vehicle environment
        state variables:
            x: x-axis position in the global frame [m]
            y: y-axis position in the global frame [m]
            yaw: orientation in the global frame [rad]
            v: longitudinal velocity [m/s]
        control input:
            steer: front tire angle of the vehicle [rad] (positive in the counterclockwize direction)
            accel: longitudinal acceleration of the vehicle [m/s^2] (positive in the forward direction)
        Note: dynamics of the vehicle is the Kinematic Bicycle Model. 
        """
        # vehicle parameters
        self.l_f = l_f # [m]
        self.l_r = l_r # [m]
        self.wheel_base = l_f + l_r # [m]
        self.max_steer_abs = max_steer_abs # [rad]
        self.max_accel_abs = max_accel_abs # [m/s^2]
        self.delta_t = delta_t #[s]
        self.ref_path = ref_path

        # visualization settings
        self.vehicle_w = 3.00
        self.vehicle_l = 4.00
        self.view_x_lim_min, self.view_x_lim_max = -20.0, 20.0
        self.view_y_lim_min, self.view_y_lim_max = -25.0, 25.0

        # reset environment
        self.visualize_flag = visualize
        self.reset()

    def reset(
            self, 
            init_state: np.ndarray = np.array([0.0, 0.0, 0.0, 0.0]), # [x, y, yaw, v]
        ) -> None:
        """reset environment to initial state"""

        # reset state variables
        self.state = init_state

        # clear animation frames
        self.frames = []

        if self.visualize_flag:
            # prepare figure
            self.fig = plt.figure(figsize=(9,9))
            self.main_ax = plt.subplot2grid((3,4), (0,0), rowspan=3, colspan=3)
            self.minimap_ax = plt.subplot2grid((3,4), (0,3))
            self.steer_ax = plt.subplot2grid((3,4), (1,3))
            self.accel_ax = plt.subplot2grid((3,4), (2,3))

            # graph layout settings
            ## main view
            self.main_ax.set_aspect('equal')
            self.main_ax.set_xlim(self.view_x_lim_min, self.view_x_lim_max)
            self.main_ax.set_ylim(self.view_y_lim_min, self.view_y_lim_max)
            self.main_ax.tick_params(labelbottom=False, labelleft=False, labelright=False, labeltop=False)
            self.main_ax.tick_params(bottom=False, left=False, right=False, top=False)
            ## mini map
            self.minimap_ax.set_aspect('equal')
            self.minimap_ax.axis('off')
            ## steering angle view
            self.steer_ax.set_title("Steering Angle", fontsize="12")
            self.steer_ax.axis('off')
            ## acceleration view
            self.accel_ax.set_title("Acceleration", fontsize="12")
            self.accel_ax.axis('off')
            
            # apply tight layout
            self.fig.tight_layout()

    def update(
            self, 
            u: np.ndarray, 
            delta_t: float = 0.0, 
            append_frame: bool = True, 
            vehicle_traj: np.ndarray = np.empty(0), # vehicle trajectory
        ) -> None:
        """update state variables"""
        # keep previous states
        x, y, yaw, v = self.state

        # prepare params
        l = self.wheel_base
        l_r = self.l_r
        dt = self.delta_t if delta_t == 0.0 else delta_t

        # limit control inputs
        steer = np.clip(u[0], -self.max_steer_abs, self.max_steer_abs)
        accel = np.clip(u[1], -self.max_accel_abs, self.max_accel_abs)

        """< CORE OF VEHICLE DYNAMICS >"""
        # update state variables
        beta = np.arctan(l_r / l * np.tan(steer))
        new_x = x + v * np.cos(yaw + beta) * dt
        new_y = y + v * np.sin(yaw + beta) * dt
        new_yaw = yaw + v / l * np.sin(beta) * dt
        new_v = v + accel * dt
        self.state = np.array([new_x, new_y, new_yaw, new_v])
        """< CORE OF VEHICLE DYNAMICS >"""

        # record frame
        if append_frame and self.visualize_flag:
            self.append_frame(steer, accel, vehicle_traj)

    def get_state(self) -> np.ndarray:
        """return state variables"""
        return self.state.copy()

    def append_frame(self, steer: float, accel: float, vehicle_traj: np.ndarray) -> list:
        """draw a frame of the animation."""
        # get current states
        x, y, yaw, v = self.state

        ### main view ###
        # draw the vehicle shape
        vw, vl = self.vehicle_w, self.vehicle_l
        vehicle_shape_x = [-0.5*vl, -0.5*vl, +0.5*vl, +0.5*vl, -0.5*vl, -0.5*vl]
        vehicle_shape_y = [0.0, +0.5*vw, +0.5*vw, -0.5*vw, -0.5*vw, 0.0]
        rotated_vehicle_shape_x, rotated_vehicle_shape_y = \
            self._affine_transform(vehicle_shape_x, vehicle_shape_y, yaw, [0, 0]) # make the vehicle be at the center of the figure
        frame = self.main_ax.plot(rotated_vehicle_shape_x, rotated_vehicle_shape_y, color='black', linewidth=2.0, zorder=3)

        # draw wheels
        ww, wl = 0.4, 0.7 #[m]
        wheel_shape_x = np.array([-0.5*wl, -0.5*wl, +0.5*wl, +0.5*wl, -0.5*wl, -0.5*wl])
        wheel_shape_y = np.array([0.0, +0.5*ww, +0.5*ww, -0.5*ww, -0.5*ww, 0.0])

        ## rear-left wheel
        wheel_shape_rl_x, wheel_shape_rl_y = \
            self._affine_transform(wheel_shape_x, wheel_shape_y, 0.0, [-0.3*vl, 0.3*vw])
        wheel_rl_x, wheel_rl_y = \
            self._affine_transform(wheel_shape_rl_x, wheel_shape_rl_y, yaw, [0, 0])
        frame += self.main_ax.fill(wheel_rl_x, wheel_rl_y, color='black', zorder=3)

        ## rear-right wheel
        wheel_shape_rr_x, wheel_shape_rr_y = \
            self._affine_transform(wheel_shape_x, wheel_shape_y, 0.0, [-0.3*vl, -0.3*vw])
        wheel_rr_x, wheel_rr_y = \
            self._affine_transform(wheel_shape_rr_x, wheel_shape_rr_y, yaw, [0, 0])
        frame += self.main_ax.fill(wheel_rr_x, wheel_rr_y, color='black', zorder=3)

        ## front-left wheel
        wheel_shape_fl_x, wheel_shape_fl_y = \
            self._affine_transform(wheel_shape_x, wheel_shape_y, steer, [0.3*vl, 0.3*vw])
        wheel_fl_x, wheel_fl_y = \
            self._affine_transform(wheel_shape_fl_x, wheel_shape_fl_y, yaw, [0, 0])
        frame += self.main_ax.fill(wheel_fl_x, wheel_fl_y, color='black', zorder=3)

        ## front-right wheel
        wheel_shape_fr_x, wheel_shape_fr_y = \
            self._affine_transform(wheel_shape_x, wheel_shape_y, steer, [0.3*vl, -0.3*vw])
        wheel_fr_x, wheel_fr_y = \
            self._affine_transform(wheel_shape_fr_x, wheel_shape_fr_y, yaw, [0, 0])
        frame += self.main_ax.fill(wheel_fr_x, wheel_fr_y, color='black', zorder=3)

        # draw the vehicle center circle
        vehicle_center = patches.Circle([0, 0], radius=vw/20.0, fc='white', ec='black', linewidth=2.0, zorder=4)
        frame += [self.main_ax.add_artist(vehicle_center)]

        # draw the reference path
        ref_path_x = self.ref_path[:, 0] - np.full(self.ref_path.shape[0], x)
        ref_path_y = self.ref_path[:, 1] - np.full(self.ref_path.shape[0], y)
        frame += self.main_ax.plot(ref_path_x, ref_path_y, color='black', linestyle="dashed", linewidth=1.5)

        # draw the information text
        text = "vehicle velocity = {v:>+6.1f} [m/s]".format(pos_e=x, head_e=np.rad2deg(yaw), v=v)
        frame += [self.main_ax.text(0.5, 0.02, text, ha='center', transform=self.main_ax.transAxes, fontsize=14, fontfamily='monospace')]

        # draw vehicle trajectory
        if vehicle_traj.any():
            vehicle_traj_x_offset = np.append(np.ravel(vehicle_traj[:, 0]) - np.full(vehicle_traj.shape[0], x), [0.0])
            vehicle_traj_y_offset = np.append(np.ravel(vehicle_traj[:, 1]) - np.full(vehicle_traj.shape[0], y), [0.0])
            frame += self.main_ax.plot(vehicle_traj_x_offset, vehicle_traj_y_offset, color='purple', linestyle="solid", linewidth=2.0)

        ### mini map view ###
        frame += self.minimap_ax.plot(self.ref_path[:, 0], self.ref_path[:,1], color='black', linestyle='dashed')
        rotated_vehicle_shape_x_minimap, rotated_vehicle_shape_y_minimap = \
            self._affine_transform(vehicle_shape_x, vehicle_shape_y, yaw, [x, y]) # make the vehicle be at the center of the figure
        frame += self.minimap_ax.plot(rotated_vehicle_shape_x_minimap, rotated_vehicle_shape_y_minimap, color='black', linewidth=2.0, zorder=3)
        frame += self.minimap_ax.fill(rotated_vehicle_shape_x_minimap, rotated_vehicle_shape_y_minimap, color='white', zorder=2)
        if vehicle_traj.any():
            frame += self.minimap_ax.plot(vehicle_traj[:, 0], vehicle_traj[:, 1], color='purple', linestyle="solid", linewidth=1.0)

        ### control input view ###
        # steering angle
        MAX_STEER = self.max_steer_abs
        PIE_RATE = 3.0/4.0
        PIE_STARTANGLE = 225 # [deg]
        s_abs = np.abs(steer)
        if steer < 0.0: # when turning right
            steer_pie_obj, _ = self.steer_ax.pie([MAX_STEER*PIE_RATE, s_abs*PIE_RATE, (MAX_STEER-s_abs)*PIE_RATE, 2*MAX_STEER*(1-PIE_RATE)], startangle=PIE_STARTANGLE, counterclock=False, colors=["lightgray", "black", "lightgray", "white"], wedgeprops={'linewidth': 0, "edgecolor":"white", "width":0.4})
        else: # when turning left
            steer_pie_obj, _ = self.steer_ax.pie([(MAX_STEER-s_abs)*PIE_RATE, s_abs*PIE_RATE, MAX_STEER*PIE_RATE, 2*MAX_STEER*(1-PIE_RATE)], startangle=PIE_STARTANGLE, counterclock=False, colors=["lightgray", "black", "lightgray", "white"], wedgeprops={'linewidth': 0, "edgecolor":"white", "width":0.4})
        frame += steer_pie_obj
        frame += [self.steer_ax.text(0, -1, f"{np.rad2deg(steer):+.2f} " + r"$ \rm{[deg]}$", size = 14, horizontalalignment='center', verticalalignment='center', fontfamily='monospace')]

        # acceleration
        MAX_ACCEL = self.max_accel_abs
        PIE_RATE = 3.0/4.0
        PIE_STARTANGLE = 225 # [deg]
        a_abs = np.abs(accel)
        if accel > 0.0:
            accel_pie_obj, _ = self.accel_ax.pie([MAX_ACCEL*PIE_RATE, a_abs*PIE_RATE, (MAX_ACCEL-a_abs)*PIE_RATE, 2*MAX_ACCEL*(1-PIE_RATE)], startangle=PIE_STARTANGLE, counterclock=False, colors=["lightgray", "black", "lightgray", "white"], wedgeprops={'linewidth': 0, "edgecolor":"white", "width":0.4})
        else:
            accel_pie_obj, _ = self.accel_ax.pie([(MAX_ACCEL-a_abs)*PIE_RATE, a_abs*PIE_RATE, MAX_ACCEL*PIE_RATE, 2*MAX_ACCEL*(1-PIE_RATE)], startangle=PIE_STARTANGLE, counterclock=False, colors=["lightgray", "black", "lightgray", "white"], wedgeprops={'linewidth': 0, "edgecolor":"white", "width":0.4})
        frame += accel_pie_obj
        frame += [self.accel_ax.text(0, -1, f"{accel:+.2f} " + r"$ \rm{[m/s^2]}$", size = 14, horizontalalignment='center', verticalalignment='center', fontfamily='monospace')]

        # append frame
        self.frames.append(frame)

    # rotate shape and return location on the x-y plane.
    def _affine_transform(self, xlist: list, ylist: list, angle: float, translation: list=[0.0, 0.0]) -> Tuple[list, list]:
        transformed_x = []
        transformed_y = []
        if len(xlist) != len(ylist):
            print("[ERROR] xlist and ylist must have the same size.")
            raise AttributeError

        for i, xval in enumerate(xlist):
            transformed_x.append((xlist[i])*np.cos(angle)-(ylist[i])*np.sin(angle)+translation[0])
            transformed_y.append((xlist[i])*np.sin(angle)+(ylist[i])*np.cos(angle)+translation[1])
        transformed_x.append(transformed_x[0])
        transformed_y.append(transformed_y[0])
        return transformed_x, transformed_y

    def show_animation(self, interval_ms: int) -> None:
        """show animation of the recorded frames"""
        ani = ArtistAnimation(self.fig, self.frames, interval=interval_ms) # blit=True
        html = display.HTML(ani.to_jshtml())
        display.display(html)
        plt.close()

    def save_animation(self, filename: str, interval: int, movie_writer: str="ffmpeg") -> None:
        """save animation of the recorded frames (ffmpeg required)"""
        ani = ArtistAnimation(self.fig, self.frames, interval=interval)
        ani.save(filename, writer=movie_writer)


## Path-Tracking Controller : PID Controller

## Longitudinal Controller

In [None]:
class PIDLongitudinalController():
    def __init__(
            self,
            p_gain: float = +1.2, # P Gain
            i_gain: float = +0.4, # I Gain
            d_gain: float = +0.1, # D Gain
            target_velocity: float = 3.0, # [m/s]
    ) -> None:
        """initialize pid controller for keeping constant velocity"""
        # pid control parameters
        self.K_p = p_gain
        self.K_i = i_gain
        self.K_d = d_gain
        self.target_vel = target_velocity
        self.pre_e = 0.0 # previous tracking error
        self.integrated_e = 0.0 # integrated tracking error

    def calc_control_input(self, observed_vel: float, delta_t: float) -> None:
        """calculate control input"""

        # calculate tracking error, its integral and derivative
        r = self.target_vel
        y = observed_vel
        e = r - y # tracking error to the traget velocity
        ie = self.integrated_e + (e + self.pre_e) * delta_t / 2.0 # integral of the tracking error
        de = (e - self.pre_e) / delta_t # derivative of the tracking error

        # calculate control input
        acc_cmd = self.K_p * e + self.K_i * ie + self.K_d * de

        # update previous tracking error
        self.pre_e = e
        self.integrated_e = ie

        return acc_cmd


## Lateral Controller

In [None]:
class PIDLateralController():
    def __init__(
            self,
            p_gain: float = +0.8, # P Gain
            i_gain: float = +0.1, # I Gain
            d_gain: float = +0.1, # D Gain
            ref_path: np.ndarray = np.array([[0.0, 0.0, 0.0, 1.0], [10.0, 0.0, 0.0, 1.0]]),
    ) -> None:
        """initialize pid controller for path-tracking"""
        # pid control parameters
        self.K_p = p_gain
        self.K_i = i_gain
        self.K_d = d_gain
        self.prev_tracking_error = 0.0 # previous tracking error
        self.prev_steer_cmd = 0.0 # previous steering command
        self.steer_cmd_change = 0.0 # previous change of steering command
        self.integrated_e = 0.0 # integrated tracking error

        # ref_path info
        self.ref_path = ref_path
        self.prev_waypoints_idx = 0

    def calc_control_input(self, observed_x: np.ndarray, delta_t: float) -> float:
        """calculate control input"""

        # set vehicle state variables from observation
        x = observed_x[0]
        y = observed_x[1]

        # get the waypoint closest to current vehicle position 
        _, ref_x, ref_y, ref_yaw, _ = self._get_nearest_waypoint(x, y, update_prev_idx=True)
        if self.prev_waypoints_idx >= self.ref_path.shape[0]-1:
            # print("[WARNING] Reached the end of the reference path.")
            raise IndexError

        # which side of the reference path is the car on, the right or the left?
        ## algorithm : http://www.hptown.com/ucad/Ufb00009.htm
        x1, y1 = ref_x, ref_y
        x2, y2 = ref_x + 1.0 * np.cos(ref_yaw), ref_y + 1.0*np.sin(ref_yaw)
        vx, vy = x2 - x1, y2 - y1
        wx, wy =  x - x1,  y - y1
        s = vx * wy - vy * wx # s>0 : vehicle is on the left of the path, s<0 : vehicle is on the left of the path,

        # get tracking error, its integral and derivative
        e = -np.sign(s) * np.sqrt((ref_x-x)**2 + (ref_y-y)**2) # tracking error to the reference path
        ie = self.integrated_e + (e + self.prev_tracking_error) * delta_t / 2.0 # integral of the tracking error
        de = (e - self.prev_tracking_error) / delta_t # derivative of the tracking error

        # calculate control input
        steer_cmd = self.K_p * e + self.K_i * ie + self.K_d * de

        # update variables
        self.prev_tracking_error = e
        self.steer_cmd_change = steer_cmd - self.prev_steer_cmd
        self.prev_steer_cmd = steer_cmd
        self.integrated_e = ie

        return steer_cmd

    def _get_nearest_waypoint(self, x: float, y: float, update_prev_idx: bool = False):
        """search the closest waypoint to the vehicle on the reference path"""

        SEARCH_IDX_LEN = 200 # [points] forward search range
        prev_idx = self.prev_waypoints_idx
        dx = [x - ref_x for ref_x in self.ref_path[prev_idx:(prev_idx + SEARCH_IDX_LEN), 0]]
        dy = [y - ref_y for ref_y in self.ref_path[prev_idx:(prev_idx + SEARCH_IDX_LEN), 1]]
        d = [idx ** 2 + idy ** 2 for (idx, idy) in zip(dx, dy)]
        min_d = min(d)
        nearest_idx = d.index(min_d) + prev_idx

        # get reference values of the nearest waypoint
        ref_x = self.ref_path[nearest_idx,0]
        ref_y = self.ref_path[nearest_idx,1]
        ref_yaw = self.ref_path[nearest_idx,2]
        ref_v = self.ref_path[nearest_idx,3]

        # update nearest waypoint index if necessary
        if update_prev_idx:
            self.prev_waypoints_idx = nearest_idx 

        return nearest_idx, ref_x, ref_y, ref_yaw, ref_v

    def get_score_for_evaluation(self) -> float:
        weight_1 = 1.0
        weight_2 = 3.0
        score = weight_1 * np.abs(self.prev_tracking_error) + weight_2 * np.abs(self.steer_cmd_change)
        return score


## Simulation of Path-Tracking
- Longitudinal Control : PID Controller
- Lateral Control : PID Controller

In [None]:
# optimization target: p_gain and i_gain of the lateral controller
# this function executes path-tracking simulation and returns the evaluation score of the episode
def path_tracking_simulation(
    x: list,
    visualize_ref_path: bool = False,
    visualize_animation: bool = False,
    show_info: bool = False,
    save_animation: bool = False,
    animation_filename: str = "pid_pathtracking.mp4"
    ) -> float:
    # simulation settings
    delta_t = 0.05 # [sec]
    sim_steps = 1500 # [steps]
    if show_info:
        print(f"[INFO] delta_t : {delta_t:.2f}[s] , sim_steps : {sim_steps}[steps], total_sim_time : {delta_t*sim_steps:.2f}[s]")

    # load and visualize reference path
    ref_path = np.genfromtxt('./f_shape_path.csv', delimiter=',', skip_header=1)
    if visualize_ref_path:
        plt.title("Reference Path")
        plt.plot(ref_path[:,0], ref_path[:,1])
        plt.show()

    # initialize a vehicle as a control target
    vehicle = Vehicle(
        max_steer_abs=0.785, # [rad]
        max_accel_abs=2.000, # [m/s^2]
        ref_path = ref_path[:, 0:2], # ndarray, size is <num_of_waypoints x 2>
        visualize = visualize_animation
    )
    vehicle.reset(
        init_state = np.array([0.0, 1.0, 0.0, 0.0]), # [x[m], y[m], yaw[rad], v[m/s]]
    )
    vehicle_trajectory = np.array([vehicle.get_state()[:2]])

    # initialize pid controllers for accelration control
    pid_lon_controller = PIDLongitudinalController(
        p_gain = 1.2,
        i_gain = 0.4,
        d_gain = 0.1,
        target_velocity = +5.0 # [m/s]
    )

    # initialize pid controllers for steering control
    pid_lat_controller = PIDLateralController(
        p_gain = x[0], ### optimization target ###
        i_gain = x[1], ### optimization target ###
        d_gain = 0.0,
        ref_path = ref_path, # ndarray, size is <num_of_waypoints x 2>
    )

    # initialize score
    episode_score = 0.0

    # simulation loop
    for i in range(sim_steps):

        # get current state of vehicle
        current_state = vehicle.get_state()

        try:
            # calculate control inputs
            current_velocity = current_state[3]
            accel_input = pid_lon_controller.calc_control_input(observed_vel=current_velocity, delta_t=delta_t)
            steer_input = pid_lat_controller.calc_control_input(observed_x=current_state, delta_t=delta_t)
            episode_score += pid_lat_controller.get_score_for_evaluation() 

        except IndexError as ex:
            # the vehicle has reached the end of the reference path
            # print("[WARNING] IndexError detected. Terminate simulation.")
            break

        # print current state and input force
        if show_info:
            print(f"[INFO] Time: {i*delta_t:>2.2f}[s], x={current_state[0]:>+3.3f}[m], y={current_state[1]:>+3.3f}[m], yaw={current_state[2]:>+3.3f}[rad], v={current_state[3]:>+3.3f}[m/s], steer={steer_input:>+6.2f}[rad], accel={accel_input:>+6.2f}[m/s]")

        # update states of vehicle
        vehicle.update(u=[steer_input, accel_input], delta_t=delta_t, vehicle_traj=vehicle_trajectory) # update vehicle state
        vehicle_trajectory = np.vstack((vehicle_trajectory, vehicle.get_state()[:2])) # record vehicle trajectory

    # show animation
    if visualize_animation:
        vehicle.show_animation(interval_ms=int(delta_t * 1000))

    # save animation as a mp4 file if necessary
    if save_animation:
        vehicle.save_animation(animation_filename, interval=int(delta_t * 1000), movie_writer="ffmpeg") # ffmpeg is required to write mp4 file

    if visualize_ref_path:
        plt.plot(ref_path[:,0], ref_path[:,1]) # reference path
        plt.plot(vehicle_trajectory[:,0], vehicle_trajectory[:,1]) # vehicle trajectory
        plt.show()

    return episode_score

# set sample parameters and try simulation once
p_gain = 2.75
i_gain = 1.36
x = [p_gain, i_gain]
episode_score = path_tracking_simulation(x, visualize_ref_path=True, visualize_animation=True, save_animation=False, show_info=True)
print(f"[INFO] p_gain = {p_gain}, i_gain = {i_gain}")
print(f"[INFO] Total score of the episode: {episode_score:.2f} (lower is better)")

## Define Gaussian Process Regressor

In [None]:
# gaussian process regressor
class GaussianProcessRegression:
    def __init__(self, kernel):
        self.kernel = kernel

    def fit(self, X_test: np.ndarray, X_train: np.ndarray, y_train: np.ndarray):
        # get matrix size
        train_data_size = X_train.shape[0]

        # calculate kernel matrix K (train_data_size x train_data_size)
        K = np.zeros((train_data_size, train_data_size))
        for r in range(train_data_size):
            for c in range(train_data_size):
                K[r, c] = self.kernel(X_train[r], X_train[c])

        # calculate expanded kernel matrix K_big (train_data_size x test_data_size)        
        K_big = np.zeros((train_data_size, test_data_size))
        for r in range(train_data_size):
            for c in range(test_data_size):
                K_big[r, c] = self.kernel(X_train[r], X_test[c])

        # calculate mean and variance
        s = self.kernel(X_test, X_test)
        yy = np.linalg.inv(K) @ y_train
        y_pred = K_big.T @ yy
        y_var = s - np.diag(K_big.T @ np.linalg.inv(K) @ K_big)

        # return prediction and variance as lists
        return y_pred.flatten(), y_var.flatten()

    def optimize(self):
        # TODO: optimize hyperparameters automatically
        pass

# define kernel function
def rbf_kernel(x_i: np.ndarray, x_j: np.ndarray) -> float:
    theta_1 = 1.0
    theta_2 = 0.5
    return theta_1 * np.exp(-1.0 * np.linalg.norm(x_i - x_j, ord=2) ** 2 / theta_2)

# Execute Bayesian Optimization

In [None]:
# settings
min_x1, max_x1 = 0.0, +15.0
min_x2, max_x2 = 0.0, +15.0
test_data_size_x1 = 50
test_data_size_x2 = 100
test_data_size = test_data_size_x1 * test_data_size_x2
initial_train_data_size = 1
num_total_trials = 300

## generate test data
x1_test = np.linspace(min_x1, max_x1, test_data_size_x1)
x2_test = np.linspace(min_x2, max_x2, test_data_size_x2)
x1_test_mesh, x2_test_mesh = np.meshgrid(x1_test, x2_test)
X_test_mesh = np.stack([x1_test_mesh, x2_test_mesh], axis=0)

# define acquisition function (upper confidence bound)
def UCB(mu: np.ndarray, sigma: np.ndarray, num_current_trial) -> np.ndarray:
    # [type 1] set k as constant
    # k = 500.0
    # [type 2] set k as function of n
    n = (num_current_trial+1)
    k = 100.0 * np.sqrt(np.log(n) / n)
    return - mu - k * sigma # lower score is better

# run bayesian optimization
## initialize training and test data
x1_train = np.random.uniform(min_x1, max_x1, initial_train_data_size)
x2_train = np.random.uniform(min_x2, max_x2, initial_train_data_size)
y_train = np.array([path_tracking_simulation([x1_train[train_idx], x2_train[train_idx]]) for train_idx in range(initial_train_data_size)])
X_train = np.stack([x1_train, x2_train], axis=1)

X_test_list = np.stack([x1_test_mesh.flatten(), x2_test_mesh.flatten()], axis=1)
gpr = GaussianProcessRegression(kernel=rbf_kernel)
y_pred_list, y_var_list = gpr.fit(X_test_list, X_train, y_train)
ucb = UCB(y_pred_list, y_var_list, 0)
max_ucb_idx = np.argmax(ucb)

## repeat exploration process for num_total_trials times
# initialize log file
log_txt_file = "bayesian_optimization_log.txt"
with open(log_txt_file, "w") as f:
    f.write("")
# initialize counter
trial = 0
# initialize random sampling flag
random_sampling_flag = False # it is for avoiding the stuck in the exploration
while trial < num_total_trials:
    # increment trial counter
    trial += 1

    # calculate UCB (Upper Confidence Bound)
    ucb = UCB(y_pred_list, y_var_list, trial)
    max_ucb_idx = np.argmax(ucb)

    # get next exploration point which has maximum UCB
    if not random_sampling_flag:
        x_next = X_test_list[max_ucb_idx]
        y_next = path_tracking_simulation(x_next) # try simulation with new parameter (x_next)
    else:
        # randomly sample x_next
        x_next = np.random.uniform([min_x1, min_x2], [max_x1, max_x2], 2)
        y_next = path_tracking_simulation(x_next) # try simulation with new parameter (x_next)
        random_sampling_flag = False

    # check if X_train already has x_next, if not, append x_next to X_train
    if not np.any(np.all(X_train == x_next, axis=1)):
        X_train = np.append(X_train, [x_next], axis=0)
        y_train = np.append(y_train, y_next)
    else:
        print("[WARNING] x_next is already in X_train")
        # try random sampling in the next loop
        random_sampling_flag = True
        trial -= 1
        continue

    # fit GPR and get prediction and variance
    y_pred_list, y_var_list = gpr.fit(X_test_list, X_train, y_train)
    min_pred_idx = np.argmin(y_pred_list)
    info = f"Trial: {trial:04d}, explored x: {x_next}, explored y = f(x) = {y_next:.2f}, current optimal x*: {X_test_list[min_pred_idx]}, current optimal y = f(x*) = {y_pred_list[min_pred_idx]:.2f}, explored current optimal: {X_train[np.argmin(y_train)]}, explored current optimal y = f(x*) = {np.min(y_train):.2f}"
    # print(f"Trial: {trial:02d}, explored x: {x_next}, explored y = f(x) = {y_next:.2f}, current optimal x*: {X_test_list[min_pred_idx]}, current optimal y = f(x*) = {y_pred_list[min_pred_idx]:.2f}, explored current optimal: {X_train[np.argmin(y_train)]}, explored current optimal y = f(x*) = {np.min(y_train):.2f}")
    print(info)
    # add info to the log file
    with open(log_txt_file, "a") as f:
        f.write(info + "\n")

    # save the result as a figure
    if trial % 10 == 0:
        # plot the result [y=f(x)]
        fig = plt.figure(figsize=(7, 6))
        fig.suptitle(f"Trial: {trial:02d}", fontsize=16)
        ax = fig.add_subplot(111, projection='3d', computed_zorder=False)
        # plot explored points
        ax.scatter(X_train[:, 0], X_train[:, 1], y_train.flatten(), c='blue', marker='o', zorder=2, label='explored points')
        y_pred_mesh = y_pred_list.reshape(test_data_size_x2, test_data_size_x1) # confusing order, but correct
        # plot prediction as a surface
        ax.plot_surface(x1_test_mesh, x2_test_mesh, y_pred_mesh, cmap='viridis', alpha=0.6, zorder=1, label='prediction')
        # plot latest exploration point
        ax.scatter(x_next[0], x_next[1], y_next, fc="none", ec="blue", s=150, linewidth=2, zorder=4, label='latest exploration')
        # plot maximum prediction
        ax.scatter(X_test_list[min_pred_idx, 0], X_test_list[min_pred_idx, 1], y_pred_list[min_pred_idx], fc="none", ec="#FF0000", s=150, linewidth=2, zorder=4, label='optimal prediction')
        # plot settings
        ax.set_xlabel('P Gain', fontsize=14)
        ax.set_ylabel('I Gain', fontsize=14)
        ax.view_init(elev=50, azim=-140) # rotate the 3d view
        plt.legend(loc="upper center", ncol=4, bbox_to_anchor=(0.5, 1.08))
        plt.savefig(f"bo_{trial:04d}.png", dpi=300)
        plt.close()

# print optimal solution
min_pred_idx = np.argmin(y_pred_list)
print(f"Optimal solution: x* = {X_test_list[min_pred_idx]}, y = f(x*) = {y_pred_list[min_pred_idx]:.2f}")

# print optimal solution from the exploration
min_train_idx = np.argmin(y_train)
print(f"Optimal solution from the exploration: x* = {X_train[min_train_idx]}, y = f(x*) = {y_train[min_train_idx]:.2f}")