In [None]:
import matplotlib.pylab as plt
import ruptures as rpt
import pandas as pd
from scipy.signal import savgol_filter
from sktime.clustering.k_medoids import TimeSeriesKMedoids
import numpy as np
import plotly.graph_objects as go
import plotly.express as px

In [None]:
df = pd.read_excel(r'D:\data_analysis\data.xlsx', sheet_name='Sheet1')
df = df.rename(columns={"Untitled": "timestamp", "Untitled 1": "deformation quantity"})
selected_columns = ['deformation quantity']
plt.figure(figsize=(16, 10))
plt.plot(df[selected_columns[0]])
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei']
plt.title(f"原始数据", y=1, fontsize=20)
plt.show()


In [None]:
df[selected_columns[0]] = savgol_filter(df[selected_columns[0]],
                                     window_length=5,
                                     polyorder=3)
plt.figure(figsize=(16, 10))
plt.plot(df[selected_columns[0]])
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei']
plt.title(f"降噪后的数据", y=1, fontsize=20)
plt.show()


In [None]:
data = df[selected_columns[0]]
signal = data.values
model = "rbf"
algo = rpt.Pelt(model=model,
            min_size=90,
            jump=3).fit(signal)
my_bkps = algo.predict(pen=50)
fig, ax_arr = rpt.display(signal, [],
                    my_bkps,
                    figsize=(16, 10))
plt.show()


In [None]:
def extract_segments(signal, bkps):
    """根据分割点提取子序列"""
    segments = []
    start = 0
    for end in bkps[:]:
        segments.append(signal[start:end])
        start = end
    return segments
def pad_segments(segments, pad_value=0):
    """将所有子序列填充到相同长度以将其转换为ndarray"""
    max_length = max(len(seq) for seq in segments)
    padded_sequences = np.array([np.pad(seq, (0, max_length - len(seq)), constant_values=pad_value)
                       for seq in segments])
    return padded_sequences
def cluster_time_series_segments(pad_segments, n_clusters):
    """直接使用子序列进行 TimeSeriesKMedoids 聚类"""
    kmedoids = TimeSeriesKMedoids(n_clusters=n_clusters, metric="dtw")
    labels = kmedoids.fit_predict(pad_segments)
    return labels
segments = extract_segments(signal, my_bkps)
padded_segments = pad_segments(segments)
num_clusters = 3
labels = cluster_time_series_segments(padded_segments, num_clusters)


In [None]:
fig_result = go.Figure()
for name in selected_columns:
    fig_result.add_trace(go.Scatter(x=df.index, y=df[name], name=name))
my_bkps.insert(0, 0)
for i in range(len(my_bkps) - 1):
    segment_label = labels[i]
    fig_result.add_vrect(x0=my_bkps[i], x1=my_bkps[i + 1], fillcolor = px.colors.qualitative.Dark24[int(segment_label)], opacity=0.3, layer="below", line_width=0)
def add_annotation(x_point, y_point, annotation):
    fig_result.add_annotation(
        x=x_point,
        y=y_point,
        text= annotation + str(y_point),
        showarrow=True,
        font=dict(
            family="Courier New, monospace",
            size=16,
            color="#ffffff"
        ),
        align="center",
        arrowhead=2,
        arrowsize=1,
        arrowwidth=2,
        arrowcolor="#636363",
        ax=20,
        ay=-30,
        bordercolor="#c7c7c7",
        borderwidth=2,
        borderpad=4,
        bgcolor="#ff7f0e",
        opacity=0.5
    )
y_max = df[selected_columns[0]].max()
x_max = df[selected_columns[0]].idxmax()
y_min = df[selected_columns[0]].min()
x_min = df[selected_columns[0]].idxmin()
add_annotation(x_max, y_max, '最大值：')
add_annotation(x_min, y_min, '最小值：')
fig_result.show()
