In [None]:
#!pip install numpy scikit-learn matplotlib pandas
#from google.colab import files

# Upload files
#uploaded = files.upload()

# The uploaded files will be saved in the current working directory
# You can access them using their filenames



: 

# Clustering From Scratch
Applying K-Means, DBSCAN, and HDBSCAN to the provided dataset using only NumPy, Pandas, and Matplotlib.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

data = pd.read_csv('clustering.csv')
X = data[['x', 'y']].to_numpy()
data.head()


In [None]:
def pairwise_distances(X):
    diff = X[:, None, :] - X[None, :, :]
    return np.sqrt(np.sum(diff * diff, axis=2))

def plot_clusters(X, labels, title):
    plt.figure(figsize=(6, 5))
    unique_labels = np.unique(labels)
    colors = plt.cm.tab10(np.linspace(0, 1, len(unique_labels)))
    for color, lbl in zip(colors, unique_labels):
        mask = labels == lbl
        if lbl == -1:
            plt.scatter(X[mask, 0], X[mask, 1], s=10, c=[color], marker='x', label='noise')
        else:
            plt.scatter(X[mask, 0], X[mask, 1], s=10, c=[color], label=f'cluster {lbl}')
    plt.title(title)
    plt.legend(loc='best', fontsize=8)
    plt.show()

## K-Means from Scratch
Simple iterative centroid-based clustering.

In [None]:
class KMeansScratch:
    def __init__(self, k, max_iters=100, random_state=0):
        self.k = k
        self.max_iters = max_iters
        self.random_state = random_state
        self.centroids = None

    def initialize_centroids(self, X):
        rng = np.random.default_rng(self.random_state)
        indices = rng.choice(len(X), size=self.k, replace=False)
        self.centroids = X[indices].copy()

    def assign_clusters(self, X):
        distances = np.linalg.norm(X[:, None, :] - self.centroids[None, :, :], axis=2)
        return np.argmin(distances, axis=1)

    def update_centroids(self, X, labels):
        new_centroids = []
        for idx in range(self.k):
            points = X[labels == idx]
            if len(points) == 0:
                new_centroids.append(self.centroids[idx])
            else:
                new_centroids.append(points.mean(axis=0))
        self.centroids = np.vstack(new_centroids)

    def fit_predict(self, X):
        self.initialize_centroids(X)
        for _ in range(self.max_iters):
            labels = self.assign_clusters(X)
            old_centroids = self.centroids.copy()
            self.update_centroids(X, labels)
            if np.allclose(old_centroids, self.centroids):
                break
        return labels

kmeans = KMeansScratch(k=4, max_iters=50, random_state=42)
kmeans_labels = kmeans.fit_predict(X)
plot_clusters(X, kmeans_labels, 'K-Means Clusters')

## DBSCAN from Scratch
Density-based clustering that marks low-density points as noise.

In [None]:
class DBSCANScratch:
    def __init__(self, eps, min_samples):
        self.eps = eps
        self.min_samples = min_samples

    def region_query(self, X, idx):
        distances = np.linalg.norm(X - X[idx], axis=1)
        return np.where(distances <= self.eps)[0]

    def expand_cluster(self, X, labels, idx, cluster_id):
        seeds = list(self.region_query(X, idx))
        if len(seeds) < self.min_samples:
            labels[idx] = -1
            return False
        labels[idx] = cluster_id
        i = 0
        while i < len(seeds):
            point = seeds[i]
            if labels[point] == -1:
                labels[point] = cluster_id
            if labels[point] != 0:
                i += 1
                continue
            labels[point] = cluster_id
            neighbors = self.region_query(X, point)
            if len(neighbors) >= self.min_samples:
                seeds.extend([n for n in neighbors if n not in seeds])
            i += 1
        return True

    def fit_predict(self, X):
        labels = np.zeros(len(X), dtype=int)
        cluster_id = 0
        for idx in range(len(X)):
            if labels[idx] != 0:
                continue
            if self.expand_cluster(X, labels, idx, cluster_id + 1):
                cluster_id += 1
        labels[labels == 0] = -1
        return labels  # keep noise at -1, clusters start at 1; adjust to start at 0 below

dbscan = DBSCANScratch(eps=25.0, min_samples=5)
dbscan_labels = dbscan.fit_predict(X)
dbscan_labels = np.where(dbscan_labels == -1, -1, dbscan_labels - 1)
plot_clusters(X, dbscan_labels, 'DBSCAN Clusters')

## HDBSCAN from Scratch (simplified)
Hierarchy over density via mutual reachability distances and a stability-based cluster selection.

In [None]:

def core_distances(X, min_samples):
    """Return core distance for each point: distance to its (min_samples)-th nearest neighbor.
    X: (n, d) array, min_samples: int
    """
    dists = pairwise_distances(X)                    # full pairwise distance matrix (n,n)
    sorted_dists = np.sort(dists, axis=1)            # sort distances for each point
    # index min_samples-1 to keep same convention as earlier (includes self at index 0)
    return sorted_dists[:, min_samples - 1]          # shape (n,)

def mutual_reachability_distances(X, min_samples):
    """Compute mutual reachability distance matrix:
    mreach(i,j) = max( pairwise(i,j), core(i), core(j) )
    """
    core = core_distances(X, min_samples)            # core distance per point, shape (n,)
    pairwise = pairwise_distances(X)                 # pairwise distances, shape (n,n)
    # broadcast core to (n,n) and take elementwise maximum
    return np.maximum(pairwise, np.maximum(core[:, None], core[None, :]))

def prim_mst(mreach):
    """Prim's algorithm on a dense symmetric matrix to return MST edges as (u,v,weight).
    mreach: (n,n) mutual reachability matrix
    """
    n = mreach.shape[0]
    selected = np.zeros(n, dtype=bool)
    selected[0] = True                                # start from node 0
    min_dist = mreach[0].copy()                       # best distance from selected set
    min_edge = np.zeros(n, dtype=int)                 # keeps nearest selected neighbor for each node
    edges = []
    for _ in range(n - 1):
        min_dist[selected] = np.inf                   # ignore already selected nodes
        v = np.argmin(min_dist)                       # next node to add
        u = min_edge[v]                               # its nearest selected neighbor
        w = min_dist[v]                               # edge weight
        edges.append((u, v, w))
        selected[v] = True
        # update distances/edges using newly selected vertex v
        for j in range(n):
            if not selected[j] and mreach[v, j] < min_dist[j]:
                min_dist[j] = mreach[v, j]
                min_edge[j] = v
    return edges                                     # list length n-1

def build_cluster_tree(n_points, edges):
    """Build a hierarchical merge tree from MST edges sorted by weight.
    Returns root id, children dict, and stability dict for nodes.
    Node ids < n_points are original points, >= n_points are internal merge nodes.
    """
    edges_sorted = sorted(edges, key=lambda x: x[2])  # sort by increasing weight
    next_id = n_points
    parent = {i: i for i in range(n_points)}         # union-find like parent mapping
    size = {i: 1 for i in range(n_points)}           # cluster size (# original points)
    birth = {i: 0.0 for i in range(n_points)}        # birth lambda for leaf: 0.0
    death = {i: None for i in range(n_points)}       # death lambda when merged
    children = {i: [] for i in range(n_points)}      # child list for each node

    def find(x):
        # path-compress find for current parent mapping
        while parent[x] != x:
            parent[x] = parent[parent[x]]
            x = parent[x]
        return x

    for u, v, w in edges_sorted:
        ru, rv = find(u), find(v)
        if ru == rv:
            continue
        lambda_val = 1.0 / w if w > 0 else np.inf     # convert distance -> 'lambda' (density)
        # create new internal node representing merge(ru,rv)
        parent[ru] = next_id
        parent[rv] = next_id
        parent[next_id] = next_id
        children[next_id] = [ru, rv]
        birth[next_id] = lambda_val
        death[ru] = lambda_val
        death[rv] = lambda_val
        size[next_id] = size[ru] + size[rv]
        # zero-out sizes of merged children to avoid double-counting later
        size[ru] = 0
        size[rv] = 0
        next_id += 1

    # find root (representative of node 0)
    root = find(0)
    # set death of root to 0 (it persists down to zero density)
    death[root] = 0.0

    # compute stability for nodes that have a death value
    stability = {}
    for cid in list(birth.keys()) + [i for i in range(n_points, next_id)]:
        b = birth.get(cid, 0.0)
        d = death.get(cid, None)
        s = max(size.get(cid, 0), 0)
        if d is not None:
            # stability = (death - birth) * size (how long cluster persists weighted by size)
            stability[cid] = (d - b) * max(s, 1)
    return root, children, stability

def select_clusters(root, children, stability):
    """Select stable clusters from the hierarchical tree using a simple greedy rule:
    pick a node if its stability >= sum(stabilities of selected children), else use children.
    Returns list of selected node ids.
    """
    def recurse(node):
        # leaf node -> nothing to select at internal level
        if node not in children or len(children[node]) == 0:
            return []
        selected_children = []
        for child in children[node]:
            selected_children.extend(recurse(child))
        # compute scores
        child_score = sum(stability.get(c, 0.0) for c in selected_children)
        self_score = stability.get(node, 0.0)
        if self_score >= child_score and node in stability:
            return [node]
        return selected_children
    return recurse(root)

def label_points(n_points, selected, children):
    """Assign final cluster labels to original points given selected cluster nodes.
    Points not covered by any selected node remain noise (-1).
    """
    labels = np.full(n_points, -1, dtype=int)
    cluster_id = 0

    def assign(node, cid):
        # if node is an original point, label it
        if node < n_points:
            labels[node] = cid
            return
        # otherwise, recurse into children
        for child in children.get(node, []):
            assign(child, cid)

    for node in selected:
        assign(node, cluster_id)
        cluster_id += 1
    return labels

def hdbscan_scratch(X, min_samples=5):
    """Top-level simplified HDBSCAN: compute mutual reachability, MST, build tree,
    select clusters by stability, and label points.
    """
    mreach = mutual_reachability_distances(X, min_samples)   # (n,n)
    edges = prim_mst(mreach)                                 # MST edges (n-1)
    root, children, stability = build_cluster_tree(len(X), edges)
    selected = select_clusters(root, children, stability)
    labels = label_points(len(X), selected, children)
    return labels

# run simplified HDBSCAN (adjust min_samples as needed)
hdbscan_labels = hdbscan_scratch(X, min_samples=3)
plot_clusters(X, hdbscan_labels, 'HDBSCAN Clusters')


## Quick Comparison
Basic summary stats for the three runs.

In [None]:
def cluster_summary(labels):
    counts = pd.Series(labels).value_counts().sort_index()
    n_clusters = (counts.index != -1).sum()
    n_noise = counts.get(-1, 0)
    return n_clusters, n_noise, counts.to_dict()

k_summary = cluster_summary(kmeans_labels)
d_summary = cluster_summary(dbscan_labels)
h_summary = cluster_summary(hdbscan_labels)

print('K-Means clusters:', k_summary[0])
print('DBSCAN clusters:', d_summary[0], 'noise:', d_summary[1])
print('HDBSCAN clusters:', h_summary[0], 'noise:', h_summary[1])

## Metrics
Simple internal metrics (silhouette, Davies-Bouldin, Calinski-Harabasz). Noise points (-1) are ignored for the metrics that require cluster membership.

In [None]:
from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score

def score_all(X, labels):
    mask = labels >= 0
    Xc = X[mask]
    lc = labels[mask]
    unique = np.unique(lc)
    if len(Xc) == 0 or len(unique) < 2:
        return {
            'silhouette': np.nan,
            'davies_bouldin': np.nan,
            'calinski_harabasz': np.nan
        }
    silhouette = silhouette_score(Xc, lc, sample_size=min(len(Xc), 4000), random_state=0) if len(Xc) > 50 else silhouette_score(Xc, lc)
    db = davies_bouldin_score(Xc, lc)
    ch = calinski_harabasz_score(Xc, lc)
    return {'silhouette': float(silhouette), 'davies_bouldin': float(db), 'calinski_harabasz': float(ch)}

metrics = {
    'kmeans': score_all(X, kmeans_labels),
    'dbscan': score_all(X, dbscan_labels),
    'hdbscan': score_all(X, hdbscan_labels)
}
metrics