In [2]:
# 基础DBSCAN实现
import numpy as np
import pandas as pd
import time
from collections import deque
import warnings
warnings.filterwarnings('ignore')

print("=" * 50)
print("基础DBSCAN算法实现")
print("=" * 50)

基础DBSCAN算法实现


In [3]:
class BasicDBSCAN:
    """基础版本的DBSCAN实现"""
    
    def __init__(self, eps=0.5, min_samples=5):
        self.eps = eps
        self.min_samples = min_samples
        self.labels_ = None
        
    def _euclidean_distance(self, a, b):
        """计算欧氏距离"""
        return np.sqrt(np.sum((a - b) ** 2))
    
    def _region_query(self, X, point_idx):
        """查找邻域内的点"""
        neighbors = []
        point = X[point_idx]
        
        for i in range(len(X)):
            if self._euclidean_distance(point, X[i]) < self.eps:
                neighbors.append(i)
                
        return neighbors
    
    def fit_predict(self, X):
        """执行DBSCAN聚类"""
        n_samples = len(X)
        self.labels_ = np.full(n_samples, -1)  # -1表示噪声点
        cluster_id = 0
        
        for i in range(n_samples):
            if self.labels_[i] != -1:
                continue  # 已经处理过
                
            # 查找邻域
            neighbors = self._region_query(X, i)
            
            if len(neighbors) < self.min_samples:
                self.labels_[i] = -1  # 标记为噪声
                continue
                
            # 发现新簇
            self.labels_[i] = cluster_id
            seed_set = deque(neighbors)
            seed_set.remove(i)  # 移除当前点
            
            # 扩展簇
            while seed_set:
                j = seed_set.popleft()
                
                if self.labels_[j] == -1:
                    self.labels_[j] = cluster_id
                    
                if self.labels_[j] != -1:
                    continue
                    
                self.labels_[j] = cluster_id
                
                # 查找j的邻域
                j_neighbors = self._region_query(X, j)
                
                if len(j_neighbors) >= self.min_samples:
                    for n in j_neighbors:
                        if self.labels_[n] == -1:
                            seed_set.append(n)
                            
            cluster_id += 1
            
        return self.labels_

In [4]:
# 测试代码
print("\n1. 加载测试数据...")
try:
    df = pd.read_csv("../data/processed/data_1000.csv")
    X = df[['LAT_scaled', 'LON_scaled']].values
    print(f"  加载 {len(X)} 条数据")
except:
    # 创建测试数据
    print("  创建模拟数据...")
    from sklearn.datasets import make_blobs
    X, _ = make_blobs(n_samples=1000, centers=3, random_state=42)
    print(f"  创建 {len(X)} 条模拟数据")

print("\n2. 运行基础DBSCAN...")


1. 加载测试数据...
  加载 1000 条数据

2. 运行基础DBSCAN...


In [5]:
dbscan = BasicDBSCAN(eps=0.3, min_samples=5)

start_time = time.time()
labels = dbscan.fit_predict(X)
end_time = time.time()

print(f"  运行时间: {end_time - start_time:.2f} 秒")
print(f"  聚类数量: {len(set(labels[labels != -1]))}")
print(f"  噪声点数量: {sum(labels == -1)}")
print(f"  噪声点比例: {sum(labels == -1) / len(labels):.2%}")

# 保存结果
np.save("../results/metrics/basic_labels.npy", labels)
print("\n 基础DBSCAN实现完成！结果已保存")

  运行时间: 0.04 秒
  聚类数量: 5
  噪声点数量: 0
  噪声点比例: 0.00%

 基础DBSCAN实现完成！结果已保存
