In [1]:
import numpy as np

In [2]:
def MyDBSCAN(data, eps, MinPts):
    # 保存最终的标签结果，其中 -1 代表噪声点，0 代表还未被分类状态
    labels = [0]*len(data)

    # 当前蔟的类别
    C = 0
    
    for P in range(0, len(data)):
    
        # 只对还未被分类的点进行聚类操作
        if not (labels[P] == 0):
           continue
        
        NeighborPts = regionQuery(data, P, eps)
        
        # 当邻近点数小于 MinPts 时，将所有点视为噪声点
        if len(NeighborPts) < MinPts:
            labels[P] = -1
        # 基于点 P 生成蔟
        else: 
           C += 1
           growCluster(data, labels, P, NeighborPts, C, eps, MinPts)
    
    return labels

def regionQuery(data, P, eps):
    """
    找到数据集中所有与点 P 距离小于 eps 的点
    """
    neighbors = []
    
    for Pn in range(0, len(data)):
        # 如果距离小于 eps 把当前点假如邻居节点集合当中
        if np.linalg.norm(data[P] - data[Pn]) < eps:
           neighbors.append(Pn)
    return neighbors

def growCluster(data, labels, P, NeighborPts, C, eps, MinPts):
    """
    基于点 P 以及其邻居节点生成蔟
    """

    labels[P] = C
    
    i = 0
    while i < len(NeighborPts):
        Pn = NeighborPts[i]
        if labels[Pn] == -1:
            # 标签值为 -1 时，其邻居节点数量必定小于 MinPts 所以不需要寻找了
            labels[Pn] = C
        elif labels[Pn] == 0:
            # 不需要考虑 labels 为其他值的节点，为了避免遍历到之前已经赋值过的节点导致的死循环
            labels[Pn] = C
            # 找到 Pn 的所有邻居节点并加入该蔟
            PnNeighborPts = regionQuery(data, Pn, eps)

            if len(PnNeighborPts) >= MinPts:
                NeighborPts = NeighborPts + PnNeighborPts
        i += 1

## 与 sklearn 实现对比

In [3]:
from sklearn.datasets import make_blobs
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler

# Create three gaussian blobs to use as our clustering data.
centers = [[1, 1], [-1, -1], [1, -1]]
X, labels_true = make_blobs(n_samples=750, centers=centers, cluster_std=0.4,
                            random_state=0)

X = StandardScaler().fit_transform(X)

###############################################################################
# Run my DBSCAN implementation.
print('Running my implementation...')
my_labels = MyDBSCAN(X, eps=0.3, MinPts=10)

###############################################################################
# Scikit-learn implementation of DBSCAN
#

print('Runing scikit-learn implementation...')
db = DBSCAN(eps=0.3, min_samples=10).fit(X)
skl_labels = db.labels_

# Scikit learn uses -1 to for NOISE, and starts cluster labeling at 0. I start
# numbering at 1, so increment the skl cluster numbers by 1.
for i in range(0, len(skl_labels)):
    if not skl_labels[i] == -1:
        skl_labels[i] += 1


###############################################################################
# Did we get the same results?

num_disagree = 0

# Go through each label and make sure they match (print the labels if they 
# don't)
for i in range(0, len(skl_labels)):
    if not skl_labels[i] == my_labels[i]:
        print('Scikit learn:', skl_labels[i], 'mine:', my_labels[i])
        num_disagree += 1

if num_disagree == 0:
    print('PASS - All labels match!')
else:
    print('FAIL -', num_disagree, 'labels don\'t match.')       

Running my implementation...
Runing scikit-learn implementation...
PASS - All labels match!
