In [None]:
import numpy as np
from scipy import signal
#import matplotlib.pyplot as plt
import pandas as pd
from orangebox import Parser
from collections import Counter
import plotly.graph_objs as go
from plotly.subplots import make_subplots
import torch

ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject

In [None]:
# 字段列表
fields = [
    "loopIteration",
    "time",
    "axisP[0]",
    "axisP[1]",
    "axisP[2]",
    "axisI[0]",
    "axisI[1]",
    "axisI[2]",
    "axisD[0]",
    "axisD[1]",
    "axisF[0]",
    "axisF[1]",
    "axisF[2]",
    "rcCommand[0]",
    "rcCommand[1]",
    "rcCommand[2]",
    "rcCommand[3]",
    "setpoint[0]",      # Roll
    "setpoint[1]",      # Ptich
    "setpoint[2]",      # Yaw
    "setpoint[3]",      # Throttle
    "vbatLatest",
    "amperageLatest",
    "rssi",
    "gyroADC[0]",       # Gyro Roll
    "gyroADC[1]",       # Gyro Pitch
    "gyroADC[2]",       # Gyro Yaw
    "gyroUnfilt[0]",    # Gyro Roll  Unfilted
    "gyroUnfilt[1]",    # Gyro Pitch Unfilted
    "gyroUnfilt[2]",    # Gyro Yaw   Unfilted
    "accSmooth[0]",
    "accSmooth[1]",
    "accSmooth[2]",
    "debug[0]",
    "debug[1]",
    "debug[2]",
    "debug[3]",
    "debug[4]",
    "debug[5]",
    "debug[6]",
    "debug[7]",
    "motor[0]",
    "motor[1]",
    "motor[2]",
    "motor[3]",
    "eRPM[0]",
    "eRPM[1]",
    "eRPM[2]",
    "eRPM[3]",
    "flightModeFlags",
    "stateFlags",
    "failsafePhase",
    "rxSignalReceived",
    "rxFlightChannelsValid",
    "heading[0]",
    "heading[1]",
    "heading[2]",
    "axisSum[0]",
    "axisSum[1]",
    "axisSum[2]",
    "rcCommands[0]",
    "rcCommands[1]",
    "rcCommands[2]",
    "rcCommands[3]",
    "axisError[0]",
    "axisError[1]",
    "axisError[2]"
]

# 通过字典推导式生成对应序号
field_cor_num = {field: idx+1 for idx, field in enumerate(fields)}
print(field_cor_num)

In [None]:
def analyze_blackbox(
    file_path, 
    log_index=1,   # 从1开始算
    throttle_range=(1100, 1900),
    input_field_name='', 
    output_field_name='',
    window_sec=2,
    displayLog = True
):
    # 加载文件
    parser = Parser.load(file_path)
    parser.set_log_index(log_index)
    device = torch.device("cuda")
    # 读取所有帧数据，跳过损坏帧
    data = []
    frame_gen = parser.frames()
    error_counter = Counter()

    while True:
        try:
            frame = next(frame_gen)
            data.append(frame.data)
        except StopIteration:
            break
        except Exception as e:
            error_counter[str(type(e))] += 1
            continue

    print("Skipped frames by error type:", error_counter)

    # 检查是否成功解析到数据
    if not data:
        print("未能成功解析任何帧，终止分析。")
        return

    # 创建 DataFrame
    df = pd.DataFrame(data)

    # 检查必要字段是否存在
    required_fields = ['time', input_field_name, output_field_name, 'setpoint[3]']
    for field in required_fields:
        if field_cor_num.get(field) not in df.columns:
            print(f"缺少必要字段：{field}")
            return

    # 时间戳转换
    df[field_cor_num['time']] = df[field_cor_num['time']] / 1_000_000  # 微秒转秒


    # 分割时间窗口
    windows = []
    current_start = df[field_cor_num['time']].min()
    while current_start <= df[field_cor_num['time']].max():
        current_end = current_start + window_sec
        win_df = df[(df[field_cor_num['time']] >= current_start) & 
                    (df[field_cor_num['time']] < current_end)]
        if not win_df.empty:
            windows.append(win_df)
        current_start = current_end

    # 根据油门筛选窗口
    valid_windows = []
    for win_df in windows:
        throttle = win_df[field_cor_num['setpoint[3]']]
        if throttle_range[0] <= throttle.mean() <= throttle_range[1]:
            print(throttle.mean())
            win_df.sort_values(field_cor_num['time'])
            valid_windows.append(win_df)

    if not valid_windows:
        print("找不到符合油门范围的数据片段！")
        return

    # 估算采样率（待完成）
   

    fs = 4000 #实际的采样率为4000hz
    print(f"Estimated sampling rate: {fs:.2f} Hz") 
    all_magnitudes = []
    all_phases = []
    freqs_pos = None

    for win_df in valid_windows:
        x_np = win_df[field_cor_num[input_field_name]].values
        y_np = win_df[field_cor_num[output_field_name]].values
        if len(x_np) < 1024:
            continue

        # 转为 Tensor 并上传至 GPU
        x = torch.tensor(x_np, dtype=torch.float32, device=device)
        y = torch.tensor(y_np, dtype=torch.float32, device=device)

        n = len(x)
        nperseg = min(1024, n)
        step = nperseg // 2
        win = torch.hann_window(nperseg, device=device)

        segments = []
        Pxx_segments = []
        for start in range(0, n - nperseg + 1, step):
            x_seg = (x[start:start + nperseg] - x[start:start + nperseg].mean()) * win
            y_seg = (y[start:start + nperseg] - y[start:start + nperseg].mean()) * win

            # 标准化
            x_seg = x_seg / x_seg.std()
            y_seg = y_seg / y_seg.std()

            X = torch.fft.fft(x_seg)
            Y = torch.fft.fft(y_seg)

            Pxy = Y * torch.conj(X)
            Pxx = X * torch.conj(X)

            segments.append(Pxy)
            Pxx_segments.append(Pxx)

        Pxy_avg = torch.stack(segments).mean(dim=0)
        Pxx_avg = torch.stack(Pxx_segments).mean(dim=0)

    H = Pxy_avg / Pxx_avg

    # 频率轴（只需提取一次）
    if freqs_pos is None:
        freqs = torch.fft.fftfreq(nperseg, d=1/fs).to(device)
        mask = freqs > 0
        freqs_pos = freqs[mask].cpu().numpy()  # 转回 CPU 以绘图

    mag = 20 * torch.log10(torch.abs(H[mask]))
    mag -= mag.max()  # 归一化
    phase_rad = torch.unwrap(torch.angle(H[mask]))
    phase_deg = torch.rad2deg(phase_rad)

    # 移回 CPU 以绘图
    all_magnitudes.append(mag.cpu().numpy())
    all_phases.append(phase_deg.cpu().numpy())

    # 计算平均增益和相位
    avg_mag = np.mean(all_magnitudes, axis=0)
    avg_ph = np.mean(all_phases, axis=0)

    # 创建子图
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True,
                        vertical_spacing=0.1,
                        subplot_titles=('Gain (dB)', 'Phase (deg)'))

    # 横轴是否对数显示
    if displayLog:
        x_type = 'log'
    else:
        x_type = 'log'
    print(f"对数坐标轴:{displayLog}")
    print(f"Min freq: {freqs_pos.min()}, Max freq: {freqs_pos.max()}")
    # 添加 Gain 曲线
    fig.add_trace(go.Scatter(x=freqs_pos, y=avg_mag, mode='lines',
                            name='Gain (dB)',
                            hovertemplate='Freq: %{x:.2f} Hz<br>Gain: %{y:.2f} dB'),
                row=1, col=1)

    # 添加 Phase 曲线
    fig.add_trace(go.Scatter(x=freqs_pos, y=avg_ph, mode='lines',
                            name='Phase (deg)',
                            hovertemplate='Freq: %{x:.2f} Hz<br>Phase: %{y:.2f} deg'),
                row=2, col=1)

    # 设置坐标轴类型与标题
    fig.update_xaxes(title_text="Frequency (Hz)", type=x_type, row=1, col=1) 
    fig.update_xaxes(title_text="Frequency (Hz)", type=x_type, row=2, col=1)
    fig.update_yaxes(title_text="Gain (dB)", row=1, col=1)
    fig.update_yaxes(title_text="Phase (deg)", row=2, col=1)

    # 添加网格线
    fig.update_xaxes(showgrid=True)
    fig.update_yaxes(showgrid=True)

    # 布局设置
    fig.update_layout(height=600, width=900, title_text="Bode Plot (Interactive)",
                    hovermode="x unified")

    fig.show()


In [None]:
#freq_resp_unit_test
analyze_blackbox(file_path=r"C:\Users\AlexRice\Desktop\FreqResponse\btfl_002.bbl", input_field_name='setpoint[1]', output_field_name='gyroADC[1]')