In [1]:
import numpy as np
from sklearn.cluster import DBSCAN
from DataMatrix import generate_data_matrix
from scipy.stats import mode
from sklearn.metrics import (
    precision_score,
    recall_score,
    f1_score,
)

import warnings

warnings.filterwarnings("ignore")

In [2]:
X_train1, y_train1, X_test1, y_test1 = generate_data_matrix(method="mean")
# X_train2, y_train2, X_test2, y_test2 = generate_data_matrix(method="flatten")

X1 = X_test1
y1 = y_test1
# X2 = X_test2
# y2 = y_test2

number_of_clusters = 0

print("X_test shape: ", X_test1.shape)
print("y_train shape: ", y_train1.shape)
print("X_test shape: ", X_test1.shape)
print("y_test shape: ", y_test1.shape)

X_test shape:  (1824, 45)
y_train shape:  (7296,)
X_test shape:  (1824, 45)
y_test shape:  (1824,)


### DBSCAN Algorithm


In [11]:
def get_DBSCAN(X, min_samples, eps):
    # Initialize labels
    labels = np.zeros(X.shape[0])

    # C is the cluster counter
    C = 0
    for i in range(X.shape[0]):

        # Skip if already labeled
        if labels[i] != 0:
            continue

        # Find neighbors within eps
        neighbors = np.where(np.linalg.norm(X - X[i], axis=1) <= eps)[0]

        # Mark as noise initially
        labels[i] = -1

        # If point is a core point, assign a cluster label
        if len(neighbors) >= min_samples:
            C += 1  # New cluster
            labels[i] = C  # Assign cluster label to core point
            S = list(neighbors)  # Set of points to potentially add to the current cluster

            j = 0
            while j < len(S):
                point = S[j]
                neighbors_j = np.where(np.linalg.norm(X - X[point], axis=1) <= eps)[0]

                # If the point is a core point, add its neighbors to S
                if len(neighbors_j) >= min_samples:
                    S += list(set(neighbors_j) - set(S))
                    labels[point] = C  # Assign cluster label to core point
                elif labels[point] == 0:
                    labels[point] = C  # Assign cluster label to border point

                j += 1
    # print("Number of clusters: ", C)
    return labels

#### Labels Mapping Function with Ground Truth Labels with Majority Voting


In [4]:
def map_labels(y_true, y_pred):
    # Get the unique labels in y_pred
    unique_labels = np.unique(y_pred)

    # Create a dictionary to map the labels
    label_map = {}

    for label in unique_labels:
        if label != -1:  # Skip noise points
            # Find the indices of points in this cluster
            indices = np.where(y_pred == label)[0]

            # Get the actual labels of these points
            actual_labels = y_true[indices]

            # Find the most common actual label
            majority_label = mode(actual_labels).mode

            # If majority_label is an array, take the first element
            if isinstance(majority_label, np.ndarray):
                majority_label = majority_label[0]

            # Map the predicted label to the majority label
            label_map[label] = majority_label

    # Map the labels in y_pred
    y_pred = np.array([label_map.get(label, -1) for label in y_pred])

    return y_pred

#### Entropy Calculation Function


In [5]:
def entropy_score(y_true, y_pred):
    totalEntropy = 0
    y_pred = y_pred.astype(int)
    number_of_clusters = np.max(y_pred) + 1
    for i in range(number_of_clusters):
        entropy = 0
        # Getting the label indices of the points in the cluster
        labels = np.where(y_pred == i)

        # Getting the actual labels of the points in the cluster
        labels = y_true[labels]

        # Getting the counts of each label in each cluster
        labels, counts = np.unique(labels, return_counts=True)

        entropy = -np.sum(counts / np.sum(counts) * np.log2(counts / np.sum(counts)))

        totalEntropy += entropy

    # Dividing by the number of clusters to get the average conditional entropy
    totalEntropy /= number_of_clusters
    return totalEntropy

### Hyperparamters


In [6]:
n_samples = 2
eps = 1.8

### Implemented DBSCAN


#### Method 1 (Mean)


In [13]:
y_pred = get_DBSCAN(X1, n_samples, eps)
y_pred_mapped = map_labels(y1, y_pred)

# Compute the accuracy
precision = precision_score(y1, y_pred_mapped, average="weighted")
recall = recall_score(y1, y_pred_mapped, average="weighted")
f1 = f1_score(y1, y_pred_mapped, average="weighted")
entropy = entropy_score(y1, y_pred_mapped)

print("Implemented DBSCAN Evaluation Metrics:")
print("Precision: {:.3f}%".format(precision * 100))
print("Recall:    {:.3f}%".format(recall * 100))
print("F1-Score:  {:.3f}%".format(f1 * 100))
print("Entropy:   {:.3f}".format(entropy))

Implemented DBSCAN Evaluation Metrics:
Precision: 92.150%
Recall:    70.998%
F1-Score:  76.615%
Entropy:   0.107


#### Method 2 (Flatten)


In [8]:
# y_pred = get_DBSCAN(X2, n_samples2, eps2)
# y_pred_mapped = map_labels(y2, y_pred)

# # Compute the accuracy
# precision = precision_score(y2, y_pred_mapped, average="weighted")
# recall = recall_score(y2, y_pred_mapped, average="weighted")
# f1 = f1_score(y2, y_pred_mapped, average="weighted")
# entropy = entropy_score(y2, y_pred_mapped)

# print("Implemented DBSCAN Evaluation Metrics:")
# print("Precision: {:.3f}%".format(precision * 100))
# print("Recall:    {:.3f}%".format(recall * 100))
# print("F1-Score:  {:.3f}%".format(f1 * 100))
# print("Entropy:   {:.3f}".format(entropy))

### DBSCAN in Scikit-Learn


#### Method 1 (Mean)


In [14]:
dbscan1 = DBSCAN(eps=eps, min_samples=n_samples)

# Fit the model to the data
dbscan1.fit(X1)

# Print the cluster labels for each data point
y_pred_sklearn = dbscan1.labels_
num_clusters = len(np.unique(y_pred_sklearn[y_pred_sklearn != -1]))
print("Number of clusters: ", num_clusters)
y_pred_sklearn_mapped = map_labels(y1, y_pred_sklearn)

precision = precision_score(y1, y_pred_sklearn_mapped, average="weighted")
recall = recall_score(y1, y_pred_sklearn_mapped, average="weighted")
f1 = f1_score(y1, y_pred_sklearn_mapped, average="weighted")
entropy = entropy_score(y1, y_pred_sklearn_mapped)

print("Sklearn DBSCAN Evaluation Metrics:")
print("Precision: {:.3f}%".format(precision * 100))
print("Recall:    {:.3f}%".format(recall * 100))
print("F1-Score:  {:.3f}%".format(f1 * 100))
print("Entropy:   {:.3f}".format(entropy))

Number of clusters:  189
Sklearn DBSCAN Evaluation Metrics:
Precision: 92.150%
Recall:    70.998%
F1-Score:  76.615%
Entropy:   0.107


#### Method 2 (Flatten)


In [10]:
# dbscan2 = DBSCAN(eps=eps2, min_samples=n_samples2)

# # Fit the model to the data
# dbscan2.fit(X2)

# # Print the cluster labels for each data point
# y_pred_sklearn = dbscan2.labels_
# num_clusters = len(np.unique(y_pred_sklearn[y_pred_sklearn != -1]))
# print("Number of clusters: ", num_clusters)
# y_pred_sklearn_mapped = map_labels(y2, y_pred_sklearn)

# precision = precision_score(y2, y_pred_sklearn_mapped, average="weighted")
# recall = recall_score(y2, y_pred_sklearn_mapped, average="weighted")
# f1 = f1_score(y2, y_pred_sklearn_mapped, average="weighted")
# entropy = entropy_score(y2, y_pred_sklearn_mapped)

# print("Sklearn DBSCAN Evaluation Metrics:")
# print("Precision: {:.3f}%".format(precision * 100))
# print("Recall:    {:.3f}%".format(recall * 100))
# print("F1-Score:  {:.3f}%".format(f1 * 100))
# print("Entropy:   {:.3f}".format(entropy))