In [282]:
import numpy as np
import distance_measures as measures
from typing import Callable

class DBSCAN:
    """
    Density-Based Spatial Clustering of Applications with Noise (DBSCAN).

    Parameters:
        min_pts (int): Minimum number of points to form a dense region (cluster).
        dist_meas (Callable): Distance measure function.
        eps (float): Maximum distance between two samples for them to be considered as in the same neighborhood.
    """

    def __init__(
        self,
        min_pts: int = 3,
        dist_meas: Callable[[np.ndarray, np.ndarray], float] = measures.Euclidean_distance,
        eps: float = 0.5
    ) -> None:
        self.min_pts: int = min_pts
        self.dist_meas: Callable[[np.ndarray, np.ndarray], float] = dist_meas
        self.eps: float = eps

    def form_cluster(
        self,
        X: np.ndarray,
        P: np.ndarray
    ) -> np.ndarray:
        """
        Forms a cluster around point P using density reachability.

        Args:
            X (np.ndarray): Dataset of shape (n_samples, n_features).
            P (np.ndarray): The point around which to form the cluster.

        Returns:
            np.ndarray: Boolean array indicating cluster membership (True if in cluster, False otherwise).
        """
        # Find indices of points not yet assigned to any cluster
        unclustered: np.ndarray = (self.cluster == 0) & (~self.czy_nalezy_do_klastra)

        # Compute distances from P to all points
        distances: np.ndarray = np.apply_along_axis(lambda x: self.dist_meas(x, P), 1, X)

        # Find neighbors within eps distance
        neighbours_ids: np.ndarray = unclustered & (distances < self.eps)

        neighbour_clusters: list[np.ndarray] = []

        if np.sum(neighbours_ids) >= self.min_pts:
            # Enough neighbors to form a cluster
            for idx in np.where(neighbours_ids)[0]:
                self.czy_nalezy_do_klastra[idx] = True
                # Avoid infinite recursion by not revisiting the same point
                if distances[idx] > 0:
                    neighb_cluster = self.form_cluster(X, X[idx, :])
                    neighbour_clusters.append(neighb_cluster)

            # Add current cluster membership to the list
            neighbour_clusters.append(self.czy_nalezy_do_klastra.copy())
            clusters_arr: np.ndarray = np.array(neighbour_clusters)

            # Merge all clusters using logical OR along axis 0
            merged_cluster: np.ndarray = np.any(clusters_arr, axis=0)
            return merged_cluster
        else:
            # Not enough neighbors, return all False (noise)
            return np.zeros(X.shape[0], dtype=bool)

    def fit(self, X: np.ndarray) -> np.ndarray:
        """
        Fits the DBSCAN clustering algorithm to the data.

        Args:
            X (np.ndarray): Dataset of shape (n_samples, n_features).

        Returns:
            np.ndarray: Array of cluster assignments for each sample (0 means noise).
        """
        n: int = X.shape[0]
        self.cluster: np.ndarray = np.zeros(n, np.int8)  # 0 means noise/unassigned
        cluster_id: int = 1

        for i in range(n):
            if self.cluster[i] == 0:
                P: np.ndarray = X[i, :]
                self.czy_nalezy_do_klastra: np.ndarray = np.zeros(X.shape[0], bool)
                self.czy_nalezy_do_klastra[i] = True

                P_cluster: np.ndarray = self.form_cluster(X, P)

                if np.sum(P_cluster) >= self.min_pts:
                    self.cluster[P_cluster] = cluster_id
                    cluster_id += 1

        return self.cluster

In [283]:
# # Example usage
# n: int = 50
# p: int = 2

# df: np.ndarray = np.random.uniform(0, 2, size=(n, p))
# df_std: np.ndarray = np.apply_along_axis(lambda x: (x - np.mean(x)) / np.std(x), axis=0, arr=df)

# DBSCAN_inst: DBSCAN = DBSCAN(min_pts=2, eps=0.6)
# wynik: np.ndarray = DBSCAN_inst.fit(df_std)

# import seaborn as sns
# import matplotlib.pyplot as plt

# sns.scatterplot(x=df_std[:, 0], y=df_std[:, 1], hue=wynik, palette="rainbow")
# plt.title("DBSCAN Clustering Result")
# plt.show()