In [16]:
from db_stat import DBStatManager  ,DBSystemManager
from datetime import datetime
from time_util import TimeUtil # DB 시간에 동기화 시키도록 만들어짐 
import math
import json
import matplotlib.pyplot as plt
import numpy as np 
from math import sqrt

from ekf_imp import EKF_basic, EKF_ver1, EKF_ver2


ekf_basic = EKF_basic()
ekf_ver1 = EKF_ver1()
ekf_ver2 = EKF_ver2()

db_stat_manager = DBStatManager('/home/netai/Omniverse/dt_server/UWB_EKF/config.json')  
db_space_manager = DBSystemManager('/home/netai/Omniverse/dt_server/UWB_EKF/config.json')

Database connection successfully established.
Database connection successfully established system Manager.


In [17]:
def convert_db_bounds_to_dict(bounds):
    if bounds:
        return {
            'x_min': bounds[0],
            'x_max': bounds[1],
            'y_min': bounds[2],
            'y_max': bounds[3]
        }
    else:
        return None
    

# 이동경로 벗어난 데이터 필터링
def check_space_bound_out(data, space_bound, max_speed=5):
    """
    Remove movements that are outside the space bounds.
    :param movements: List of tuples, each containing (orientation_z, uwb_x, uwb_y, timestamp)
    :param space_bound: Dictionary with 'x_min', 'x_max', 'y_min', 'y_max'
    :return: Filtered movements and removed movements
    """
    filtered = []
    removed = []
    for movement in data:
        # 올바른 인덱스를 사용하여 x, y 값을 추출합니다.
        orientation_z = movement[0]
        x = movement[1]
        y = movement[2]
        timestamp = movement[3]

        if space_bound['x_min'] <= x <= space_bound['x_max'] and space_bound['y_min'] <= y <= space_bound['y_max']:
            filtered.append(movement)
        else:
            removed.append(movement)


    #print(f"Removed {len(removed)} movements outside the space bounds.")
    return filtered, removed


def calculate_errors(movements, start_x, start_y, end_x, end_y, total_seconds):
    """ Calculate errors between theoretical and actual positions over time. 
    
    [(0.99595577, 18.7, -21.5, 1714992788.946, 1.6556671, 0.02457608)"""
    errors = []
    initial_timestamp = movements[0][3]
    for movement in movements:
        _, x, y,  timestamp, vx, vy  = movement
        current_timestamp = timestamp
            #elapsed_time = (current_timestamp - initial_timestamp).total_seconds()
        elapsed_time = timestamp - initial_timestamp
        tx, ty = calculate_theoretical_position(start_x, start_y, end_x, end_y, total_seconds, elapsed_time)
        distance_error = sqrt((tx - x) ** 2 + (ty - y) ** 2) * 100  # Convert to cm
        errors.append((current_timestamp, distance_error))
    return errors

def calculate_errors_ekf(movements, start_x, start_y, end_x, end_y, total_seconds):
    """
    Calculate errors between theoretical and actual positions over time.
    Each movement contains: orientation_z, uwb_x, uwb_y, timestamp (in float, seconds since epoch)
    [1.000936587001754, 18.689811599889207, -21.509029285714288, 1714992789.011]
[0.31836727466887216, 18.657138926895836, -21.5123205, 0.49723884275117824, -0.00476929180079249, 1714992789.011]
    
    """
    errors = []
    #initial_timestamp = datetime.fromtimestamp(movements[0][3])  # 첫 번째 움직임의 timestamp 변환
    length = len(movements[0])
    if length == 4:
        initial_timestamp = movements[0][3]
    elif length == 6:
        initial_timestamp = movements[0][5]

    for movement in movements:

        if length == 4:
            _, x, y, timestamp = movement
        elif length == 6:
            _, x, y,  vx, vy, timestamp  = movement
        else:
            continue  # Skip if the number of elements is neither 4 nor 6

        current_timestamp = timestamp
            #elapsed_time = (current_timestamp - initial_timestamp).total_seconds()
        elapsed_time = timestamp - initial_timestamp
        tx, ty = calculate_theoretical_position(start_x, start_y, end_x, end_y, total_seconds, elapsed_time)
        distance_error = sqrt((tx - x) ** 2 + (ty - y) ** 2) * 100  # Convert to cm
        errors.append((current_timestamp, distance_error))


    return errors


def calculate_theoretical_position(start_x, start_y, end_x, end_y, total_seconds, elapsed_time):
    """ Calculate the theoretical position at a given elapsed time. """
    dx = end_x - start_x
    dy = end_y - start_y
    proportion = elapsed_time / total_seconds
    theoretical_x = start_x + dx * proportion
    theoretical_y = start_y + dy * proportion
    return theoretical_x, theoretical_y


# 시간대별로 발생한 오차 크기 그려주는 함수
def plot_errors_timeseries(errors, title = 'Distance Errors Over Time'):
    """ Plot the errors over time. """
    times = [error[0] for error in errors]
    values = [error[1] for error in errors]
    plt.figure(figsize=(10, 5))
    plt.plot(times, values, marker='o', linestyle='-', color='r')
    plt.title(title)
    plt.xlabel('Time')
    plt.ylabel('Error (cm)')
    plt.grid(True)
    plt.show()
    plt.close()


# 실제 공간에서 이동한 경로와 측정한 경로 그려주는 함수
def plot_path(start_x, start_y, end_x, end_y, movements, anno=True, title='Actual and Measured Paths'):
    """ Plot the actual and measured paths. """
    actual_x = [start_x, end_x]
    actual_y = [start_y, end_y]
    measured_x = [movement[1] for movement in movements]
    measured_y = [movement[2] for movement in movements]
    plt.figure(figsize=(10, 5))
    plt.plot(actual_x, actual_y, marker='o', linestyle='-', color='b', label='Actual Path')
    
    plt.plot(measured_x, measured_y, marker='o', linestyle='-', color='r', label='Measured Path')

    if anno:
        plt.annotate('Start', (start_x, start_y), textcoords="offset points", xytext=(-10, 10),
                    ha='center', arrowprops=dict(facecolor='blue', shrink=0.05))
        plt.annotate('End', (end_x, end_y), textcoords="offset points", xytext=(10, -10),
                    ha='center', arrowprops=dict(facecolor='blue', shrink=0.05))

        plt.annotate('Measured Start', (measured_x[0], measured_y[0]), textcoords="offset points", xytext=(-30, 10),
                    ha='center', arrowprops=dict(facecolor='green', shrink=0.05))
        plt.annotate('Measured End', (measured_x[-1], measured_y[-1]), textcoords="offset points", xytext=(30, -10),
                    ha='center', arrowprops=dict(facecolor='red', shrink=0.05))

    # Calculate bounds for the plot
    all_x = actual_x + measured_x
    all_y = actual_y + measured_y
    padding = 1  # Padding to be added to the plot limits
    plt.xlim(min(all_x) - padding, max(all_x) + padding)
    plt.ylim(min(all_y) - padding, max(all_y) + padding)
    plt.title(title)
    plt.xlabel('X')
    plt.ylabel('Y')
    plt.legend()
    plt.grid(False)
    plt.show()
    plt.close()

In [18]:
import pandas as pd

def save_data_to_csv(original_data, ekf_data, filename="./ekf_results.csv"):
    # 데이터 프레임 생성
    df_original = pd.DataFrame(original_data, columns=['orientation_z', 'uwb_x', 'uwb_y', 'stamp'])
    df_ekf = pd.DataFrame(ekf_data, columns=['estimated_orientation_z', 'estimated_uwb_x', 'estimated_uwb_y', 'stamp'])

    # 원본 데이터와 EKF 데이터 병합
    df_combined = pd.merge(df_original, df_ekf, on='stamp', suffixes=('_orig', '_ekf'))

    # CSV 파일로 저장
    df_combined.to_csv(filename, index=False)
    print(f"Data saved to {filename}")

def real_data_basic(data, P_k = None):
    
    if len(data) < 2:
        return None

        # 초기 상태 추정치: 첫 번째 측정값을 기반으로 설정
    initial_orientation_z = data[0][0]
    initial_uwb_x = data[0][1]
    initial_uwb_y = data[0][2]
    state_estimate_k_minus_1 = np.array([initial_orientation_z, initial_uwb_x, initial_uwb_y])

    control_vector_k_minus_1 = np.array([data[1][1]-initial_uwb_x,data[1][2]-initial_uwb_y])

    if P_k is None:
        P_k_minus_1 = 0.1 * np.eye(3)
    else :
        P_k_minus_1 = P_k
    
    ekf_results = []

  
    for i in range(1, len(data)):
                # 현재 행의 stamp와 이전 행의 stamp 차이를 계산하여 dk 설정
        if i > 0:
            dk = data[i][3] - data[i - 1][3]
                    #print(f"Time interval (dk) at row {i}: {dk}")

                    # Sensor observation vector: 여기서는 예시로 orientation_z, uwb_x, uwb_y를 사용
            dx = data[i][1] - data[i - 1][1]
            dy = data[i][2] - data[i - 1][2]
            distance = np.sqrt(dx**2 + dy**2)
            v = distance / dk
            yaw_rate = (data[i][0] - data[i-1][0]) / dk

            control_vector_k_minus_1 = np.array([v, yaw_rate])
            obs_vector_z_k = np.array([data[i][0], data[i][1], data[i][2]])
                    
                    # EKF 실행
            optimal_state_estimate_k, covariance_estimate_k, z_k_observation_vector = ekf_basic.ekf(
                        obs_vector_z_k,
                        state_estimate_k_minus_1,
                        control_vector_k_minus_1,
                        P_k_minus_1,
                        dk
            )
            ekf_results.append([*optimal_state_estimate_k, data[i][3]])

            # 상태 추정치와 공분산 행렬 업데이트
            state_estimate_k_minus_1 = optimal_state_estimate_k
            P_k_minus_1 = covariance_estimate_k

            #save_data_to_csv(data, ekf_results)
    return ekf_results, P_k_minus_1
                    

In [19]:
def real_data_basic_up(data, P_k = None, state_estimate_k = None, max_speed = 5):
    
    if not data or len(data) < 2:
        print("Not enough data to process.")
        return None

    control_vector_k_minus_1 = np.array([0.0,0,0])

    # 상태 벡터와 공분산 초기화

    if state_estimate_k is None:
        state_estimate_k_minus_1 = np.zeros(5)
    else:
        state_estimate_k_minus_1 = state_estimate_k

    if P_k is None:
        P_k_minus_1 = np.eye(5)
    else:
        P_k_minus_1 = P_k
    
    ekf_results = []

    initial_orientation_z = data[0][0]
    initial_uwb_x = data[0][1]
    initial_uwb_y = data[0][2]
    initial_ax = data[0][4]
    initial_ay = data[0][5]
    state_estimate_k_minus_1 = np.array([initial_orientation_z, initial_uwb_x, initial_uwb_y, initial_ax, initial_ay])
    initial_vx = 0.0
    initial_vy = 0.0
    

    for i in range(1, len(data)):
                
        # 현재 행의 stamp와 이전 행의 stamp 차이를 계산하여 dk 설정
        dk = data[i][3] - data[i - 1][3]
        
        current_vx = initial_vx + 0.5 * (data[i][4]+data[i-1][4]) * dk # 선형 가속도 ax 로 계산한 vx
        current_vy = initial_vy + 0.5 * (data[i][5]+data[i-1][5]) * dk # 선형 가속도 ay 계산한 vy
        dx = data[i][1] - data[i - 1][1] # UWb 데이터로 계산한 x 변화량
        dy = data[i][2] - data[i - 1][2] # UWb 데이터로 계산한 y 변화량
        distance = np.sqrt(dx**2 + dy**2) # UWb 데이터로 계산한 거리
        v = distance / dk # UWb 데이터로 계산한 속도
        uwb_x = data[i][1]
        uwb_y = data[i][2]
        #adjusted_vx = 0.5 * v * np.cos(np.arctan2(dy, dx)) + 0.5 * current_vx
        #adjusted_vy = 0.5 * v * np.sin(np.arctan2(dy, dx)) + 0.5 * current_vy
        # 속도가 최대 속도를 초과하는 경우


        #print(f"velocity: {v}")
        #print(f"current vx: {current_vx}, current vy: {current_vy}")
        #print(f"adjusted_vx: {adjusted_vx}, adjusted_vy: {adjusted_vy}")

        yaw_rate = (data[i][0] - data[i - 1][0]) / dk
        control_vector_k_minus_1 = np.array([v,  yaw_rate])
        #control_vector_k_minus_1 = np.array([current_vx, current_vy,  yaw_rate])
                    #print(f"Time interval (dk) at row {i}: {dk}")

        # Sensor observation vector: 여기서는 예시로 orientation_z, uwb_x, uwb_y, vx, vy 를 사용
        #obs_vector_z_k = np.array([yaw_rate, data[i][1], data[i][2], data[i][4], data[i][5]])
        obs_vector_z_k = np.array([yaw_rate, uwb_x, uwb_y, current_vx, current_vy])
                    
                    # EKF 실행
        optimal_state_estimate_k, covariance_estimate_k, z_k_observation_vector = ekf_ver1.ekf(
                        obs_vector_z_k,
                        state_estimate_k_minus_1,
                        control_vector_k_minus_1,
                        P_k_minus_1,
                        dk
            )
        ekf_results.append([*optimal_state_estimate_k, data[i][3]])

                    # 상태 추정치와 공분산 행렬 업데이트
        state_estimate_k_minus_1 = optimal_state_estimate_k
        P_k_minus_1 = covariance_estimate_k

                    # 결과 출력
                    #print(f"State Estimate at k={i}: {state_estimate_k_minus_1}")
                    #print(f"Covariance Estimate at k={i}: {P_k_minus_1}")

            #save_data_to_csv(data, ekf_results)
        # 속도도 갱신
        initial_vx = current_vx
        initial_vy = current_vy
        #print(initial_vx, initial_vy)
    return ekf_results, P_k_minus_1, state_estimate_k_minus_1
     
#real_data()

In [20]:
def real_data_basic_up2(data, P_k = None, state_estimate_k = None, max_speed = 5):
    
    if not data or len(data) < 2:
        print("Not enough data to process.")
        return None

    control_vector_k_minus_1 = np.array([4.5,0.0,0,0])

    # 상태 벡터와 공분산 초기화

    if state_estimate_k is None:
        state_estimate_k_minus_1 = np.zeros(5)
    else:
        state_estimate_k_minus_1 = state_estimate_k

    if P_k is None:
        P_k_minus_1 = np.eye(5)
    else:
        P_k_minus_1 = P_k
    
    ekf_results = []

    initial_orientation_z = data[0][0]
    initial_uwb_x = data[0][1]
    initial_uwb_y = data[0][2]
    initial_ax = data[0][4]
    initial_ay = data[0][5]
    state_estimate_k_minus_1 = np.array([initial_orientation_z, initial_uwb_x, initial_uwb_y, initial_ax, initial_ay])
    initial_vx = 0.0
    initial_vy = 0.0
    

    for i in range(1, len(data)):
                
        # 현재 행의 stamp와 이전 행의 stamp 차이를 계산하여 dk 설정
        dk = data[i][3] - data[i - 1][3]
        
        current_vx = initial_vx + 0.5 * (data[i][4]+data[i-1][4]) * dk # 선형 가속도 ax 로 계산한 vx
        current_vy = initial_vy + 0.5 * (data[i][5]+data[i-1][5]) * dk # 선형 가속도 ay 계산한 vy
        dx = data[i][1] - data[i - 1][1] # UWb 데이터로 계산한 x 변화량
        dy = data[i][2] - data[i - 1][2] # UWb 데이터로 계산한 y 변화량

        uwb_x = data[i][1]
        uwb_y = data[i][2]
        distance_x = np.sqrt(dx**2) / dk
        distance_y = np.sqrt(dy**2) / dk

        yaw_rate = (data[i][0] - data[i - 1][0]) / dk
        control_vector_k_minus_1 = np.array([distance_x, distance_y,  yaw_rate])
        #control_vector_k_minus_1 = np.array([current_vx, current_vy,  yaw_rate])
                    #print(f"Time interval (dk) at row {i}: {dk}")

        # Sensor observation vector: 여기서는 예시로 orientation_z, uwb_x, uwb_y, vx, vy 를 사용
        #obs_vector_z_k = np.array([yaw_rate, data[i][1], data[i][2], data[i][4], data[i][5]])
        obs_vector_z_k = np.array([yaw_rate, uwb_x, uwb_y, current_vx, current_vy])
                    
                    # EKF 실행
        optimal_state_estimate_k, covariance_estimate_k, z_k_observation_vector = ekf_ver2.ekf(
                        obs_vector_z_k,
                        state_estimate_k_minus_1,
                        control_vector_k_minus_1,
                        P_k_minus_1,
                        dk
            )
        ekf_results.append([*optimal_state_estimate_k, data[i][3]])

                    # 상태 추정치와 공분산 행렬 업데이트
        state_estimate_k_minus_1 = optimal_state_estimate_k
        P_k_minus_1 = covariance_estimate_k

                    # 결과 출력
                    #print(f"State Estimate at k={i}: {state_estimate_k_minus_1}")
                    #print(f"Covariance Estimate at k={i}: {P_k_minus_1}")

            #save_data_to_csv(data, ekf_results)
        # 속도도 갱신
        initial_vx = current_vx
        initial_vy = current_vy
        #print(initial_vx, initial_vy)
    return ekf_results, P_k_minus_1, state_estimate_k_minus_1
     
#real_data()

In [21]:
space_bound = convert_db_bounds_to_dict(db_space_manager.get_space_bounds())

print(space_bound)

{'x_min': 0.0, 'x_max': 28.0, 'y_min': -27.0, 'y_max': 0.0}


In [23]:
# 데이터 추출 로직

time = TimeUtil()

start = datetime(2024, 5, 30, 19, 0, 0) # 10:00:00
end = datetime(2024, 5, 30, 21, 34, 0) # 12:10:00

 # 팔요시 실제 측정한 시간으로 변경
actual_seconds = 16

start, end = time.calculate_time_bounds(start,end)

print(f"시작 시간: {start}", f"종료 시간: {end}")
movements = db_stat_manager.get_lines_data(start, end)

P_k = None
P_k_ekf = None
P_k2_ekf = None
state_estimate_k_minus_1_ekf = None
state_estimate_k_minus_2_ekf = None
if len(movements) > 0 :
    
    for move in movements:
        #print(f"line_id: {move[0]}")
        # 튜플의 경우, 각 필드에 대한 정확한 인덱스를 사용
        start_timestamp = move[1]  # 예시로 0번 인덱스가 시작 시간이라 가정
        end_timestamp = move[2]    # 예시로 1번 인덱스가 종료 시간이라 가정


        data = db_stat_manager.get_ekf_data_rosavg(start_timestamp, end_timestamp)
        #print(data)
        filtered_movements, outlier = check_space_bound_out(data, space_bound)
        actual_seconds = (end_timestamp - start_timestamp).total_seconds()
        #print(f"Actual seconds: {actual_seconds}")  

        errors = calculate_errors(filtered_movements, move[4], move[5], move[6], move[7], actual_seconds)
        #print(filtered_movements)

        average_error = sum(error[1] for error in errors) / len(errors)
        print(f"일반 UWB 평균 오차: {average_error:.2f} cm")
        #plot_errors_timeseries(errors, 'normal KF')
        #plot_path(move[4], move[5], move[6], move[7], filtered_movements, True, 'normal KF')

        ekf_results , P_k =real_data_basic(filtered_movements, P_k)
        #print(ekf_results)
        
        errors = calculate_errors_ekf(ekf_results, move[4], move[5], move[6], move[7], actual_seconds)
        average_error = sum(error[1] for error in errors) / len(errors)
        print(f"EKF UWB 평균 오차: {average_error:.2f} cm")
        #plot_errors_timeseries(errors, 'EKF + Yaw')
        #plot_path(move[4], move[5], move[6], move[7], ekf_results, True, 'EKF + Yaw')

        ekf_results , P_k_ekf, state_estimate_k_minus_1_ekf = real_data_basic_up(filtered_movements, P_k_ekf, state_estimate_k_minus_1_ekf)
        #print(ekf_results)
        errors = calculate_errors_ekf(ekf_results, move[4], move[5], move[6], move[7], actual_seconds)
        average_error = sum(error[1] for error in errors) / len(errors)
        print(f"가속도 추가한 EKF UWB 평균 오차: {average_error:.2f} cm")
        #plot_errors_timeseries(errors, 'EKF + Yaw + Acceleration')
        #plot_path(move[4], move[5], move[6], move[7], ekf_results, True, 'EKF + Yaw + Acceleration')

        ekf_results , P_k2_ekf, state_estimate_k_minus_2_ekf = real_data_basic_up2(filtered_movements, P_k2_ekf, state_estimate_k_minus_2_ekf)
        #print(ekf_results)
        errors = calculate_errors_ekf(ekf_results, move[4], move[5], move[6], move[7], actual_seconds)
        average_error = sum(error[1] for error in errors) / len(errors)
        print(f"가속도 추가한 EKF UWB ver2 평균 오차: {average_error:.2f} cm")
        


시작 시간: 2024-05-30 10:00:00+00:00 종료 시간: 2024-05-30 12:34:00+00:00
일반 UWB 평균 오차: 74.25 cm
EKF UWB 평균 오차: 157.06 cm
가속도 추가한 EKF UWB 평균 오차: 176.31 cm
가속도 추가한 EKF UWB ver2 평균 오차: 112.16 cm
일반 UWB 평균 오차: 86.63 cm
EKF UWB 평균 오차: 100.22 cm
가속도 추가한 EKF UWB 평균 오차: 106.27 cm
가속도 추가한 EKF UWB ver2 평균 오차: 105.12 cm
일반 UWB 평균 오차: 95.23 cm
EKF UWB 평균 오차: 135.73 cm
가속도 추가한 EKF UWB 평균 오차: 142.06 cm
가속도 추가한 EKF UWB ver2 평균 오차: 110.95 cm
일반 UWB 평균 오차: 96.36 cm
EKF UWB 평균 오차: 116.12 cm
가속도 추가한 EKF UWB 평균 오차: 122.37 cm
가속도 추가한 EKF UWB ver2 평균 오차: 122.37 cm
일반 UWB 평균 오차: 105.42 cm
EKF UWB 평균 오차: 94.95 cm
가속도 추가한 EKF UWB 평균 오차: 101.41 cm
가속도 추가한 EKF UWB ver2 평균 오차: 103.34 cm
일반 UWB 평균 오차: 441.45 cm
EKF UWB 평균 오차: 434.73 cm
가속도 추가한 EKF UWB 평균 오차: 435.03 cm
가속도 추가한 EKF UWB ver2 평균 오차: 434.50 cm
