# DBSCAN聚类算法性能分析

本笔记本用于分析串行和并行DBSCAN算法的性能差异，包括执行时间、内存使用和可扩展性分析。

## 1. 环境设置

In [None]:
import sys
import os
from pathlib import Path

# 添加项目根目录到Python路径
project_root = Path.cwd().parent
sys.path.insert(0, str(project_root))

# 导入所需库
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import time
import json
import warnings
import multiprocessing as mp
from datetime import datetime

warnings.filterwarnings('ignore')

# 设置中文字体和图表样式
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")
sns.set_palette("husl")

print("Python版本:", sys.version)
print("NumPy版本:", np.__version__)
print("Pandas版本:", pd.__version__)
print("CPU核心数:", mp.cpu_count())

## 2. 导入项目模块

In [None]:
# 导入项目模块
from src.clustering.dbscan_sequential import DBSCANSequential
from src.clustering.dbscan_parallel import DBSCANParallel
from src.clustering.utils import compute_distance_matrix, build_spatial_index

from src.profiling.time_profiler import TimeProfiler, profile_function
from src.profiling.memory_profiler import MemoryProfiler, track_memory_usage
from src.profiling.performance_analyzer import PerformanceAnalyzer, compare_implementations

from src.visualization.plot_performance import PerformanceVisualizer

## 3. 生成测试数据

In [None]:
def generate_test_datasets():
    """生成不同大小的测试数据集"""
    datasets = {}
    
    # 数据集大小
    sizes = [100, 500, 1000, 2000, 5000, 10000, 20000]
    
    print("生成测试数据集...")
    
    for size in sizes:
        # 生成聚类数据（模拟热点区域）
        np.random.seed(42)
        
        # 创建3个聚类中心
        cluster_centers = np.array([
            [39.9, 116.3],  # 市中心
            [40.0, 116.5],  # 北部区域
            [39.8, 116.4]   # 南部区域
        ])
        
        points = []
        
        # 为每个聚类生成点
        cluster_sizes = np.random.dirichlet([1, 1, 1]) * size
        cluster_sizes = cluster_sizes.astype(int)
        cluster_sizes[-1] = size - sum(cluster_sizes[:-1])  # 确保总数正确
        
        for i, center in enumerate(cluster_centers):
            n_points = cluster_sizes[i]
            if n_points > 0:
                cluster_points = np.random.randn(n_points, 2) * 0.01 + center
                points.append(cluster_points)
        
        # 添加一些噪声点
        n_noise = int(size * 0.1)  # 10%的噪声点
        noise_points = np.random.uniform(
            low=[39.7, 116.2],
            high=[40.1, 116.7],
            size=(n_noise, 2)
        )
        points.append(noise_points)
        
        # 合并所有点
        all_points = np.vstack(points)
        
        # 随机打乱
        np.random.shuffle(all_points)
        
        datasets[f"dataset_{size}"] = {
            "points": all_points,
            "size": len(all_points),
            "n_clusters": 3,
            "n_noise": n_noise
        }
        
        print(f"  数据集 {size}: {len(all_points)} 个点 (3个聚类 + {n_noise}个噪声点)")
    
    return datasets

# 生成测试数据
test_datasets = generate_test_datasets()

In [None]:
# 可视化测试数据集
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
axes = axes.flatten()

for idx, (name, data) in enumerate(list(test_datasets.items())[:7]):  # 显示前7个数据集
    ax = axes[idx]
    points = data["points"]
    
    ax.scatter(points[:, 1], points[:, 0], 
               s=20, alpha=0.6, edgecolors='none')
    
    ax.set_xlabel('经度')
    ax.set_ylabel('纬度')
    ax.set_title(f'{data["size"]} 个点')
    ax.grid(True, alpha=0.3)

# 隐藏多余的子图
for idx in range(len(test_datasets), len(axes)):
    axes[idx].axis('off')

plt.suptitle('测试数据集可视化', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

## 4. 串行DBSCAN基准测试

In [None]:
def benchmark_sequential_dbscan(datasets, eps=0.01, min_samples=5):
    """运行串行DBSCAN基准测试"""
    print("=" * 60)
    print("串行DBSCAN基准测试")
    print("=" * 60)
    
    results = {}
    
    for name, data in datasets.items():
        points = data["points"]
        size = data["size"]
        
        print(f"\n测试数据集: {name} ({size} 个点)")
        
        # 创建DBSCAN聚类器
        dbscan = DBSCANSequential(
            eps=eps,
            min_samples=min_samples,
            metric='euclidean'
        )
        
        # 使用性能分析器
        time_profiler = TimeProfiler(enable_profiling=False)
        memory_profiler = MemoryProfiler(track_detailed=False)
        
        # 开始内存分析
        memory_profiler.start()
        memory_before = memory_profiler.take_snapshot("开始前")
        
        # 执行聚类
        start_time = time.time()
        dbscan.fit(points)
        execution_time = time.time() - start_time
        
        # 内存分析
        memory_after = memory_profiler.take_snapshot("结束后")
        memory_usage = memory_after.memory_usage_mb - memory_before.memory_usage_mb
        
        # 停止内存分析
        memory_profiler.stop()
        
        # 获取聚类结果
        stats = dbscan.get_cluster_stats()
        
        print(f"  执行时间: {execution_time:.4f} 秒")
        print(f"  内存使用: {memory_usage:.2f} MB")
        print(f"  聚类数量: {stats['n_clusters']}")
        print(f"  噪声点数量: {stats['n_noise']}")
        
        # 保存结果
        results[name] = {
            "size": size,
            "execution_time": execution_time,
            "memory_usage_mb": memory_usage,
            "peak_memory_mb": memory_after.peak_memory_mb,
            "n_clusters": stats["n_clusters"],
            "n_noise": stats["n_noise"],
            "n_core_points": stats["n_core_points"]
        }
    
    return results

# 运行串行基准测试
sequential_results = benchmark_sequential_dbscan(test_datasets)

In [None]:
# 分析串行版本的可扩展性
if sequential_results:
    # 创建DataFrame
    seq_df = pd.DataFrame.from_dict(sequential_results, orient='index')
    seq_df = seq_df.sort_values('size')
    
    print("串行DBSCAN性能分析:")
    print("-" * 40)
    display(seq_df)
    
    # 可视化可扩展性
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    # 1. 执行时间 vs 数据大小
    ax1 = axes[0]
    ax1.plot(seq_df['size'], seq_df['execution_time'], 'bo-', 
             linewidth=2, markersize=8)
    ax1.set_xlabel('数据点数量')
    ax1.set_ylabel('执行时间 (秒)')
    ax1.set_title('执行时间 vs 数据大小')
    ax1.grid(True, alpha=0.3)
    ax1.set_xscale('log')
    ax1.set_yscale('log')
    
    # 2. 内存使用 vs 数据大小
    ax2 = axes[1]
    ax2.plot(seq_df['size'], seq_df['memory_usage_mb'], 'ro-', 
             linewidth=2, markersize=8)
    ax2.set_xlabel('数据点数量')
    ax2.set_ylabel('内存使用 (MB)')
    ax2.set_title('内存使用 vs 数据大小')
    ax2.grid(True, alpha=0.3)
    ax2.set_xscale('log')
    ax2.set_yscale('log')
    
    # 3. 时间复杂度分析
    ax3 = axes[2]
    sizes = seq_df['size'].values
    times = seq_df['execution_time'].values
    
    # 计算不同复杂度模型的拟合
    # O(n) 参考线
    n_fit = times[0] * sizes / sizes[0]
    # O(n²) 参考线
    n2_fit = times[0] * (sizes / sizes[0]) ** 2
    
    ax3.loglog(sizes, times, 'ko-', linewidth=2, markersize=8, label='实际时间')
    ax3.loglog(sizes, n_fit, 'r--', alpha=0.7, label='O(n)')
    ax3.loglog(sizes, n2_fit, 'g--', alpha=0.7, label='O(n²)')
    
    ax3.set_xlabel('数据点数量 (log)')
    ax3.set_ylabel('执行时间 (log)')
    ax3.set_title('时间复杂度分析')
    ax3.grid(True, alpha=0.3, which='both')
    ax3.legend()
    
    plt.suptitle('串行DBSCAN可扩展性分析', fontsize=14, fontweight='bold')
    plt.tight_layout()
    plt.show()
    
    # 计算实际复杂度
    print("\n时间复杂度估计:")
    print("-" * 40)
    
    # 使用最后两个点估计增长率
    if len(sizes) >= 2:
        n1, n2 = sizes[-2], sizes[-1]
        t1, t2 = times[-2], times[-1]
        
        # 估计复杂度指数
        exponent = np.log(t2/t1) / np.log(n2/n1)
        print(f"复杂度指数: {exponent:.3f}")
        
        if exponent < 1.2:
            print("推断复杂度: 接近 O(n)")
        elif exponent < 1.8:
            print(f"推断复杂度: O(n^{exponent:.2f})，接近 O(n log n)")
        else:
            print(f"推断复杂度: O(n^{exponent:.2f})，接近 O(n²)")

## 5. 并行DBSCAN性能测试

In [None]:
def benchmark_parallel_dbscan(datasets, n_jobs_list=[1, 2, 4, 8], 
                            eps=0.01, min_samples=5):
    """运行并行DBSCAN基准测试"""
    print("=" * 60)
    print("并行DBSCAN基准测试")
    print("=" * 60)
    
    results = {}
    
    # 选择中等大小的数据集进行并行测试
    test_dataset_name = "dataset_5000"
    if test_dataset_name not in datasets:
        test_dataset_name = list(datasets.keys())[3]  # 选择第4个数据集
    
    data = datasets[test_dataset_name]
    points = data["points"]
    size = data["size"]
    
    print(f"测试数据集: {test_dataset_name} ({size} 个点)")
    print(f"测试不同的工作进程数: {n_jobs_list}")
    
    for n_jobs in n_jobs_list:
        print(f"\n工作进程数: {n_jobs}")
        
        # 创建并行DBSCAN聚类器
        dbscan = DBSCANParallel(
            eps=eps,
            min_samples=min_samples,
            metric='euclidean',
            n_jobs=n_jobs,
            chunk_size=1000
        )
        
        # 使用性能分析器
        time_profiler = TimeProfiler(enable_profiling=False)
        memory_profiler = MemoryProfiler(track_detailed=False)
        
        # 开始内存分析
        memory_profiler.start()
        memory_before = memory_profiler.take_snapshot("开始前")
        
        # 执行聚类
        start_time = time.time()
        dbscan.fit(points)
        execution_time = time.time() - start_time
        
        # 内存分析
        memory_after = memory_profiler.take_snapshot("结束后")
        memory_usage = memory_after.memory_usage_mb - memory_before.memory_usage_mb
        
        # 停止内存分析
        memory_profiler.stop()
        
        # 获取聚类结果
        stats = dbscan.get_cluster_stats()
        perf_stats = dbscan.get_performance_stats()
        
        print(f"  执行时间: {execution_time:.4f} 秒")
        print(f"  内存使用: {memory_usage:.2f} MB")
        print(f"  聚类数量: {stats['n_clusters']}")
        print(f"  噪声点数量: {stats['n_noise']}")
        
        # 保存结果
        results[n_jobs] = {
            "n_jobs": n_jobs,
            "execution_time": execution_time,
            "memory_usage_mb": memory_usage,
            "peak_memory_mb": memory_after.peak_memory_mb,
            "n_clusters": stats["n_clusters"],
            "n_noise": stats["n_noise"],
            "n_core_points": stats["n_core_points"]
        }
    
    return results, test_dataset_name

# 运行并行基准测试
parallel_results, test_dataset_name = benchmark_parallel_dbscan(test_datasets)

In [None]:
# 分析并行效率
if parallel_results:
    # 创建DataFrame
    par_df = pd.DataFrame.from_dict(parallel_results, orient='index')
    par_df = par_df.sort_values('n_jobs')
    
    print("并行DBSCAN性能分析:")
    print("-" * 40)
    display(par_df)
    
    # 获取串行版本的时间作为基准
    if test_dataset_name in sequential_results:
        serial_time = sequential_results[test_dataset_name]["execution_time"]
        print(f"\n串行版本执行时间: {serial_time:.4f} 秒")
        
        # 计算加速比和并行效率
        par_df['speedup'] = serial_time / par_df['execution_time']
        par_df['efficiency'] = par_df['speedup'] / par_df['n_jobs']
        
        print("\n并行性能指标:")
        print("-" * 40)
        display(par_df[['n_jobs', 'execution_time', 'speedup', 'efficiency']])
        
        # 可视化并行性能
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))
        
        # 1. 执行时间对比
        ax1 = axes[0]
        x_pos = range(len(par_df))
        ax1.bar(x_pos, par_df['execution_time'], 
               color='skyblue', alpha=0.7)
        ax1.axhline(y=serial_time, color='red', linestyle='--', 
                   linewidth=2, label=f'串行时间: {serial_time:.3f}s')
        ax1.set_xlabel('工作进程数')
        ax1.set_ylabel('执行时间 (秒)')
        ax1.set_title('执行时间对比')
        ax1.set_xticks(x_pos)
        ax1.set_xticklabels(par_df['n_jobs'])
        ax1.legend()
        
        # 在柱状图上添加数值
        for i, v in enumerate(par_df['execution_time']):
            ax1.text(i, v * 1.02, f'{v:.3f}', 
                    ha='center', va='bottom', fontsize=9)
        
        # 2. 加速比
        ax2 = axes[1]
        bars = ax2.bar(x_pos, par_df['speedup'], 
                      color='lightgreen', alpha=0.7)
        ax2.plot(x_pos, par_df['n_jobs'], 'ro-', 
                linewidth=2, markersize=8, label='理想加速比')
        ax2.set_xlabel('工作进程数')
        ax2.set_ylabel('加速比')
        ax2.set_title('并行加速比')
        ax2.set_xticks(x_pos)
        ax2.set_xticklabels(par_df['n_jobs'])
        ax2.legend()
        
        # 在柱状图上添加数值
        for i, v in enumerate(par_df['speedup']):
            ax2.text(i, v * 1.02, f'{v:.2f}x', 
                    ha='center', va='bottom', fontsize=9)
        
        # 3. 并行效率
        ax3 = axes[2]
        bars = ax3.bar(x_pos, par_df['efficiency'], 
                      color='gold', alpha=0.7)
        ax3.axhline(y=1.0, color='red', linestyle='--', 
                   linewidth=2, label='理想效率 (100%)')
        ax3.set_xlabel('工作进程数')
        ax3.set_ylabel('并行效率')
        ax3.set_title('并行效率')
        ax3.set_xticks(x_pos)
        ax3.set_xticklabels(par_df['n_jobs'])
        ax3.set_ylim(0, 1.2)
        ax3.legend()
        
        # 在柱状图上添加百分比
        for i, v in enumerate(par_df['efficiency']):
            ax3.text(i, v * 1.02, f'{v:.1%}', 
                    ha='center', va='bottom', fontsize=9)
        
        plt.suptitle(f'并行DBSCAN性能分析 ({test_dataset_name})', 
                    fontsize=14, fontweight='bold')
        plt.tight_layout()
        plt.show()
        
        # 分析并行瓶颈
        print("\n并行性能分析:")
        print("-" * 40)
        
        # 找出最优配置
        best_idx = par_df['speedup'].idxmax()
        best_config = par_df.loc[best_idx]
        
        print(f"最优配置: {best_config['n_jobs']} 个工作进程")
        print(f"最大加速比: {best_config['speedup']:.2f}x")
        print(f"并行效率: {best_config['efficiency']:.1%}")
        
        # 计算Amdahl定律预测
        # 假设串行部分比例为 p
        # 使用实际数据拟合Amdahl定律
        def amdahl_speedup(p, n):
            return 1 / ((1 - p) + p / n)
        
        # 使用最小二乘法估计串行比例
        from scipy.optimize import curve_fit
        try:
            n_jobs_array = par_df['n_jobs'].values
            speedup_array = par_df['speedup'].values
            
            # 拟合Amdahl定律
            popt, _ = curve_fit(amdahl_speedup, n_jobs_array, speedup_array, 
                               bounds=(0, 1))
            serial_fraction = popt[0]
            
            print(f"\nAmdahl定律分析:")
            print(f"  估计的串行部分比例: {serial_fraction:.3f}")
            print(f"  最大理论加速比 (无限核心): {1/serial_fraction:.2f}x")
            
            # 绘制Amdahl定律拟合
            fig, ax = plt.subplots(figsize=(8, 6))
            
            n_range = np.linspace(1, max(n_jobs_array) * 2, 100)
            theoretical_speedup = amdahl_speedup(serial_fraction, n_range)
            
            ax.plot(n_range, theoretical_speedup, 'r-', 
                   linewidth=2, label='Amdahl定律拟合')
            ax.scatter(n_jobs_array, speedup_array, s=100, 
                      color='blue', alpha=0.7, label='实际数据')
            ax.plot(n_jobs_array, n_jobs_array, 'g--', 
                   linewidth=2, alpha=0.5, label='理想加速比')
            
            ax.set_xlabel('工作进程数')
            ax.set_ylabel('加速比')
            ax.set_title('Amdahl定律分析')
            ax.grid(True, alpha=0.3)
            ax.legend()
            
            plt.tight_layout()
            plt.show()
            
        except Exception as e:
            print(f"Amdahl定律分析失败: {e}")

## 6. 综合性能比较

In [None]:
def run_comprehensive_comparison(datasets):
    """运行综合性能比较"""
    print("=" * 60)
    print("综合性能比较")
    print("=" * 60)
    
    # 选择中等大小的数据集
    test_dataset_name = "dataset_5000"
    if test_dataset_name not in datasets:
        test_dataset_name = list(datasets.keys())[3]
    
    data = datasets[test_dataset_name]
    points = data["points"]
    
    print(f"使用数据集: {test_dataset_name} ({len(points)} 个点)")
    
    # 创建性能分析器
    analyzer = PerformanceAnalyzer(output_dir="../results")
    
    # 定义要比较的实现
    implementations = {
        'DBSCAN_Sequential': lambda data: DBSCANSequential(
            eps=0.01, min_samples=5, metric='euclidean'
        ).fit(data),
        
        'DBSCAN_Parallel_2jobs': lambda data: DBSCANParallel(
            eps=0.01, min_samples=5, metric='euclidean', n_jobs=2
        ).fit(data),
        
        'DBSCAN_Parallel_4jobs': lambda data: DBSCANParallel(
            eps=0.01, min_samples=5, metric='euclidean', n_jobs=4
        ).fit(data),
        
        'DBSCAN_Parallel_8jobs': lambda data: DBSCANParallel(
            eps=0.01, min_samples=5, metric='euclidean', n_jobs=8
        ).fit(data),
    }
    
    # 运行比较
    print("\n运行性能比较...")
    comparison_results = compare_implementations(
        implementations=implementations,
        test_data=points,
        test_name="dbscan_comprehensive",
        output_dir="../results"
    )
    
    print("\n比较结果:")
    print("-" * 40)
    display(comparison_results)
    
    return comparison_results, implementations

# 运行综合比较
comparison_results, implementations = run_comprehensive_comparison(test_datasets)

In [None]:
# 可视化综合比较结果
if 'comparison_results' in locals() and not comparison_results.empty:
    visualizer = PerformanceVisualizer()
    
    # 准备性能数据
    perf_data = {}
    for _, row in comparison_results.iterrows():
        perf_data[row['implementation']] = {
            'execution_time': row['execution_time'],
            'memory_usage_mb': row['memory_usage_mb'],
            'peak_memory_mb': row['peak_memory_mb'],
            'accuracy': row.get('accuracy', 1.0)
        }
    
    # 创建综合比较图
    fig = visualizer.plot_execution_time_comparison(
        perf_data,
        title="DBSCAN算法综合性能比较",
        log_scale=True
    )
    
    # 分析性能提升
    print("\n性能提升分析:")
    print("-" * 40)
    
    if 'time_speedup' in comparison_results.columns:
        best_impl = comparison_results.loc[comparison_results['time_speedup'].idxmax()]
        worst_impl = comparison_results.loc[comparison_results['time_speedup'].idxmin()]
        
        print(f"最快实现: {best_impl['implementation']}")
        print(f"  执行时间: {best_impl['execution_time']:.4f} 秒")
        print(f"  速度提升: {best_impl['time_speedup']:.2f}x (相对于最快基准)")
        print()
        
        print(f"最慢实现: {worst_impl['implementation']}")
        print(f"  执行时间: {worst_impl['execution_time']:.4f} 秒")
        print(f"  速度提升: {worst_impl['time_speedup']:.2f}x (相对于最快基准)")
        print()
        
        # 计算相对提升
        speedup_ratio = worst_impl['execution_time'] / best_impl['execution_time']
        print(f"最快 vs 最慢: 加速比 {speedup_ratio:.2f}x")
        
        # 内存使用比较
        best_mem_impl = comparison_results.loc[comparison_results['memory_usage_mb'].idxmin()]
        worst_mem_impl = comparison_results.loc[comparison_results['memory_usage_mb'].idxmax()]
        
        print(f"\n内存使用比较:")
        print(f"  最省内存: {best_mem_impl['implementation']} ({best_mem_impl['memory_usage_mb']:.2f} MB)")
        print(f"  最耗内存: {worst_mem_impl['implementation']} ({worst_mem_impl['memory_usage_mb']:.2f} MB)")
        print(f"  内存比: {worst_mem_impl['memory_usage_mb'] / best_mem_impl['memory_usage_mb']:.2f}x")

## 7. 性能瓶颈分析

In [None]:
def analyze_performance_bottlenecks():
    """分析性能瓶颈"""
    print("=" * 60)
    print("性能瓶颈分析")
    print("=" * 60)
    
    # 使用一个较小的数据集进行详细分析
    test_points = test_datasets["dataset_1000"]["points"]
    
    # 分析串行版本的瓶颈
    print("\n分析串行DBSCAN性能瓶颈:")
    print("-" * 40)
    
    # 使用详细性能分析
    dbscan_sequential = DBSCANSequential(eps=0.01, min_samples=5)
    
    @profile_function(detailed=True)
    def run_sequential():
        return dbscan_sequential.fit(test_points)
    
    result, time_analysis = run_sequential()
    
    print(f"执行时间: {time_analysis['execution_time']:.4f} 秒")
    
    if 'top_functions' in time_analysis:
        print("\n最耗时的函数:")
        for i, func_info in enumerate(time_analysis['top_functions'][:5]):
            print(f"  {i+1}. {func_info['function']}: {func_info['cumtime']:.4f} 秒")
    
    # 内存瓶颈分析
    print("\n内存瓶颈分析:")
    print("-" * 40)
    
    memory_profiler = MemoryProfiler(track_detailed=True)
    
    with memory_profiler:
        memory_before = memory_profiler.take_snapshot("开始前")
        dbscan_sequential.fit(test_points)
        memory_after = memory_profiler.take_snapshot("结束后")
    
    memory_usage = memory_after.memory_usage_mb - memory_before.memory_usage_mb
    print(f"内存使用: {memory_usage:.2f} MB")
    print(f"峰值内存: {memory_after.peak_memory_mb:.2f} MB")
    
    # 查找内存瓶颈
    bottlenecks = memory_profiler.find_memory_bottlenecks(threshold_mb=1.0)
    if bottlenecks:
        print("\n内存瓶颈点:")
        for i, bottleneck in enumerate(bottlenecks[:3]):
            print(f"  {i+1}. {bottleneck['consumer']}: {bottleneck['size_mb']:.2f} MB")
    
    # 分析内存模式
    patterns = memory_profiler.analyze_memory_patterns()
    if patterns:
        print("\n内存使用模式:")
        print(f"  总内存增长: {patterns.get('total_memory_growth_mb', 0):.2f} MB")
        print(f"  内存增长率: {patterns.get('memory_growth_rate_mb_per_sec', 0):.2f} MB/秒")
        print(f"  内存峰值数: {patterns.get('n_memory_peaks', 0)}")
    
    return time_analysis, memory_profiler

# 运行瓶颈分析
time_analysis, memory_profiler = analyze_performance_bottlenecks()

In [None]:
# 生成优化建议
if 'time_analysis' in locals() and 'memory_profiler' in locals():
    print("\n" + "=" * 60)
    print("优化建议")
    print("=" * 60)
    
    suggestions = []
    
    # 基于时间分析的优化建议
    if 'top_functions' in time_analysis:
        top_funcs = time_analysis['top_functions']
        
        for func_info in top_funcs[:3]:
            func_name = func_info['function']
            time_spent = func_info['cumtime']
            time_percent = (time_spent / time_analysis['execution_time']) * 100
            
            if time_percent > 20:
                suggestions.append(f"函数 '{func_name}' 占用 {time_percent:.1f}% 的执行时间，是主要瓶颈")
                
                # 具体优化建议
                if 'distance' in func_name.lower():
                    suggestions.append("  → 使用KDTree/BallTree加速距离计算")
                    suggestions.append("  → 考虑使用向量化计算")
                    suggestions.append("  → 缓存距离计算结果")
                
                if 'neighbor' in func_name.lower():
                    suggestions.append("  → 使用空间索引加速邻居搜索")
                    suggestions.append("  → 预计算距离矩阵")
                    suggestions.append("  → 并行化邻居查询")
                
                if 'loop' in func_name.lower():
                    suggestions.append("  → 使用向量化操作代替Python循环")
                    suggestions.append("  → 考虑使用Numba进行JIT编译")
    
    # 基于内存分析的优化建议
    bottlenecks = memory_profiler.find_memory_bottlenecks(threshold_mb=5.0)
    if bottlenecks:
        suggestions.append(f"发现 {len(bottlenecks)} 个内存瓶颈点")
        
        for bottleneck in bottlenecks[:2]:
            suggestions.append(f"  → {bottleneck['consumer']} 占用 {bottleneck['size_mb']:.2f} MB")
            suggestions.append("    - 考虑使用更紧凑的数据类型")
            suggestions.append("    - 及时释放不再使用的对象")
            suggestions.append("    - 使用内存映射文件处理大数据")
    
    # 通用优化建议
    suggestions.extend([
        "通用优化建议:",
        "1. 使用numpy数组代替Python列表存储数值数据",
        "2. 使用适当的数据类型（如float32代替float64）",
        "3. 对于大规模数据，采用分批处理策略",
        "4. 利用多核CPU进行并行计算",
        "5. 使用空间索引（如KDTree）加速空间查询",
        "6. 缓存重复的计算结果",
        "7. 使用生成器处理流式数据，减少内存占用"
    ])
    
    # 并行优化建议
    suggestions.extend([
        "\n并行优化建议:",
        f"1. 系统有 {mp.cpu_count()} 个CPU核心，建议使用 {max(1, mp.cpu_count() - 2)} 个工作进程",
        "2. 使用共享内存减少进程间数据复制",
        "3. 实现动态负载均衡，避免工作进程空闲",
        "4. 考虑任务粒度，避免任务过小导致通信开销过大",
        "5. 使用异步I/O重叠计算和通信"
    ])
    
    # 打印所有建议
    for suggestion in suggestions:
        print(suggestion)

## 8. 结论与建议

In [None]:
print("=" * 60)
print("性能分析结论")
print("=" * 60)

conclusions = []

# 串行性能结论
if 'sequential_results' in locals() and sequential_results:
    seq_df = pd.DataFrame.from_dict(sequential_results, orient='index')
    avg_time = seq_df['execution_time'].mean()
    avg_memory = seq_df['memory_usage_mb'].mean()
    
    conclusions.append("串行DBSCAN性能:")
    conclusions.append(f"  • 平均执行时间: {avg_time:.4f} 秒")
    conclusions.append(f"  • 平均内存使用: {avg_memory:.2f} MB")
    conclusions.append(f"  • 时间复杂度: 接近 O(n²)（基于实际测量）")

# 并行性能结论
if 'parallel_results' in locals() and parallel_results:
    par_df = pd.DataFrame.from_dict(parallel_results, orient='index')
    max_speedup = par_df['speedup'].max() if 'speedup' in par_df.columns else 0
    max_efficiency = par_df['efficiency'].max() if 'efficiency' in par_df.columns else 0
    
    conclusions.append("\n并行DBSCAN性能:")
    conclusions.append(f"  • 最大加速比: {max_speedup:.2f}x")
    conclusions.append(f"  • 最大并行效率: {max_efficiency:.1%}")
    conclusions.append(f"  • 最优工作进程数: {par_df.loc[par_df['speedup'].idxmax(), 'n_jobs']}")

# 综合比较结论
if 'comparison_results' in locals() and not comparison_results.empty:
    fastest_impl = comparison_results.loc[comparison_results['execution_time'].idxmin()]
    slowest_impl = comparison_results.loc[comparison_results['execution_time'].idxmax()]
    
    speedup_ratio = slowest_impl['execution_time'] / fastest_impl['execution_time']
    
    conclusions.append("\n算法实现比较:")
    conclusions.append(f"  • 最快实现: {fastest_impl['implementation']}")
    conclusions.append(f"  • 最慢实现: {slowest_impl['implementation']}")
    conclusions.append(f"  • 性能差异: {speedup_ratio:.2f}x")

# 瓶颈分析结论
if 'time_analysis' in locals() and 'memory_profiler' in locals():
    conclusions.append("\n主要性能瓶颈:")
    
    if 'top_functions' in time_analysis:
        main_bottleneck = time_analysis['top_functions'][0]
        time_percent = (main_bottleneck['cumtime'] / time_analysis['execution_time']) * 100
        conclusions.append(f"  • 时间瓶颈: {main_bottleneck['function']} ({time_percent:.1f}%)")
    
    bottlenecks = memory_profiler.find_memory_bottlenecks(threshold_mb=5.0)
    if bottlenecks:
        main_memory_bottleneck = bottlenecks[0]
        conclusions.append(f"  • 内存瓶颈: {main_memory_bottleneck['consumer']} ({main_memory_bottleneck['size_mb']:.2f} MB)")

# 优化建议总结
conclusions.append("\n关键优化建议:")
conclusions.append("  1. 使用空间索引加速邻居搜索")
conclusions.append("  2. 利用多核CPU进行并行计算")
conclusions.append(f"  3. 建议使用 {max(1, mp.cpu_count() - 2)} 个工作进程")
conclusions.append("  4. 使用向量化操作代替Python循环")
conclusions.append("  5. 及时释放大对象，优化内存使用")

# 打印所有结论
for conclusion in conclusions:
    print(conclusion)

# 保存分析结果
print("\n" + "=" * 60)
print("结果保存")
print("=" * 60)

results_dir = "../results/notebooks"
os.makedirs(results_dir, exist_ok=True)

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# 保存串行结果
if 'sequential_results' in locals():
    seq_df.to_csv(f"{results_dir}/sequential_results_{timestamp}.csv")

# 保存并行结果
if 'parallel_results' in locals():
    par_df.to_csv(f"{results_dir}/parallel_results_{timestamp}.csv")

# 保存比较结果
if 'comparison_results' in locals():
    comparison_results.to_csv(f"{results_dir}/comparison_results_{timestamp}.csv")

# 保存结论
with open(f"{results_dir}/conclusions_{timestamp}.txt", "w") as f:
    for conclusion in conclusions:
        f.write(conclusion + "\n")

print(f"分析结果已保存到: {results_dir}")