In [5]:
import pandas as pd
import numpy as np
import gudhi as gd
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from mpl_toolkits.mplot3d import Axes3D
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
from sklearn.manifold import MDS  # 多维标度

In [9]:
# 定义二维空腔的持久性阈值。只有持久性大于此值的才被认为是“显著”的洞。
H2_PERSISTENCE_THRESHOLD = 0.00001
WINDOW_SIZE = 31
# 替换原有的 pd.read_excel(...) 代码，改为 pd.read_csv()
df = pd.read_csv(
    r"datasets/ASWSIndexEOD.csv",  # 目标CSV文件路径
    skiprows=[1],  # 跳过第2行（和原代码逻辑一致）
    index_col='TRADE_DT',  # 用“日期”列作为索引
    parse_dates=['TRADE_DT'],  # 读取时直接将“日期”列解析为datetime类型（无需后续再转）
    dtype='float64'  # 数据列按float64类型读取
)

df.index=pd.to_datetime(df.index)
# --- 2. 动画生成函数 ---
# 将动画逻辑封装成一个函数，便于在找到目标窗口后调用

def generate_animation_for_window(points, max_edge_length, start_index, end_index):
    """
    为给定的点云数据生成并保存3D持续同调动画。
    """
    print(f"\n找到目标窗口 [{start_index}, {end_index})。开始生成动画...")
    
    # 构建Rips Complex和计算同调 
    rips_complex = gd.RipsComplex(points=points, max_edge_length=max_edge_length)
    simplex_tree = rips_complex.create_simplex_tree(max_dimension=3)
    simplex_tree.compute_persistence()
    
    # 设置画布
    fig = plt.figure(figsize=(18, 9))
    ax_complex = fig.add_subplot(1, 2, 1, projection='3d')
    ax_barcode = fig.add_subplot(1, 2, 2)

    # 绘制条形码
    persistence_pairs = simplex_tree.persistence()
    gd.plot_persistence_barcode(persistence=persistence_pairs, axes=ax_barcode, legend=True)
    ax_barcode.set_title(f"Barcode for Window [{start_index}, {end_index})")
    epsilon_line = ax_barcode.axvline(x=0, color='k', linestyle='--', linewidth=1.5)

    # 动画参数
    num_frames = 150
    epsilon_values = np.linspace(0, max_edge_length, num_frames)
    filtration = list(simplex_tree.get_filtration())

    # 动画更新函数
    def update(i):
        current_epsilon = epsilon_values[i]
        ax_complex.clear()
        ax_complex.scatter(points[:, 0], points[:, 1], points[:, 2], c='b', marker='o', s=15)
        
        edges, triangles = [], []
        for simplex, filtration_value in filtration:
            if filtration_value > current_epsilon: break
            if len(simplex) == 2: edges.append(points[simplex])
            elif len(simplex) == 3: triangles.append(points[simplex])

        for edge in edges:
            ax_complex.plot(edge[:, 0], edge[:, 1], edge[:, 2], 'k-', linewidth=0.7)
        if triangles:
            poly_collection = Poly3DCollection(triangles, facecolors='r', linewidths=0, alpha=0.2)
            ax_complex.add_collection3d(poly_collection)

        ax_complex.set_title(f"Rips Complex, $\\epsilon = {current_epsilon:.2f}$")
        # 自动确定坐标轴范围
        min_vals, max_vals = points.min(axis=0), points.max(axis=0)
        ax_complex.set_xlim(min_vals[0] - 0.1, max_vals[0] + 0.1)
        ax_complex.set_ylim(min_vals[1] - 0.1, max_vals[1] + 0.1)
        ax_complex.set_zlim(min_vals[2] - 0.1, max_vals[2] + 0.1)

        epsilon_line.set_xdata([current_epsilon, current_epsilon])
        return fig,

    # 创建并保存
    ani = FuncAnimation(fig, update, frames=num_frames, interval=100, blit=False)
    output_filename = f"animation_window_{start_index}_to_{end_index-1}.gif"
    print(f"正在保存动画到 {output_filename}... (这可能需要一些时间)")
    ani.save(output_filename, writer='pillow', fps=10)
    print("动画制作完成！")
    plt.close(fig) # 关闭图形，防止显示

# --- 3. 主循环：滑动窗口并检验空腔 ---

max_start_index = len(df) - WINDOW_SIZE
for i in range(max_start_index + 1):
    start_index = i
    end_index = i + WINDOW_SIZE
    
    print(f"\n--- 正在检验窗口:索引 {start_index} 到 {end_index-1} ---")

    window_df = df.iloc[start_index:end_index]
    corr_matrix = window_df.corr().fillna(0)
    distance_matrix_values = np.sqrt(np.clip(2 * (1 - corr_matrix.values), a_min=0.0, a_max=4.0))

    upper_triangle_indices = np.triu_indices_from(distance_matrix_values, k=1)
    if len(upper_triangle_indices[0]) == 0: continue
    distances_off_diagonal = distance_matrix_values[upper_triangle_indices]
    max_edge_length = np.percentile(distances_off_diagonal, 30)
    points = MDS(n_components=3, dissimilarity='precomputed', random_state=42, normalized_stress=False).fit_transform(distance_matrix_values)
    

    rips_complex_check = gd.RipsComplex(points = points, max_edge_length=max_edge_length)
    simplex_tree_check = rips_complex_check.create_simplex_tree(max_dimension=3)
    simplex_tree_check.compute_persistence()
    
    # 获取所有持久性对，包括无限woshel的
    all_persistence = simplex_tree_check.persistence()
    
    has_significant_hole = False
    # 遍历所有维度的所有特征
    for dim, (birth, death) in all_persistence:
        # 只关心二维特征 (H2)
        if dim == 2:
            # 检查无限特征
            if death == float('inf'):
                print(f"发现无限二维空腔 (诞生于 {birth:.4f})。这是一个显著特征。")
                has_significant_hole = True
                break # 找到一个就够了
            
            # 检查有限但持久性很长的特征
            persistence = death - birth
            if persistence > H2_PERSISTENCE_THRESHOLD:
                print(f"发现有限二维空腔。持久性: {persistence:.4f} > 阈值: {H2_PERSISTENCE_THRESHOLD}")
                has_significant_hole = True
                break # 找到一个就够了

    if not has_significant_hole:
        print(f"*** 成功！窗口 [{start_index}, {end_index-1}] 未发现显著二维空腔。***")
        generate_animation_for_window(points, max_edge_length, start_index, end_index)
        print("\n任务完成，程序已停止。")
        break
    else:
        print(f"窗口 [{start_index}, {end_index-1}] 存在显著空腔。继续下一个窗口...")

else:
    print("\n扫描完所有窗口，未找到任何没有显著二维空腔的区间。")


ValueError: could not convert string to float: '{23CE2CCE-85E1-4AAF-AFDA-2A1C346207E1}'