# DBSCAN FROM SCRATCH

In [335]:
import numpy as np
import math

# 1) DISTANCE EUCLIDIENNE

In [336]:
# Distance entre 2 points
# On va chercher à comparer cette distance à epsilon
# Epsilon (eps) est le rayon par rapport à un point qui sera le centre d'un cercle

In [337]:
def distance_euclidienne(a,b):
    return math.sqrt(np.power(a-b,2))

In [338]:
# Test de la fonction

distance_euclidienne(2,5)

3.0

# 2) RETOURNER UN BOOLEEN EN COMPARANT LA DISTANCE EUCLIDIENNE AVEC EPSILON

Ce booléen (True ou False) va nous permettre de réaliser une condition.<p>
Si c'est True : la distance entre le point sélectionné et un autre point est inférieure à eps.<p>
Donc le point sélectionné (point central) sera un core point (POINT INTERIEUR).

In [339]:
# Fonction qui permet de voir si la distance entre 2 points est inférieure à eps
# Si c'est égal à True : la distance entre le point sélectionné et un autre point est inférieure à eps.

def eps_neighborhood(a,b,eps):
    # eps : distance maximale entre 2 pts pour qu'ils puissent être dans la même région
    return distance_euclidienne(a,b) < eps

In [340]:
eps_neighborhood(2,5,4)

True

In [341]:
# Distance euclidienne = 3 < epsilon = 4 -> Donc c'est True

In [342]:
# AUTRE EXEMPLE AVEC UNE MATRICE

In [343]:
m = np.matrix('1 1.2 3.7 32.9 0.8 5.7 3.9 6 8.9 5')
m.shape

(1, 10)

In [344]:
for i in m:
    result = eps_neighborhood(m[0,0],m[0,2],2)
    print(result)

False


In [345]:
# La distance entre la 1ère valeur de m et la 3ème est supérieure à 2 donc False

# 3) RECHERCHE DES POINTS QUI SONT A UNE DISTANCE DE EPSILON OU MOINS : donc dans le voisinage du point central

In [346]:
# Va retourner tous les points de m qui sont à une distance inférieure à epsilon de point_id

In [347]:
def get_neighbors(m, point_id, eps):
    
    # m : matrice 2D par exemple
    
    number_of_points = m.shape[1]
    neighbors = []
    # On va stocker dans cette liste vide les indexes des points qui seront considérés comme voisin du point central
    # Pour qu'un point soit considéré comme tel, la distance euclidienne (entre chaque point du cercle et le point centre) doit être inférieure à epsilon
    
    # Boucle sur le nombre de points de m
    for i in range(0, number_of_points):
        # On cherche à voir si la distance entre 2 points est à une distance inférieure à epsilon
        # Si c'est le cas, on l'ajoute à la liste vide neighbors
        if eps_neighborhood(m[:,point_id], m[:,i], eps):
            neighbors.append(i)
            #Retourne une liste d'index d'échantillons voisins
    return neighbors

In [348]:
m = np.matrix('1 1.2 3.7 32.9 0.8 5.7 3.9 6 8.9 5')
eps = 2

get_neighbors(m, [0], eps)

[0, 1, 4]

Ici on a testé la fonction get_neighbors() :
    - Pour données : m
    - point_id : la première valeur (index 0, "[0]")
    - eps : 2
    
La distance euclidienne entre ce point d'index 0 (de valeur 1) a été calculée puis comparée à eps.

Toutes les distances inférieures à eps sont True donc les index ont été ajoutés à la liste vide neighbors.

# 4) FONCTION ETENDRE CLUSTER

Objectif : 

In [349]:
# On initialise 2 valeurs: 

# NOISE est vide au départ
NOISE = None

# On va réaliser une condition afin de classifier tout
UNCLASSIFIED = False

In [383]:
# On cherche à étendre le cluster jusqu'à avoir tous les index qui sont dans le cluster correspondant

def expand_cluster(m, classifications, point_id, cluster_id, eps, min_samples):
    
    neighbors = get_neighbors(m, point_id, eps)
    # neighbors va retourner tous les points de m qui sont à une distance inférieure à epsilon de point_id
    
    # Si le nombre de points est inférieur au minimum de points requis (min_samples) -> ce point est du bruit (NOISE)
    if len(neighbors) < min_samples:
        classifications[point_id] = NOISE
        #Les points considérés comme bruit auront la valeur None dans la liste finale
        #print("None value :")
        #print(neighbors)
        #print(classifications)
        #return False
    
    # Sinon
    else:   
        current_point = neighbors[0]
        results = get_neighbors(m, current_point, eps)
        
        # print(results)
        # Ce print donne : 
        # [0, 1, 3]
        # [2, 5]
            
        if len(results) >= min_samples:
            # On parcourt toute la liste de results
            for i in range(0, len(results)):
                result_point = results[i]
                if classifications[result_point] == UNCLASSIFIED or classifications[result_point] == NOISE:
                    if classifications[result_point] == UNCLASSIFIED:
                        x = neighbors.append(result_point)
                        # print(x)
                    y = classifications[result_point] = cluster_id
                    # print(y)
                        
                    #print(classifications)
                        
                    # Ce print donne:
                    # [1, False, False, False, False, False, False]
                    # [1, 1, False, False, False, False, False]
                    # [1, 1, False, 1, False, False, False]
                    # [1, 1, 2, 1, False, False, False]
                    # [1, 1, 2, 1, False, 2, False]                
                        
        neighbors = neighbors[1:]
        #print(neighbors)
    return True

In [384]:
# min_samples : nombre minimum de points pour faire un cluster

def dbscan(m, eps, min_samples):
    cluster_id = 1
    n_points = m.shape[1]
    
    #On considère au départ tous les points comme non classifiés (False) (UNCLASSIFIED = False)
    classifications = [UNCLASSIFIED] * n_points
    # classifications sera à la fin la liste des valeurs de cluster (1,2,None etc...)
    # print(classifications)
    # print(classifications) donnera [False, False, False, False, False, False, False]
    
    # On parcourt tous les points qui ont la longueur n_points
    for point_id in range(0, n_points):
        point = m[:,point_id]
        #print(point)
        
        # Tant que c'est faux (si des points ne sont pas classifiés), on cherche à classifier
        if classifications[point_id] == UNCLASSIFIED:
            if expand_cluster(m, classifications, point_id, cluster_id, eps, min_samples):
                # Si on a atteint la frontière on ajoute un autre cluster (donc +1 à la valeur du cluster de départ)
                cluster_id = cluster_id + 1
    return classifications

In [385]:
m = np.matrix('1 1.2 3.7 0.8 5.7 3.9 8.9')
eps = 0.5
min_samples = 2

In [386]:
dbscan(m, eps, min_samples)

[1, 1, 2, 1, None, 2, None]

In [361]:
# On voit bien que la valeur "3.7" et "3.9" appartiennent au même cluster
# Les autres points sont dans la classe 1 car proche de 1
# "0.8" et "8.9" sont trop loin donc None (bruit)

In [393]:
data = np.matrix('100 10 4 800 6 3.9 9')

dbscan(data, 3, 2)

[None, 2, 3, None, 3, 3, 2]