# Import Packages

In [2]:
from pprint import pprint
import sys
from ipywidgets import widgets, Layout
import matplotlib.pyplot as plt
import matplotlib as mpl
from IPython.display import display
%matplotlib widget
import pandas as pd
import numpy as np
import os
import warnings
warnings.filterwarnings('ignore')
import time
import holoviews as hv
from holoviews import opts
from holoviews.operation.datashader import datashade, shade, dynspread, spread
from holoviews.operation.datashader import rasterize, ResamplingOperation # 이미지로 변환
from holoviews.operation import decimate # 표시할 데이터 한정
hv.extension('bokeh')
from bokeh.io import show
from scipy.signal import butter, filtfilt, find_peaks, peak_prominences
from scipy import signal as sig
from scipy.interpolate import interp1d
import re
import logging
logging.getLogger("param.main").setLevel(logging.CRITICAL)
from PIL import Image
import io
from sklearn.preprocessing import MinMaxScaler

import math
from banf.data_processing.preprocessing import ForMeasurement

  from holoviews.operation.datashader import rasterize, ResamplingOperation # 이미지로 변환


In [27]:
def createFolder(directory):
    try:
        if not os.path.exists(directory):
            os.makedirs(directory)
    except OSError:
        print ('Error: Creating directory. ' +  directory)

def bandpassFiltering(
    df: pd.DataFrame,     # Input DataFrame
    col: list,            # List of column names to apply the filter on
    lowcut: float,        # Low frequency cutoff
    highcut: float,       # High frequency cutoff
    fs: int,              # Sampling frequency
    order=5,              # Order of the Butterworth filter (default is 5)
    inplace: bool = False # Modify the DataFrame in-place or create new filtered columns (default is False)
):
    nyq = 0.5 * fs  # Calculate Nyquist frequency
    low = lowcut / nyq
    high = highcut / nyq

    for i in col:
        # Select appropriate filter type based on low and high cutoff frequencies
        if low == 0:
            b, a = butter(order, high, btype="low")
        elif high == 0:
            b, a = butter(order, low, btype="high")
        else:
            # Compute filter coefficients for a bandpass filter
            b, a = butter(order, [low, high], btype="band")
        
        # Apply the filter to each specified column in the DataFrame
        if not inplace:
            # Create new columns with filtered data
            df[i + "_filtered"] = filtfilt(b, a, df[i].to_numpy())
        else:
            # Modify original columns with filtered data
            df[i] = filtfilt(b, a, df[i].to_numpy())

    return df

In [4]:
"""
    road_type, sensor_type, speed_type
    주행 시험 시 주행한 도로, 센서, 속도에 대해 파일 경로에서 구분되는 값
    Amazon S3 Bucket에서 데이터를 불러올 때 시험 조건을 구분하기 위해 사용됨 (불러오는 방식은 해당 셀 하단 using_file_list 참조)
    화성 1차 주행 시험 데이터 사용 (2023-04-04)
    
    road_friction_dict, sensor_location_dict
    road_type, sensor_type에 대한 실제 도로 명칭 및 센서 부착 바퀴를 매핑하는 dictionary
"""

road_type = ["road_1"]
# road_type = ["road_1","road_2","road_3","road_4"]
sensor_type = ["TS0032"]
# sensor_type = ["TS0032", "TS0036"]
acc_type = ['Acceleration_X', 'Acceleration_Y', 'Acceleration_Z']
# speed_type = [10]
speed_type = [10, 30, 50, 70, 90, 110]

road_friction_dict = {"road_1":"μ 0.1", "road_2":"μ 0.2", "road_3":"μ 0.4", "road_4":"μ 0.7"}
sensor_location_dict = {"TS0032":"Front Right", "TS0036":"Front Left"}

fs = 1000
nyq = 0.5 * fs  # Calculate Nyquist frequency
order = 5
high = 250 / nyq
low_freq_range = 150

# max_freq_idx = int(nperseg / 16)

fm = ForMeasurement()
dir_list, file_list = fm.lookupS3Objects()

# 파일 명에 포함되는 단어를 기준으로 파일을 불러옴
using_file_list = [files for files in file_list if ("20230404" in files) and (road_type[0] in files) and (sensor_type[0] in files) and ("1000Hz" in files) and (int(re.sub(r'[^0-9]', '', files.split("/")[-2])) == speed_type[0])]
df_list = fm.importS3Objects(using_file_list)

In [None]:
"""
    무선 전력(P33V_MON)을 사용하여 타이어 내부 센서가 위치하는 각도를 계산하고, 그 각도를 토대로 Contact Patch 신호가 존재할 것으로 예상되는 각도 범위를 구함
    
    1. 각 시험에서 수집된 P33V_MON을 최소값 0, 최대값 1로 MinMax Scaling (P33V_MON 피크 검출 시 전압 크기에 대한 dependency 없애기 위함)
    2. 0~1 사이에 분포하는 P33V_MON의 피크 검출 (피크는 적어도 0.2 이상이어야 함, Peak Threshold = 0.2)
    3. 검출된 피크를 사용하여 차량에 부착된 프로파일러의 위치 대비 타이어 내부 센서가 위치하는 각도에 대해 초기 피크를 0도, 후기 피크를 360도로 가정하고 그 사이의 샘플 수만큼 나누어 한 샘플이 측정되는 순간 센서가 위치하는 각도를 계산
    4. Contact Patch 신호가 포함되어있을 것이라 예상되는 각도 범위를 지정하고 데이터를 토대로 검증 (현재 스타렉스 차량 기준 Front Tire 각도 범위 : 0°~150°, Rear Tire 각도 범위 : 120°~ 270°로 설정)
"""

scaler = MinMaxScaler()

for speed in speed_type: # 단일 시험로, 바퀴에서 수집된 센서에 대해 속도에 따른 적용
    using_file_list = [files for files in file_list if ("20230404" in files) and (road_type[0] in files) and (sensor_type[0] in files) and ("1000Hz" in files) and (int(re.sub(r'[^0-9]', '', files.split("/")[-2])) == speed)]
    df = fm.importS3Objects(using_file_list)[0]
    
    # (1) P33V_MON MinMax Scaling
    p33v = df['P33V_MON'].to_numpy()
    p33v = scaler.fit_transform(p33v.reshape(-1, 1)).reshape(-1)
    
    acc_z = df['Acceleration_Z'].to_numpy()
    acc_z = scaler.fit_transform(acc_z.reshape(-1, 1)).reshape(-1)
    
    # (2) P33V_MON Peak Detection
    peaks, _ = find_peaks(p33v, height=0.2)
    
    # (3) P33V_MON Peak Detection을 통해 얻은 각도를 토대로 타이어 내부 센서가 위치하는 각도 계산
    angular_degrees = 360 / np.diff(peaks)
    
    peaks = peaks[:-1]
    
    # (4) Contact Patch Range Angle 지정 및 표시
    # Steer Tire
    start = np.int64((0 / angular_degrees) + peaks)
    end = np.int64((150 / angular_degrees) + peaks)
    
    # Rear Tire
    # start = np.int64((120 / angular_degrees) + peaks)
    # end = np.int64((270 / angular_degrees) + peaks)
    
    peak = hv.Scatter((peaks, p33v[peaks]), 'Time(s)', 'Acceleration(g)', label='find_peaks')
    overlay = hv.Curve(p33v, label="P33V").options(color='blue') * hv.Curve(acc_z, label="Acceleration_Z").options(color='red')
    overlay *= peak
    
    for s, e in zip(start, end):
        overlay *= hv.VSpan(s, e).options(color='gray')
    
    overlay.opts(
        opts.Overlay(title=f"Contact Patch Range Angle, Speed :{speed} km/h, Sensor Location :{sensor_location_dict[sensor_type[0]]}, Road type :{road_friction_dict[road_type[0]]}"),
        opts.Curve(xlabel='time (samples)', ylabel='acceleration (g)', height=400, responsive=True, default_tools=['xpan,ypan,xbox_zoom,ybox_zoom,undo,redo,reset,save'], active_tools=['xbox_zoom'], toolbar='below', show_grid=True, show_legend=True),
        opts.Scatter(color='r', marker='x', size=10),
    )
    show(hv.render(overlay))

In [6]:
"""
    무선 전력 피크에 의해 구해진 Contact Patch 예상 각도 범위에 대해 다음과 같은 주파수 도메인 분석 알고리즘을 적용하여 Contact Patch Extraction
    
    1. 가속도 Z 필터링 (250Hz LPF, 5th Butterworth)
    2. P33V_MON 피크에 의해 계산된 Contact Patch 예상 각도 범위만큼의 신호 sample 길이를 n으로 나눈 값, 혹은 특정 고정된 크기 N을 STFT의 Window 길이로 지정 (param 1. Window Length)
    3. STFT의 Window 길이 – 1, 혹은 Window 길이의 절반 만큼 Overlap 길이로 설정 (param 2. Overlap Length)
    4. 위 조건대로 구분되는 Window에 해당하는 신호에 FFT 적용 (scipy.signal.periodogram은 주파수 도메인 변환 시 그 크기 값을 PSD로 계산함)
    5. 4번의 결과에 대해 저주파수 영역(0~Nhz)에 해당하는 PSD sum을 전체 주파수 범위(0~500Hz)에 해당하는 PSD sum으로 나눈 비율 계산 (param 3. Low Freq Range)
    (여기서 저주파 범위에 해당하는 N은 (현재 주행 속도(km/h) / 2) + 80 으로 설정) (가속도계 Z 신호 기준 Contact Patch에 해당하는 신호의 주파수 범위는 속도의 증가에 따라 커짐)
    6. 5번에 의해 계산된 비율이 특정 임계값 N보다 큰 경우, 해당 Window 구간에 Contact Patch가 포함된 것으로 간주 (param 4. Ratio Threshold)
    (여기서 임계값 B는 0.5로 설정)
    (해당 임계값을 크게 설정할 경우, Contact Patch가 포함된 신호인 Window의 수가 줄어들거나, Contact Patch인 경우에도 잡히지 않을 수 있으며, 임계값을 작게 설정할 경우 상대적으로 Contact Patch가 포함되었다고 판단되는 Window 수가 증가하지만 노이즈 구간이 많이 포함될 수 있음)
"""

scaler = MinMaxScaler()

for speed in speed_type:
    using_file_list = [files for files in file_list if ("20230404" in files) and (road_type[0] in files) and (sensor_type[0] in files) and ("1000Hz" in files) and (int(re.sub(r'[^0-9]', '', files.split("/")[-2])) == speed)]
    df = fm.importS3Objects(using_file_list)[0]
    
    ### Contact Patch Range Angle 범위 계산 ###
    p33v = df['P33V_MON'].to_numpy()
    p33v = scaler.fit_transform(p33v.reshape(-1, 1)).reshape(-1)
    
    peaks, _ = find_peaks(p33v, height=0.2)

    angular_degrees = 360 / np.diff(peaks)
    
    peaks = peaks[:-1]
    
    # Steer Tire
    start = np.int64((0 / angular_degrees) + peaks)
    end = np.int64((150 / angular_degrees) + peaks)
    
    # Rear Tire
    # start = np.int64((120 / angular_degrees) + peaks)
    # end = np.int64((270 / angular_degrees) + peaks)
    ### Contact Patch Range Angle 범위 계산 ###

    signal = df["Acceleration_Z"]

    # (1) 가속도 Z 필터링 (250Hz LPF, 5th Butterworth)
    b, a = butter(order, high, btype="low")
    signal = filtfilt(b, a, signal)
    
    low_freq_range = speed / 2 + 80 # param 3
    ratio_threshold = 0.5 # param 4
    
    overlay = hv.Curve(None)

    for acc in acc_type:
        overlay *= hv.Curve(df[acc], label=acc)
        
    st, et = [], []
    cp_cnt = 0

    for s, e in zip(start, end):
        # (2) P33V_MON 피크에 의해 계산된 Contact Patch 예상 각도 범위만큼의 신호 sample 길이를 n으로 나눈 값, 혹은 특정 고정된 크기 N을 STFT의 Window 길이로 지정 (param 1. Window Length)
        window_length = int((e-s) / 3) # param 1
        overlap_length = int(window_length / 2) # param2
        
        # (3) STFT의 Window 길이 – 1, 혹은 Window 길이의 절반 만큼 Overlap 길이로 설정 (param 2. Overlap Length)
        for seg in range(s, e-window_length, overlap_length):
            # (4) 위 조건대로 구분되는 Window에 해당하는 신호에 FFT 적용 (scipy.signal.periodogram은 주파수 도메인 변환 시 그 크기 값을 PSD로 계산함)
            (f ,S) = sig.periodogram(signal[seg:seg+window_length], fs=fs, window='hann')
            
            # (5, 6) 4번의 결과에 대해 저주파수 영역(0~Nhz)에 해당하는 PSD sum을 전체 주파수 범위(0~500Hz)에 해당하는 PSD sum으로 나눈 비율 계산 및 임계 비율(ratio_threshold)보다 큰 경우 Contact Patch로 판단 (param 3. Low Freq Range, param 4. Ratio Threshold)
            if np.sum(S[np.where(f<=low_freq_range)]) / np.sum(S) > ratio_threshold:
                if st == []:
                    st.append(seg)
                    et.append(seg+window_length)
                    cp_cnt += 1
                else:
                    if (seg) - et[-1] <= 0:
                        et[-1] = seg+window_length
                    else:
                        st.append(seg)
                        et.append(seg+window_length)
                        cp_cnt += 1
    
    for s, e in zip(start, end):
        overlay *= hv.VSpan(s, e).options(color='gray', alpha=0.3)
        
    for s, e in zip(st, et):
        overlay *= hv.VSpan(s, e).options(color='blue', alpha=0.3)

    overlay.opts(
        opts.Overlay(title=str(speed) + "kph / " + road_friction_dict[road_type[0]] + " / " + sensor_location_dict[sensor_type[0]] + " / " + str(ratio_threshold) + " < " + str(low_freq_range) + "Hz / mean samples length in range angle : " + str(np.mean(end - start))),
        opts.Curve(xlabel='time (samples)', ylabel='acceleration (g)', height=400, width=800, default_tools=['xpan,ypan,xbox_zoom,ybox_zoom,undo,redo,reset,save'], active_tools=['xbox_zoom'], toolbar='below', show_grid=True, show_legend=True),
    )
    show(hv.render(overlay))
    print(cp_cnt) # Contact Patch의 수

In [8]:
"""
    Low Frequency Range에 대한 적정 설정 기준 탐색을 위함
    speed / 2 + 80은 분석자가 주관적으로 근거 없이 결정한 값
    
    저주파 볼록 신호의 끝에 대한 객관적인 기준을 찾기 위함
"""

scaler = MinMaxScaler()

overlay = hv.Curve(None)
nperseg = 64

acc_type = ['Acceleration_Z']

for speed in speed_type:
    using_file_list = [files for files in file_list if ("20230404" in files) and (road_type[0] in files) and (sensor_type[0] in files) and ("1000Hz" in files) and (int(re.sub(r'[^0-9]', '', files.split("/")[-2])) == speed)]
    df = fm.importS3Objects(using_file_list)[0]

    psd_arr = np.zeros(501)
    
    for acc in acc_type:
        signal = df[acc]
        welch_result = sig.welch(signal, fs, nperseg=nperseg)
        itp = interp1d(welch_result[0], welch_result[1], kind='quadratic')

        psd_arr = psd_arr + itp(np.linspace(welch_result[0].min(), welch_result[0].max(), 501))
        
    psd_arr = scaler.fit_transform(psd_arr.reshape(-1, 1)).reshape(-1) # 크기가 아니라 저주파 볼록 신호의 끝을 보기 위함이므로 0~1사이 MinMax Scaling
        
    overlay *= hv.Curve((np.linspace(welch_result[0].min(), welch_result[0].max(), 501), psd_arr), label=f"{speed}km/h")
        
overlay.opts(
    opts.Overlay(title=f"{acc}, {road_friction_dict[road_type[0]]}, {sensor_location_dict[sensor_type[0]]}"),
    opts.Curve(xlabel='frequency (Hz)', ylabel='scaled value', height=400, responsive=True, default_tools=['xpan,ypan,xbox_zoom,ybox_zoom,undo,redo,reset,save'], active_tools=['xbox_zoom'], toolbar='below', show_grid=True, show_legend=True),
)
show(hv.render(overlay))

In [25]:
"""
    Ratio Threshold에 대한 적정 설정 기준 탐색을 위함
    0.5 ~ 0.95까지 0.05 단위로 테스트하고, 그에 따른 Contact Patch의 개수를 시각화
    (그려지는 데이터가 많아 일부 데이터만을 사용해서 시각화 테스트)
"""

scaler = MinMaxScaler()

speed_type = [10, 30, 50, 70, 90, 110]
# speed_type = [10]
ratio_threshold = [i/100 for i in range(5, 100, 5)] # param 4
# ratio_threshold = [i/100 for i in range(5, 100, 100)] # param 4
cp_cnt_each_r_t = [] # Ratio Threshold 설정에 따른 각각 속도에서 수집된 데이터의 Contact Patch 개수를 저장할 2차원 리스트

for speed in speed_type:
    cp_cnt_list = []
    for r_t in ratio_threshold:
        using_file_list = [files for files in file_list if ("20230404" in files) and (road_type[0] in files) and (sensor_type[0] in files) and ("1000Hz" in files) and (int(re.sub(r'[^0-9]', '', files.split("/")[-2])) == speed)]
        df = fm.importS3Objects(using_file_list)[0]
        
        ### Contact Patch Range Angle 범위 계산 ###
        p33v = df['P33V_MON'].to_numpy()
        p33v = scaler.fit_transform(p33v.reshape(-1, 1)).reshape(-1)
        
        peaks, _ = find_peaks(p33v, height=0.2)

        angular_degrees = 360 / np.diff(peaks)
        
        peaks = peaks[:-1]
        
        # Steer Tire
        start = np.int64((0 / angular_degrees) + peaks)
        end = np.int64((150 / angular_degrees) + peaks)
        
        # Rear Tire
        # start = np.int64((120 / angular_degrees) + peaks)
        # end = np.int64((270 / angular_degrees) + peaks)
        ### Contact Patch Range Angle 범위 계산 ###

        signal = df["Acceleration_Z"]

        # (1) 가속도 Z 필터링 (250Hz LPF, 5th Butterworth)
        b, a = butter(order, high, btype="low")
        signal = filtfilt(b, a, signal)
        
        low_freq_range = speed / 2 + 80 # param 3
            
        st, et = [], []
        cp_cnt = 0

        for s, e in zip(start, end):
            # (2) P33V_MON 피크에 의해 계산된 Contact Patch 예상 각도 범위만큼의 신호 sample 길이를 n으로 나눈 값, 혹은 특정 고정된 크기 N을 STFT의 Window 길이로 지정 (param 1. Window Length)
            window_length = int((e-s) / 3) # param 1
            overlap_length = int(window_length / 2) # param2
            
            # (3) STFT의 Window 길이 – 1, 혹은 Window 길이의 절반 만큼 Overlap 길이로 설정 (param 2. Overlap Length)
            for seg in range(s, e-window_length, overlap_length):
                # (4) 위 조건대로 구분되는 Window에 해당하는 신호에 FFT 적용 (scipy.signal.periodogram은 주파수 도메인 변환 시 그 크기 값을 PSD로 계산함)
                (f ,S) = sig.periodogram(signal[seg:seg+window_length], fs=fs, window='hann')
                
                # (5, 6) 4번의 결과에 대해 저주파수 영역(0~Nhz)에 해당하는 PSD sum을 전체 주파수 범위(0~500Hz)에 해당하는 PSD sum으로 나눈 비율 계산 및 임계 비율(ratio_threshold)보다 큰 경우 Contact Patch로 판단 (param 3. Low Freq Range, param 4. Ratio Threshold)
                if np.sum(S[np.where(f<=low_freq_range)]) / np.sum(S) > r_t:
                    if st == []:
                        st.append(seg)
                        et.append(seg+window_length)
                        cp_cnt += 1
                    else:
                        if (seg) - et[-1] <= 0:
                            et[-1] = seg+window_length
                        else:
                            st.append(seg)
                            et.append(seg+window_length)
                            cp_cnt += 1
        cp_cnt_list.append(cp_cnt)
        
    cp_cnt_each_r_t.append(cp_cnt_list)
    
overlay = hv.Curve(None)

for i in range(len(speed_type)):
    overlay *= hv.Curve((ratio_threshold, cp_cnt_each_r_t[i]), label=str(speed_type[i]) + "kph")
    overlay *= hv.Scatter((ratio_threshold, cp_cnt_each_r_t[i]), label=str(speed_type[i]) + "kph")

overlay.opts(
    opts.Overlay(title="contact patch count sum result per ration threshold"),
    opts.Curve(xlabel='ratio threshold', ylabel='contact patch count sum', height=400, width=800, default_tools=['xpan,ypan,xbox_zoom,ybox_zoom,undo,redo,reset,save'], active_tools=['xbox_zoom'], toolbar='below', show_grid=True, show_legend=True),
    opts.Scatter(size=20),
)
show(hv.render(overlay))