## Utils Module

In [1]:
# %load ../utils/utils.py
import numpy as np
import time


class DataLoader(object):
    @staticmethod
    def load_data_label(path: str):
        """
        this is for input file with (coordinate_x, coordinate_y, ... , label) in each line
        """
        with open(path, 'r') as f:
            data = []
            label = []
            for l in f.readlines():
                source = l.strip().split()
                data.append([float(val) for val in source[:-1]])
                label.append(int(source[-1]))
            return np.array(data), np.array(label)

    @staticmethod
    def load_data(path: str):
        """
        this is for input file with (coordinate_x, coordinate_y, ...) in each line
        """
        with open(path, 'r') as f:
            data = []
            for l in f.readlines():
                source = l.strip().split()
                data.append([float(val) for val in source])
            return np.array(data)


class Evaluation(object):
    @classmethod
    def silhouette_coefficient(cls, dbscan_obj):
        def a(pid, tags, dist_matrix):
            mask = tags == tags[pid]
            avg_dist = np.sum(dist_matrix[pid] * mask, axis=0) / np.sum(mask)
            return avg_dist

        def b(pid, tags, dist_matrix):
            avg_dists = []
            for label in range(1,
                               max(tags) + 1):  # cluster label starts from 1
                if label == tags[pid]:
                    continue
                mask = tags == label
                avg_dists.append(
                    np.sum(dist_matrix[pid] * mask, axis=0) / np.sum(mask))
            return min(avg_dists)

        # preparation
        # if sum(dbscan_obj.tags) == -dbscan_obj.num_p:
        if sum(dbscan_obj.tags) < 0:
            raise Exception('There are no tags in target dbscan method.')
        if not hasattr(dbscan_obj, 'dist_m'):
            # by default, we try to use matrix dbscan to tune parameters
            # BUG: If use basic dbscan has no _get_distance_matrix() attribute function
            dbscan_obj._get_distance_matrix()
        tags = np.array(dbscan_obj.tags)

        # TODO: this method still can be optimised by matrix computation
        res = 0
        for pid in range(dbscan_obj.num_p):
            tmp_a = a(pid, tags, dbscan_obj.dist_m)
            tmp_b = b(pid, tags, dbscan_obj.dist_m)
            res += (tmp_b - tmp_a) / max(tmp_b, tmp_a)
        res /= dbscan_obj.num_p

        print(
            f'eps: {dbscan_obj.eps} min points: {dbscan_obj.min_pts} silhouette coefficient: {res}'
        )
        return res


def timeit(func):
    def wrapper(*args, **wargs):
        start = time.time()
        res = func(*args, **wargs)
        end = time.time()
        print(f'{func.__name__} time cost: {(end-start)*1000}ms')
        return res

    return wrapper


## Local DBSCAN Module

In [2]:
# Status
UNKNOWN = -1
NOISE = -2

class DBSCAN(object):
    """
    Base Class of DBSCAN, please do NOT instantiate this Class
    """

    def __init__(self, dataset):
        """
        DBSCAN Classes should be instantiate with data point set
        """
        self.m, _ = (dataset, None)     # placeholder _ for future implementation of labels
        self.num_p = self.m.shape[0]
        self.tags = [UNKNOWN] * self.num_p

    def _get_dist(self, a, b, fast_mode: bool = False) -> float:
        """
        for float comparison, set all distance value precision to 5
        :param: a: int; index of given point in data matrix
        :param: b: same as a
        :param: fast_mode: bool -> if True, ignore sqrt() opration for distance
        """
        if fast_mode:
            result = np.power(self.m[b] - self.m[a], 2).sum()
        else:
            result = np.sqrt(np.power(self.m[b] - self.m[a], 2).sum())
        return round(result, 5)

    def _get_neighbours(self, p: int, eps: float, fast_mode=False) -> list:
        """
        return neighbours index of given point p in source data matrix
        :param: p: int; index of given point in data matrix
        :param: eps: float; the value of radius of density area
        """
        pass

    def _clustering(self, p, eps, min_pts, cluster_id, fast_mode=False):
        """
        tag given point p and all of its neighbours and sub-neighbours with the same cluster id
        :param: m: np.matrix; N * 2 matrix recoding all nodes' coordinates
        :param: eps: float; the value of radius of density area
        :param: min_pts: int; least neighbours should be in a density area
        :param: cluster_id: int; current id of cluster
        """
        pass
    
    def _find_core_pts(self, eps, min_pts):
        self.is_core = [0] * self.num_p
        for i in range(self.num_p):
            if len(self._get_neighbours(i, eps, min_pts)) > min_pts:
                self.is_core[i] = 1
        return self.is_core
        

    def predict(self, eps, min_pts, fast_mode=False) -> list:
        """
        return list of labels as the sequence in data matrix
        :param: m: np.matrix; N * 2 matrix recoding all nodes' coordinates
        :param: eps: float; the value of radius of density area
        :param: min_pts: int; least neighbours should be in a density area
        """
        self.eps = eps
        self.min_pts = min_pts

        cluster_id = 1
        for p_id in range(self.num_p):
            if self.tags[p_id] != UNKNOWN:
                continue
            if self._clustering(p_id, eps, min_pts, cluster_id, fast_mode):
                cluster_id += 1
        return np.array(self.tags)


class NaiveDBSCAN(DBSCAN):

    def __init__(self, dataset):
        super(NaiveDBSCAN, self).__init__(dataset)

    def _get_neighbours(self, p: int, eps: float, fast_mode=False) -> list:

        ngbs = []
        for idx in range(len(self.m)):
            if self._get_dist(p, idx, fast_mode) < eps:
                ngbs.append(idx)
        return ngbs

    def _clustering(self, p, eps, min_pts, cluster_id, fast_mode=False) -> bool:

        neighbours = self._get_neighbours(p, eps, fast_mode)
        if len(neighbours) < min_pts:
            self.tags[p] = NOISE
            return False
        else:
            self.tags[p] = cluster_id
            for idx in neighbours:
                self.tags[idx] = cluster_id
            while len(neighbours) > 0:
                sub_neighbours = self._get_neighbours(neighbours[0], eps, fast_mode)
                if len(sub_neighbours) >= min_pts:
                    for sub_n in sub_neighbours:
                        if self.tags[sub_n] < 0:
                            self.tags[sub_n] = cluster_id
                            if self.tags[sub_n] == UNKNOWN:
                                neighbours.append(sub_n)
                neighbours = neighbours[1:]
        return True
    

class MatrixDBSCAN(DBSCAN):

    def __init__(self, dataset):
        super(MatrixDBSCAN, self).__init__(dataset)
        self._get_distance_matrix()     # self.dist_m will be created
        del self.m

    def _get_distance_matrix(self):
        """
        Only once calculation will be on each point-pairs
        results will be stored in self.dist_m
        """

        self.dist_m = np.zeros((self.num_p, self.num_p))
        for p_id in range(self.num_p):
            for q_id in range(p_id, self.num_p):
                dist = self._get_dist(p_id, q_id)
                self.dist_m[q_id, p_id] = dist
                self.dist_m[p_id, q_id] = dist

    def _get_neighbours(self, p: int, eps: float, fast_mode=False) -> list:
        return np.nonzero(self.dist_m[p] < eps)[0]

    def _clustering(self, p, eps, min_pts, cluster_id, fast_mode=False) -> bool:
        """
        TODO: There should be some optimizations for this part, current code is too ugly
        """

        neighbours = self._get_neighbours(p, eps, fast_mode)
        if len(neighbours) < min_pts:
            self.tags[p] = NOISE
            return False
        else:
            self.tags[p] = cluster_id
            for idx in neighbours:
                self.tags[idx] = cluster_id
            while len(neighbours) > 0:
                sub_neighbours = self._get_neighbours(neighbours[0], eps, fast_mode)
                if len(sub_neighbours) >= min_pts:
                    for sub_n in sub_neighbours:
                        if self.tags[sub_n] < 0:
                            self.tags[sub_n] = cluster_id
                            if self.tags[sub_n] == UNKNOWN:
                                neighbours.append(sub_n)
                neighbours = neighbours[1:]
        return True

## Parallel Implementation

In [3]:
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark import RDD

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [8]:
def parse_line(l):
    s = l.split()
    return (float(s[0]), float(s[1]))
def load_data_label(path):
    pts = sc.textFile(path).map(parse_line)
    return pts.collect()

In [5]:
def _bounds_coordinates(bin_bounds):
#     coordinates = [bounds[0]]

    lower_cdnts = [[low] for low in  bin_bounds[0][:-1]]
    upper_cdnts = [[high] for high in bin_bounds[0][1:]]
    
    # super stupid implementation, optimization needed
    for bound in bin_bounds[1:]:
        lower_tmp = []
        upper_tmp = []
        
        for bc in bound[:-1]:
            lower_tmp.extend([lc + [bc] for lc in lower_cdnts])
        lower_cdnts = lower_tmp
        
        for bc in bound[1:]:
            upper_tmp.extend([uc + [bc] for uc in upper_cdnts])
        upper_cdnts = upper_tmp
        
    return np.array(lower_cdnts), np.array(upper_cdnts)

def partition(dataset, n_partitions, eps):
    """
    n_partitions: tuple with shape correspoding to dataset dimension
    """
    partition_dim = n_partitions
    n_partitions = np.prod(n_partitions)
    # cut bins
    bounds = np.concatenate(([np.min(dataset, axis=0)], [np.max(dataset, axis=0)]), axis=0) # 2 * D
    bounds_dim = bounds.T # D * 2, 
    
    bin_bounds = []
    for i in range(len(partition_dim)):
        dim_bins = np.linspace(*bounds_dim[i], partition_dim[i]+1, endpoint=True)
        bin_bounds.append(dim_bins)
    
    lower_bounds, upper_bounds = _bounds_coordinates(bin_bounds)
    lower_bounds -= eps
    upper_bounds += eps
    
    print(lower_bounds)
    print(upper_bounds)
    
    # scatter points into bins with eps
    indexed_data = []
    for id_pts in range(len(dataset)):     # index of point in dataset
        for id_ptt in range(n_partitions):
            if not (dataset[id_pts] > lower_bounds[id_ptt]).all():
                continue
            if not (dataset[id_pts] < upper_bounds[id_ptt]).all():
                continue
            indexed_data.append([id_ptt, id_pts])
            
    res = sc.parallelize(indexed_data).groupByKey().map(lambda x: [x[0], list(x[1])])
    return res

def local_dbscan(partioned_rdd):

    dataset = np.array([b_dataset.value[idp] for idp in partioned_rdd])
    dbscan_obj = MatrixDBSCAN(dataset)
    dbscan_obj.predict(b_eps.value, b_min_pts.value)
    is_core_list = dbscan_obj._find_core_pts(b_eps.value, b_min_pts.value)
    
    return list(zip(zip(partioned_rdd, is_core_list), dbscan_obj.tags))

def merge(local_tags, dataset):
    global_tags = [UNKNOWN] * len(dataset)
    is_tagged = [0] * len(dataset)
    last_max_label = 0
    for local in local_tags:
        np_local = np.array(local[-1])
        np_local[:, -1] += last_max_label
        last_max_label = np.max(np_local[:, -1])
        
        # check and merge overlapped points
        tagged_indices = np.nonzero(is_tagged)[0]
        for tmp_i in range(len(np_local)):
            # should do tag check
            (p_id, is_core), label = np_local[tmp_i]
            if p_id in tagged_indices and is_core==1:
                np_local[-1][np_local[-1]==label] = global_tags[p_id]
        
        # update global tags
        for (p_id, is_core), label in np_local:
            if is_tagged[p_id]==1:
                continue
            global_tags[p_id] = label
            is_tagged[p_id] = 1
    return global_tags
            

### Parallel Test

In [9]:
test_file = '../../s1.txt'
dataset = load_data_label(test_file)
n_partitions = 4
eps = 0.8
min_pts = 10

In [18]:
from rtree import index
from collections import deque
import math

In [52]:
# dataset

In [11]:
b_dataset = sc.broadcast(dataset)
b_eps = sc.broadcast(eps)
b_min_pts = sc.broadcast(min_pts)
n_partitions = (3, 2)

partitioned_rdd = partition(dataset, n_partitions, eps)
local_tags = partitioned_rdd.mapValues(lambda x: local_dbscan(x)).collect()
result_tags = merge(local_tags, dataset)

[[ 19834.2         51120.2       ]
 [333872.86666667  51120.2       ]
 [647911.53333333  51120.2       ]
 [ 19834.2        510937.7       ]
 [333872.86666667 510937.7       ]
 [647911.53333333 510937.7       ]]
[[333874.46666667 510939.3       ]
 [647913.13333333 510939.3       ]
 [961951.8        510939.3       ]
 [333874.46666667 970756.8       ]
 [647913.13333333 970756.8       ]
 [961951.8        970756.8       ]]


In [12]:
for i in partitioned_rdd.collect():
    print(i[0], len(i[1]))

0 580
1 1092
2 749
3 807
4 823
5 949


In [129]:
dbscan_obj = MatrixDBSCAN(np.array(dataset))
dbscan_obj.tags = result_tags
dbscan_obj.eps = eps
dbscan_obj.min_pts = min_pts

Evaluation.silhouette_coefficient(dbscan_obj)



eps: 0.8 min points: 10 silhouette coefficient: 0.5871332307631024


0.5871332307631024

## RTree based Partition

In [130]:
from rtree import index
from collections import deque
import math

In [13]:
def cost_base_partition(rtree, maxCost, eps):
    mbr = rtree.bounds
    partition_list = []
    queue = deque()
    queue.append(mbr)
    while len(queue):
        br = queue.popleft()
        nPoints = rtree.count(br)
        if _get_cost(br, nPoints)> maxCost:
            (subbr1, subbr2) = _cost_base_split(rtree, br, eps)
            queue.append(subbr1)
            queue.append(subbr2)
        else:
            partition_list.append(br)
    return partition_list

def _get_cost(bounds, nPoints, fanout = 2):
    h = math.log( (nPoints+1) /fanout, fanout) + 1
    DA  = h + math.sqrt(nPoints)*2/(math.sqrt(fanout) - 1) + nPoints/(fanout - 1) + 1
    return DA * nPoints


def _cost_base_split(rtree, bounds, eps):
    (xmin, ymin, xmax, ymax) = bounds
    #vertical split  
    ymin_diff = float('inf')    
    ysplit = ymin + (ymax - ymin)/2
    ybest_split = ((xmin, ymin, xmax, ysplit), (xmin, ysplit, xmax, ymax)) 
    while( ysplit + eps * 2<= ymax):      
        lowerbr = (xmin, ymin, xmax, ysplit)
        lowercost = _get_cost(lowerbr, rtree.count(lowerbr))
        
        upperbr = (xmin, ysplit, xmax, ymax)
        uppercost = _get_cost(upperbr, rtree.count(upperbr))
        costdiff = abs(uppercost - lowercost)
        if costdiff < ymin_diff:
            ymin_diff = costdiff
            ybest_split = (lowerbr, upperbr)
            if uppercost < lowercost:
                ysplit = ymin + (ysplit - ymin)/2
            else:
                ysplit = ysplit + (ymax - ysplit)/2
        else:
            break
    
    #horizontal split
    xmin_diff = float('inf')    
    xsplit = xmin + (xmax - xmin)/2
    xbest_split = ((xmin, ymin, xsplit, ysplit), (xsplit, ymin, xmax, ymax))
    while( xsplit + eps * 2<= xmax):   
        lowerbr = (xmin, ymin, xsplit, ymax)
        lowercost = _get_cost(lowerbr, rtree.count(lowerbr))
        
        upperbr = (xsplit, ymin, xmax, ymax)
        uppercost = _get_cost(upperbr, rtree.count(upperbr))
        costdiff = abs(uppercost - lowercost)
        if costdiff < xmin_diff:
            xmin_diff = costdiff
            xbest_split = (lowerbr, upperbr)
            if uppercost < lowercost:
                xsplit = xmin + (xsplit - xmin)/2
            else:
                xsplit = xsplit + (xmax - xsplit)/2        
        else:
            break
    
    #compare ysplit and xsplit
    if xmin_diff < ymin_diff:
        return xbest_split
    else:
        return ybest_split

In [14]:
def reduced_boundary_partition(rtree, maxPoints, eps):
    mbr = rtree.bounds
    partition_list = []
    queue = deque()
    queue.append(mbr)
    while len(queue):
        br = queue.popleft()
        nPoints = rtree.count(br)
        if nPoints > maxPoints:
            (br1, br2) = _reduced_boundary_split(rtree, br, eps)
            queue.append(br1)
            queue.append(br2)
        else:
            partition_list.append(br)
    return partition_list
def _reduced_boundary_split(rtree, br, eps):
    (xmin, ymin, xmax, ymax) = br
    
    # vertical splitline candidates
    ymin_score = float('inf')
    ysplit = ymin + (ymax - ymin)/2
    ybest_split = ((xmin, ymin, xmax, ysplit), (xmin, ysplit, xmax, ymax)) 
    while(ysplit + eps*2 <= ymax):
        br1 = (xmin,ymin, xmax, ysplit)
        br2 = (xmin, ysplit, xmax, ymax)
        point_diff = abs(rtree.count(br1) - rtree.count(br2))
        score = point_diff * rtree.count((xmin, ysplit-eps, xmax, ysplit+eps))
        if score < ymin_score:
            ymin_score = score
            ybest_split = (br1, br2)
            if rtree.count(br1) > rtree.count(br2):
                ysplit = ymin + (ysplit - ymin)/2
            else:
                ysplit = ysplit + (ymax - ysplit)/2
        else:
            break
        
    # horizontal splitline candidates
    xsplit = xmin + eps * 2
    xmin_score = float('inf')
    xbest_split = ((xmin, ymin, xsplit, ymax), (xsplit, ymin, xmax, ymax)) 
    while( xsplit + eps * 2<= xmax):
        br1 = (xmin , ymin, xsplit, ymax)
        br2 = (xsplit, ymin, xmax, ymax)
        point_diff = abs(rtree.count(br1) - rtree.count(br2))
        score = point_diff * rtree.count((xmin - eps, ymin, xmin + eps, ymax))
        if score < xmin_score:
            xmin_score = score
            xbest_split = (br1, br2)
            if rtree.count(br1) > rtree.count(br2):
                xsplit = xmin + (xsplit - xmin)/2
            else:
                xsplit = xsplit + (xmax - xsplit)/2
        else:
            break

    if xmin_score < ymin_score:
        return xbest_split
    else:
        return ybest_split

In [44]:
#construct rtree index

def construct_rtree_index(dataset):
    p = index.Property()
    rtree_idx = index.Index(properties=p)
    count = 0
    for coordinate in dataset:       
        rtree_idx.insert(count,(*coordinate, *coordinate))
        count+=1        
    return rtree_idx

In [55]:
def rtree_partition(dataset):
    idx = construct_rtree_index(dataset)
    # #split test
    cost_based = cost_base_partition(idx, 1000000, 50)
    reduced_boundary = reduced_boundary_partition(idx, 500, 50)

    partitioned = cost_based

    indexed_data = []
    id_ptt = 0
    for boundary in partitioned:    
        (left, bot, right, top) = boundary
        for id_pts in idx.intersection((left-eps, bot-eps, right+eps, top+eps )):
            indexed_data.append([id_ptt, id_pts])
        id_ptt+=1

    res = sc.parallelize(indexed_data).groupByKey().map(lambda x: [x[0], list(x[1])])
    return res