# 整体配置

In [None]:
import numpy as np
import os
import re
import pandas as pd
from datetime import datetime
from datetime import timedelta
import matplotlib.pyplot as plt
from scipy import interpolate
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from scipy.optimize import minimize
import seaborn as sns
from scipy import stats
from sklearn.metrics import silhouette_score
from scipy.optimize import curve_fit
from sklearn.metrics import r2_score, mean_squared_error
from scipy.signal import savgol_filter
from statsmodels.tsa.seasonal import STL

In [None]:
plt.rcParams['font.sans-serif'] = ['SimHei']  # 设置字体为黑体
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题

# 车辙数据

In [None]:
# 全局配置
RUTPATH = '数据大赛/性能数据/车辙数据'
COLUMNS_NAME = [ '日期', '轮迹带1均值', '轮迹带1标准差', '轮迹带1变异系数', '轮迹带1样本量', 
                   '轮迹带2均值', '轮迹带2标准差', '轮迹带2变异系数', '轮迹带2样本量',
                   '断面均值', '断面标准差', '断面变异系数', '断面样本量']
KEY_WORDS = ['STR2', 'STR8']

In [None]:
# 从文件名中获取日期
def extract_and_convert_date(filename):
    match = re.search(r'(\d{8})\.xlsx$', filename)
    if match:
        date_str = match.group(1)  # 提取匹配到的日期字符串
        date_obj = datetime.strptime(date_str, '%Y%m%d')
        formatted_date = date_obj.strftime('%Y-%m-%d')
        return formatted_date

In [None]:
# 基于key_word提取对应的行数
def get_rut_data_rows(df, key_word):
    result_rows = []
    start_index = None
    for i in range(len(df)):
        if df.iloc[i, 0] == key_word:  # 找到key_word所在行
            start_index = i
            result_rows.append(i)  # 添加这个行序
        elif start_index is not None and pd.isna(df.iloc[i, 0]):  # 开始行下面的空白行
            result_rows.append(i)  # 添加这个行序
        elif start_index is not None and not pd.isna(df.iloc[i, 0]):  # 直到下一次遇到非空行
            break
    return result_rows

In [None]:
# 找到满足“第*次”的列
def get_rut_data_columns(df):
    result_columns = []
    str_list = df.iloc[0, :].values  # 提取表头
    pattern = r'^第.+次$'
    for i in range(len(str_list)):
        if not pd.isna(str_list[i]):
            if bool(re.fullmatch(pattern, str(str_list[i]))):
                result_columns.append(i)
                result_columns.append(i+1)
    return result_columns

In [None]:
# 对target_arr作统计分析
# target_arr是一个具有偶数列的array，所有奇数列都是轮迹带1所有偶数列都是轮迹带2，要分别求均值、标准差和变异系数，还有整个断面的
def summary_target_arr(target_arr, rut_date):
    # 两列两列地拼在一起，例如(7,6)的数组会被改成(21,2)
    rows_num, columns_num = target_arr.shape
    pairs_num = int(columns_num / 2)
    target_arr_reshape = np.zeros((rows_num * pairs_num, 2))
    for i in range(pairs_num):
        row_start = i * rows_num
        row_end = (i + 1) * rows_num
        target_arr_reshape[row_start:row_end, :] = target_arr[:, (i * 2):(i * 2 + 2)]
        
    # 开始整理统计量
    rut_data_tmp = pd.DataFrame(columns = COLUMNS_NAME)
    columns_mean = np.average(target_arr_reshape, axis=0)
    columns_std = np.std(target_arr_reshape, axis=0)
    columns_cv = columns_std / columns_mean
    rut_data_tmp.loc[0, '日期'] = rut_date
    rut_data_tmp.loc[0, '轮迹带1均值'], rut_data_tmp.loc[0, '轮迹带2均值'] = columns_mean
    rut_data_tmp.loc[0, '轮迹带1标准差'], rut_data_tmp.loc[0, '轮迹带2标准差'] = columns_std
    rut_data_tmp.loc[0, '轮迹带1变异系数'], rut_data_tmp.loc[0, '轮迹带2变异系数'] = columns_cv
    rut_data_tmp.loc[0, '轮迹带1样本量'] = rows_num * pairs_num
    rut_data_tmp.loc[0, '轮迹带2样本量'] = rows_num * pairs_num
    rut_data_tmp.loc[0, '断面均值'] = np.mean(target_arr_reshape)
    rut_data_tmp.loc[0, '断面标准差'] = np.std(target_arr_reshape)
    rut_data_tmp.loc[0, '断面变异系数'] = np.std(target_arr_reshape) / np.mean(target_arr_reshape)
    rut_data_tmp.loc[0, '断面样本量'] = rows_num * columns_num
    
    return rut_data_tmp

In [None]:
# 获取车辙数据
def get_rut_data(key_word):
    # keyword分别是STR2和STR8
    rut_data = pd.DataFrame(columns = COLUMNS_NAME)  # 空白df，逐一存放rut_data_tmp
    folder_list = os.listdir(RUTPATH)  # 搜索文件夹中所有文件夹
    for folder in folder_list:
        print(f'正在读取{folder}')
        file_list = os.listdir(str(RUTPATH) + '/' + folder)  # 搜索子文件夹中所有文件
        for file_name in file_list:
            print(f'正在读取{file_name}')
            rut_date = extract_and_convert_date(file_name)  # 从文件名中获取日期
            file_path = str(RUTPATH) + '/' + folder + '/' + file_name  # 完整相对路径
            
            with pd.ExcelFile(file_path) as xls:  # 判断是否有个表叫“内侧车道”
                if '内侧车道' in xls.sheet_names:
                    sheet_name = '内侧车道'
                else:
                    sheet_name = 'Sheet1'
            
            # 提取完整的数据，例如STR2的所有次数的轮迹带1和2的所有桩号数据
            df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
            rut_data_rows = get_rut_data_rows(df, key_word)  # 基于key_word提取对应的行数
            rut_data_columns = get_rut_data_columns(df)  # 找到满足“第*次”的列
            target_arr = df.iloc[rut_data_rows, rut_data_columns].to_numpy(dtype=float)  # 这是完整的array数据
            rut_data_tmp = summary_target_arr(target_arr, rut_date)
            rut_data = pd.concat([rut_data, rut_data_tmp], ignore_index=True)
    return rut_data

In [None]:
# 计算并汇总
key_word = KEY_WORDS[1]
rut_data = get_rut_data(key_word)

In [None]:
# 查看并输出
print(rut_data)
csv_name = key_word + '车辙数据.csv'
rut_data.to_csv(csv_name, index=False)  # index=False 表示不保存索引

# 温度数据

In [None]:
# 全局配置
TEPPATH = '数据大赛/环境数据/路面结构温湿度数据'
KEY_WORDS = ['STR2', 'STR8']
SENSOR_NUMS = [9, 10]

In [None]:
def get_tep_data(key_word_index):
    
    # 列名
    sensor_index_tmp = np.arange(SENSOR_NUMS[key_word_index]) + 1
    sensor_index = np.char.add('W', sensor_index_tmp.astype(str))
    columns_title = np.insert(sensor_index, 0, 'timestamp')
    
    # 读取数据
    tepdata = pd.DataFrame(columns = columns_title)  # 空数组
    folder_list = os.listdir(TEPPATH)
    for folder in folder_list:
        print(f'正在读取{folder}')
        file_list = os.listdir(str(TEPPATH) + '/' + folder)
        for file_name in file_list:
            if bool(re.search(KEY_WORDS[key_word_index], file_name)):  # 找到文件名带有STR2或8的
                print(f'正在读取{file_name}')
                file_path = str(TEPPATH) + '/' + folder + '/' + file_name  # 完整相对路径
                with pd.ExcelFile(file_path) as xls:  # 判断是否有个表叫“汇总”
                    if '汇总' in xls.sheet_names:
                        sheet_name = '汇总'
                        df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
                        df.columns = columns_title
                        df = df.loc[5:, :]
                        tepdata = pd.concat([tepdata, df], ignore_index=True)
                    else:
                        for sheet_name in xls.sheet_names:
                            print(f'正在读取{sheet_name}')
                            df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
                            df.columns = columns_title
                            df = df.loc[3:, :]
                            tepdata = pd.concat([tepdata, df], ignore_index=True)
    return tepdata

In [None]:
def clear_tep_data(tepdata):
    
    # 定义日期时间格式的正则表达式
    # 匹配格式为YYYY-MM-DD HH:MM:SS[.ffffff]
    pattern = r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(\.\d{6})?$'
    
    valid_indices = []
    # 找出不符合格式的行并删除
    for index, row in tepdata.iterrows():
        if re.match(pattern, str(row['timestamp'])):
            valid_indices.append(index)
            # 去掉微秒部分
            if '.' in str(row['timestamp']):
                cleaned_timestamp = str(row['timestamp']).split('.')[0]
                tepdata.loc[index, 'timestamp'] = cleaned_timestamp
    # 删除不符合格式的行
    tepdata = tepdata.loc[valid_indices].reset_index(drop=True)
    return tepdata

In [None]:
# 计算并汇总
key_word_index = 0
tepdata = get_tep_data(key_word_index)

In [None]:
# 清洗
tepdata_clear = clear_tep_data(tepdata)
print(tepdata_clear)
csv_name = KEY_WORDS[key_word_index] + '温度数据.csv'
tepdata_clear.to_csv(csv_name, index=False)  # index=False 表示不保存索引

# 应力数据

In [None]:
# 全局配置
STRESS_PATH = r'E:\BaiduNetdiskDownload\STR2+STR8'
KEY_WORDS = ['STR2', 'STR8']
SENSOR_TYPES = ['SP', 'SB', 'SV', 'PF', 'DM']
SENSOR_TYPES_DESCRIBE = ['沥青应变', '混凝土应变', '竖向应变', '土压力', '多点位移']
SENSOR_NUMS = [[8, 24, 2, 7, 4], [8, 24, 2, 7, 3]]
CHUCK_SIZE = 10000

In [None]:
def get_stress_data(key_word_index, sensor_type_index):
    
    # 列名
    sensor_index_tmp = np.arange(SENSOR_NUMS[key_word_index][sensor_type_index]) + 1
    sensor_index = np.char.add('Y', sensor_index_tmp.astype(str))
    columns_title = np.insert(sensor_index, 0, 'timestamp')
    
    # 数据
    stress_data = pd.DataFrame(columns = columns_title)
    
    # 初始化一个last_second
    last_second = None
    
    # 打开文件夹
    folder_list = os.listdir(STRESS_PATH)  # 20200319、20200320...20200428
    for folder in folder_list:
        folder_path = os.path.join(STRESS_PATH, folder, KEY_WORDS[key_word_index])
        print(f'正在读取{folder_path}')
        if os.path.exists(folder_path):
            file_list = os.listdir(folder_path)  # ../20200319/STR2文件夹的文件列表
        else:
            continue
        
        for file_name in file_list:
            if bool(re.search(SENSOR_TYPES[sensor_type_index], file_name)):  # 找到有传感器类型对应的文件
                print(f'正在读取{file_name}')
                file_path = os.path.join(STRESS_PATH, folder, KEY_WORDS[key_word_index], file_name)
                chunk_iterator = pd.read_csv(file_path, chunksize=CHUCK_SIZE, parse_dates=['DateTime'], 
                                            on_bad_lines='skip')  # 逐块取数据
                
                # 每一块
                for chunk in chunk_iterator:
                    if len(chunk) == 0:
                        break
                    chunk['DateTime'] = chunk['DateTime'].dt.floor('S')  # 消除微秒
                    second_stress_data = chunk.groupby('DateTime').first().reset_index()  # 每一秒取第一行
                    second_stress_data.columns = columns_title
                    if second_stress_data.iloc[0, 0] == last_second and len(second_stress_data) > 0:
                        second_stress_data = second_stress_data.iloc[1:]
                    if len(second_stress_data) > 0:
                        last_second = second_stress_data.iloc[-1, 0]
                    
                    stress_data = pd.concat([stress_data, second_stress_data], ignore_index=True)

    return stress_data

In [None]:
# 汇总
key_word_index = 1
sensor_type_index = 4
stress_data = get_stress_data(key_word_index, sensor_type_index)

In [None]:
# 查看
print(stress_data.head())
print(len(stress_data))
print(f"总内存: {stress_data.memory_usage(deep=True).sum() / (1024 ** 2):.2f} MB")

In [None]:
# 输出
csv_name = KEY_WORDS[key_word_index] + SENSOR_TYPES_DESCRIBE[sensor_type_index] + '数据.csv'
print(csv_name)
stress_data.to_csv(csv_name, index=False)  # index=False 表示不保存索引

# 轴次数据

In [None]:
def get_axle_data(file_path):
    # 生成每一天的轴次数据
    axle_data = pd.DataFrame(columns=['日期', '边缘轴次'])
    axle_df = pd.read_excel(file_path, parse_dates=['开始时间', '结束时间'])
    for i in range(len(axle_df)):
        date_sequence = pd.date_range(start=axle_df.loc[i, '开始时间'], 
                                      end=axle_df.loc[i, '结束时间'], freq='D')  # 生成日期序列（包含头尾）
        date_num = len(date_sequence)
        axle_data_tmp = pd.DataFrame(columns=['日期', '边缘轴次'])
        axle_data_tmp['日期'] = date_sequence
        daily_axle = axle_df.loc[i, '边际轴次'] / axle_df.loc[i, '时长']
        daily_axle_sequence = daily_axle * np.ones(date_num)
        axle_data_tmp['边缘轴次'] = daily_axle_sequence
        axle_data = pd.concat([axle_data, axle_data_tmp], ignore_index=True)
    axle_data['日期'] = pd.to_datetime(axle_data['日期'])
    axle_data['日期'] = axle_data['日期'].dt.floor('D')
    axle_data['累计轴次'] = axle_data['边缘轴次'].cumsum()
    return axle_data

In [None]:
# 整理并查看
file_path = '轴次数据.xlsx'
axle_data = get_axle_data(file_path)
print(axle_data.head())

In [None]:
# 输出
csv_name = '单日轴次数据.csv'
axle_data.to_csv(csv_name, index=False)  # index=False 表示不保存索引

# 二次加载+可视化

### 加载数据

In [None]:
# 轴次数据
axle_data = pd.read_csv('单日轴次数据.csv', parse_dates=['日期'])
axle_data = axle_data.sort_values(by='日期')
print(axle_data.head())

In [None]:
# 车辙数据
STR2_rut_data = pd.read_csv('STR2车辙数据.csv', parse_dates=['日期'])
STR2_rut_data = STR2_rut_data.sort_values(by='日期')
print(STR2_rut_data.head())

# 温度数据
STR2_tep_data = pd.read_csv('STR2温度数据.csv', parse_dates=['timestamp'], low_memory=False)
STR2_tep_data = STR2_tep_data.sort_values(by='timestamp')
print(STR2_tep_data.head())

# 土压力数据
STR2_stress_data = pd.read_csv('STR2土压力数据.csv', parse_dates=['timestamp'])
STR2_stress_data = STR2_stress_data.sort_values(by='timestamp')
print(STR2_stress_data.head())

In [None]:
# 车辙数据
STR8_rut_data = pd.read_csv('STR8车辙数据.csv', parse_dates=['日期'])
STR8_rut_data = STR8_rut_data.sort_values(by='日期')
print(STR8_rut_data.head())

# 温度数据
STR8_tep_data = pd.read_csv('STR8温度数据.csv', parse_dates=['timestamp'], low_memory=False)
STR8_tep_data = STR8_tep_data.sort_values(by='timestamp')
print(STR8_tep_data.head())

# 土压力数据
STR8_stress_data = pd.read_csv('STR8土压力数据.csv', parse_dates=['timestamp'])
STR8_stress_data = STR8_stress_data.sort_values(by='timestamp')
print(STR8_stress_data.head())

In [None]:
# 日期统一
max_date = axle_data.iloc[-1, 0]  # 取轴次的最大日期为期限

# STR2过滤
STR2_rut_data = STR2_rut_data[STR2_rut_data['日期'] <= max_date]
STR2_tep_data = STR2_tep_data[STR2_tep_data['timestamp'] <= max_date]
STR2_stress_data = STR2_stress_data[STR2_stress_data['timestamp'] <= max_date]

# STR8过滤
STR8_rut_data = STR8_rut_data[STR8_rut_data['日期'] <= max_date]
STR8_tep_data = STR8_tep_data[STR8_tep_data['timestamp'] <= max_date]
STR8_stress_data = STR8_stress_data[STR8_stress_data['timestamp'] <= max_date]

### 温度

In [None]:
# 温度格式转换
columns_to_convert = STR2_tep_data.columns[1:]
STR2_tep_data[columns_to_convert] = STR2_tep_data[columns_to_convert].apply(pd.to_numeric, errors='coerce')
columns_to_convert = STR8_tep_data.columns[1:]
STR8_tep_data[columns_to_convert] = STR8_tep_data[columns_to_convert].apply(pd.to_numeric, errors='coerce')

In [None]:
def _calculate_gap_lengths(mask):
    """计算布尔掩码中连续True的片段"""
    # 找到掩码中True值的开始和结束位置
    mask = mask.values
    if not mask.any():
        return []
    
    # 找到变化点
    idx = np.flatnonzero(np.diff(np.r_[False, mask, False]))
    starts = idx[0::2]
    ends = idx[1::2]
    
    # 计算每个片段的长度
    lengths = ends - starts
    
    return list(zip(starts, ends, lengths))

In [None]:
# 温度插值
def interpolate_temperature_data(data, method='cubic_spline', max_gap=10, limit_direction='both'):
    
    df = data.copy()
    
    for column in df.columns:
        
        if column == 'timestamp':
            continue
        
        print(f'正在处理{column}')
    
        # 确保温度列是数值类型
        df[column] = pd.to_numeric(df[column], errors='coerce')

        # 计算空缺值位置
        missing_mask = df[column].isna()
        missing_count = missing_mask.sum()

        print(f"原始数据共有 {len(df)} 行，其中 {missing_count} 行是空缺值")

        # 根据选择的方法进行插值
        if method == 'linear':
            # 线性插值
            df[column] = df[column].interpolate(
                method='linear', 
                limit=max_gap, 
                limit_direction=limit_direction
            )

        elif method == 'quadratic':
            # 二次插值（使用pandas的polynomial方法，阶数为2）
            df[column] = df[column].interpolate(
                method='polynomial', 
                order=2, 
                limit=max_gap, 
                limit_direction=limit_direction
            )

        elif method == 'cubic_spline':
            # 三次样条插值
            # 先进行线性插值处理短空缺
            df[column] = df[column].interpolate(
                method='linear', 
                limit=max_gap//2,  # 先处理较短的空缺
                limit_direction=limit_direction
            )

            # 对剩余的较长空缺使用三次样条插值
            non_missing = df[column].dropna()
            x = non_missing.index.astype(np.int64) // 10**9  # 转换为unix时间（秒）
            y = non_missing.values

            # 创建三次样条函数
            if len(non_missing) >= 4:  # 至少需要4个点来构建三次样条
                # 转换为unix时间（秒）并确保严格递增
                x = non_missing.index.astype(np.int64) // 10**9
                y = non_missing.values

                # 检查是否严格递增
                if np.all(np.diff(x) > 0):
                    # 如果是严格递增的，直接使用
                    spl = interpolate.UnivariateSpline(x, y, s=0)
                else:
                    # 如果不是，创建一个严格递增的索引
                    x_new = np.arange(len(x))
                    spl = interpolate.UnivariateSpline(x_new, y, s=0)
                    # 需要将原始x映射到新索引
                    x_mapping = {orig_x: new_x for orig_x, new_x in zip(x, x_new)}

                # 对剩余的空缺值应用样条插值
                remaining_missing = df[column].isna()
                missing_indices = df.index[remaining_missing]
                missing_x = missing_indices.astype(np.int64) // 10**9

                # 只对连续空缺长度小于max_gap的部分进行插值
                gaps = _calculate_gap_lengths(remaining_missing)
                valid_missing = remaining_missing.copy()

                for start, end, length in gaps:
                    if length > max_gap:
                        valid_missing[start:end] = False

                valid_missing_indices = df.index[valid_missing]
                if len(valid_missing_indices) > 0:
                    valid_missing_x = valid_missing_indices.astype(np.int64) // 10**9
                    df.loc[valid_missing, column] = spl(valid_missing_x)

        # 计算插值后的空缺值数量
        remaining_missing_count = df[column].isna().sum()
        filled_count = missing_count - remaining_missing_count

        print(f"插值完成：填充了 {filled_count} 个空缺值，剩余 {remaining_missing_count} 个空缺值")
    
    return df

In [None]:
# 温度插值
STR2_tep_data_interpolated = interpolate_temperature_data(
    STR2_tep_data, 
    method='linear',  # 可选：'linear', 'quadratic', 'cubic_spline'
    max_gap=2000,             # 允许插值的最大连续空缺值数量
    limit_direction='both'  # 双向插值
)
STR8_tep_data_interpolated = interpolate_temperature_data(
    STR2_tep_data, 
    method='linear',  # 可选：'linear', 'quadratic', 'cubic_spline'
    max_gap=2000,             # 允许插值的最大连续空缺值数量
    limit_direction='both'  # 双向插值
)

In [None]:
# 温度可视化
fig = plt.figure(figsize=(10, 6))
for i in np.arange(9)+1:
    plt.plot(STR2_tep_data['timestamp'], STR2_tep_data[f'W{i}'], label=f'第{i}层')
plt.legend()
plt.xlabel('日期')
plt.ylabel('温度')
fig.savefig('温度变化图.png', dpi=300)
plt.show()

In [None]:
# 温度可视化
plt.figure(figsize=(10, 6))
for i in np.arange(9)+1:
    plt.plot(STR2_tep_data_interpolated['timestamp'], STR2_tep_data_interpolated[f'W{i}'], label=f'第{i}层')
plt.legend()
plt.xlabel('日期')
plt.ylabel('温度')
plt.show()

### 车辙

In [None]:
# 车辙可视化
fig = plt.figure(figsize=(10, 6))
plt.plot(STR2_rut_data['日期'], STR2_rut_data['轮迹带1均值'], color='red', marker='o', label='STR2轮迹带1')
plt.plot(STR2_rut_data['日期'], STR2_rut_data['轮迹带2均值'], color='red', marker='^', linestyle='--', label='STR2轮迹带2')
plt.plot(STR8_rut_data['日期'], STR8_rut_data['轮迹带1均值'], color='blue', marker='o', label='STR8轮迹带1')
plt.plot(STR8_rut_data['日期'], STR8_rut_data['轮迹带2均值'], color='blue', marker='^', linestyle='--', label='STR8轮迹带2')
plt.legend()
plt.xlabel('日期')
plt.ylabel('车辙深度')
fig.savefig('STR2及STR8左右轮迹带车辙.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
def compare_two_datasets(data1, data2, alpha=0.05, plot=True, savefig=False, fig_name=None):
    """
    比较两列数据是否来自同一总体
    
    参数:
    data1, data2: 待比较的两列数据（Series或numpy数组）
    alpha: 显著性水平，默认0.05
    plot: 是否绘制可视化图表
    
    返回:
    dict: 包含各项检验结果的字典
    """
    results = {}
    
    # 1. 正态性检验
    stat_shapiro1, p_shapiro1 = stats.shapiro(data1)
    stat_shapiro2, p_shapiro2 = stats.shapiro(data2)
    results['normality'] = {
        'data1': {'statistic': stat_shapiro1, 'p-value': p_shapiro1},
        'data2': {'statistic': stat_shapiro2, 'p-value': p_shapiro2}
    }
    
    # 2. t检验（如果两列数据都正态）
    if p_shapiro1 > alpha and p_shapiro2 > alpha:
        # 方差齐性检验
        stat_levene, p_levene = stats.levene(data1, data2)
        equal_var = p_levene > alpha
        results['levene'] = {'statistic': stat_levene, 'p-value': p_levene, 'equal_var': equal_var}
        
        # t检验
        stat_t, p_t = stats.ttest_ind(data1, data2, equal_var=equal_var)
        results['t-test'] = {'statistic': stat_t, 'p-value': p_t}
    else:
        results['t-test'] = None
    
    # 3. Mann-Whitney U检验
    stat_mw, p_mw = stats.mannwhitneyu(data1, data2)
    results['mann-whitney'] = {'statistic': stat_mw, 'p-value': p_mw}
    
    # 4. Kolmogorov-Smirnov检验
    stat_ks, p_ks = stats.ks_2samp(data1, data2)
    results['kolmogorov-smirnov'] = {'statistic': stat_ks, 'p-value': p_ks}
    
    # 5. 可视化
    if plot:
        fig = plt.figure(figsize=(16, 10))
        
        # 直方图
        plt.subplot(2, 2, 1)
        sns.histplot(data1, kde=True, label='轮迹带1')
        sns.histplot(data2, kde=True, label='轮迹带2')
        plt.title('数据分布比较')
        plt.xlabel('车辙深度')
        plt.ylabel('计数')
        plt.legend()
        
        # 箱线图
        plt.subplot(2, 2, 2)
        sns.boxplot(data=[data1, data2])
        plt.xticks([0, 1], ['轮迹带1', '轮迹带2'])
        plt.title('箱线图比较')
        
        # Q-Q图
        plt.subplot(2, 2, 3)
        stats.probplot(data1, dist="norm", plot=plt)
        stats.probplot(data2, dist="norm", plot=plt)
        plt.title('Q-Q图比较')
        plt.xlabel('分位数')
        plt.ylabel('车辙深度')
        
        # 散点图（如果数据长度相同）
        if len(data1) == len(data2):
            plt.subplot(2, 2, 4)
            plt.scatter(data1, data2, alpha=0.5)
            plt.xlabel('轮迹带1')
            plt.ylabel('轮迹带2')
            plt.title('数据点对比')
            # 添加y=x参考线
            min_val = min(data1.min(), data2.min())
            max_val = max(data1.max(), data2.max())
            plt.plot([min_val, max_val], [min_val, max_val], 'r--')
        
        plt.tight_layout()
        
        if savefig:
            fig.savefig(fig_name, dpi=300)
        plt.show()
    
    # 打印主要结果
    print("\n===== 假设检验结果 =====")
    print(f"1. 正态性检验 (Shapiro-Wilk):")
    print(f"   轮迹带1: p-value = {p_shapiro1:.4f} ({'正态' if p_shapiro1 > alpha else '非正态'})")
    print(f"   轮迹带2: p-value = {p_shapiro2:.4f} ({'正态' if p_shapiro2 > alpha else '非正态'})")
    
    if results['t-test']:
        print(f"\n2. t检验:")
        print(f"   方差齐性: p-value = {p_levene:.4f} ({'满足' if equal_var else '不满足'})")
        print(f"   t统计量 = {stat_t:.4f}, p-value = {p_t:.4f} ({'相同分布' if p_t > alpha else '不同分布'})")
    
    print(f"\n3. Mann-Whitney U检验:")
    print(f"   U统计量 = {stat_mw:.4f}, p-value = {p_mw:.4f} ({'相同分布' if p_mw > alpha else '不同分布'})")
    
    print(f"\n4. Kolmogorov-Smirnov检验:")
    print(f"   KS统计量 = {stat_ks:.4f}, p-value = {p_ks:.4f} ({'相同分布' if p_ks > alpha else '不同分布'})")
    
    return results

In [None]:
test_result = compare_two_datasets(STR2_rut_data['轮迹带1均值'],
                                   STR8_rut_data['轮迹带1均值'],
                                   alpha=0.05, plot=True, savefig=False, 
                                   fig_name='STR2及STR8右轮迹带分析.png')

### 应力

In [None]:
# 查看
# 对应关系：STR2压力传感器的Y1对应温度传感器的W2...压力传感器的Y7对应温度传感器的W8
# 
print(STR2_stress_data.head())
print(STR2_tep_data.head())

In [None]:
# 应力可视化
fig = plt.figure(figsize=(10, 50))
for i in np.arange(7)+1:
    plt.subplot(7, 1, i)
    plt.hist(STR2_stress_data[f'Y{i}'], bins=100, density=True, label=f'第{i}层')
    plt.xlabel('应力')
    plt.ylabel('密度')
plt.show()

In [None]:
# 应力可视化
plt.figure(figsize=(10, 6))
for i in np.arange(7)+1:
    plt.plot(STR2_stress_data['timestamp'], STR2_stress_data[f'Y{i}'], label=f'第{i}层')
# plt.plot(STR2_stress_data['timestamp'], STR2_stress_data['S1'], label='第1层')
plt.legend()
plt.xlabel('日期')
plt.ylabel('应力')
plt.show()

In [None]:
# 应力可视化
# 抽一段来看
# 计算时间范围
sample_start_time = pd.Timestamp('2020-03-21 10:10:00')
sample_end_time = pd.Timestamp('2020-03-21 10:20:00')

sample_stress = STR2_stress_data[(STR2_stress_data['timestamp'] >= sample_start_time) & (STR2_stress_data['timestamp'] <= sample_end_time)]
sample_stress.head()

In [None]:
# 应力可视化
plt.figure(figsize=(10, 6))
for i in np.arange(7)+1:
    plt.plot(sample_stress['timestamp'], sample_stress[f'Y{i}'], label=f'第{i}层')
plt.legend()
plt.xlabel('日期')
plt.ylabel('应力')
plt.show()

In [None]:
def sensor_clustering(df, n_clusters=2, random_state=42, plot=True, get_score=False):
    """
    对传感器数据进行KMeans聚类，区分正常状态和响应状态
    
    参数:
    df (pd.DataFrame): 包含传感器数据的DataFrame，第一列为日期，其余列为传感器数据(S1~Sn)
    n_clusters (int): 聚类数量，默认为2
    random_state (int): 随机种子，保证结果可复现
    plot (bool): 是否绘制聚类结果可视化图
    
    返回:
    dict: 包含聚类结果的字典，包括：
        - cluster_centers: 聚类中心
        - labels: 每个样本的聚类标签
        - df_clustered: 包含聚类标签的原始DataFrame
        - inertia: 聚类的惯性（越小表示聚类效果越好）
    """
    # 提取特征列（跳过第一列日期）
    feature_columns = df.columns[1:]
    X = df[feature_columns].values
    
    # 数据标准化（KMeans对尺度敏感）
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    # 执行KMeans聚类
    kmeans = KMeans(n_clusters=n_clusters, random_state=random_state)
    labels = kmeans.fit_predict(X_scaled)
    inertia = kmeans.inertia_
    
    # 评价聚类效果
    if get_score:
        score = silhouette_score(X, labels)
        print(f"轮廓系数：{score:.2f}")
    
    # 获取聚类中心并反标准化
    cluster_centers = scaler.inverse_transform(kmeans.cluster_centers_)
    
    # 将聚类结果添加到原始DataFrame
    df_clustered = df.copy()
    df_clustered['Cluster'] = labels
    
    # 为聚类标签排序（假设较小的值对应A状态，较大的值对应B状态）
    # 计算每个聚类中心的平均传感器值
    cluster_avg = np.mean(cluster_centers, axis=1)
    # 获取排序后的索引
    sorted_indices = np.argsort(cluster_avg)
    # 创建映射字典
    label_mapping = {sorted_indices[i]: i for i in range(n_clusters)}
    # 重新映射标签
    df_clustered['Cluster'] = df_clustered['Cluster'].map(label_mapping)
    
    # 更新聚类中心顺序
    cluster_centers = cluster_centers[sorted_indices]
    
    # 绘制聚类结果
    if plot:
        plot_clustering_results(df_clustered, feature_columns, cluster_centers)
    
    return {
        'cluster_centers': cluster_centers,
        'labels': df_clustered['Cluster'].values,
        'df_clustered': df_clustered,
        'inertia': inertia
    }

In [None]:
def plot_clustering_results(df, feature_columns, cluster_centers):
    """绘制聚类结果可视化图"""
    n_features = len(feature_columns)
    
    # 创建子图
    fig, axes = plt.subplots(n_features, 1, figsize=(12, 4 * n_features))
    if n_features == 1:
        axes = [axes]  # 确保axes是列表
    
    # 为每个传感器绘制聚类结果
    for i, col in enumerate(feature_columns):
        ax = axes[i]
        
        # 绘制原始数据
        ax.plot(df.index, df[col], 'b-', alpha=0.5, label='原始数据')
        
        # 为不同聚类绘制不同颜色
        for cluster in df['Cluster'].unique():
            cluster_data = df[df['Cluster'] == cluster]
            ax.scatter(cluster_data.index, cluster_data[col], 
                       label=f'聚类 {cluster} (中心: {cluster_centers[cluster][i]:.2f})',
                       alpha=0.6, s=30)
        
        ax.set_title(f'{col} 的聚类结果')
        ax.set_xlabel('日期')
        ax.set_ylabel('传感器值')
        ax.legend()
        ax.grid(True)
    
    plt.tight_layout()
    plt.show()
    
    # 绘制聚类中心热图
    plt.figure(figsize=(10, 4))
    centers_df = pd.DataFrame(cluster_centers, columns=feature_columns, 
                              index=[f'聚类 {i}' for i in range(len(cluster_centers))])
    sns.heatmap(centers_df, annot=True, cmap='coolwarm', fmt='.2f')
    plt.title('聚类中心值')
    plt.show()

In [None]:
def plot_sensor_pairplot(df, cluster_column='Cluster', figsize=(20, 20)):
    """
    绘制传感器数据的散点图矩阵，展示各传感器之间的关系
    
    参数:
    df (pd.DataFrame): 包含传感器数据的DataFrame，第一列为日期，其余列为传感器数据(S1~Sn)
    cluster_column (str): 聚类标签列名，用于为散点着色
    figsize (tuple): 图形大小，单位为英寸
    
    返回:
    plt.Figure: 生成的图形对象
    """
    # 提取传感器列（跳过第一列日期）
    sensor_columns = df.columns[1:-1]
    
    # 创建图形和子图网格
    n = len(sensor_columns)
    fig, axes = plt.subplots(n, n, figsize=figsize)
    
    # 确保axes是二维数组（即使n=1）
    axes = np.array(axes).reshape(n, n)
    
    # 设置中文字体
    plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
    
    # 遍历每个子图
    for i, row_col in enumerate(sensor_columns):
        for j, col_col in enumerate(sensor_columns):
            ax = axes[i, j]
            
            # 如果是对角线位置，绘制直方图
            if i == j:
                for cluster in df[cluster_column].unique():
                    sns.histplot(
                        df[df[cluster_column] == cluster][row_col],
                        ax=ax,
                        kde=True,
                        label=f'聚类 {cluster}',
                        alpha=0.6
                    )
                ax.set_title(f'{row_col} 分布')
                ax.legend()
            
            # 非对角线位置，绘制散点图
            else:
                sns.scatterplot(
                    x=col_col,
                    y=row_col,
                    hue=cluster_column,
                    data=df,
                    ax=ax,
                    alpha=0.6,
                    palette='Set1'
                )
                ax.set_title(f'{row_col} vs {col_col}')
            
            # 简化坐标轴标签
            if i < n - 1:
                ax.set_xlabel('')
            if j > 0:
                ax.set_ylabel('')
    
    # 调整布局
    plt.tight_layout()
    return fig

In [None]:
def merge_tep_and_stress(tep_df, stress_df, sensor_name):
    """
    tep_df和stress_df都只有两列
    对于tep_df的每一行，找到这段时间范围内最大的应力.
    sensor_name是'Y1'这样的
    """
    start_date = min(stress_df['timestamp'])
    end_date = max(stress_df['timestamp'])
    tep_df = tep_df[(tep_df['timestamp'] >= start_date) & (tep_df['timestamp'] <= end_date)]
    tep_df = tep_df.reset_index(drop=True)
    tep_df.loc[:, sensor_name] = None
    
    time_delta = timedelta(minutes=10)
    for i in range(len(tep_df)):
        start_time = tep_df.iloc[i, 0]
        end_time = start_time + time_delta
        stress_df_tmp = stress_df[(stress_df['timestamp'] >= start_time) & (stress_df['timestamp'] <= end_time)]
        if len(stress_df_tmp) > 0:
            max_stress = max(stress_df_tmp.loc[:, sensor_name])
            tep_df.loc[i, sensor_name] = max_stress
    
    return tep_df

In [None]:
def select_nearest_to_mean(df, temp_col='W2', value_col='Y1', n_bins=5, m=3):
    """
    将温度列分组，每组选取最接近均值的m条数据
    
    参数:
    df: 输入DataFrame
    temp_col: 温度列名
    value_col: 传感器数值列名
    n_bins: 温度分组数
    m: 每组选取的数据条数
    
    返回:
    筛选后的DataFrame
    """
    # 步骤1: 创建温度分箱
    df['temp_bin'] = pd.cut(df[temp_col], bins=n_bins)
    
    # 步骤2: 计算每个箱内的均值
    bin_means = df.groupby('temp_bin')[value_col].mean().reset_index()
    bin_means.columns = ['temp_bin', 'bin_mean']
    
    # 步骤3: 将均值合并回原DataFrame
    df = pd.merge(df, bin_means, on='temp_bin', how='left')
    
    # 步骤4: 计算每个值与均值的绝对差
    df['abs_diff'] = (df[value_col] - df['bin_mean']).abs()
    
    # 步骤5: 对每个箱内的数据按绝对差排序，并选择前m条
    selected = df.sort_values(['temp_bin', 'abs_diff']).groupby('temp_bin').head(m)
    
    # 步骤6: 删除临时列并返回结果
    return selected.drop(['temp_bin', 'bin_mean', 'abs_diff'], axis=1)

In [None]:
def fit_nonlinear_model(df, x_col='tep', y_col='stress', plot=True):
    """
    拟合非线性模型，并评价拟合效果
    
    参数:
    df: DataFrame，包含tep和stress两列
    x_col, y_col: 自变量和因变量的列名
    plot: 是否绘制拟合结果和残差图
    
    返回:
    dict: 包含拟合参数和评价指标的字典
    """
    # 提取数据
    x_data = df[x_col].values
    y_data = df[y_col].values
    
    # 定义新模型
    def model(tep, a, b):
        return a * (b ** tep)
    
    # 初始参数猜测（关键！影响拟合成功率）
    p0 = [np.mean(y_data), 0.9]  # 初始猜测值，需根据数据调整
    
    # 拟合模型（增加参数边界约束）
    try:
        popt, pcov = curve_fit(
            model, 
            x_data, 
            y_data, 
            p0=p0,
            bounds=([0, 0], [np.inf, np.inf])
        )
        a_fit, b_fit = popt
        
        # 计算拟合值
        y_fit = model(x_data, *popt)
        
        # 评价指标
        r2 = r2_score(y_data, y_fit)
        rmse = np.sqrt(mean_squared_error(y_data, y_fit))
        
        # 残差分析
        residuals = y_data - y_fit
        
        # 打印结果
        print(f"拟合参数: a = {a_fit:.4f}, b = {b_fit:.4f}")
        print(f"评价指标: R² = {r2:.4f}, RMSE = {rmse:.4f}")
        
        # 绘图
        if plot:
            fig = plt.figure(figsize=(10, 10))
            plt.scatter(x_data, y_data, label='原始数据', alpha=0.6)
            plt.plot(x_data, y_fit, 'r-', 
                     label=f'拟合曲线: {y_col} = {a_fit:.4f} * {b_fit:.4f}^{x_col}', 
                     linewidth=2)
            plt.xlabel(x_col)
            plt.ylabel(y_col)
            plt.title('非线性拟合结果')
            plt.legend()
            fig.savefig('图片/应力对应关系')
            plt.grid(True)
            plt.show()
        
        return {
            'params': {'a': a_fit, 'b': b_fit},
            'metrics': {'r2': r2, 'rmse': rmse},
            'residuals': residuals
        }
    
    except Exception as e:
        print(f"拟合失败: {e}")
        print("提示: 尝试调整初始参数p0或检查数据范围")
        return None

In [None]:
# 开始聚类
kmeans_result = sensor_clustering(STR2_stress_data, n_clusters=2, random_state=42, plot=False, get_score=False)
STR2_stress_clustered = kmeans_result['df_clustered']
STR2_stress_responsive = STR2_stress_clustered[STR2_stress_clustered['Cluster'] == 1]  # 只保留响应值
# fig =  plot_sensor_pairplot(STR2_stress_responsive)
# fig.savefig('STR2传感器聚类结果', dpi=300, bbox_inches='tight')

In [None]:
# 开始融合
merge_df = merge_tep_and_stress(STR2_tep_data[['timestamp', 'W2']], 
                                STR2_stress_responsive[['timestamp', 'Y1']], 'Y1')
merge_df = merge_df.dropna(subset=['Y1'])
print(len(merge_df))
print(merge_df.head())
clear_merge_df = select_nearest_to_mean(merge_df, n_bins=120, m=2)
# merge_df.to_csv('111.csv', index=False)  # index=False 表示不保存索引

In [None]:
# 开始拟合
fit_results = fit_nonlinear_model(clear_merge_df, x_col='W2', y_col='Y1', plot=True)

# 车辙预估模型

In [None]:
def preprocess_data(N, T, RD, window_length=5, polyorder=2):
    """数据预处理：处理NaN、负数和零值，并对RD进行平滑化处理"""
    # 转换为numpy数组
    N = np.array(N, dtype=np.float64)
    T = np.array(T, dtype=np.float64)
    RD = np.array(RD, dtype=np.float64)
    
    # 处理NaN（用前值填充）
    N = pd.Series(N).fillna(method='ffill').values
    T = pd.Series(T).fillna(method='ffill').values
    RD = pd.Series(RD).fillna(method='ffill').values
    
    # 处理非正数轴次（替换为极小值）
    N[N <= 0] = 1e-6
    
    # 处理负数温度（平移至正数区间）
    T_min = np.min(T)
    if T_min < 0:
        T = T - T_min + 1e-6
    
    # 处理非正数车辙深度（取绝对值，需谨慎评估物理意义）
    RD = np.abs(RD)
    
    # 对RD进行平滑化处理（使用Savitzky-Golay滤波器）
    # 仅在数据点足够多时进行平滑处理
    if len(RD) > window_length:
        RD_smooth = savgol_filter(RD, window_length=window_length, polyorder=polyorder)
        
        # 确保平滑后的值不小于零
        RD_smooth = np.maximum(RD_smooth, 1e-10)
        
        # 可视化平滑效果（可选，取消注释以查看）
        plt.figure(figsize=(10, 6))
        plt.plot(RD, 'b-', label='原始RD')
        plt.plot(RD_smooth, 'r--', label='平滑后RD')
        plt.legend()
        plt.title('车辙深度平滑处理效果')
        plt.show()
        
        RD = RD_smooth
    
    return N, T, RD

In [None]:
def linearized_objective(beta, N, T, RD):
    """对数线性化模型的目标函数"""
    beta0, n, t, dt = beta
    months = len(N)
    log_rd_pred = np.zeros(months)
    
    for i in range(months):
        
        if i == 0:
            log_rd_pred[i] = beta0 + n * np.log(N[i]) + t * np.log((T[i] + dt))
            
        else:
            
            # 计算当前月的累计等效轴次
            nt = np.exp((log_rd_pred[i-1] - beta0 - t * np.log(T[i] + dt)) / n) + N[i]

            # 计算对数预测值
            log_rd_pred[i] = beta0 + n * np.log(nt) + t * np.log(T[i] + dt)
    
    # 计算对数空间的均方误差
    mse_log = mean_squared_error(np.log(RD), log_rd_pred)
    return mse_log

In [None]:
def train_linearized_model(N, T, RD, initial_guess):
    """使用对数线性化方法训练模型"""
    
    # 定义参数约束
    # beta0无约束，n和t必须为正数
    bounds = [(None, None), (1e-10, None), (1e-10, None), (None, None)]
    
    # 使用L-BFGS-B算法进行优化
    result = minimize(
        linearized_objective,
        initial_guess,
        args=(N, T, RD),
        method='L-BFGS-B',
        bounds=bounds,
        options={'maxiter': 1000, 'disp': True}
    )
    
    # 返回最优参数
    beta0_opt, n_opt, t_opt, dt_opt = result.x
    A_opt = np.exp(beta0_opt)  # 转换回原始参数A
    
    return A_opt, n_opt, t_opt, dt_opt, result

In [None]:
def predict_rd_linearized(params, N, T):
    """使用对数线性化模型预测车辙深度"""
    A, n, t, dt = params
    months = len(N)
    rd_pred = np.zeros(months)
    
    for i in range(months):
        if i == 0:
            # 第一个月直接计算
            rd_pred[i] = A * (N[i] ** n) * ((T[i] + dt) ** t)
        else:
            # 计算前一个月的等效轴次
            equivalent_axles_prev = (rd_pred[i-1] / (A * ((T[i-1] + dt) ** t))) ** (1/n)
            # 计算累计等效轴次
            total_equivalent_axles = equivalent_axles_prev + N[i]
            # 计算当前月的车辙深度
            rd_pred[i] = A * (total_equivalent_axles ** n) * ((T[i] + dt) ** t)
    
    return rd_pred

In [None]:
def evaluate_model(params, N, T, RD_actual):
    """评估模型性能"""
    rd_pred = predict_rd_linearized(params, N, T)
    
    # 计算评估指标
    mse = mean_squared_error(RD_actual, rd_pred)
    rmse = np.sqrt(mse)
    r2 = r2_score(RD_actual, rd_pred)
    
    # 计算平均绝对百分比误差(MAPE)
    mape = np.mean(np.abs((RD_actual - rd_pred) / RD_actual)) * 100
    
    return {
        'mse': mse,
        'rmse': rmse,
        'r2': r2,
        'mape': mape
    }

### 数据准备

In [None]:
def prepare_data(axle_data, rut_data, tep_data, layer_name):
    # 将车辙数据、轴次数据和温度数据同步到同一个表格中
    # layer_name指W1,W2这些
    # STR2车辙深度主要由第一层决定
    # STR8车辙深度主要由第前三层决定，分别是0.8，0.2，0.2
    rd_data = pd.DataFrame(columns=['日期', '断面均值', '周期内轴次', layer_name])
    rd_data['日期'] = rut_data['日期']
    rd_data['断面均值'] = rut_data['断面均值']
    
    # 计算周期内轴次和平均温度
    start_date = axle_data.iloc[0, 0]
    for i in range(len(rd_data)):
        end_date = rd_data.iloc[i, 0]
        axle_data_tmp = axle_data[(axle_data['日期'] >= start_date) & (axle_data['日期'] <= end_date)]
        tep_data_tmp = tep_data[(tep_data['timestamp'] >= start_date) & (tep_data['timestamp'] <= end_date)]
        rd_data.loc[i, '周期内轴次'] = axle_data_tmp['边缘轴次'].sum()
        rd_data.loc[i, layer_name] = tep_data_tmp[layer_name].mean()
        start_date = end_date + timedelta(days=1)
    
    return rd_data

In [None]:
layer_name = 'W1'
rd_data = prepare_data(axle_data, STR2_rut_data, STR2_tep_data, layer_name)
N = np.array(rd_data['周期内轴次'])
T = np.array(rd_data[layer_name])
RD_actual = np.array(rd_data['断面均值'])
initial_guess = [-25, 0.5, 5, 0]
print(rd_data)
print(len(rd_data))

In [None]:
months = len(rd_data)

# 数据预处理
N, T, RD_actual = preprocess_data(N, T, RD_actual)

# 训练模型
A_opt, n_opt, t_opt, dt_opt, opt_result = train_linearized_model(N, T, RD_actual, initial_guess)
optimal_params = (A_opt, n_opt, t_opt, dt_opt)

print(f"\n优化后的参数: A = {A_opt:.6f}, n = {n_opt:.6f}, t = {t_opt:.6f}, dt = {dt_opt:.6f}")

# 评估模型
metrics = evaluate_model(optimal_params, N, T, RD_actual)
print("\n模型评估结果:")
print(f"MSE: {metrics['mse']:.6f}")
print(f"RMSE: {metrics['rmse']:.6f}")
print(f"R²: {metrics['r2']:.4f}")
print(f"MAPE: {metrics['mape']:.2f}%")

# 使用训练好的模型进行预测
RD_pred = predict_rd_linearized(optimal_params, N, T)

# 可视化结果
plt.figure(figsize=(14, 10))

# 绘制预测值与实际值对比
plt.subplot(2, 1, 1)
plt.plot(range(1, months + 1), RD_actual, 'b-', label='实际车辙深度')
plt.plot(range(1, months + 1), RD_pred, 'r--', label='预测车辙深度')
plt.xlabel('月份')
plt.ylabel('车辙深度 (mm)')
plt.title('车辙深度预测模型结果')
plt.legend()
plt.grid(True)

# 绘制残差图
plt.subplot(2, 1, 2)
residuals = RD_actual - RD_pred
plt.scatter(range(1, months + 1), residuals, color='gray')
plt.axhline(y=0, color='r', linestyle='-')
plt.xlabel('月份')
plt.ylabel('残差 (mm)')
plt.title('残差分析')
plt.grid(True)

plt.tight_layout()
plt.show()

# 输出预测与实际值的对比表格
results_df = pd.DataFrame({
    '日期': rd_data['日期'],
    '温度 (°C)': T.round(1),
    '轴次': N.round(1),
    '实际车辙深度': RD_actual.round(4),
    '预测车辙深度': RD_pred.round(4),
    '误差': (RD_actual - RD_pred).round(4),
    '误差百分比 (%)': (residuals / RD_actual * 100).round(2)
})

# print("\n预测结果与实际值对比:")
# print(results_df.to_string(index=False))
# results_df.to_csv('pred_result.csv', index=False)

### STL分解

In [None]:
def prepare_sparse_data(df, freq='QS'):
    """
    处理稀疏时间序列数据（每月1-4条记录）
    
    参数:
    df (pd.DataFrame): 包含时间戳和数值的DataFrame
    freq (str): 重采样频率，建议使用'QS'（季度）或'6MS'（半年）
    
    返回:
    pd.Series: 处理后的连续时间序列
    """
    # 设置时间戳为索引
    df = df.set_index(df.columns[0])
    
    # 按指定频率重采样
    resampled = df.resample(freq).mean()
    
    # 使用三次样条插值（更适合稀疏数据）
    # 注意：样条插值要求至少有4个非NaN值
    if resampled.count().iloc[0] >= 4:
        interpolated = resampled.interpolate(method='spline', order=3)
    else:
        # 如果数据点太少，退化为线性插值
        print("警告：数据点过少，使用线性插值而非样条插值")
        interpolated = resampled.interpolate(method='time')
    
    return interpolated.iloc[:, 0]

In [None]:
def perform_stl_decomposition(series, period=12):
    """执行STL分解（与之前相同）"""
    stl = STL(series, period=period)
    result = stl.fit()
    
    return pd.DataFrame({
        '观测值': series,
        '趋势': result.trend,
        '季节': result.seasonal,
        '残差': result.resid
    })

In [None]:
# 可视化函数（与之前相同）
def plot_decomposition(decomposition_results):
    fig, axes = plt.subplots(4, 1, figsize=(12, 15), sharex=True)
    for i, col in enumerate(decomposition_results.columns):
        axes[i].plot(decomposition_results[col], marker='o')
        axes[i].set_title(col)
    plt.tight_layout()
    return fig

In [None]:
def stl_main(df, freq='QS', period=12):
    """
    处理稀疏时间序列并执行STL分解
    
    参数:
    df (pd.DataFrame): 输入数据
    freq (str): 重采样频率，建议'QS'（季度）
    period (int): 分解周期，建议12（4年周期）
    
    返回:
    tuple: 分解结果和图表
    """
    # 准备稀疏数据
    prepared_series = prepare_sparse_data(df, freq=freq)
    
    # 执行STL分解
    decomposition = perform_stl_decomposition(prepared_series, period=period)
    
    # 可视化
    fig = plot_decomposition(decomposition)
    
    return decomposition, fig

In [None]:
# 开始STL分解
stl_data = results_df[['日期', '预测车辙深度']]
decomposition, fig = stl_main(stl_data, freq='QS', period=12)
# fig.savefig('STR8_STL分解.png', dpi=300)
plt.show()

In [None]:
def reconstruct_original_timestamps(original_df, decomposition_results, decay, freq='QS'):
    """
    根据STL分解结果，重构原始时间戳对应的值
    
    参数:
    original_df (pd.DataFrame): 原始数据，包含时间戳和观测值
    decomposition_results (pd.DataFrame): STL分解结果
    freq (str): 分解时使用的频率
    
    返回:
    pd.DataFrame: 包含原始时间戳和重构值的DataFrame
    """
    # 提取分解后的成分
    trend = decomposition_results['趋势']
    seasonal = decomposition_results['季节']
    residual = decomposition_results['残差']
    
    # 获取原始时间戳并转换为Index对象
    original_timestamps = pd.Index(original_df.iloc[:, 0])  # 转换为Index对象
    
    # 将分解结果重采样到原始时间戳
    # 1. 首先将分解结果转换为DataFrame
    components = pd.DataFrame({
        '趋势': trend,
        '季节': seasonal,
        '残差': residual
    })
    
    # 2. 将各成分重采样到原始时间戳
    reconstructed = pd.DataFrame(index=original_timestamps)
    
    for component in ['趋势', '季节', '残差']:
        # 使用时间插值将规则序列转换到原始时间戳
        reconstructed[component] = components[component].reindex(
            original_timestamps.union(components.index)
        ).interpolate(method='time').reindex(original_timestamps)
    
    # 2.5 压缩
    decay_arr = np.exp(-1 * decay * np.arange(len(reconstructed)))
    
    # 3. 合成预测值
    reconstructed['合成值'] = reconstructed['趋势'] + decay_arr * reconstructed['季节']
    
    # 4. 添加原始观测值以便比较
    reconstructed['原始观测值'] = original_df.iloc[:, 1].values  # 假设第二列是观测值
    
    # 5. 计算误差
    reconstructed['误差'] = reconstructed['合成值'] - reconstructed['原始观测值']
    reconstructed['相对误差(%)'] = (reconstructed['误差'] / reconstructed['原始观测值']) * 100
    
    return reconstructed

In [None]:
def plot_reconstruction(reconstructed_df):
    """绘制重构结果与原始数据的对比图"""
    plt.figure(figsize=(12, 8))
    
    # 绘制原始观测值和合成值
    plt.subplot(2, 1, 1)
    plt.plot(reconstructed_df.index, reconstructed_df['原始观测值'], 'o-', label='原始观测值')
    plt.plot(reconstructed_df.index, reconstructed_df['合成值'], 's-', label='合成值')
    plt.legend()
    plt.title('原始观测值 vs 合成值')
    plt.grid(True)
    
    # 绘制误差
    plt.subplot(2, 1, 2)
    plt.bar(reconstructed_df.index, reconstructed_df['误差'], alpha=0.5)
    plt.axhline(y=0, color='r', linestyle='-')
    plt.title('重构误差')
    plt.grid(True)
    
    plt.tight_layout()
    return plt

In [None]:
def reconstruction_main(original_df, decomposition, decay, freq='QS'):
    """
    主函数：重构原始时间戳的值并可视化结果
    
    参数:
    original_df (pd.DataFrame): 原始数据
    decomposition (pd.DataFrame): STL分解结果
    freq (str): 重采样频率
    
    返回:
    tuple: 重构结果和图表
    """
    # 重构原始时间戳的值
    reconstructed = reconstruct_original_timestamps(original_df, decomposition, decay, freq=freq)
    
    # 可视化结果
    plot = plot_reconstruction(reconstructed)
    
    # 计算评估指标
    mse = ((reconstructed['误差']) ** 2).mean()
    mae = abs(reconstructed['误差']).mean()
    mape = abs(reconstructed['相对误差(%)']).mean()
    
    print(f"重构评估指标:")
    print(f"均方误差 (MSE): {mse:.4f}")
    print(f"平均绝对误差 (MAE): {mae:.4f}")
    print(f"平均绝对百分比误差 (MAPE): {mape:.2f}%")
    
    return reconstructed, plot

In [None]:
# 开始重建
reconstructed, plot = reconstruction_main(stl_data, decomposition, decay=0.075, freq='QS')
reconstructed['真实值'] = np.array(results_df['实际车辙深度'])
plt.scatter(reconstructed.index, reconstructed['真实值'], c='r')
plt.plot(reconstructed.index, reconstructed['合成值'], c='b', linewidth=3)
plt.show()
r2 = r2_score(reconstructed['真实值'], reconstructed['合成值'])
print(r2)
print(reconstructed.head())
reconstructed.to_csv('结果数据/STR8结果.csv', index=True)

### 画图

In [None]:
# 加载数据
STR2_result = pd.read_csv('结果数据/STR2结果.csv', parse_dates=['日期'])
STR8_result = pd.read_csv('结果数据/STR8结果.csv', parse_dates=['日期'])
print(STR2_result.head())
print(STR8_result.head())

In [None]:
fig = plt.figure(figsize=(15, 9))
plt.scatter(STR2_result['日期'], STR2_result['真实值'], marker='^', c='b', label='STR2真实车辙')
plt.plot(STR2_result['日期'], STR2_result['合成值'], linewidth=3, c='b', label='STR2预测车辙')
plt.plot(STR2_result['日期'], STR2_result['原始观测值'], linewidth=1, linestyle='--', c='b', label='STR2预测车辙（未重新合成）')
plt.scatter(STR8_result['日期'], STR8_result['真实值'], marker='^', c='g', label='STR8真实车辙')
plt.plot(STR2_result['日期'], STR8_result['合成值'], linewidth=3, c='g', label='STR8预测车辙')
plt.plot(STR8_result['日期'], STR8_result['原始观测值'], linewidth=1, linestyle='--', c='g', label='STR8预测车辙（未重新合成）')
plt.legend()
plt.xlabel('日期')
plt.ylabel('车辙深度')
plt.title('车辙长期累积模型')
plt.grid(True)
# fig.savefig('图片/预测数据.png', dpi=300)
plt.show()

In [None]:
fig = plt.figure(figsize=(15, 7))
max_value1 = np.max([np.max(STR2_result['真实值']), np.max(STR2_result['合成值'])])
max_value2 = np.max([np.max(STR8_result['真实值']), np.max(STR8_result['合成值'])])
plt.subplot(1,2,1)
plt.scatter(STR2_result['真实值'], STR2_result['合成值'], marker='o', c='b')
plt.plot([10, max_value1], [10, max_value1], c='r', linestyle='--')
plt.title('STR2预测结果')
plt.subplot(1,2,2)
plt.scatter(STR8_result['真实值'], STR8_result['合成值'], marker='o', c='b')
plt.plot([10, max_value2], [10, max_value2], c='r', linestyle='--')
plt.title('STR8预测结果')
# fig.savefig('图片/预测数据2.png', dpi=300)
plt.show()