In [1]:
import os
import numpy as np
import pandas as pd
import plotly_express as px      # 交互更好
import plotly.graph_objects as go
import matplotlib.pyplot as plt  # 不需要交互
import seaborn as sns            # 绘制残差图
import dfUtils
import scipy.signal as signal
from sklearn.linear_model import Lasso
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from customizedLogger import logger as logging
from const import insole_mapping
from const import forceplate_mapping
from matplotlib.font_manager import FontProperties
from matplotlib import rcParams
config = {
    "font.family": 'Times New Roman',
    "font.size": 30,
}
rcParams.update(config)

# 鞋垫标定：
    * 主要是标定传感器输出的力的大小
# 鞋垫标定步骤：
    1.穿着鞋垫在力板上行走，利用力板数据对鞋垫输出进行校准
    2.得到含有校准的鞋垫输出数据、躯干加速度数据以及关节角度数据，作为训练数据
    3.模型的输入数据就是IMU数据，输出数据就是鞋垫数据，后面可能不需要鞋垫也能够得到较为准确的足底压力估计值
    4.结合关节角度等几何信息，可能还可以根据垂直地面反力估计水平分量，得到更为准确的足底压力
# 利用力板数据对鞋垫输出进行校准：
    1.将每个步态的力板数据进行插值，插值到100Hz
    2.生成用于标定的数据，N行7列，前六列表示6行传感器在N个时刻的读数，最后一列表示N个时刻的测力板读数
    2.每一行的总和对应一个k，利用采集到的力板数据对k和b的值进行优化使用线性回归模型对每一行的k进行优化

In [None]:
# options
read_flag = False              # 读取鞋垫和力板数据，并对其进行校正
generate_flag = False          # 生成标定数据集，需要与read_flag同时开启
calibration_flag = True        # 进行标定，只需要读取生成的数据集
visualization_flag = False     # 可视化数据

# parameters
myWeight = 62                  # kg
insoleSamplingRate = 50        # Hz
forcePlateSamplingRate = 1200  # Hz
medfiltKernelSize = 13         # kernel size should be odd
gaussianFilterSigma = 9

In [None]:
if read_flag:

    ## * 读取鞋垫数据，包括原始数据和剪裁后的数据
    insoleDataClip = r"D:\code\Exoskeleton\data\20241014_calibration\clipped\insoleR10ms_clipped.xlsx"
    # insoleDatadf = pd.read_csv(insoleData, sep=',')
    insoleDataClipdf = pd.read_excel(insoleDataClip)
    # insoleDatadf = insoleDatadf.reset_index()
    insoleDataClipdf = insoleDataClipdf.reset_index()
    # insoleDataProcesseddf = insoleDatadf.copy()
    insoleDataClipProcesseddf = insoleDataClipdf.copy()

    ## * 读取力板数据，包括原始数据和剪裁后的数据
    forcePlateDataClip = r"D:\code\Exoskeleton\data\20241014_calibration\clipped\forceplateR10ms_clipped.xlsx"
    # forcePlateDatadf = pd.read_excel(forcePlateData)
    forcePlateDataClipdf = pd.read_excel(forcePlateDataClip)
    # forcePlateDatadf = forcePlateDatadf.reset_index()
    forcePlateDataClipdf = forcePlateDataClipdf.reset_index()
    # forcePlateDataProcesseddf = forcePlateDatadf.copy()
    forcePlateDataClipProcesseddf = forcePlateDataClipdf.copy()

    print("insole dataframe columns: ", insoleDataClipdf.columns) ## ? 如果源文件中含有index列最好删除掉重新生成
    print("forceplate dataframe columns: ", forcePlateDataClipdf.columns)

    ## * 对数据做截断处理
    samplingRate = 1200
    cutoffFreq = 20
    nyquist = 0.5 * samplingRate
    normalCutoff = cutoffFreq / nyquist
    b, a = signal.butter(4, normalCutoff, btype='low', analog=False)

    insoleDataClipdf['Sum'] = np.where(insoleDataClipdf['Sum'] < 3300, 0, insoleDataClipdf['Sum'])
    ## * 标定左鞋垫
    # forcePlateDataClipdf['FZ1_filtered'] = signal.filtfilt(b, a, forcePlateDataClipdf['FZ1'])
    # forcePlateDataClipdf['FZ1_zeroed'] = np.where(forcePlateDataClipdf['FZ1_filtered'] < 20, 0, forcePlateDataClipdf['FZ1_filtered'])
    ## * 标定右鞋垫
    forcePlateDataClipdf['FZ2_filtered'] = signal.filtfilt(b, a, forcePlateDataClipdf['FZ2'])
    forcePlateDataClipdf['FZ2_zeroed'] = np.where(forcePlateDataClipdf['FZ2_filtered'] < 30, 0, forcePlateDataClipdf['FZ2_filtered'])

    if visualization_flag:
        fig = go.Figure()

        # for i in ['I0', 'I1', 'I2', 'I3', 'I4', 'I5', 'Sum']:
        #     fig.add_trace(go.Scatter(
        #         x=insoleDataClipdf['index'].values,
        #         y=insoleDataClipdf[i].values,
        #         mode='lines',
        #         name=insole_mapping[i],
        #     ))

        for i in ['Sum']:
            fig.add_trace(go.Scatter(
                x=insoleDataClipdf['index'].values,
                y=insoleDataClipdf[i].values,
                mode='lines',
                name=insole_mapping[i],
            ))
        
        fig.add_trace(go.Scatter(
            x=forcePlateDataClipdf['index'].values,
            y=forcePlateDataClipdf['FZ2'].values,
            mode='lines',
            name=forceplate_mapping['FZ2'],
        ))

        fig.add_trace(go.Scatter(
            x=forcePlateDataClipdf['index'].values,
            y=forcePlateDataClipdf['FZ2_zeroed'].values,
            mode='lines',
            name="FZ2_zeroed",
        ))

        fig.show()

In [None]:
if generate_flag:

    ## * 从数据中提取步态周期
    ## * 标定左脚鞋垫
    # forceplateCycles = dfUtils.extractGaitCycles(forcePlateDataClipdf, 'FZ1_zeroed')
    ## * 标定右脚鞋垫
    forceplateCycles = dfUtils.extractGaitCycles(forcePlateDataClipdf, 'FZ2_zeroed')
    # forceplateCycles = dfUtils.extractGaitCycles(forcePlateDataClipdf, 'FZ2')
    insoleCycles = dfUtils.extractGaitCycles(insoleDataClipdf, 'Sum')
    
    # print(forceplateCycles[0])
    # print(insoleCycles[0])

    logging.info(f"Extract {len(forceplateCycles)} gait cycles of forceplate")
    logging.info(f"Extract {len(insoleCycles)} gait cycles of insole")

    '''
    如果二者步态周期数量不相等可能是存在以下原因：
        * 力板数据为经过滤波处理，存在较大毛刺
        * 鞋垫数据在摆动相下的力不为零，需要做进一步的截断处理
        * 可以通过可视化数据对原因进行分析
    '''

    if len(forceplateCycles) == len(insoleCycles):
        ## * 将鞋垫和力板每个步态周期的数据都插值成固定长度的数据并合并
        numberOfPoints = 100
        combinedCycles = []
        for i, (forcePlatecycle, insoleCycle) in enumerate(zip(forceplateCycles, insoleCycles)):
            if forcePlatecycle.empty or insoleCycle.empty:
                logging.error(f"Empty cycle detected for cycle {i+1} ...")
                break
            ## * 左脚鞋垫
            # forcePlateInterp = dfUtils.interpolatePerCycle(forcePlatecycle['FZ1_zeroed'], numberOfPoints)
            ## * 右脚鞋垫
            forcePlateInterp = dfUtils.interpolatePerCycle(forcePlatecycle['FZ2_zeroed'], numberOfPoints)
            insoleI0Interp = dfUtils.interpolatePerCycle(insoleCycle['I0'], numberOfPoints)
            insoleI1Interp = dfUtils.interpolatePerCycle(insoleCycle['I1'], numberOfPoints)
            insoleI2Interp = dfUtils.interpolatePerCycle(insoleCycle['I2'], numberOfPoints)
            insoleI3Interp = dfUtils.interpolatePerCycle(insoleCycle['I3'], numberOfPoints)
            insoleI4Interp = dfUtils.interpolatePerCycle(insoleCycle['I4'], numberOfPoints)
            insoleI5Interp = dfUtils.interpolatePerCycle(insoleCycle['I5'], numberOfPoints)
            insoleI6Interp = dfUtils.interpolatePerCycle(insoleCycle['I6'], numberOfPoints)
            insoleI7Interp = dfUtils.interpolatePerCycle(insoleCycle['I7'], numberOfPoints)
            insoleI8Interp = dfUtils.interpolatePerCycle(insoleCycle['I8'], numberOfPoints)
            insoleI9Interp = dfUtils.interpolatePerCycle(insoleCycle['I9'], numberOfPoints)
            insoleI10Interp = dfUtils.interpolatePerCycle(insoleCycle['I10'], numberOfPoints)
            insoleI11Interp = dfUtils.interpolatePerCycle(insoleCycle['I11'], numberOfPoints)
            insoleI12Interp = dfUtils.interpolatePerCycle(insoleCycle['I12'], numberOfPoints)
            insoleI13Interp = dfUtils.interpolatePerCycle(insoleCycle['I13'], numberOfPoints)
            insoleI14Interp = dfUtils.interpolatePerCycle(insoleCycle['I14'], numberOfPoints)
            insoleI15Interp = dfUtils.interpolatePerCycle(insoleCycle['I15'], numberOfPoints)
            insoleI16Interp = dfUtils.interpolatePerCycle(insoleCycle['I16'], numberOfPoints)
            insoleI17Interp = dfUtils.interpolatePerCycle(insoleCycle['I17'], numberOfPoints)
            combinedDf = pd.DataFrame({
                'I0': insoleI0Interp,
                'I1': insoleI1Interp,
                'I2': insoleI2Interp,
                'I3': insoleI3Interp,
                'I4': insoleI4Interp,
                'I5': insoleI5Interp,
                'I6': insoleI6Interp,
                'I7': insoleI7Interp,
                'I8': insoleI8Interp,
                'I9': insoleI9Interp,
                'I10': insoleI10Interp,
                'I11': insoleI11Interp,
                'I12': insoleI12Interp,
                'I13': insoleI13Interp,
                'I14': insoleI14Interp,
                'I15': insoleI15Interp,
                'I16': insoleI16Interp,
                'I17': insoleI17Interp,
                'forceplate': forcePlateInterp,
                'cycleNumber': i+1,
            })
            combinedCycles.append(combinedDf)
        if combinedCycles:
            allCyclesCombined = pd.concat(combinedCycles, ignore_index=True)
            logging.info(f"Number of Data: {len(allCyclesCombined)}")
        else:
            logging.error("No valid gait cycles were found for concatenation.")

        ## * 生成校准数据集
        ## * 左脚鞋垫
        # calibrationFile = r"D:\code\Exoskeleton\data\20241014_calibration\leftInsoleCalibration.xlsx"
        ## * 右脚鞋垫
        calibrationFile = r"D:\code\Exoskeleton\data\20241014_calibration\rightInsoleCalibration.xlsx"

        if os.path.isfile(calibrationFile):
            raise ValueError(f"{calibrationFile} is not empty!") ## 防止覆盖
        else:
            allCyclesCombined.to_excel(calibrationFile, index=False)
    else:
        raise ValueError("Length of forceplateCycles and insoleCycles are not equal!")

In [None]:
if calibration_flag:
    # calibrationDataFile = r"D:\code\Exoskeleton\data\20241014_calibration\leftInsoleCalibration.xlsx"
    calibrationDataFile = r"D:\code\Exoskeleton\data\20241014_calibration\rightInsoleCalibration.xlsx"

    ## * 根据力板数据对鞋垫数据进行校准
    data = pd.read_excel(calibrationDataFile)
    print(f"Columns of calibration file: {data.columns}")
    ### * 分割特征和目标变量
    regressionX = data.iloc[:, :18]
    regressionY = data.iloc[:, 18]
    ### * 分割数据集
    regressionX_train, regressionX_test, regressionY_train, regressionY_test = train_test_split(regressionX, regressionY, test_size=0.2, random_state=42)
    ### * 创建线性回归模型
    # calibrationModel = LinearRegression()
    calibrationModel = Lasso(alpha=0.0, positive=True) # alpha=0.0表示无正则化，仅强制非负
    ### * 训练模型
    calibrationModel.fit(regressionX_train, regressionY_train)
    ### * 预测
    predictionY = calibrationModel.predict(regressionX_test)
    ### * 评估模型
    mse = mean_squared_error(regressionY_test, predictionY)
    r2 = r2_score(regressionY_test, predictionY)
    logging.info(f"Root Mean Squared Error: {np.sqrt(mse)}")
    logging.info(f"R^2 Score: {r2}")
    ### * 获取校准系数
    coefficients = calibrationModel.coef_
    print(f"Coefficients: {coefficients}")
    ### * 可视化结果
    #### * 可视化实际值和预测值
    plt.figure(figsize=(10, 6))
    plt.plot(regressionY_test.values, label='Actual', marker='o')
    plt.plot(predictionY, label='Predicted', marker='x')
    plt.xlabel('Sample Index')
    plt.ylabel('Force Output (N)')
    plt.title('Actual vs Predicted Force Output')
    plt.legend()
    plt.show()
    #### * 绘制残差图
    residuals = regressionY_test - predictionY
    plt.figure(figsize=(10, 6))
    sns.histplot(residuals, kde=True) # kde=True设置是否显示核密度曲线
    plt.xlabel('Residual')
    plt.title('Residuals Distribution')
    plt.show()