# T1: DBSCAN聚类算法实现

本实验实现DBSCAN（Density-Based Spatial Clustering of Applications with Noise）聚类算法，并比较Manhattan距离和Euclidean距离下的聚类效果。


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False


## 1. 实现DBSCAN算法


In [None]:
class DBSCAN:
    """
    DBSCAN聚类算法实现
    
    参数:
        eps: 邻域半径
        min_samples: 形成核心点所需的最小样本数
        metric: 距离度量方式 ('euclidean' 或 'manhattan')
    """
    
    def __init__(self, eps=0.5, min_samples=5, metric='euclidean'):
        self.eps = eps
        self.min_samples = min_samples
        self.metric = metric
        self.labels_ = None
        
    def _distance(self, p1, p2):
        """计算两点之间的距离"""
        if self.metric == 'euclidean':
            return np.sqrt(np.sum((p1 - p2) ** 2))
        elif self.metric == 'manhattan':
            return np.sum(np.abs(p1 - p2))
        else:
            raise ValueError(f"Unsupported metric: {self.metric}")
    
    def _get_neighbors(self, point_idx, X):
        """获取指定点的所有邻居（在eps范围内的点）"""
        neighbors = []
        for i, other_point in enumerate(X):
            if self._distance(X[point_idx], other_point) <= self.eps:
                neighbors.append(i)
        return neighbors
    
    def fit(self, X):
        """
        对数据进行DBSCAN聚类
        
        参数:
            X: 数据点数组，形状为 (n_samples, n_features)
        """
        X = np.array(X)
        n_samples = len(X)
        
        # 初始化标签：-1表示噪声点，0表示未访问，正整数表示簇标签
        self.labels_ = np.zeros(n_samples, dtype=int)
        cluster_id = 0
        visited = set()
        
        for point_idx in range(n_samples):
            if point_idx in visited:
                continue
                
            visited.add(point_idx)
            neighbors = self._get_neighbors(point_idx, X)
            
            # 如果邻居数量少于min_samples，标记为噪声点
            if len(neighbors) < self.min_samples:
                self.labels_[point_idx] = -1  # 噪声点
            else:
                # 创建新簇
                cluster_id += 1
                self.labels_[point_idx] = cluster_id
                
                # 使用队列扩展簇
                seed_set = deque(neighbors)
                seed_set.remove(point_idx)  # 移除自身
                
                while seed_set:
                    current_point = seed_set.popleft()
                    
                    if current_point in visited:
                        continue
                    
                    visited.add(current_point)
                    current_neighbors = self._get_neighbors(current_point, X)
                    
                    # 如果当前点也是核心点，将其邻居加入种子集
                    if len(current_neighbors) >= self.min_samples:
                        for neighbor in current_neighbors:
                            if neighbor not in visited:
                                seed_set.append(neighbor)
                    
                    # 如果当前点还没有被分配到任何簇，分配到当前簇
                    if self.labels_[current_point] == 0:
                        self.labels_[current_point] = cluster_id
        
        return self
    
    def fit_predict(self, X):
        """训练并返回聚类标签"""
        self.fit(X)
        return self.labels_


## 2. 测试数据：根据题目描述创建测试点


In [None]:
# 根据题目描述创建测试数据点
# 聚类1: A, B, C, D (左下角)
points_cluster1 = np.array([
    [0, 0],   # A
    [1, 0],   # B
    [0, 1],   # C
    [1, 1],   # D
])

# 聚类2: E, F, G, H, I, J
points_cluster2 = np.array([
    [2, 2],   # E
    [3, 2],   # F
    [2, 3],   # G
    [3, 3],   # H
    [2, 4],   # I
    [3, 4],   # J
])

# 聚类3: P, Q, R, S
points_cluster3 = np.array([
    [5, 5],   # P
    [6, 5],   # Q
    [7, 5],   # R
    [6, 6],   # S
])

# 噪声点: N, M, L, K, O, T
noise_points = np.array([
    [4, 0],   # N
    [5, 1],   # M
    [6, 0],   # L
    [7, 1],   # K
    [4, 6],   # O
    [6, 8],   # T
])

# 合并所有点
all_points = np.vstack([
    points_cluster1,
    points_cluster2,
    points_cluster3,
    noise_points
])

# 点标签（用于可视化）
point_labels = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 
                'P', 'Q', 'R', 'S', 'N', 'M', 'L', 'K', 'O', 'T']

print(f"总点数: {len(all_points)}")
print(f"数据形状: {all_points.shape}")


## 3. 使用Euclidean距离进行DBSCAN聚类


In [None]:
# 使用Euclidean距离，eps=1.1, min_samples=2
dbscan_euclidean = DBSCAN(eps=1.1, min_samples=2, metric='euclidean')
labels_euclidean = dbscan_euclidean.fit_predict(all_points)

print("Euclidean距离下的聚类结果:")
print(f"簇的数量: {len(set(labels_euclidean)) - (1 if -1 in labels_euclidean else 0)}")
print(f"噪声点数量: {np.sum(labels_euclidean == -1)}")
print("\n各点的标签:")
for i, (label, point_label) in enumerate(zip(labels_euclidean, point_labels)):
    if label == -1:
        print(f"{point_label}: 噪声点")
    else:
        print(f"{point_label}: 簇 {label}")

# 分析每个簇
clusters_euclidean = {}
for i, label in enumerate(labels_euclidean):
    if label not in clusters_euclidean:
        clusters_euclidean[label] = []
    clusters_euclidean[label].append(point_labels[i])

print("\n簇的组成:")
for cluster_id, points in sorted(clusters_euclidean.items()):
    if cluster_id == -1:
        print(f"噪声点: {points}")
    else:
        print(f"簇 {cluster_id}: {points}")


In [None]:
# 可视化Euclidean距离下的聚类结果
plt.figure(figsize=(12, 5))

# 左图：原始数据
plt.subplot(1, 2, 1)
plt.scatter(all_points[:, 0], all_points[:, 1], c='black', s=100, alpha=0.6)
for i, label in enumerate(point_labels):
    plt.annotate(label, (all_points[i, 0], all_points[i, 1]), 
                xytext=(5, 5), textcoords='offset points', fontsize=10)
plt.title('原始数据点', fontsize=14)
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True, alpha=0.3)

# 右图：聚类结果
plt.subplot(1, 2, 2)
unique_labels = set(labels_euclidean)
colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))

for k, col in zip(unique_labels, colors):
    if k == -1:
        # 噪声点用黑色表示
        col = 'black'
        marker = 'x'
        size = 150
    else:
        marker = 'o'
        size = 100
    
    class_member_mask = (labels_euclidean == k)
    xy = all_points[class_member_mask]
    plt.scatter(xy[:, 0], xy[:, 1], c=[col], marker=marker, s=size, 
                alpha=0.6, label=f'簇 {k}' if k != -1 else '噪声点')
    
    # 添加标签
    for i, point_idx in enumerate(np.where(class_member_mask)[0]):
        plt.annotate(point_labels[point_idx], 
                    (xy[i, 0], xy[i, 1]), 
                    xytext=(5, 5), textcoords='offset points', fontsize=9)

plt.title(f'Euclidean距离 DBSCAN聚类结果 (eps=1.1, min_samples=2)', fontsize=14)
plt.xlabel('X')
plt.ylabel('Y')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()


## 4. 使用Manhattan距离进行DBSCAN聚类


In [None]:
# 使用Manhattan距离，eps=1.1, min_samples=2
dbscan_manhattan = DBSCAN(eps=1.1, min_samples=2, metric='manhattan')
labels_manhattan = dbscan_manhattan.fit_predict(all_points)

print("Manhattan距离下的聚类结果:")
print(f"簇的数量: {len(set(labels_manhattan)) - (1 if -1 in labels_manhattan else 0)}")
print(f"噪声点数量: {np.sum(labels_manhattan == -1)}")
print("\n各点的标签:")
for i, (label, point_label) in enumerate(zip(labels_manhattan, point_labels)):
    if label == -1:
        print(f"{point_label}: 噪声点")
    else:
        print(f"{point_label}: 簇 {label}")

# 分析每个簇
clusters_manhattan = {}
for i, label in enumerate(labels_manhattan):
    if label not in clusters_manhattan:
        clusters_manhattan[label] = []
    clusters_manhattan[label].append(point_labels[i])

print("\n簇的组成:")
for cluster_id, points in sorted(clusters_manhattan.items()):
    if cluster_id == -1:
        print(f"噪声点: {points}")
    else:
        print(f"簇 {cluster_id}: {points}")


In [None]:
# 可视化Manhattan距离下的聚类结果
plt.figure(figsize=(12, 5))

# 左图：原始数据
plt.subplot(1, 2, 1)
plt.scatter(all_points[:, 0], all_points[:, 1], c='black', s=100, alpha=0.6)
for i, label in enumerate(point_labels):
    plt.annotate(label, (all_points[i, 0], all_points[i, 1]), 
                xytext=(5, 5), textcoords='offset points', fontsize=10)
plt.title('原始数据点', fontsize=14)
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True, alpha=0.3)

# 右图：聚类结果
plt.subplot(1, 2, 2)
unique_labels = set(labels_manhattan)
colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))

for k, col in zip(unique_labels, colors):
    if k == -1:
        # 噪声点用黑色表示
        col = 'black'
        marker = 'x'
        size = 150
    else:
        marker = 'o'
        size = 100
    
    class_member_mask = (labels_manhattan == k)
    xy = all_points[class_member_mask]
    plt.scatter(xy[:, 0], xy[:, 1], c=[col], marker=marker, s=size, 
                alpha=0.6, label=f'簇 {k}' if k != -1 else '噪声点')
    
    # 添加标签
    for i, point_idx in enumerate(np.where(class_member_mask)[0]):
        plt.annotate(point_labels[point_idx], 
                    (xy[i, 0], xy[i, 1]), 
                    xytext=(5, 5), textcoords='offset points', fontsize=9)

plt.title(f'Manhattan距离 DBSCAN聚类结果 (eps=1.1, min_samples=2)', fontsize=14)
plt.xlabel('X')
plt.ylabel('Y')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()


## 5. 比较两种距离度量的结果


In [None]:
# 并排比较两种距离度量的结果
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

for idx, (labels, metric_name) in enumerate([(labels_euclidean, 'Euclidean'), 
                                              (labels_manhattan, 'Manhattan')]):
    ax = axes[idx]
    unique_labels = set(labels)
    colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))
    
    for k, col in zip(unique_labels, colors):
        if k == -1:
            col = 'black'
            marker = 'x'
            size = 150
        else:
            marker = 'o'
            size = 100
        
        class_member_mask = (labels == k)
        xy = all_points[class_member_mask]
        ax.scatter(xy[:, 0], xy[:, 1], c=[col], marker=marker, s=size, 
                   alpha=0.6, label=f'簇 {k}' if k != -1 else '噪声点')
        
        # 添加标签
        for i, point_idx in enumerate(np.where(class_member_mask)[0]):
            ax.annotate(point_labels[point_idx], 
                       (xy[i, 0], xy[i, 1]), 
                       xytext=(5, 5), textcoords='offset points', fontsize=9)
    
    ax.set_title(f'{metric_name}距离 DBSCAN聚类结果', fontsize=14)
    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.legend()
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# 打印对比结果
print("=" * 60)
print("距离度量对比总结")
print("=" * 60)
print(f"\nEuclidean距离:")
print(f"  簇数量: {len(set(labels_euclidean)) - (1 if -1 in labels_euclidean else 0)}")
print(f"  噪声点: {np.sum(labels_euclidean == -1)}")
print(f"\nManhattan距离:")
print(f"  簇数量: {len(set(labels_manhattan)) - (1 if -1 in labels_manhattan else 0)}")
print(f"  噪声点: {np.sum(labels_manhattan == -1)}")


## 6. 验证距离计算

验证题目中提到的关键距离值：


In [None]:
# 验证关键距离
def euclidean_dist(p1, p2):
    return np.sqrt(np.sum((p1 - p2) ** 2))

def manhattan_dist(p1, p2):
    return np.sum(np.abs(p1 - p2))

# 聚类1的点
A, B, C, D = points_cluster1[0], points_cluster1[1], points_cluster1[2], points_cluster1[3]

print("聚类1 (A, B, C, D) 的距离验证:")
print(f"Euclidean距离:")
print(f"  A-B: {euclidean_dist(A, B):.4f}")
print(f"  A-C: {euclidean_dist(A, C):.4f}")
print(f"  A-D: {euclidean_dist(A, D):.4f}")
print(f"  B-C: {euclidean_dist(B, C):.4f}")
print(f"  C-D: {euclidean_dist(C, D):.4f}")

print(f"\nManhattan距离:")
print(f"  A-B: {manhattan_dist(A, B):.4f}")
print(f"  A-C: {manhattan_dist(A, C):.4f}")
print(f"  A-D: {manhattan_dist(A, D):.4f}")
print(f"  B-C: {manhattan_dist(B, C):.4f}")
print(f"  C-D: {manhattan_dist(C, D):.4f}")

# 聚类2的点
E, F, G, H, I, J = points_cluster2[0], points_cluster2[1], points_cluster2[2], \
                   points_cluster2[3], points_cluster2[4], points_cluster2[5]

print(f"\n聚类2 (E, F, G, H, I, J) 的距离验证:")
print(f"Euclidean距离:")
print(f"  E-F: {euclidean_dist(E, F):.4f}")
print(f"  E-G: {euclidean_dist(E, G):.4f}")
print(f"  E-I: {euclidean_dist(E, I):.4f}")
print(f"  G-H: {euclidean_dist(G, H):.4f}")
print(f"  I-J: {euclidean_dist(I, J):.4f}")

print(f"\neps = 1.1 的阈值线")
print(f"  距离 <= 1.1 的点会被连接")
