In [None]:
max_iter = 300
convergence_iter = 15
random_state = 5
damping = 0.5 # range [0.5, 1.0)
preferences = None # array-like of shape (n_samples,)


In [None]:
import numpy as np

In [None]:
def negative_squared_distance(x: np.ndarray, y: np.ndarray) -> float:
    return - np.power(np.linalg.norm(x - y), 2)

In [None]:
def calculate_similarity_matrix(X: np.ndarray) -> np.ndarray:
    return np.array([
        [
            negative_squared_distance(x, y) for x in X
        ]
        for y in X
    ])

In [None]:
def calculate_responsibility_matrix(similarity_matrix: np.ndarray, availability_matrix: np.ndarray) -> np.ndarray:
    l = len(similarity_matrix)
    new_responsibility_matrix = np.zeros((l, l))

    for i in range(l):
        for j in range(l):
            m = max([availability_matrix[i][k] + similarity_matrix[i][k] for k in range(l) if k != j])
            new_responsibility_matrix[i][j] = similarity_matrix[i][j] - m

    return new_responsibility_matrix

In [None]:
def calculate_availability_matrix(responsibility_matrix: np.ndarray) -> np.ndarray:
    l = len(responsibility_matrix)
    new_availability_matrix = np.zeros((l, l))

    for i in range(l):
        for j in range(l):
            if i != j:
                m = sum([max(0, responsibility_matrix[k][j]) for k in range(l) if k != i and k != j])
                new_availability_matrix[i][j] = min(0, responsibility_matrix[j][j] + m)
            else:
                m = sum([max(0, responsibility_matrix[k][j]) for k in range(l) if k != j])
                new_availability_matrix[i][i] = m
    return new_availability_matrix

In [None]:
X = np.array([[1, 2], [1, 4], [1, 0], [4, 2], [4, 4], [4, 0]])

sim_matrix = calculate_similarity_matrix(X)

preferences = np.median(sim_matrix)
np.fill_diagonal(sim_matrix, preferences)

responsiliblity_matrix = np.zeros((len(X), len(X)))
availability_matrix = np.zeros((len(X), len(X)))

exemplars = np.array([])

convergence_iter_counter = 0

for _ in range(max_iter):
    new_responsiliblity_matrix = calculate_responsibility_matrix(sim_matrix, availability_matrix)
    
    new_availability_matrix = calculate_availability_matrix(responsiliblity_matrix) # calculate with new responsibility matrix?

    responsiliblity_matrix = damping * responsiliblity_matrix + (1 - damping) * new_responsiliblity_matrix
    availability_matrix = damping * availability_matrix + (1 - damping) * new_availability_matrix

    s = responsiliblity_matrix + availability_matrix

    new_exemplars = np.where(np.diag(s) > 0)[0]

    if np.array_equal(exemplars, new_exemplars):
        convergence_iter_counter += 1
    else:
        convergence_iter_counter = 0
        exemplars = new_exemplars

    if convergence_iter_counter == convergence_iter:
        break

exemplar_labels = {exemplar: label for label, exemplar in enumerate(exemplars)}

labels = np.full(len(X), -1)

for i in range(len(X)):
    if i in exemplar_labels:
        labels[i] = exemplar_labels[i]
    else:
        nearest_exemplar = exemplars[np.argmax(sim_matrix[i, exemplars])]
        labels[i] = exemplar_labels[nearest_exemplar]

labels

In [1]:
import numpy as np
from mlalgos.cluster import AffinityPropagation

clustering = AffinityPropagation(random_state=5)

X = np.array([[1, 2], [1, 4], [1, 0], [4, 2], [4, 4], [4, 0]])

clustering.fit(X)

In [2]:
clustering.cluster_centers

array([[1, 2],
       [4, 2]])

In [3]:
clustering.labels

array([0, 0, 0, 1, 1, 1])

In [4]:
clustering.predict([[0, 0], [4, 4]])

array([0, 1])