In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score

In [2]:
# Read data
def load_data(file_path):
    return pd.read_csv(file_path)


In [3]:
def preprocess_data(df):
    # Drop the "CustomerID" column
    # df = df.dropna(inplace=True)
    df = df.drop(["CustomerID"], axis=1)
       
    # Scale the numerical columns
    scaler = StandardScaler()
    numerical_columns = ["Age", "Annual_Income", "Spending_Score"]
    df[numerical_columns] = scaler.fit_transform(df[numerical_columns])
    
    return df

In [4]:
def perform_hierarchical_clustering(df, n_clusters):
    # Perform hierarchical clustering
    model = AgglomerativeClustering(n_clusters=n_clusters)
    df['cluster2'] = model.fit_predict(df)
    
    return df

In [5]:
df = load_data("segmented_customers.csv")
df.head()


Unnamed: 0,CustomerID,Gender,Age,Annual_Income,Spending_Score,cluster
0,1,1,19,15,39,3
1,2,1,21,15,81,4
2,3,0,20,16,6,3
3,4,0,23,16,77,4
4,5,0,31,17,40,3


In [6]:
df = preprocess_data(df)

In [7]:
df.head()

Unnamed: 0,Gender,Age,Annual_Income,Spending_Score,cluster
0,1,-1.424569,-1.738999,-0.434801,3
1,1,-1.281035,-1.738999,1.195704,4
2,0,-1.352802,-1.70083,-1.715913,3
3,0,-1.137502,-1.70083,1.040418,4
4,0,-0.563369,-1.66266,-0.39598,3


In [8]:
df = perform_hierarchical_clustering(df, n_clusters=5)
print(df)

     Gender       Age  Annual_Income  Spending_Score  cluster  cluster2
0         1 -1.424569      -1.738999       -0.434801        3         0
1         1 -1.281035      -1.738999        1.195704        4         3
2         0 -1.352802      -1.700830       -1.715913        3         0
3         0 -1.137502      -1.700830        1.040418        4         3
4         0 -0.563369      -1.662660       -0.395980        3         0
..      ...       ...            ...             ...      ...       ...
195       0 -0.276302       2.268791        1.118061        1         1
196       0  0.441365       2.497807       -0.861839        0         2
197       1 -0.491602       2.497807        0.923953        1         1
198       1 -0.491602       2.917671       -1.250054        0         2
199       1 -0.635135       2.917671        1.273347        1         1

[200 rows x 6 columns]


In [9]:
# Calculate the silhouette score
score = silhouette_score(df, df['cluster2'])
print(score)

0.5476242034629109


In [10]:
import numpy as np

class AgglomerativeClustering1:
    def __init__(self, n_clusters):
        self.n_clusters = n_clusters
    
    def fit_predict(self, X):
        n_samples, _ = X.shape
        distances = self._calculate_distance_matrix(X)
        cluster_assignments = np.arange(n_samples)
        current_cluster_count = n_samples
        
        while current_cluster_count > self.n_clusters:
            min_distance = np.inf
            min_i, min_j = None, None
            for i in range(n_samples):
                for j in range(i+1, n_samples):
                    if cluster_assignments[i] != cluster_assignments[j] and distances[i, j] < min_distance:
                        min_distance = distances[i, j]
                        min_i, min_j = i, j
            cluster_assignments[cluster_assignments == cluster_assignments[min_j]] = cluster_assignments[min_i]
            current_cluster_count -= 1
        
        return cluster_assignments
    
    def _calculate_distance_matrix(self, X):
        n_samples = X.shape[0]
        distances = np.zeros((n_samples, n_samples))
        for i in range(n_samples):
            for j in range(i+1, n_samples):
                distances[i, j] = np.linalg.norm(X[i] - X[j])
                distances[j, i] = distances[i, j]
        return distances


In [11]:
def perform_hierarchical_clustering_self(df, n_clusters):
    # Reset the index of the dataframe
    df = df.reset_index(drop=True)
    # Perform hierarchical clustering
    model = AgglomerativeClustering1(n_clusters=n_clusters)
    df["cluster3"] = model.fit_predict(df.values)
    return df

In [12]:
df_new = perform_hierarchical_clustering_self(df, n_clusters=5)
df_new = pd.DataFrame(df_new)
df_new.head()

Unnamed: 0,Gender,Age,Annual_Income,Spending_Score,cluster,cluster2,cluster3
0,1,-1.424569,-1.738999,-0.434801,3,0,0
1,1,-1.281035,-1.738999,1.195704,4,3,7
2,0,-1.352802,-1.70083,-1.715913,3,0,0
3,0,-1.137502,-1.70083,1.040418,4,3,7
4,0,-0.563369,-1.66266,-0.39598,3,0,0


In [14]:
#silouette score
silhouette_score(df_new, df_new['cluster3'])

0.7497501015197205

In [101]:
from collections import deque
import numpy as np

class AgglomerativeClustering3:
    def __init__(self, n_clusters, distance_metric='euclidean'):
        self.n_clusters = n_clusters
        self.distance_metric = distance_metric
        
    def fit_predict(self, X):
        n_samples, _ = X.shape
        
        # Check if number of desired clusters is greater than number of samples
        if self.n_clusters > n_samples:
            raise ValueError("Number of desired clusters cannot be greater than number of samples")
        
        # Check if there are NaN values in the input
        if np.isnan(X).any():
            raise ValueError("Input contains NaN values")
        
        # Check if there are identical samples in the input
        unique_rows, unique_indices = np.unique(X, return_index=True, axis=0)
        if unique_rows.shape[0] != X.shape[0]:
            raise ValueError("Input contains identical samples")
        
        distances = self._calculate_distance_matrix(X)
        nearest_neighbors = self._calculate_nearest_neighbors(distances)
        cluster_assignments = np.arange(n_samples)
        current_cluster_count = n_samples
        
        while current_cluster_count > self.n_clusters:
            min_distance = np.inf
            min_i, min_j = None, None
            for i in range(n_samples):
                j = nearest_neighbors[i][0]
                if cluster_assignments[i] != cluster_assignments[j] and distances[i, j] < min_distance:
                    min_distance = distances[i, j]
                    min_i, min_j = i, j
            
            cluster_assignments[cluster_assignments == cluster_assignments[min_j]] = cluster_assignments[min_i]
            current_cluster_count -= 1
            nearest_neighbors[min_i].extend(nearest_neighbors[min_j])
            nearest_neighbors.pop(min_j)
            self._update_nearest_neighbors(nearest_neighbors, min_i, distances)
        
        return cluster_assignments
    
    def _calculate_distance_matrix(self, X):
        n_samples = X.shape[0]
        distances = np.zeros((n_samples, n_samples))
        for i in range(n_samples):
            for j in range(i+1, n_samples):
                distances[i, j] = np.linalg.norm(X[i] - X[j])
                distances[j, i] = distances[i, j]
        return distances

    def _update_nearest_neighbors(self, nearest_neighbors, min_i, distances):
        n_samples = distances.shape[0]
        for j in range(n_samples):
            if min_i == j:
                continue
            if j in nearest_neighbors[min_i]:
                nearest_neighbors[min_i].remove(j)
            if distances[min_i, j] < distances[j, nearest_neighbors[j][0]]:
                nearest_neighbors[j][0] = min_i
            else:
                nearest_neighbors[j].append(min_i)


    def _calculate_nearest_neighbors(self, distances):
        n_samples = distances.shape[0]
        nearest_neighbors = [deque([j for j in range(n_samples) if j != i]) for i in range(n_samples)]
        for i in range(n_samples):
            nearest_neighbors[i] = deque(sorted(nearest_neighbors[i], key=lambda j: distances[i, j]))
        return nearest_neighbors


In [106]:
def perform_hierarchical_clustering_self_deque(df, n_clusters):
    # Reset the index of the dataframe
    df = df.reset_index(drop=True)
    # Perform hierarchical clustering
    model = AgglomerativeClustering1(n_clusters=n_clusters)
    df["cluster4"] = model.fit_predict(df.values)
    return df

In [109]:
df_performance = perform_hierarchical_clustering_self_deque(df, n_clusters=5)
df_performance = pd.DataFrame(df_performance)
df_performance.head()

Unnamed: 0,Gender,Age,Annual_Income,Spending_Score,cluster,cluster2,cluster3,cluster4
0,1,-1.424569,-1.738999,-0.434801,3,0,0,0
1,1,-1.281035,-1.738999,1.195704,4,4,7,7
2,0,-1.352802,-1.70083,-1.715913,3,0,0,0
3,0,-1.137502,-1.70083,1.040418,4,4,7,7
4,0,-0.563369,-1.66266,-0.39598,3,0,0,0


In [111]:
#silouette score
silhouette_score(df, df_performance['cluster4'])

0.7718081363433258

In [112]:
#unit test
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score
from sklearn.datasets import make_blobs
import numpy as np

def test_hierarchical_clustering():
    X, y = make_blobs(n_samples=100, centers=5, random_state=0)
    model = AgglomerativeClustering(n_clusters=5)
    model.fit(X)
    score = silhouette_score(X, model.labels_)
    assert score < 0.8, "Silhouette score is too low"
    assert np.unique(model.labels_).shape[0] == 5, "Number of clusters is incorrect"

test_hierarchical_clustering()