In [267]:
import numpy as np
import pandas as pd

In [268]:
from sklearn.datasets import make_blobs

X, _ = make_blobs(n_samples=100, centers=5, n_features=5, cluster_std=2.5, random_state=42)
X = pd.DataFrame(X)
X.columns = [f'col_{col}' for col in X.columns]

In [332]:
class MyDBSCAN():
    def __init__(self, eps = 3, min_samples = 3, metric = 'euclidean'):
        self.eps= eps
        self.min_samples = min_samples
        self.metric = metric
        
        self.clusters = {}
        
        self.points_list = {}
        
        self.outliers = []
        
        self.visited_points = []
        
    def __repr__(self):
        return f'MyDBSCAN class: eps={self.eps}, min_samples={self.min_samples}'
    
    def fit_predict(self, X):
        
        X = X.reset_index(drop=True)
        
        self.idx = np.arange(0,X.shape[0],1)
        
        for i in range(X.shape[0]):
            self.find_neighbours(i)
            
        
        i=0
        N=0
        
        while i < X.shape[0]:
            self.start_the_walk(i)
            i += 1
            
        self.clusters.update({0:self.outliers})
        clusters = sorted(self.clusters.keys()) 
                
        y = np.zeros(X.shape[0])
        
        for k, cluster in enumerate(clusters):
            rows = self.clusters.get(cluster)
            for row in rows:
                y[row] = k+1
        
        return y
                 
    def cosine(self, x1, x2):
        x1_norm = np.linalg.norm(x1)
        x2_norm = np.linalg.norm(x2)
        return 1 - np.dot(x1, x2)/(x1_norm*x2_norm)        
    
    def get_distance(self, x1, x2): #x1, x2 - numpy arrays or numbers
        if self.metric == 'euclidean':
            return np.sqrt(np.sum(np.square(x2-x1)))
        elif self.metric == 'chebyshev':
            return np.max(np.abs(x2-x1))
        elif self.metric == 'manhattan':
            return np.sum(np.abs(x2-x1))
        elif self.metric == 'cosine':
            return self.cosine(x1,x2)   
    
    
    def find_neighbours(self, index):
        neighbours = []
        for i in self.idx[self.idx != index]:
            d = self.get_distance(X.loc[i,:].values, X.loc[index,:].values)
            if d < self.eps:
                neighbours = neighbours + [i]
        self.points_list[index] = neighbours
        
    def start_the_walk(self, index):
        if index not in self.visited_points:
            self.visited_points += [index] #adds the point to the list of visited points
            if len(self.points_list[index]) < self.min_samples:
                self.outliers += [index]
            else: 
                self.clusters.update({index+1: [index]})
                self.neighbours_walk(index, index)
                
    def neighbours_walk(self, cluster, index):
        for j in self.points_list[index]:
            if j in self.outliers:
                self.clusters[cluster+1] += [j] #add to cluster
                self.outliers.remove(j) #remove from outliers
            else:
                if j not in self.visited_points:
                    if len(self.points_list[j]) < self.min_samples:
                        self.clusters[cluster+1] += [j] #add to cluster
                        self.visited_points += [j]
                    else:
                        self.clusters[cluster+1] += [j] #add to cluster
                        self.visited_points += [j]
                        self.neighbours_walk(cluster, j)