In [1]:
import math  # Matematiksel işlemler için kullanılan Python'un yerleşik modülü
import math  # Aynı modül iki kez içe aktarılmış, ikinci satır gereksiz ve kaldırılabilir
import numpy as np  # Sayısal işlemler için kullanılan güçlü bir kütüphane (NumPy)
import matplotlib.pyplot as plt  # Grafikler çizmek için kullanılan matplotlib kütüphanesinin pyplot modülü
from typing import Tuple  # Tür ipuçları vermek için kullanılan Python'un typing modülünden Tuple sınıfı
from matplotlib import patches  # Matplotlib'te şekiller (dikdörtgen, çember vb.) oluşturmak için kullanılan modül
from matplotlib.animation import ArtistAnimation  # Matplotlib kullanarak animasyonlar oluşturmak için kullanılan modül
from IPython import display  # Jupyter Notebook ortamında çıktıları düzgün göstermek için kullanılan IPython modülü


In [None]:
class Vehicle():
    def __init__(
            self,
            wheel_base: float = 2.5,  # Araç dingil mesafesi [m]
            vehicle_width = 3.0,  # Araç genişliği [m]
            vehicle_length = 4.0,  # Araç uzunluğu [m]
            max_steer_abs: float = 0.523,  # Maksimum direksiyon açısı [rad]
            max_accel_abs: float = 2.000,  # Maksimum ivmelenme [m/s^2]
            ref_path: np.ndarray = np.array([[-100.0, 0.0], [100.0, 0.0]]),  # Referans yol noktaları
            obstacle_circles: np.ndarray = np.array([[-2.0, 1.0, 1.0], [2.0, -1.0, 1.0]]),  # Engel pozisyonları ve yarıçapları
            delta_t: float = 0.05,  # Güncelleme süresi adımı [s]
            visualize: bool = True,  # Görselleştirme flag'i
        ) -> None:
        """Araç ortamını başlatır."""
        # Araç parametreleri
        self.wheel_base = wheel_base  # Dingil mesafesi
        self.vehicle_w = vehicle_width  # Araç genişliği
        self.vehicle_l = vehicle_length  # Araç uzunluğu
        self.max_steer_abs = max_steer_abs  # Maksimum direksiyon açısı
        self.max_accel_abs = max_accel_abs  # Maksimum ivmelenme
        self.delta_t = delta_t  # Zaman adımı
        self.ref_path = ref_path  # Referans yol

        # Engel parametreleri
        self.obstacle_circles = obstacle_circles  # Engel listesi

        # Görselleştirme ayarları
        self.view_x_lim_min, self.view_x_lim_max = -20.0, 20.0
        self.view_y_lim_min, self.view_y_lim_max = -25.0, 25.0
        self.minimap_view_x_lim_min, self.minimap_view_x_lim_max = -40.0, 40.0
        self.minimap_view_y_lim_min, self.minimap_view_y_lim_max = -10.0, 40.0

        # Ortamı sıfırla
        self.visualize_flag = visualize
        self.reset()

    def reset(
            self, 
            init_state: np.ndarray = np.array([0.0, 0.0, 0.0, 0.0]),  # Başlangıç durumu [x, y, yaw, v]
        ) -> None:
        """Ortami başlangıç durumuna sıfırlar."""
        
        # Durum değişkenlerini sıfırla
        self.state = init_state

        # Animasyon karelerini temizle
        self.frames = []

        if self.visualize_flag:
            # Şekil ve alt grafiklerin hazırlanması
            self.fig = plt.figure(figsize=(9,9))
            self.main_ax = plt.subplot2grid((3,4), (0,0), rowspan=3, colspan=3)
            self.minimap_ax = plt.subplot2grid((3,4), (0,3))
            self.steer_ax = plt.subplot2grid((3,4), (1,3))
            self.accel_ax = plt.subplot2grid((3,4), (2,3))

            # Grafik ayarları
            self.main_ax.set_aspect('equal')
            self.main_ax.set_xlim(self.view_x_lim_min, self.view_x_lim_max)
            self.main_ax.set_ylim(self.view_y_lim_min, self.view_y_lim_max)
            self.main_ax.tick_params(labelbottom=False, labelleft=False, labelright=False, labeltop=False)
            self.main_ax.tick_params(bottom=False, left=False, right=False, top=False)

            self.minimap_ax.set_aspect('equal')
            self.minimap_ax.axis('off')
            self.minimap_ax.set_xlim(self.minimap_view_x_lim_min, self.minimap_view_x_lim_max)
            self.minimap_ax.set_ylim(self.minimap_view_y_lim_min, self.minimap_view_y_lim_max)

            self.steer_ax.set_title("Steering Angle", fontsize="12")
            self.steer_ax.axis('off')

            self.accel_ax.set_title("Acceleration", fontsize="12")
            self.accel_ax.axis('off')

            self.fig.tight_layout()

    def update(
            self, 
            u: np.ndarray,  # Kontrol girdisi [direksiyon açısı, ivmelenme]
            delta_t: float = 0.0,  # Zaman adımı
            append_frame: bool = True,  # Kare kaydedilsin mi?
            optimal_traj: np.ndarray = np.empty(0),  # MPPI'den gelen tahmini en iyi yol
            sampled_traj_list: np.ndarray = np.empty(0),  # MPPI tarafından örneklenen yollar
        ) -> None:
        """Araç durum değişkenlerini günceller."""

        # Önceki durumu kaydet
        x, y, yaw, v = self.state

        # Parametreleri hazırla
        l = self.wheel_base
        dt = self.delta_t if delta_t == 0.0 else delta_t

        # Kontrol girdilerini sınırla
        steer = np.clip(u[0], -self.max_steer_abs, self.max_steer_abs)
        accel = np.clip(u[1], -self.max_accel_abs, self.max_accel_abs)

        # Durum değişkenlerini güncelle
        new_x = x + v * np.cos(yaw) * dt
        new_y = y + v * np.sin(yaw) * dt
        new_yaw = yaw + v / l * np.tan(steer) * dt
        new_v = v + accel * dt
        self.state = np.array([new_x, new_y, new_yaw, new_v])

        # Kare kaydetme
        if append_frame:
            self.append_frame(steer, accel, optimal_traj, sampled_traj_list)

    def get_state(self) -> np.ndarray:
        """Mevcut durumu döndürür."""
        return self.state.copy()

    def show_animation(self, interval_ms: int) -> None:
        """Kayıtlı kareleri göstererek animasyonu oynatır."""
        ani = ArtistAnimation(self.fig, self.frames, interval=interval_ms)  
        html = display.HTML(ani.to_jshtml())
        display.display(html)
        plt.close()

    def save_animation(self, filename: str, interval: int, movie_writer: str="ffmpeg") -> None:
        """Kayıtlı karelerden bir animasyon dosyası oluşturur ve kaydeder."""
        ani = ArtistAnimation(self.fig, self.frames, interval=interval)
        ani.save(filename, writer=movie_writer)

## [Test Simülasyonu] ##
sim_step = 100  # Simülasyon adım sayısı
delta_t = 0.05  # Zaman adımı [s]
ref_path = np.genfromtxt('../data/ovalpath.csv', delimiter=',', skip_header=1)  # Referans yol dosyasını oku

# Araç nesnesini oluştur
vehicle = Vehicle(ref_path=ref_path[:, 0:2])

# Simülasyonu çalıştır
for i in range(sim_step):
    vehicle.update(u=[0.6 * np.sin(i/3.0), 2.2 * np.sin(i/10.0)], delta_t=delta_t)  
    # u: Kontrol girdisi (direksiyon açısı ve ivmelenme)

# Animasyonu göster
vehicle.show_animation(interval_ms=delta_t*1000)

# Animasyonu kaydet
vehicle.save_animation("vehicle.mp4", interval=int(delta_t * 1000), movie_writer="ffmpeg")  
# Not: ffmpeg yüklenmiş olmalıdır.


In [None]:
class MPPIControllerForPathTracking():
    def __init__(
            self,
            delta_t: float = 0.05,  # Zaman adımı [s]
            wheel_base: float = 2.5,  # Araç dingil mesafesi [m]
            vehicle_width: float = 3.0,  # Araç genişliği [m]
            vehicle_length: float = 4.0,  # Araç uzunluğu [m]
            max_steer_abs: float = 0.523,  # Maksimum direksiyon açısı [rad]
            max_accel_abs: float = 2.000,  # Maksimum ivmelenme [m/s^2]
            ref_path: np.ndarray = np.array([[0.0, 0.0, 0.0, 1.0], [10.0, 0.0, 0.0, 1.0]]),  # Referans yol noktaları
            horizon_step_T: int = 30,  # MPPI planlama süresi adımı
            number_of_samples_K: int = 1000,  # Örneklenen yörünge sayısı
            param_exploration: float = 0.0,  # Keşif oranı (MPPI parametresi)
            param_lambda: float = 50.0,  # Ağırlıklandırma katsayısı (MPPI parametresi)
            param_alpha: float = 1.0,  # Alpha parametresi (MPPI parametresi)
            sigma: np.ndarray = np.array([[0.5, 0.0], [0.0, 0.1]]),  # Gürültü matrisi (MPPI'de kullanılan)
            stage_cost_weight: np.ndarray = np.array([50.0, 50.0, 1.0, 20.0]),  # Aşama maliyeti ağırlıkları [x, y, yaw, v]
            terminal_cost_weight: np.ndarray = np.array([50.0, 50.0, 1.0, 20.0]),  # Terminal maliyet ağırlıkları [x, y, yaw, v]
            visualize_optimal_traj = True,  # Eğer True ise, optimal yörünge görselleştirilir
            visualze_sampled_trajs = False,  # Eğer True ise, örneklenen yörüngeler görselleştirilir
            obstacle_circles: np.ndarray = np.array([[-2.0, 1.0, 1.0], [2.0, -1.0, 1.0]]),  # Engel bilgileri [obs_x, obs_y, obs_radius]
            collision_safety_margin_rate: float = 1.2,  # Çarpışma güvenlik marjı oranı
    ) -> None:
        """MPPI tabanlı yol takibi için kontrolcüyü başlatır."""
        # MPPI parametreleri
        self.dim_x = 4  # Durum vektörünün boyutu (x, y, yaw, hız)
        self.dim_u = 2  # Kontrol girdisi boyutu (direksiyon, ivme)
        self.T = horizon_step_T  # Planlama ufku
        self.K = number_of_samples_K  # Örneklenen yörünge sayısı
        self.param_exploration = param_exploration  # MPPI keşif parametresi
        self.param_lambda = param_lambda  # MPPI lambda parametresi
        self.param_alpha = param_alpha  # MPPI alpha parametresi
        self.param_gamma = self.param_lambda * (1.0 - self.param_alpha)  # MPPI gamma parametresi
        self.Sigma = sigma  # Gürültü matrisi
        self.stage_cost_weight = stage_cost_weight  # Aşama maliyet ağırlıkları
        self.terminal_cost_weight = terminal_cost_weight  # Terminal maliyet ağırlıkları
        self.visualize_optimal_traj = visualize_optimal_traj  # Optimal yörünge görselleştirme seçeneği
        self.visualze_sampled_trajs = visualze_sampled_trajs  # Örneklenen yörüngeleri görselleştirme seçeneği

        # Araç parametreleri
        self.delta_t = delta_t  # Zaman adımı [s]
        self.wheel_base = wheel_base  # Dingil mesafesi [m]
        self.vehicle_w = vehicle_width  # Araç genişliği [m]
        self.vehicle_l = vehicle_length  # Araç uzunluğu [m]
        self.max_steer_abs = max_steer_abs  # Maksimum direksiyon açısı [rad]
        self.max_accel_abs = max_accel_abs  # Maksimum ivmelenme [m/s^2]
        self.ref_path = ref_path  # Referans yolu

        # Engel parametreleri
        self.obstacle_circles = obstacle_circles  # Engel konumları
        self.collision_safety_margin_rate = collision_safety_margin_rate  # Çarpışma güvenlik marjı

        # MPPI değişkenleri
        self.u_prev = np.zeros((self.T, self.dim_u))  # Önceki kontrol girişleri

        # Referans yol bilgisi
        self.prev_waypoints_idx = 0  # Önceki en yakın yol noktası

    def calc_control_input(self, observed_x: np.ndarray) -> Tuple[float, np.ndarray]:
        """Optimal kontrol girdisini hesaplar."""
        # Önceki kontrol girişlerini yükle
        u = self.u_prev

        # Başlangıç durumunu gözlemlenen veri ile ayarla
        x0 = observed_x

        # Mevcut aracın en yakın olduğu yol noktasını bul
        self._get_nearest_waypoint(x0[0], x0[1], update_prev_idx=True)
        if self.prev_waypoints_idx >= self.ref_path.shape[0] - 1:
            print("[ERROR] Reached the end of the reference path.")
            raise IndexError

        # Aşama maliyeti listesi için tampon oluştur
        S = np.zeros((self.K))

        # Gürültü örnekle
        epsilon = self._calc_epsilon(self.Sigma, self.K, self.T, self.dim_u)  # Gürültü matrisi

        # Örneklenen kontrol girdileri için tampon
        v = np.zeros((self.K, self.T, self.dim_u))

        # Örnekleme döngüsü
        for k in range(self.K):
            # Başlangıç durumunu ayarla
            x = x0

            # Zaman adımı döngüsü
            for t in range(1, self.T + 1):
                # Gürültü ile kontrol girdisi hesapla
                if k < (1.0 - self.param_exploration) * self.K:
                    v[k, t - 1] = u[t - 1] + epsilon[k, t - 1]  # Keşif yerine mevcut kontrolü kullan
                else:
                    v[k, t - 1] = epsilon[k, t - 1]  # Keşif için tamamen rastgele girdi üret

                # Durum değişkenlerini güncelle
                x = self._F(x, self._g(v[k, t - 1]))

                # Aşama maliyetini hesapla
                S[k] += self._c(x) + self.param_gamma * u[t - 1].T @ np.linalg.inv(self.Sigma) @ v[k, t - 1]

            # Terminal maliyeti ekle
            S[k] += self._phi(x)

        # Ağırlıkları hesapla (Bilgi teorisine dayalı ağırlıklar)
        w = self._compute_weights(S)

        # Gürültü ağırlıklı ortalama al
        w_epsilon = np.zeros((self.T, self.dim_u))
        for t in range(0, self.T):
            for k in range(self.K):
                w_epsilon[t] += w[k] * epsilon[k, t]

        # Kontrol girişlerini yumuşatmak için hareketli ortalama filtresi uygula
        w_epsilon = self._moving_average_filter(xx=w_epsilon, window_size=10)

        # Yeni kontrol giriş dizisini hesapla
        u += w_epsilon

        # Optimal yörüngeyi hesapla
        optimal_traj = np.zeros((self.T, self.dim_x))
        if self.visualize_optimal_traj:
            x = x0
            for t in range(0, self.T):
                x = self._F(x, self._g(u[t]))
                optimal_traj[t] = x

        # İlk kontrol girişini döndür
        return u[0], u, optimal_traj


In [None]:
# simulation settings
delta_t = 0.05 # [sec]
sim_steps = 150 # [steps]
print(f"[INFO] delta_t : {delta_t:.2f}[s] , sim_steps : {sim_steps}[steps], total_sim_time : {delta_t*sim_steps:.2f}[s]")

# obstacle params
OBSTACLE_CIRCLES = np.array([
    [+ 8.0, +5.0, 4.0], # pos_x, pos_y, radius [m] in the global frame
    [+18.0, -5.0, 4.0], # pos_x, pos_y, radius [m] in the global frame
])

# load and visualize reference path
ref_path = np.genfromtxt('../data/ovalpath.csv', delimiter=',', skip_header=1)
plt.title("Reference Path")
plt.plot(ref_path[:,0], ref_path[:,1])
plt.show()

# initialize a vehicle as a control target
vehicle = Vehicle(
    wheel_base=2.5,
    max_steer_abs=0.523, # [rad]
    max_accel_abs=2.000, # [m/s^2]
    ref_path = ref_path[:, 0:2], # ndarray, size is <num_of_waypoints x 2>
    obstacle_circles = OBSTACLE_CIRCLES, # [obs_x, obs_y, obs_radius]
)
vehicle.reset(
    init_state = np.array([0.0, 0.0, 0.0, 0.0]), # [x[m], y[m], yaw[rad], v[m/s]]
)

# initialize a mppi controller for the vehicle
mppi = MPPIControllerForPathTracking(
    delta_t = delta_t*2.0, # [s]
    wheel_base = 2.5, # [m]
    max_steer_abs = 0.523, # [rad]
    max_accel_abs = 2.000, # [m/s^2]
    ref_path = ref_path, # ndarray, size is <num_of_waypoints x 2>
    horizon_step_T = 20, # [steps]
    number_of_samples_K = 500, # [samples]
    param_exploration = 0.05,
    param_lambda = 100.0,
    param_alpha = 0.98,
    sigma = np.array([[0.075, 0.0], [0.0, 2.0]]),
    stage_cost_weight = np.array([50.0, 50.0, 1.0, 20.0]), # weight for [x, y, yaw, v]
    terminal_cost_weight = np.array([50.0, 50.0, 1.0, 20.0]), # weight for [x, y, yaw, v]
    visualze_sampled_trajs = True, # if True, sampled trajectories are visualized
    obstacle_circles = OBSTACLE_CIRCLES, # [obs_x, obs_y, obs_radius]
    collision_safety_margin_rate = 1.2, # safety margin for collision check
)

# simulation loop
for i in range(sim_steps):

    # get current state of vehicle
    current_state = vehicle.get_state()

    try:
        # calculate input force with MPPI
        optimal_input, optimal_input_sequence, optimal_traj, sampled_traj_list = mppi.calc_control_input(
            observed_x = current_state
        )
    except IndexError as e:
        # the vehicle has reached the end of the reference path
        print("[ERROR] IndexError detected. Terminate simulation.")
        break

    # print current state and input force
    print(f"Time: {i*delta_t:>2.2f}[s], x={current_state[0]:>+3.3f}[m], y={current_state[1]:>+3.3f}[m], yaw={current_state[2]:>+3.3f}[rad], v={current_state[3]:>+3.3f}[m/s], steer={optimal_input[0]:>+6.2f}[rad], accel={optimal_input[1]:>+6.2f}[m/s]")

    # update states of vehicle
    vehicle.update(u=optimal_input, delta_t=delta_t, optimal_traj=optimal_traj[:, 0:2], sampled_traj_list=sampled_traj_list[:, :, 0:2])

# show animation
vehicle.show_animation(interval_ms=int(delta_t * 1000))
# save animation
vehicle.save_animation("mppi_pathtracking_obav_demo.mp4", interval=int(delta_t * 1000), movie_writer="ffmpeg") # ffmpeg is required to write mp4 file