# 如何用PySpark实现高性能的分布式Balanced K-means tree¶

Balanced K-means tree是一种均衡划分数据集的算法，树的每一个叶节点代表一个簇，簇内数据的相似度大于簇间数据的相似度。与一般K-means不同的是，Balanced K-means tree对每个簇中的数据量（即簇大小）做出了限制，要求每个簇大小不能小于某个用户设定的最小值，并且不能大于该最小值的2倍。此外用户对树的高度也可以做出限制。 通过限定每个簇的大小，Balanced K-means tree很适合用于K近邻搜索算法的空间划分，比如微软最近开源的
ANN库（approximate nearest neighborhood search）就用到了Balanced K-means tree做索引的空间划分。 

Balanced K-means tree的形状如下所示：

![](BKT.png)

## 使用Redis cluster来存储K-means每个簇的数据

Balanced K-means tree会使用K-means算法反复迭代聚类产出新的簇，在每一轮K-means算法完成后，它形成的簇需要存储在合适的地方供下次迭代使用，另外在最终的树构建完成后检索时也需要使用每个簇中的数据。我们
暂时选择了redis来存储每个簇的数据。

由于我们的数据集是numpy array类型的，为了高性能的读取，适合使用redis的string类型来存储，其中string的key由tree id + parent node id + cluster id构成，value就是numpy array转换成的字节序列。 此外需要注意的是，由于redis string value的最大值是512MB，如果numpy array大小超过了512MB，则需要将numpy array分片存储。为了代码的高内聚和低耦合，我们可以将使用redis cluster的代码封装在一个RedisDBWrapper类中，在该类初始化的时候连接redis cluster得到一个可复用的连接池,得到连接池句柄。

In [145]:
import redis
import struct
import numpy as np

class RedisDBWrapper(object):

    #Redis string数据类型的value最大值是500MB
    MAX_CHUNK_SIZE = 500000000
    #在存储数据前，先要计算每个chunk的大小，公式为 RedisDBWrapper.CHUNK_SIZE = int(RedisDBWrapper.MAX_CHUNK_SIZE / data.shape[1])
    CHUNK_SIZE = 100000

    def __init__(self, host, port = 6379):
        pool = redis.ConnectionPool(host = host, port = port, decode_responses = True)
        self.redis = redis.Redis(connection_pool = pool)
        
    def getHandler(self):
        return self.redis

RedisDBWrapper类用于完成读取和存储numpy array数据，对外暴露的接口有2个，其中一个是save_data，另一个是get_data。 save_data用来将指定Key的numpy array数据存储到redis cluster中，它一开始会根据传入的numpy array数据的大小来判断需要分成几个chunk（或分片）来存储，接着在存储head chunk和其余的chunk时会有所区别，因为在第一个chunk中除了存储分片本身的维度外，还需要存储整个numpy array的维度。

In [146]:
#由于redis string value的大小限制，我们需要将一个很大的numpy array拆分成一个个chunk来存储
def __save_chunk_data(self, data, key, index):
    """Store given Numpy array 'a' in Redis under key 'n'"""
    try:
        #每一个numpy array字节序列分片的维度存储在chunk的首部
        h, w = data.shape
        shape = struct.pack('>II', h, w)
        encoded = shape + data.tobytes()

        self.redis.set(key + "_" + str(index), encoded.decode('latin1'))
        return True
    except Exception as e:
        print(e)
        return False

#numpy array第一个数据分片除了存储本分片的维度之外，还需要存储整个numpy array的维度
def __save_head_chunk_data(self, data, key, total_w, total_h, chunk_num):

    try:
        h, w = data.shape
        head = struct.pack('>III', total_w, total_h, chunk_num)
        shape = struct.pack('>II', h, w)
        encoded = head + shape + data.tobytes()

        self.redis.set(key + "_0", encoded.decode('latin1'))
        return True
    except Exception as e:
        print(e)
        return False
   

#将numpy array存储在redis中
def save_data(self, data, key):

    size = 0
    index = 0

    if len(data) <= RedisDBWrapper.CHUNK_SIZE:
        chunk_num = 1
    elif len(data) % RedisDBWrapper.CHUNK_SIZE == 0:
        chunk_num = int(len(data) / RedisDBWrapper.CHUNK_SIZE)
    else:
        chunk_num = int(len(data) / RedisDBWrapper.CHUNK_SIZE) + 1

    while size < len(data):

        offset = min(RedisDBWrapper.CHUNK_SIZE, len(data) - size)

        firstChunk = False
        if size == 0:
            firstChunk = True

        if index == 0:
            ret = self.__save_head_chunk_data(data[size : size + offset], key, data.shape[0], data.shape[1], chunk_num)
        else:
            ret = self.__save_chunk_data(data[size : size + offset], key, index)

        if not ret:
            return False

        size += offset

        index += 1

    return True

RedisDBWrapper.__save_chunk_data = __save_chunk_data
RedisDBWrapper.__save_head_chunk_data = __save_head_chunk_data
RedisDBWrapper.save_data = save_data

get_data则负责将所有chunk从redis cluster读取出来并做完数据拼接之后，能够重新reshape将原来的数据维度恢复出来。

In [147]:
#读取numpy array的其中一个数据分片
def __get_chunk_data(self, key, index):
    """Retrieve Numpy array from Redis key 'n'"""
    try:
        encoded = self.redis.get(key + "_" + str(index))
        if not encoded:
            return None
        encoded = encoded.encode('latin1')

        h, w = struct.unpack('>II', encoded[:8])
        return np.frombuffer(encoded, offset = 8).reshape(h,w)
    except Exception as e:
        print(e)
        return None

#读取numpy array的第一个分片数据，需要将整个numpy array的维度都读取出来
def __get_head_chunk_data(self, key):

    try:
        encoded = self.redis.get(key + "_0")
        if not encoded:
            return 0, 0, 0, None
        encoded = encoded.encode('latin1')

        total_h, total_w, chunk_num = struct.unpack('>III', encoded[:12])
        h, w = struct.unpack('>II', encoded[12 : 20])
        return total_h, total_w, chunk_num, np.frombuffer(encoded, offset = 20).reshape(h,w)
    except Exception as e:
        print(e)
        return 0, 0, 0, None


#从redis中读取出numpy array
def get_data(self, key):

    total_h, total_w, chunk_num, head_chunk_data = self.__get_head_chunk_data(key)
    if total_h == 0:
        return None

    for index in range(chunk_num - 1):
        chunk_data = self.__get_chunk_data(key, index + 1)

        head_chunk_data = np.concatenate((head_chunk_data, chunk_data), axis = 0)

    head_chunk_data.reshape(total_h, total_w)
    return head_chunk_data

RedisDBWrapper.__get_chunk_data = __get_chunk_data
RedisDBWrapper.__get_head_chunk_data = __get_head_chunk_data
RedisDBWrapper.get_data = get_data

## 分布式K-means算法的pyspark定制实现

K-means算法是构成Balanced k-means tree的基础，它的性能决定了构建树的性能。以下代码是我们实现的高性能的分布式K-means算法，目前只是实现了最传统的K-means算法，以后还会加上优化初始点分布的K-means++算法，因为初始点的选择会影响K-means算法的收敛速度。

首先我们可以利用spark的分布式运算特性将数据集分区，每个数据分区可以在不同的spark executor中并行处理，这样就大大加快了整体运算速度。k-means算法需要反复迭代来求出各个新的cluster的中心点，如果不使用分布式的做法，我们直接可以计算所有数据到上次迭代所形成的各个cluster的中心距离，为每一个样本点选出一个最近的中心，从而将样本点划归到最近的cluster中去，然后再重新计算新cluster的中心。但是随着数据集规模的增长，这样全量计算的速度会变得越来越慢。如果使用spark的分布式运算特性，我们就可以将数据分区，分别计算每个分区中的样本点到上次迭代所形成的各个cluster的中心距离， 划归到最近的cluster中，并返回每个数据分区中的各个cluster的数据总和以及样本量。下面FastKmeans类中的assign_to_new_centroids就是在每个spark RDD分区中计算各个cluster的数据总和以及样本量。

其次在spark driver中收集到所有spark RDD分区中的各个cluster的数据总和及样本量之后，就可以归并计算各个cluster的完整数据总和及完整样本量，从而快速计算出新cluster的中心。FastKmeans类中的move_to_new_centroids就是用来执行这个任务的。

K-means算法就这样反复迭代运行，直到中心点不再发生变化为止。在这里我们根据上次迭代的cluster中心和本次计算的cluster中心的距离来判断K-means算法是否收敛，当它们之间的距离小于0.1时即认为收敛。最后我们需要把K-means算法产生的各个cluster的数据存储在redis中，方便以后读取利用。在这里我们还是利用spark的分布式运算特性来将数据集分区，将各个分区中的数据划分到各自所属的cluster，然后按cluster id聚合起来，从而将每一个cluster的数据并行地存储在redis中。 

In [2]:
import random
import time
import numpy as np

class FastKmeans(object):

    iterations = 1000
    converge_threshold = 0.1

K-means算法的初始中心点选择很重要，主要有两方面影响，一是算法的收敛速度，二是初始中心选择不好会算法会产生空的簇，如下图所示：

![](empty_cluster.png)

图中假设有6个点需要聚成3个类，红蓝绿分别代表所选的初始节点，由于蓝色点和红色点的距离太小，到第2轮迭代时，蓝点所在的簇就成了空簇。

针对这种情况，有人提出了K-means++算法，所谓K-means++，就是改进了K-means的初始中心点选择，原始K-means是从样本中按照均匀分布来选择初始中心点，而K-means++则尽量地让初始中心点彼此远离。具体步骤如下：
1. 随机选择一个样本点作为初始中心集的第一个中心
2. 计算每个样本点离初始中心集的最近距离
3. 按照一定概率选择一个样本点放入初始中心集，这个概率和样本点离初始中心集的最近距离正相关
4. 重复步骤2-3，直到初始中心集构建完毕（即选择完了K个中心点）

通过一定的概率让初始中心点彼此远离后，就可以提高收敛速度，并且产生空簇的可能性也大大降低。

以下就是按照这个算法来选择初始中心点的代码：

In [6]:
@staticmethod
def get_closest_distance(iterator, centroids):
    centroids = centroids.value

    data = np.array([x for x in iterator])
    np.random.shuffle(data)

    points = np.array([x[0] for x in data])
    points_idx = np.array([x[1] for x in data])

    if len(points) == 0:
        return []

    distances = np.sqrt(((points - centroids[:, np.newaxis]) ** 2).sum(axis = 2))
    min_distances  = np.min(distances, axis = 0)

    for i, idx in enumerate(points_idx):
        yield (idx, min_distances[i])


@staticmethod
def initialize_centroids_non_uniform(points, k, sc):
    n_centroids = np.zeros(shape = (k, points.shape[1]))

    centroid = points[random.sample(range(points.shape[0]), 1)]
    n_centroids[0] = centroid

    rdd = sc.parallelize(points).zipWithIndex().cache()

    for i in range(k - 1):

        bc_centroids = sc.broadcast(n_centroids)

        data_closest_distances_rdd = rdd.mapPartitions(lambda x: FastKmeans.get_closest_distance(x, bc_centroids)).cache()

        sum_all = data_closest_distances_rdd.map(lambda x: (1, x[1])).reduceByKey(lambda x, y: x + y).collect()[0][1]

        sum_all *= np.random.random()

        distances = 0

        data_closest_distances = data_closest_distances_rdd.collect()
        for p in data_closest_distances:
            distances += p[1] 

            if distances >= sum_all:                              
                n_centroids[i + 1] = points[p[0]]
                break

    return n_centroids

FastKmeans.get_closest_distance = get_closest_distance

FastKmeans.initialize_centroids_non_uniform = get_closest_distance

完成初始中心选择后，就开始聚类，其中assign_to_new_centroids这个类静态函数会计算每个RDD分区中的数据应该分配到哪个新cluster中，返回新的cluster id，cluster中的数据总和以及cluster的大小, 这里计算出的新cluster的信息（比如数据总和和cluster大小）只是由这个RDD分区中数据所建立起来的，可以看作是cluster的局部信息。

In [4]:
@staticmethod
def assign_to_new_centroids(iterator, centroids):
    centroids = centroids.value
    points = np.array([x for x in iterator])
    if len(points) == 0:
        return [(k, np.zeros(shape = (1, centroids.shape[1])), 0) for k in range(centroids.shape[0])]

    distances = np.sqrt(((points - centroids[:, np.newaxis]) ** 2).sum(axis = 2))
    closest =  np.argmin(distances, axis = 0)

    sum = np.array([points[closest == k].sum(axis = 0) for k in range(centroids.shape[0])])
    count = np.array([len(points[closest == k]) for k in range(centroids.shape[0])])

    for k in range(centroids.shape[0]):
        yield (k, sum[k], count[k])
        
FastKmeans.assign_to_new_centroids = assign_to_new_centroids

move_to_new_centroids这个类静态函数会合并所有RDD分区计算出的cluster局部信息, 返回新的cluster id和cluster的完整大小。注意，如果出现空簇就马上返回，此轮聚类作废，需要再次重新选择初始中心点进行新的一轮聚类。

In [5]:
@staticmethod
def move_to_new_centroids(collect_data):

    centors_map = {}
    for cid, sum, size in collect_data:
        if cid not in centors_map:
            centors_map[cid] = ([sum], [size])
        else:
            centors_map[cid][0].append(sum)
            centors_map[cid][1].append(size)

    centroids = []
    cluster_sizes = []

    for k, v in centors_map.items():
        if np.array(v[1]).sum() == 0:
            return None, None
        
        centroids.append(np.array(v[0]).sum(axis = 0) / np.array(v[1]).sum())
        cluster_sizes.append(np.array(v[1]).sum())

    return np.array(centroids), np.array(cluster_sizes)

FastKmeans.move_to_new_centroids = move_to_new_centroids

initialize_centroids这个类静态函数初始化cluster中心，这里暂时使用随机初始化，以后会使用K-means++中的算法来选择最优的cluster中心，使得各个cluster中心的距离尽可能大。

In [151]:
@staticmethod
def initialize_centroids(points, k):
    """returns k centroids from the initial points"""
    centers_id = random.sample(range(points.shape[0]), k)
    return points[centers_id]

FastKmeans.initialize_centroids = initialize_centroids

在K-means聚类完成后，就可以将各个cluster的数据存储到redis集群中，assign_to_final_centroids会将一个RDD分区中的数据和它们所属的cluster id关联起来，而save_cluster_to_redis就负责将一个cluster的数据存储在redis中。

In [152]:
@staticmethod
def assign_to_final_centroids(iterator, centroids):
    points = np.array([x for x in iterator])
    if len(points) == 0:
        return [(k, []) for k in range(centroids.shape[0])]

    distances = np.sqrt(((points - centroids[:, np.newaxis]) ** 2).sum(axis = 2))
    closest =  np.argmin(distances, axis = 0)

    for k in range(centroids.shape[0]):
        yield (k, points[closest == k])

@staticmethod
def save_cluster_to_redis(cid, iterator, shape_w, redis_host, redis_port, cluster_key):
    points = np.zeros(shape = (1, shape_w))
    for x in iterator:
        points = np.concatenate((points, x), axis = 0)

    r = RedisDBWrapper(host = redis_host, port = redis_port)

    ret = r.save_data(points[1:], cluster_key + '_' + str(cid))

    return (cid, points.sum(axis = 0) / (len(points) - 1), len(points) - 1, ret)

FastKmeans.assign_to_final_centroids = assign_to_final_centroids
FastKmeans.save_cluster_to_redis = save_cluster_to_redis

fit这个类静态函数是FastKmeans的用户调用接口，驱动以上的函数采用迭代的方式来完成K-means聚类。

In [7]:
@staticmethod
def center_diff(a, b):
    d = a - b

    return (d * d).sum()

#FastKmeans的调用接口
#参数 data:        numpy array数组
#参数 K:           划分成的cluster数量
#参数 redis_host:  用于数据存储的redis host
#参数 redis_port:  用于数据存储的redis port
#参数 sc:          spark context
#参数 cluster_key: 用来标识此次聚类的唯一key
@staticmethod
def fit(data, k, redis_host = None, redis_port = 6379, sc = None, cluster_key = None):

    if not cluster_key:
        cluster_key = str(int(time.mktime(time.localtime())))

    retris = 0

    while True:
        n_centors = FastKmeans.initialize_centroids_non_uniform(data, k, sc)
        n_centors = n_centors[n_centors[:, 0].argsort()]

        centors = n_centors

        rdd = sc.parallelize(data)

        for i in range(FastKmeans.__iterations):
            #print("round %d ..."%i)

            broadcast_centors = sc.broadcast(n_centors)

            collect_data = rdd.mapPartitions(lambda x: FastKmeans.assign_to_new_centroids(x, broadcast_centors)).collect()

            n_centors, cluster_sizes = FastKmeans.move_to_new_centroids(collect_data)
            if n_centors is None:
                break

            n_centors = n_centors[n_centors[:, 0].argsort()]

            if FastKmeans.center_diff(n_centors, centors) < FastKmeans.__converge_threshold:
                break

            centors = n_centors

        if n_centors is not None:
            break

        retris += 1
        if retris >= FastKmeans.__MAX_RETRY_TIMES:    
            return None

    #收敛后就把数据存储到redis cluster中
    if redis_host:
        result = rdd.mapPartitions(lambda x: FastKmeans.assign_to_final_centroids(x, n_centors)).groupByKey().\
                     map(lambda x: FastKmeans.save_cluster_to_redis(x[0], x[1], n_centors.shape[1], redis_host, redis_port, cluster_key)).collect()
    else:
        result = n_centors

    return result

FastKmeans.center_diff = center_diff
FastKmeans.fit = fit

### 分布式K-means算法的pyspark定制实现的性能

我们测试算法的benchmark 来自于texmex corpus的ANN测试数据集，地址在http://corpus-texmex.irisa.fr/。

这是一个非常古老的数据集，这些向量样本是从一些图像上面抽取出来的SIFT特征点。以下代码用于读取该数据集文件并在内存中转为numpy array:

In [154]:
import numpy as np

def fvecs_read(filename, c_contiguous=True):
    fv = np.fromfile(filename, dtype=np.float32)
    if fv.size == 0:
        return np.zeros((0, 0))
    dim = fv.view(np.int32)[0]
    assert dim > 0
    fv = fv.reshape(-1, 1 + dim)
    if not all(fv.view(np.int32)[:, 0] == dim):
        raise IOError("Non-uniform vector sizes in " + filename)
    fv = fv[:, 1:]
    if c_contiguous:
        fv = fv.copy()
    return fv

In [33]:
data = fvecs_read('sift/sift_learn.fvecs').astype(np.float64)
data.shape

(100000, 128)

下表是我们自己定制的K-means实现和numpy库以及spark mlib中的K-means实现的性能对比：

![](kmeans_perf.png)

可见我们的算法实现性能优于numpy库和spark mlib的k-means实现，尤其比spark mlib的k-means实现快2倍。numpy库的k-means实现性能接近于我们的定制实现，但它是单机版的，如果数据量过大就不适用了。

## 构建Balanced K-means tree

Balanced K-means tree是一棵多叉树，使用K-means算法反复迭代聚类产出新的簇。它的规则如下：

1. 如果一个簇比用户设定的最小值的2倍还要大，则继续划分这个簇，划分的数量为 min(size / max_cluster_size + 1, max_clusters_per_run), 其中size为该簇大小，max_cluster_size为用户指定的最小簇大小的2倍， max_clusters_per_run为用户设定的每次聚类的最大划分数（即用户设定的每次聚类产生的最大簇数量）

2. 如果根据公式min(size / max_cluster_size + 1, max_clusters_per_run)计算出来的划分数量为2，则将该簇分成两个大小相等的子簇

3. 如果一个簇大于用户设定的最小值并且小于该最小值的2倍，则该簇对应树的一个叶子节点

4. 如果在一次聚类产生的簇中，有多个簇小于用户设定的最小值，则把这些小簇做归并，直到归并后的簇大于用户设定的最小值

此外，该Balanced K-means tree的实现还支持使用redis来保存树和加载树，这样在完成一次树的训练构造后，如果不用时就可以将树从内存中卸载，等到下次再使用时从redis中加载。

要实现树这种数据结构，按惯用做法，我们先定义树的节点，树节点的成员有表征该节点的唯一id号，表示该节点所在簇的cluster id，簇中心centroid以及它的父节点和子节点列表：

In [9]:
import redis
import random
import queue
import time
import numpy as np

#代表Balanced K-means tree的节点
class BKTNode(object):

    def __init__(self):
        self.id = str(id(self))
        self.cluster_id = None
        self.children = []
        self.parent = None
        self.centriod = None
        self.leaf = False

get_data是树节点的成员函数，用于读取该节点的数据，如果是叶节点，则只读取它所对应的簇数据，如果是非叶节点，则递归读取它子树的所有簇数据。

In [10]:
#参数 redis_handler:  redis handler
#参数 bkt_key:        该节点所在的树的唯一标识
def get_data(self, redis_handler, bkt_key):
    if not redis_handler:
        return None

    if self.leaf:
        return redis_handler.get_data(bkt_key + '_' + self.parent.id + '_' + str(self.cluster_id))

    merged_data = np.zeros(shape = (1, self.centriod.shape[0]))

    for child in self.children:
        data = child.get_data(redis_handler, bkt_key)
        merged_data = np.concatenate((merged_data, data), axis = 0)

    return merged_data[1:]

BKTNode.get_data = get_data

下面我们看看BKT(Balance K-means tree的缩写）的实现，按照常规做法，树需要有一个根节点root_node，唯一标识该树的bkt_key，此外我们还需要指定树的结构参数，比如树的最大高度max_depth，簇的最小元素数量min_cluster_size, 簇的最大元素数量max_cluster_size, 如果不指定簇的最大元素数量，则默认为最小数量的2倍。

In [11]:
#代表Balanced K-means tree
class BKTree(object):
    pass

#初始化Balanced K-means tree
#参数 max_clusters_per_run: 指定每次K-means聚类的最大簇数量
#参数 max_depth           : 指定树的高度限制
#参数 min_cluster_size    : 指定簇中数据的最小数量，即最小簇大小
#参数 sc                  : spark context
#参数 redis_host          : 指定用于存储簇数据的redis host
#参数 redis_port          : 指定用于存储簇数据的redis port
#参数 max_cluster_size    : 指定簇中数据的最大数量，是个软约束，如果不指定的话就默认是min_cluster_size的2倍
def init(self, max_clusters_per_run, max_depth, min_cluster_size, sc, redis_host, redis_port = 6379, max_cluster_size = 0, balance = True):
    self.__max_clusters_per_run = max_clusters_per_run
    self.__max_depth = max_depth
    self.__min_cluster_size = min_cluster_size
    self.__max_cluster_size = max(self.__min_cluster_size, max_cluster_size)
    self.__sc = sc
    self.__redis_host = redis_host
    self.__redis_port = redis_port
    self.__bkt_key = str(id(self))
    self.__balance = balance
    self.__root_node = BKTNode()

    self.__redis = RedisDBWrapper(redis_host, redis_port)

def get_root(self):
    return self.__root_node

def get_key(self):
    return self.__bkt_key

def get_redis(self):
    return self.__redis.getHandler()

BKTree.init = init
BKTree.get_root = get_root
BKTree.get_key = get_key
BKTree.get_redis = get_redis

dump是树的成员函数，该函数的功能就是把树的完整结构存储在redis中，我们使用树的广度优先遍历算法将每一个树节点的信息保存下来，包括它的id，所代表的簇id和子节点id。

In [12]:
def dump(self):

    epochTime = int(time.mktime(time.localtime()))

    tree_params = self.__bkt_key + '_' + str(self.__max_clusters_per_run) + '_' + str(self.__max_depth) + '_' \
                  + str(self.__min_cluster_size) + '_' + str(self.__max_cluster_size) + '_' \
                  + str(self.__balance) + '_' + self.__root_node.id

    self.get_redis().sadd('my_bkt', str(epochTime) + '_' + tree_params)

    q = queue.Queue()
    q.put(self.__root_node)

    try:
        while not q.empty():
            node = q.get()

            if node.leaf:
                value = '1_' + str(node.cluster_id)
            elif node.cluster_id:
                value = '0_' + str(node.cluster_id)
            else:
                value = '0_0'

            ret = self.get_redis().set(self.__bkt_key + '_' + node.id, value)
            if not ret:
                raise Exception("Failed to dump BKT node to redis")

            for child in node.children:
                ret = self.get_redis().sadd(self.__bkt_key + '_' + node.id + '_children', child.id)
                if not ret:
                    raise Exception("Failed to dump BKT sub tree to redis")

                q.put(child)

        return True 
    except Exception as e:
        print(e)
        return False
    
BKTree.dump = dump

既然dump是负责将树存储在redis中，那么必然有将树从red|is中重新加载进内存的方法，loads就是这样的函数。loads也使用广度优先遍历算法来重新构建树，然后调用update_centroid来重新计算每个非叶节点的簇中心。

In [13]:
def update_centroid(self, node):

    if node.leaf:
        data = self.__redis.get_data(bkt.__bkt_key + '_' + node.parent.id + '_' + str(node.cluster_id))
        total_sum = data.sum(axis = 0)
        total_size = len(data)

        node.centriod = total_sum / total_size

        return (total_sum, total_size)

    cluster_list = []
    for child in node.children:
        cluster_list.append(self.update_centroid(child))

    total_sum = np.zeros(shape = (cluster_list[0][0].shape[0], ))
    total_size = 0

    for sum, size in cluster_list:
        total_sum += sum
        total_size += size

    node.centriod = total_sum / total_size

    return (total_sum, total_size)


@classmethod
def loads(cls, bkt_key, redis_host, redis_port = 6379):

    redis = RedisDBWrapper(redis_host, redis_port)

    trees = redis.getHandler().smembers('my_bkt')
    found = False

    for tree in trees:            
        if bkt_key == tree.split('_')[1]:

            _, bkt_key, max_clusters_per_run, \
            max_depth, min_cluster_size, max_cluster_size, balance, root_id = tree.split('_')

            found = True
            break

    if not found:
        return None

    bkt = BKTree(int(max_clusters_per_run), int(max_depth), int(min_cluster_size), \
                 None, redis_host, redis_port, int(max_cluster_size), bool(balance))

    bkt.__bkt_key = bkt_key
    bkt.__redis = redis
    bkt.__root_node.id = root_id

    try:
        q = queue.Queue()
        q.put(bkt.__root_node)

        while not q.empty():
            node = q.get()

            if not node.leaf:
                children_ids = bkt.get_redis().smembers(bkt.__bkt_key + '_' + node.id + '_children')
            else:
                data = bkt.__redis.get_data(bkt.__bkt_key + '_' + node.parent.id + '_' + str(node.cluster_id))

                node.centriod = data.sum(axis = 0) / len(data)
                children_ids = []


            for child_id in children_ids:
                child_info = bkt.get_redis().get(bkt.__bkt_key + '_' + child_id)

                child_node = BKTNode()
                child_node.id = child_id

                child_node.leaf = True if child_info.split('_')[0] == '1' else False
                child_node.cluster_id = int(child_info.split('_')[1])
                child_node.parent = node

                q.put(child_node)

                node.children.append(child_node)

        bkt.update_centroid(bkt.__root_node)

        return bkt

    except Exception as e:
        print(e)
        return None
    
BKTree.update_centroid = update_centroid
BKTree.loads = loads

构建Balance K-means tree的目的是为了快速检索最相似的样本，给定一个样本，我们需要搜索出与该样本相似度最高的一个簇，再在这个簇里面检索与该样本最相似的K个样本。在这里我们逐个计算样本与树节点所代表的簇的簇中心的距离，选择最近的树节点并沿着该节点的子树继续上述查找过程，直到找到距离最近的叶节点，返回最近叶节点中的数据。类方法get_nearest_leaf_node用于查找与样本最接近的叶节点，get_nearest_cluster用于读取并返回最近叶节点中的数据。

In [160]:
def __get_nearest_cluster(self, point, node, leaf_clusters):

    if node.leaf:
        leaf_clusters.append((node, np.sqrt(((point - node.centriod) ** 2).sum())))

    for child in node.children:
        self.__get_nearest_cluster(point, child, leaf_clusters)

#返回与指定的数据点最接近的簇所对应的节点
#参数 point: 要检索的数据点
def get_nearest_leaf_node(self, point):

    leaf_clusters = []

    for child in self.__root_node.children:
        self.__get_nearest_cluster(point, child, leaf_clusters)

    if len(leaf_clusters) == 0:
        return None

    leaf_clusters = sorted(leaf_clusters, key = lambda x: x[1])

    return leaf_clusters[0][0]

#获取与指定的数据点最接近的簇数据
#参数 point: 要检索的数据点
def get_nearest_cluster(self, point):

    node = self.get_nearest_leaf_node(point)
    if node:
        return node.get_data(self.__redis, self.__bkt_key)

    return None

BKTree.__get_nearest_cluster = __get_nearest_cluster
BKTree.get_nearest_leaf_node = get_nearest_leaf_node
BKTree.get_nearest_cluster = get_nearest_cluster

构建Balance K-means tree主要由build和make_bkt_node方法来完成。build方法是构建树的驱动函数，它首先采用K-means算法将整个数据集分成k个大簇，然后调用make_bkt_node方法来构建子树，最后调用adjust_tree_depth方法来调整树的高度。make_bkt_node是一个重要方法，它负责树节点的构造，如果该节点中的数据量不小于min_cluster_size并且不大于max_cluster_size，则是一个叶节点，否则继续使用K-means算法来划分节点数据递归地构建它的子树。

In [15]:
#构建balanced k-means tree
#参数 data:  numpy array类型的数据集 
def build(self, data):
    size = len(data)
    if size == 0:
        return self.__root_node
    
    max_cluster_size = max(self.__min_cluster_size * 2, self.__max_cluster_size)

    if size <= max_cluster_size:
        cur_node = BKTNode()
        cur_node.parent = self.__root_node
        cur_node.centriod = data.sum(axis = 0) / len(data)
        cur_node.cluster_id = 0

        self.__redis.save_data(data, self.__bkt_key + '_' + self.__root_node.id + "_" + str(cur_node.cluster_id))
        self.__root_node.children.append(cur_node)

        return self.__root_node

    k = min(size / max_cluster_size + 1, self.__max_clusters_per_run)

    clusters = FastKmeans.fit(data, k, self.__redis_host, self.__redis_port, self.__sc, self.__bkt_key + '_' + self.__root_node.id)

    for cid, centriod, _, ret in clusters:
        if not ret:
            print("Failed to build BKT due to redis write error")
            return None

        child_node = self.make_bkt_node(cid, centriod, self.__root_node)
        if not child_node:
            print("Failed to build BKT")
            return None

        self.__root_node.children.append(child_node)

    self.adjust_tree_depth()

    return self.__root_node

#构建balanced k-means tree的节点
#参数 centriod_id： 该节点所对应的簇id
#参数 centriod：    该节点所对应的簇中心
#参数 parent_node： 该节点的父节点
def make_bkt_node(self, cluster_id, centriod, parent_node):

    cur_node = BKTNode()
    cur_node.parent = parent_node
    cur_node.centriod = centriod
    cur_node.cluster_id = cluster_id

    #读取该节点所对应簇的数据
    data = self.__redis.get_data(self.__bkt_key + '_' + parent_node.id + '_' + str(cluster_id))

    size = len(data)
    max_cluster_size = max(self.__min_cluster_size * 2, self.__max_cluster_size)

    #如果该节点所对应簇的数据量小于max_cluster_size，则完成一个叶子节点
    if size <= max_cluster_size:
        cur_node.leaf = True
        return cur_node

    #计算该节点中的数据还可以分成几个簇，如果只能分成2个簇，则做均分处理，否则进行一轮K-means聚类
    k = min(int(size / max_cluster_size + 1), self.__max_clusters_per_run)
    if self.__balance:
        if k == 2:
            clusters = self.half_cut_cluster(data, self.__bkt_key + '_' + cur_node.id)
        else:
            clusters = FastKmeans.fit(data, k, self.__redis_host, self.__redis_port, self.__sc, self.__bkt_key + '_' + cur_node.id)
    else:
        clusters = FastKmeans.fit(data, k, self.__redis_host, self.__redis_port, self.__sc, self.__bkt_key + '_' + cur_node.id)

    if False in [ret for _, _, _, ret in clusters]:
        print("Failed to build BKT due to redis write error")
        return None

    #检查K-means聚类所产生的簇，将小于min_cluster_size的簇归并起来
    if k > 2 and self.__balance:
        clusters = self.merge_small_clusters(clusters, cur_node)

    #对聚类所形成的新簇递归构建子树
    for cid, centriod, _, _ in clusters:
        child_node = self.make_bkt_node(cid, centriod, cur_node)
        if not child_node:
            return None

        cur_node.children.append(child_node)

    return cur_node

BKTree.build = build
BKTree.make_bkt_node = make_bkt_node

在构建树的过程中，需要用到2个辅助方法，一个是half_cut_cluster，另一个是merge_small_clusters。

half_cut_cluster负责将一个簇切成大小相等的2个小簇，如果一个簇的数据量只够切成2个小簇，则使用对半切，因为我们对簇的最小数据量有硬性的需求，如果不对半切而继续使用k-means来分成2个小簇的话，可能会造成一个小簇的数据量永远少于规定的最小数据量，从而算法无法收敛。

merge_small_clusters负责处理K-means聚类产生的小簇，如果多个小簇的数据量都小于规定的最小数据量，则将这些小簇合并起来直到合并后的簇的数据量不小于min_cluster_size并且小大于max_cluster_size

In [162]:
#将数据切分成大小相等的两个簇
def half_cut_cluster(self, data, cluster_key, cluster_ids = ()):

    clusters = []
    if len(cluster_ids) == 0:
        first_id = 0
        second_id = 1
    else:
        first_id, second_id = cluster_ids

    data_len = int(len(data) / 2)

    ret = self.__redis.save_data(data[:data_len], cluster_key + "_" + str(first_id))
    cluster = (first_id, data[:data_len].sum(axis = 0) / data_len, data_len, ret)
    clusters.append(cluster)

    data_len = len(data) - int(len(data) / 2)
    ret = self.__redis.save_data(data[int(len(data) / 2):], cluster_key + "_" + str(second_id))
    cluster = (second_id, data[int(len(data) / 2):].sum(axis = 0) / data_len, data_len, ret)
    clusters.append(cluster)

    return clusters

#检查K-means聚类所产生的簇，将小于min_cluster_size的簇归并起来
def merge_small_clusters(self, clusters, parent_node):

    clusters = sorted(clusters, key = lambda x: x[2])
    to_merges = []

    for i, cluster in enumerate(clusters):
        if cluster[2] < self.__min_cluster_size:
            to_merges.append((cluster[0], cluster[2]))
        else:
            clusters = clusters[i:]
            break

    max_cluster_size = max(self.__min_cluster_size * 2, self.__max_cluster_size)

    if len(to_merges) == 0:
        return clusters

    if len(to_merges) == len(clusters):
        clusters = []
        new_cid = max([cluster[0] for cluster in to_merges]) + 1
    else:
        new_cid = max([cluster[0] for cluster in clusters]) + 1

    i = 0
    new_clusters = []

    while i < len(to_merges):

        merged_data = np.zeros(shape = (1, parent_node.centriod.shape[0]))

        size = 0

        while i < len(to_merges) and size + to_merges[i][1] <= max_cluster_size:

            data = self.__redis.get_data(self.__bkt_key + '_' + parent_node.id + '_' + str(to_merges[i][0]))

            merged_data = np.concatenate((merged_data, data), axis = 0)

            size += to_merges[i][1]

            i += 1

        if size >= self.__min_cluster_size and size <= max_cluster_size:

            merged_data = merged_data[1:]
            ret = self.__redis.save_data(merged_data, self.__bkt_key + '_' + parent_node.id + '_' + str(new_cid))

            new_cluster = (new_cid, merged_data.sum(axis = 0) / len(merged_data), len(merged_data), ret)
            new_clusters.append(new_cluster)

        #如果就剩下最后一个小簇，那么表示在它前面的小簇都合并完了，并且合并后的大小小于max_cluster_size，那么就把这最后一个小簇也合并掉，
        #这样这个簇大小肯定超过max_cluster_size了，可以继续在下次迭代中分裂。
        if i == len(to_merges) - 1:

            data = self.__redis.get_data(self.__bkt_key + '_' + parent_node.id + '_' + str(to_merges[i][0]))

            merged_data = np.concatenate((merged_data, data), axis = 0)
            ret = self.__redis.save_data(merged_data, \
                                         self.__bkt_key + '_' + parent_node.id + '_' + str(new_cid))

            new_cluster = (new_cid, merged_data.sum(axis = 0) / len(merged_data), len(merged_data), ret)
            new_clusters[len(new_clusters) - 1] = new_cluster
            break

        #如果小簇合并后还是小于min_cluster_size，那么就和第一个大簇合并，并且将合并之后的簇再对半切成两个均等的簇，继续在下次迭代中分裂。
        #在这一步中，如果不做对半切，算法可能永远无法收敛
        if size < self.__min_cluster_size and len(clusters) > 0:

            data = self.__redis.get_data(self.__bkt_key + '_' + parent_node.id + '_' + str(clusters[0][0]))
            merged_data = np.concatenate((merged_data, data), axis = 0)

            new_clusters.extend(self.half_cut_cluster(merged_data[1:], self.__bkt_key + '_' + parent_node.id, (new_cid, new_cid + 1)))
            new_cid += 1
            clusters = clusters[1:]
            break

        new_cid += 1

    new_clusters.extend(clusters)

    return new_clusters

BKTree.half_cut_cluster = half_cut_cluster
BKTree.merge_small_clusters = merge_small_clusters

在构建完树结构之后，我们可能还要根据指定的最大树高度来调整树的高度。get_tree_depth用于返回树的高度，采用了深度优先遍历的算法。adjust_tree_depth负责调整树的高度，它会判断当前的树高度，如果大于指定的最大高度，则调用__adjust_tree_depth来调整。__adjust_tree_depth方法会采用递归的深度优先遍历算法，先找到子节点全是叶节点的非叶节点，将它的所有叶节点都挂载到它的父节点上，再将该节点裁剪掉，然后再层层往上调整非叶节点的簇中心点。

In [163]:
#获取指定节点的子树高度
def get_tree_depth(self, node):

    if len(node.children) == 0:
        return 0

    depths = []
    for child in node.children:
        depths.append(self.get_tree_depth(child) + 1)

    return max(depths)

#调整指定节点下的子树高度
def __adjust_tree_depth(self, node):

    if len(node.children) == 0:
        return

    all_leaf_childs = np.array([child.leaf for child in node.children]).all()

    if not all_leaf_childs:
        children = node.children

        for child in children:
            if not child.leaf:
                self.__adjust_tree_depth(child)

        if len(children) != len(node.children):
            merged_data = np.zeros(shape = (1, node.centriod.shape[0]))

            for child in node.children:
                data = self.__redis.get_data(self.__bkt_key + '_' + node.id + '_' + str(child.cluster_id))

                merged_data = np.concatenate((merged_data, data), axis = 0)

            node.centriod = merged_data.sum(axis = 0) / (len(merged_data) - 1)

        return

    parent_cluster_ids = [child.cluster_id for child in node.parent.children]
    new_cluster_id = max(parent_cluster_ids) + 1

    for child in node.children:
        data = self.__redis.get_data(self.__bkt_key + '_' + node.id + '_' + str(child.cluster_id))
        if data is None:
            print(self.__bkt_key + '_' + node.id + '_' + str(child.cluster_id))

        self.__redis.save_data(data, self.__bkt_key + '_' + node.parent.id + '_' + str(new_cluster_id))

        child.cluster_id = new_cluster_id
        child.parent = node.parent

        node.parent.children.append(child)

        new_cluster_id += 1

    node.parent.children.remove(node)

#调整树高度
def adjust_tree_depth(self):

    while self.get_tree_depth(self.__root_node) > self.__max_depth:
        self.__adjust_tree_depth(self.__root_node)
        
BKTree.get_tree_depth = get_tree_depth
BKTree.__adjust_tree_depth = __adjust_tree_depth
BKTree.adjust_tree_depth = adjust_tree_depth

## 测试Balanced K-means tree

在构建完Balanced K-means tree后，我们就可以开始着手测试。测试算法的benchmark同样来自于texmex corpus的ANN测试数据集，测试步骤如下：

启动spark session，这个magic函数是我们为机器学习平台打造的，指定driver和executor的内存各为10GB，在每个work主机上使用一个executor，每个executor进程使用4个CPU核。

In [None]:
#启动spark session
%start_pyspark --driver_memory 10g --executor_memory 10g --executor_instances 1 --executor_cores 4 --driver_maxResultSize 10g

读取数据，这批数据一共有十万个特征点，SIFT的维度是128。

In [45]:
data = fvecs_read('sift/sift_learn.fvecs').astype(np.float64)
data.shape

(100000, 128)

因为我们使用redis来存储数据，而redis的string类型对value长度有最大限制，超过限制就要将数据分片存储，所以需要计算一个分片中所包含的最大数据个数。

In [164]:
#获取合适的redis chunk size
RedisDBWrapper.CHUNK_SIZE = int(RedisDBWrapper.MAX_CHUNK_SIZE / data.shape[1])

初始化Balanced K-means tree, 指定最大聚类簇数量为5，树最大高度为4， 最小簇大小为2500

In [165]:
bkt = BKTree()
bkt.init(max_clusters_per_run = 5, max_depth = 4, min_cluster_size = 2500, sc = sc, redis_host = "10.10.50.32")

基于数据集data来构建Balanced K-means tree，并计算构建时间，可见最终100000个数据花了1分17秒就完成了树的构建，性能还是蛮快的。

In [166]:
%time root = bkt.build(data)

CPU times: user 20.5 s, sys: 1.12 s, total: 21.7 s
Wall time: 1min 17s


接下来我们看看树的检索，给定data[59909]这个样本点，从树中搜索出和它最接近的簇，并查看下该簇中数据的形状。

In [167]:
#获取数据data[59909]的最近邻簇
nearest_cluster = bkt.get_nearest_cluster(data[59909])
nearest_cluster.shape

(3391, 128)

然后我们将树从内存中dump到redis集群，再重新加载进内存，看看前后树是否一致：

In [168]:
bkt.dump()

True

In [169]:
bkt_reload = BKTree.loads(bkt.get_key(), "10.10.50.32")

In [170]:
nearest_cluster2 = bkt_reload.get_nearest_cluster(data[59909])
nearest_cluster2.shape

(3391, 128)

In [171]:
(nearest_cluster == nearest_cluster2).all()

True