In [1]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.utils import check_array
from collections import Counter

In [2]:
# Loading an image (replace filename if you want):
image_path = 'giraffe.png'
image = cv2.imread(image_path)

# Reducing the size of the image, so that DBSCAN runs in a reasonable amount of time:
# small_image is 0.5x the size of the original. You may change this value.
image = cv2.resize(image, None, fx=0.5, fy=0.5, interpolation=cv2.INTER_AREA)

height, width, _ = image.shape
pixel_data = image.reshape(-1, 3)

In [3]:
# DBSCAN
# Setting hyperparameter(s):
eps = 5
min_pts = 30

#Load data
# data_points = np.random.randint(low=0, high=255, size=(1000,3))
data_points = pixel_data
data_points.shape

(26800, 3)

In [4]:
def calculate_distance(point1, point2, metric='euclidean'):
    if metric == 'euclidean':
        # print(point1, point2)
        return np.sqrt(np.sum((point1 - point2) ** 2))
    elif metric == 'manhattan':
        return np.sum(np.abs(point1 - point2))
    else:
        raise ValueError(f"Unsupported metric: {metric}")

In [5]:
class KDTreeNode:
    def __init__(self, index, left=None, right=None):
        self.index = index
        self.left = left
        self.right = right

class KDTree:
    def __init__(self, data, metric='euclidean'):
        self.data = data
        self.root = self._build_tree(indices=np.arange(len(data)), depth=0)
        self.metric = metric

    def _build_tree(self, indices, depth):
        if len(indices) == 0:
            return None

        k = self.data.shape[1]
        axis = depth % k
        sorted_indices = indices[np.argsort(self.data[indices, axis])]
        median_index = len(sorted_indices) // 2

        # print(axis)
        # print(sorted_indices)
        # print(median_index)

        return KDTreeNode(
            index= sorted_indices[median_index],
            left=self._build_tree(sorted_indices[:median_index], depth+1),
            right=self._build_tree(sorted_indices[median_index + 1 : ],depth+1)
        )
    
    def query_radius(self, index, radius):
        results = []
        point = self.data[index]
        self._query_radius(node=self.root, point=point, radius=radius, depth=0, results=results)
        return results

    def _query_radius(self, node, point, radius, depth, results):
        if node is None:
            return

        k = self.data.shape[1]
        axis = depth % k
        node_point = self.data[node.index]

        distance = calculate_distance(point, node_point, metric=self.metric)

        if distance <= radius:
            results.append(node.index)
        
        diff = point[axis] - node_point[axis]
        if diff <= radius:
            self._query_radius(node=node.left, point=point, radius=radius, depth=depth+1, results=results)
        if diff >= -radius:
            self._query_radius(node=node.right, point=point, radius=radius, depth=depth+1, results=results)
    

In [6]:
pixel_data = pixel_data.astype(np.float64)
kdtree = KDTree(data=pixel_data)

In [None]:
for i in range(len(pixel_data)):
    neighbours = kdtree.query_radius(index=i,radius=eps)
    print(f'Neighbours for point {pixel_data[i]} at index {i}')
    print(len(neighbours), neighbours)

In [8]:
class KDTreeDBSCAN:
    def __init__(self, eps, min_samples, metric='euclidean'):
        self.eps = eps
        self.min_samples = min_samples
        self.metric = metric
        self.labels = None
    
    def fit(self, X):
        n = X.shape[0]
        self.labels = np.full(n, -1) # Mark all points as noise
        visited = np.zeros(n, dtype=bool)
        cluster_id = 0

        kdtree = KDTree(data=X, metric=self.metric)

        for point_idx in range(n):
            print(f'Exploring {point_idx}. Clusters so far: {Counter(self.labels)}')
            if visited[point_idx]:
                continue

            visited[point_idx] = True
            # neighbours = kdtree.query_radius([X[point_idx]], self.eps)
            neighbours = kdtree.query_radius(point_idx, self.eps)
            if len(neighbours) < self.min_samples:
                self.labels[point_idx] = -1 # Mark as Noise
            else:
                self._expand_cluster(X=X, point_idx=point_idx, neighbours=neighbours, visited=visited, kdtree=kdtree, cluster_id=cluster_id)
                cluster_id += 1
            
    def _expand_cluster(self, X, point_idx, neighbours, visited, kdtree, cluster_id):
        self.labels[point_idx] = cluster_id
        i=0
        while i < len(neighbours):
            neighbour_idx = neighbours[i]

            if not visited[neighbour_idx]:
                visited[neighbour_idx] = True
                # new_neighbours = kdtree.query_radius([X[neighbour_idx]], self.eps)
                new_neighbours = kdtree.query_radius(neighbour_idx, self.eps)
                if len(new_neighbours) >= self.min_samples:
                    neighbours = np.append(neighbours, new_neighbours)
                
                if self.labels[neighbour_idx] == -1:
                    self.labels[neighbour_idx] = cluster_id

            i+=1


    def fit_predict(self, X):
        self.fit(X)
        return self.labels
            

In [None]:
dbscan = KDTreeDBSCAN(eps=eps, min_samples=min_pts,metric='manhattan')
dbscan_labels = dbscan.fit_predict(X=pixel_data)

In [None]:
Counter(dbscan_labels)

In [None]:
from sklearn.cluster import DBSCAN

sklearn_dbscan = DBSCAN(eps=eps,min_samples=min_pts,metric='manhattan')
Counter(sklearn_dbscan.fit_predict(X=pixel_data))

In [None]:
from sklearn.neighbors import KDTree as sklearnKDTree
X = pixel_data

# Using scikit-learn KDTree for comparison
sklearn_tree = sklearnKDTree(X)
sklearn_neighbors = sklearn_tree.query_radius([X[0]], eps)

# Your KDTree
kdtree = KDTree(data=X)
custom_neighbors = kdtree.query_radius(0, eps)

print("Scikit-learn neighbors:", sklearn_neighbors)
print("Custom KDTree neighbors:", custom_neighbors)

In [None]:
sklearn_tree = sklearnKDTree(X)
sklearn_neighbors = sklearn_tree.query_radius([X[0]], eps)

# Your KDTree
kdtree = KDTree(data=X)
custom_neighbors = kdtree.query_radius(0, eps)

print("Scikit-learn neighbors:", sklearn_neighbors)
print("Custom KDTree neighbors:", custom_neighbors)

In [None]:
from scipy.spatial.distance import euclidean

point1 = X[0].astype(np.float64)
point2 = X[1].astype(np.float64)

# Verify distance between two points
print("Custom distance:", calculate_distance(point1, point2, metric='euclidean'))
print("Scikit-learn distance:", euclidean(point1, point2))

In [None]:
X[0], X[1]

In [19]:
class BruteForceDBSCAN:
    def __init__(self, eps, min_samples, metric='euclidean'):
        self.eps = eps
        self.min_samples = min_samples
        self.metric = metric
        self.labels = None
    
    def fit(self, X):
        n = X.shape[0]
        self.labels = np.full(n, -1)
        cluster_id = 0
        visited = np.zeros(n, dtype=bool)

        for point_idx in range(n):
            print(f'Exploring point at {point_idx}. Clusters so far: {Counter(self.labels)}')
            if visited[point_idx]:
                continue

            visited[point_idx] = True
            neighbours = self.region_query(X, point_idx)

            if len(neighbours) <= self.min_samples:
                self.labels[point_idx] = -1
            else:
                self._expand_cluster(X, point_idx, neighbours, cluster_id, visited)
                cluster_id += 1

    def _expand_cluster(self, X, point_idx, neighbours, cluster_id, visited):
        self.labels[point_idx] = cluster_id

        i = 0
        while i< len(neighbours):
            neighbour_idx = neighbours[i]

            if not visited[neighbour_idx]:
                visited[neighbour_idx] = True
                new_neighbours = self.region_query(X, neighbour_idx)

                if len(new_neighbours) >= self.min_samples:
                    neighbours.extend(new_neighbours)
            
            if self.labels[neighbour_idx] == -1:
                self.labels[neighbour_idx] = cluster_id

            i+=1
        

    def region_query(self, X, point_idx):
        neighbours = []
        for idx, point in enumerate(X):
            if calculate_distance(X[point_idx], point) <= self.eps:
                neighbours.append(idx)
        
        return neighbours

    def fit_predict(self, X):
        self.fit(X)
        return self.labels


In [None]:
brute_dbscan = BruteForceDBSCAN(eps=eps, min_samples=min_pts)
brute_dbscan_labels = brute_dbscan.fit_predict(X=pixel_data)

In [None]:
Counter(brute_dbscan_labels)

In [None]:
Counter(dbscan.fit_predict(X=pixel_data))

In [13]:
class HbridDBSCAN:
    def __init__(self, eps, min_samples, use_kdtree=True, metric='euclidean'):
        self.eps = eps
        self.min_samples = min_samples
        self.use_kdtree = use_kdtree
        self.metric = metric
        self.labels = None
    
    def fit(self, X):
        n = X.shape[0]
        self.labels = np.full(n, -1)
        visited = np.zeros(n, dtype=bool)
        cluster_id = 0
        kdtree = None

        if self.use_kdtree:
            kdtree = KDTree(X, metric=self.metric)
        
        for point_idx in range(n):
            if visited[point_idx]:
                continue
            
            visited[point_idx] = True
            if self.use_kdtree:
                neighbours = kdtree.query_radius(index=point_idx, radius=self.eps)
            else:
                neighbours = self._brute_region_query(X=X, point_idx=point_idx)
            
            if len(neighbours) < self.min_samples:
                self.labels[point_idx] = -1
            
            else:
                self._expand_cluster(X=X, point_idx=point_idx,neighbours=neighbours, visited=visited, cluster_id=cluster_id,kdtree=kdtree)
                cluster_id += 1
    
    def _brute_region_query(self, X, point_idx):
        neighbours = []
        for idx, point in enumerate(X):
            if calculate_distance(X[point_idx], point) <= self.eps:
                neighbours.append(idx)

        return neighbours
    
    def _expand_cluster(self, X, point_idx, neighbours, cluster_id, visited, kdtree):
        self.labels[point_idx] = cluster_id
        i=0
        while i < len(neighbours):
            neighbour_idx = neighbours[i]

            if not visited[neighbour_idx]:
                visited[neighbour_idx] = True
                if self.use_kdtree:
                    new_neighbours = kdtree.query_radius(index=neighbour_idx, radius=self.eps)
                else:
                    new_neighbours = self._brute_region_query(X, neighbour_idx)
                
                if len(new_neighbours) >= self.min_samples:
                    neighbours = np.append(neighbours, new_neighbours)
            
            if self.labels[neighbour_idx] == -1:
                self.labels[neighbour_idx] = cluster_id

            i += 1

    def fit_predict(self, X):
        self.fit(X)
        return self.labels


In [18]:
hdbscan = HbridDBSCAN(eps=eps,min_samples=min_pts,use_kdtree=False)
hdbscan_labels = hdbscan.fit_predict(X=pixel_data[:10000])

In [17]:
Counter(hdbscan_labels)

Counter({0: 5258, -1: 3918, 3: 701, 2: 50, 1: 42, 4: 31})

In [19]:
Counter(hdbscan_labels)

Counter({0: 5258, -1: 3918, 3: 701, 2: 50, 1: 42, 4: 31})