Skip to content

Commit

Permalink
Feature: DBScan: fix bugs, refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
KirovVerst committed Jun 16, 2018
1 parent b53f5b6 commit c772d2e
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 18 deletions.
43 changes: 32 additions & 11 deletions qparallel/clustering/dbscan.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
__author__ = 'Azat Abubakirov'

import numpy as np

import os
from pathos.multiprocessing import ProcessPool as Pool

from qparallel.clustering.base import Model
from qparallel.helpers import (
get_available_cpu_count
)

import matplotlib.pyplot as plt


class Point:
def __init__(self, x, y, label=0, index=None):
def __init__(self, x, y, label=-1, index=None):
self.x = x
self.y = y
self.label = label
Expand Down Expand Up @@ -40,10 +42,10 @@ def __init__(self, eps=0.1, min_points=10, *args, **kwargs):
def pre_process(self, data):
"""
:param data:
:return:
:param data: [(x1, y1), (x2, y2), ]
:return: [Point(x=x1, y=y2), Point(x=x2, y=y2)]
"""
return list(sorted(map(lambda xy: Point(*xy), data)))
return list(sorted(map(lambda coordinates: Point(*coordinates), data)))

def _get_neighbors(self, point, points):
return list(filter(lambda other: point.is_close(other, self.eps), points))
Expand All @@ -55,8 +57,6 @@ def _grow_cluster(self, point, points, neighbors, label):
neighbor = neighbors[i]
if neighbor.label == -1:
neighbor.label = label
elif neighbor.label == 0:
neighbor.label = label
next_neighbors = self._get_neighbors(neighbor, points)

if len(next_neighbors) >= self.min_points:
Expand All @@ -70,9 +70,10 @@ def _clustering(self, points):
:param points: sorted list of Point objects
:return:
"""
label = 0
np.random.seed(os.getpid())
label = np.random.randint(-100000, 100000, size=1)[0]
for point in points:
if not point.label != 0:
if point.label != -1:
continue

neighbors = self._get_neighbors(point, points)
Expand Down Expand Up @@ -129,9 +130,9 @@ def merge_clusters(self, clusters):

return merged_clusters

def fit(self, data, cpu_count=-1, *args, **kwargs):
def fit(self, data, cpu_count=-1):
"""
:param data: list of 2D-points. [[1,2], [3,4], ...]
:param data: list of coordinates. [[1,2], [3,4], ...]
:param cpu_count:
:param args:
:param kwargs:
Expand All @@ -146,3 +147,23 @@ def fit(self, data, cpu_count=-1, *args, **kwargs):

merged_clusters = self.merge_clusters(list(clustered_chunks))
return merged_clusters

def plot(self, marked_points):
"""
:param marked_points: [Point(x=x1, y=y1, label=1), Point(x=x2, y=y2, label=2)]
:return:
"""
data = dict()

for point in marked_points:
if point.label in data:
data[point.label][0].append(point.x)
data[point.label][1].append(point.y)
else:
data[point.label] = [[point.x], [point.y]]

for label in data:
plt.scatter(x=data[label][0], y=data[label][1], c=np.random.rand(3, ))

plt.show()
20 changes: 20 additions & 0 deletions qparallel/experiments/clustering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
__author__ = 'Azat Abubakirov'

from qparallel.experiments.logger import AbstractEvaluator
from qparallel.clustering.dbscan import DBScan


class DBScanEvaluator(DBScan, AbstractEvaluator):
algorithm_name = 'db_scan'

def _execute(self, **configuration):
data = configuration['data']
cpu_count = configuration['cpu_count']

super(DBScanEvaluator, self)._execute(cpu_count=cpu_count, data_size=len(data))

return self._estimate_execution_time(
super(DBScanEvaluator, self),
'fit', 'total_time',
data=data, cpu_count=cpu_count
)
11 changes: 7 additions & 4 deletions qparallel/experiments/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ def to_list(self):

class AbstractEvaluator:
record_class = Record
algorithm_name = None

def _execute(self, **kwargs):
raise NotImplementedError
def _execute(self, cpu_count, data_size, **kwargs):
self.record.algorithm_name = self.algorithm_name
self.record.cpu_count = cpu_count
self.record.data_size = data_size

def run(self, **configuration):
self.record = self.record_class()
Expand All @@ -39,7 +42,7 @@ def _estimate_execution_time(self, obj, function_name, record_field, **arguments


class Logger:
def __init__(self, csv_file_path, field_names, iterations, configurations, evaluators):
def __init__(self, csv_file_path, iterations, configurations, evaluators, field_names=None):
"""
:param csv_file_path:
Expand All @@ -48,10 +51,10 @@ def __init__(self, csv_file_path, field_names, iterations, configurations, evalu
:param configurations: [{'data': [1,2,3], 'cpu_count': 1, ...}]
"""
self.path = csv_file_path
self.field_names = field_names
self.iterations = iterations
self.configurations = configurations
self.evaluators = evaluators
self.field_names = field_names if field_names else Record.field_names()

def init_csv_file(self):
with open(self.path, 'w') as fp:
Expand Down
4 changes: 1 addition & 3 deletions qparallel/experiments/sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ def _execute(self, **configuration):
array = configuration['array']
cpu_count = configuration['cpu_count']

self.record.algorithm_name = self.algorithm_name
self.record.cpu_count = cpu_count
self.record.data_size = len(array)
super(AbstractSortingEvaluator, self)._execute(cpu_count=cpu_count, data_size=len(array))

return self._estimate_execution_time(self, 'sort', 'total_time', array=array, cpu_count=cpu_count)

Expand Down
29 changes: 29 additions & 0 deletions scripts/clustering_experiments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
__author__ = 'Azat Abubakirov'

import os

from sklearn.datasets.samples_generator import make_blobs
from qparallel.experiments.clustering import DBScanEvaluator

from qparallel.experiments.logger import Logger
from scripts.config import RESULTS_DIR_PATH

if __name__ == '__main__':
centers = [[1, 1], [-1, -1], [1, -1], [-1, 1]]
X, labels_true = make_blobs(n_samples=1000, centers=centers, cluster_std=0.4, random_state=0)

configurations = [{'data': X, 'cpu_count': cpu_count} for cpu_count in [1, 2, 4]]

iterations = 3

evaluators = [
DBScanEvaluator(eps=0.3, min_points=10)
]

logger = Logger(
csv_file_path=os.path.join(RESULTS_DIR_PATH, 'clustering.csv'),
evaluators=evaluators,
configurations=configurations,
iterations=iterations
)
logger.start()

0 comments on commit c772d2e

Please sign in to comment.