In [1]:
# # Notebook variables
# # ---------------------
# # should tests be runned?
# run_simulations = True

# # should the test data be saved?
# save_simulation_data = True

# # should the stored test data be loaded?
# load_simulation_data = True

In [2]:
# import matplotlib
import math
import os
import random as rng
import sys
import time
from concurrent.futures import ProcessPoolExecutor

import cv2 as cv
import matplotlib.pyplot as plt
import numpy as np
# make AntClus dir known
import sys
sys.path.append("../AntClust")
from AntClust import AntClust
from distance_classes import (
    opencv_image_flann_similarity,
    opencv_image_orb_similarity,
    opencv_descriptor_flann_similarity,
    opencv_orb_similarity,
    precomputed_similarity_matrix
)
from matplotlib import cm
from matplotlib.ticker import LinearLocator
from rules import labroche_rules
from sklearn.cluster import DBSCAN, HDBSCAN, OPTICS
from sklearn.metrics import adjusted_rand_score, rand_score

# data functions

In [3]:
def data_cluster_images_static(
    data_folder, num_clusters, num_images_per_cluster, seed=3
):
    """
    Will generate num_clusters clusters with num_images_per_cluster pictures each
    """
    # get the cars
    car_dir_names = sorted(os.listdir(data_folder), key=lambda x: x)

    # remove ipycheckpoint
    if car_dir_names[0] == ".ipynb_checkpoints":
        car_dir_names = car_dir_names[1::]

    # make a random shuffle of the cars/folders to take
    rng.seed(seed)
    cars_to_take = rng.sample(car_dir_names, len(car_dir_names))
    cars_to_take = cars_to_take[0:num_clusters:]

    # make the cluster data and the labels
    # generate num_clusters and add num_images_per_cluster to each cluster
    cluster_image: list = []
    cluster_labels: list = []

    label_counter = 0
    for car_folder in cars_to_take:
        # take images and shuffle them
        imgs = sorted(os.listdir(data_folder + "/" + car_folder), key=lambda x: x)
        imgs = rng.sample(imgs, len(imgs))

        # make data and labels
        # add the respectiv car folder as path
        cluster_image = cluster_image + [
            str(car_folder) + "/" + image for image in imgs[0:num_images_per_cluster:]
        ]
        cluster_labels = cluster_labels + [label_counter] * num_images_per_cluster
        label_counter += 1

    # read the images as opencv images from disk
    image_data = []
    for image_file in cluster_image:
        image_data.append(
            [cv.imread(data_folder + "/" + image_file, cv.IMREAD_GRAYSCALE)]
        )

    return cluster_image, image_data, cluster_labels


def data_cluster_images_dynamic(
    data_folder,
    num_clusters,
    num_images_per_cluster_min,
    num_images_per_cluster_max,
    seed=3,
):
    """
    Will generate num_clusters clusters where a random ammount of images in
    the range [num_images_per_cluster_min, num_images_per_cluster_max]
    """
    # get the cars
    car_dir_names = sorted(os.listdir(data_folder), key=lambda x: x)

    # remove ipycheckpoint
    if car_dir_names[0] == ".ipynb_checkpoints":
        car_dir_names = car_dir_names[1::]

    # make a random shuffle of the cars/folders to take
    rng.seed(seed)
    cars_to_take = rng.sample(car_dir_names, len(car_dir_names))
    cars_to_take = cars_to_take[0:num_clusters:]

    # make the cluster data and the labels
    # generate num_clusters and add a random ammount of images (in min, max range) to each cluster
    cluster_image: list = []
    cluster_labels: list = []

    label_counter = 0
    for car_folder in cars_to_take:
        # take images and shuffle them
        imgs = sorted(os.listdir(data_folder + "/" + car_folder), key=lambda x: x)
        imgs = rng.sample(imgs, len(imgs))

        # make data and labels
        num_images = rng.randint(num_images_per_cluster_min, num_images_per_cluster_max)
        # add the respectiv car folder as path
        cluster_image = cluster_image + [
            str(car_folder) + "/" + image for image in imgs[0:num_images:]
        ]
        cluster_labels = cluster_labels + [label_counter] * num_images
        label_counter += 1

    # read the images as opencv images from disk
    # and put them into their own array as data tuple
    image_data = []
    for image_file in cluster_image:
        image_data.append(
            [cv.imread(data_folder + "/" + image_file, cv.IMREAD_GRAYSCALE)]
        )

    return cluster_image, image_data, cluster_labels


def compute_orb_image_features(images, image_resize_size):
    # Initiate ORB detector for feature extraction
    orb = cv.ORB_create()
    descriptors = []
    # compute key points and descriptors
    for image in images:
        img = image[0]
        img = cv.resize(img, image_resize_size)
        kp, des = orb.detectAndCompute(img, None)
        descriptors.append([des])

    return descriptors

# Cluster Performance Tests

In [7]:
# test variables
# --------------
clusters_min = 2
clusters_max = 30
test_every_n = 2
num_clusters = np.arange(clusters_min, clusters_max + 1, test_every_n)
data_folder = "data"
values_per_cluster = 18
seed = 9
image_resize_size = (150, 172)

# AntClust standart params
ant_clust_params = {'alpha': 500, 'betta': 0.9, 'shrink': 0.2, 'removal': 0.3}
dbscan_params = {"eps": 0.33, "min_samples": 2}

# result arrays
ari_antclust = []
ari_dbscan = []
ari_hdbscan = []
ari_optics = []

# test loop
t_0 = time.time()
for num_clust in num_clusters:
    print(f"measure performance on {num_clust} clusters")
    # Data generation
    # ----------------
    image_names, images, labels = data_cluster_images_static(
        data_folder, num_clust, values_per_cluster, seed
    )
    images = compute_orb_image_features(images, image_resize_size)
    data = np.array(images, dtype=list)
    labels = np.array(labels)

    # distance matrix for sklearn
    orb_sim = opencv_orb_similarity()
    distance_matrix = []
    for i in range(len(data)):
        t_l = []
        for n in range(len(data)):
            t_l.append(orb_sim.similarity(data[i][0], data[n][0]))
        distance_matrix.append(t_l)
    # sklearn needs it in the way that 0 means a==b
    # ant clust needs it in the way 1 means a==b
    data_distance_matrix = 1 - np.array(distance_matrix)

    # ------------------------------------
    # clustering with different algorithms
    # ------------------------------------
    # AntClust
    # ----------
    # similarity function
    f_sim = [precomputed_similarity_matrix()]
    # rules
    rules = labroche_rules()
    # AntClust
    ant_clust = AntClust(
        f_sim,
        rules,
        alpha_ant_meeting_iterations=ant_clust_params["alpha"],
        betta_template_init_meetings=ant_clust_params["betta"],
        nest_shrink_prop=ant_clust_params["shrink"],
        nest_removal_prop=ant_clust_params["removal"],
        print_status=False,
    )
    # find clusters
    ant_clust.fit([[i] for i in data_distance_matrix])
    # get the clustering result
    y_pred = ant_clust.get_clusters()
    # calculate the ari score
    ari_score = adjusted_rand_score(labels, y_pred)
    # append score
    ari_antclust.append(ari_score)

    # DBSCAN
    # ----------
    dbscan = DBSCAN(
        eps=dbscan_params["eps"],
        min_samples=dbscan_params["min_samples"],
        metric="precomputed",
    )

    # Fit the model to the data
    dbscan.fit(data_distance_matrix)

    # Get the cluster labels for each data point
    y_pred = dbscan.labels_
    # calculate the ari score
    ari_score = adjusted_rand_score(labels, y_pred)
    # append score
    ari_dbscan.append(ari_score)

    # HDBSCAN
    # ----------
    hdbscan = HDBSCAN(metric="precomputed")

    # Fit the model to the data
    hdbscan.fit(data_distance_matrix)

    # Get the cluster labels for each data point
    y_pred = hdbscan.labels_
    # calculate the ari score
    ari_score = adjusted_rand_score(labels, y_pred)
    # append score
    ari_hdbscan.append(ari_score)

    # OPTICS
    # ----------
    optics = OPTICS(metric="precomputed")

    # Fit the model to the data
    optics.fit(data_distance_matrix)

    # Get the cluster labels for each data point
    y_pred = optics.labels_
    # calculate the ari score
    ari_score = adjusted_rand_score(labels, y_pred)
    # append score
    ari_optics.append(ari_score)
print(f"testing took {time.time() - t_0} seconds")

measure performance on 2 clusters
measure performance on 4 clusters
measure performance on 6 clusters
measure performance on 8 clusters
measure performance on 10 clusters
measure performance on 12 clusters
measure performance on 14 clusters
measure performance on 16 clusters
measure performance on 18 clusters
measure performance on 20 clusters
measure performance on 22 clusters
measure performance on 24 clusters
measure performance on 26 clusters
measure performance on 28 clusters
measure performance on 30 clusters
testing took 1083.333025932312 seconds


In [8]:
print(ari_antclust)
print(ari_dbscan)
print(ari_hdbscan)
print(ari_optics)

[0.35610612924100954, 0.5635759252725213, 0.6769081571929206, 0.7577188882812111, 0.6392491088718132, 0.6592643929957883, 0.6811088451146746, 0.6635066743157172, 0.6418196702140101, 0.6580274648430187, 0.6030222029190946, 0.5730978909016121, 0.5834164097963406, 0.5017854815587124, 0.5638017432112977]
[0.753633787366304, 0.7290871734663927, 0.741189192265555, 0.7306183296628833, 0.7214452446980428, 0.3379660600321533, 0.225671510253856, 0.20376583574899268, 0.17077394519743636, 0.20746875920288518, 0.15106708617020048, 0.1068482019577566, 0.08270926549569964, 0.05869973573641392, 0.06191467990423928]
[0.8564397046759639, 0.7589267383648395, 0.5526232385781852, 0.7129780201561271, 0.645396570259579, 0.48741134454083845, 0.4538531680073583, 0.4770498270178605, 0.4102312836286759, 0.3981154175560774, 0.3559824314691303, 0.4089762910895465, 0.3757083892502588, 0.3539337353650682, 0.3865623728353755]
[0.4468804238604184, 0.6251977631228559, 0.2850013806260512, 0.44954925869818957, 0.39000046

In [9]:
print(np.mean(ari_antclust))
print(np.mean(ari_dbscan))
print(np.mean(ari_hdbscan))
print(np.mean(ari_optics))

0.6081605989819828
0.35219058714392065
0.5089459021863256
0.25312592839712394
