# Using opencv2 to cluster images

In [None]:
import glob
import os

import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.cluster import OPTICS, cluster_optics_dbscan

Change `X` and `Y` in the glob path to the location of your images to run this notebook.

In [None]:
image_paths = glob.glob('media/submissions/course_X/assignment_Y/img/*-page-3-*.png')

### Draw matches between two images

In [None]:

if len(image_paths) < 2:
    raise ValueError('Not enough images in this folder. Fix the path to the images.')
random_image_paths = np.random.choice(image_paths, 2, replace=False)
img1 = cv2.imread(random_image_paths[0])
img2 = cv2.imread(random_image_paths[1])

In [None]:
# do image alignment with ORB
orb = cv2.ORB_create()
kp1, des1 = orb.detectAndCompute(img1, None)
kp2, des2 = orb.detectAndCompute(img2, None)

bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)

matches = bf.match(des1, des2)
matches = sorted(matches, key = lambda x:x.distance)

# draw the matches
img_f = cv2.drawMatches(img1, kp1, img2, kp2, matches[:100], None, flags=2)


plt.imshow(img_f)

### Measure similarity from an image

`MAX_FEATURES` and `GOOD_MATCH_PERCENT` appear to be the main parameters to tune for clustering.
With `MAX_FEATURES = 5000`, `GOOD_MATCH_PERCENT = 0.15`, clusering works amazingly well
but it takes a long time to run. With these set to `2000` and `0.15`,
it runs much faster, and the clustering is still pretty good.
The other important parameters to tune are:
- the OPTICS `min_samples` and 
- the function to calculate the distance between two images. Currently, it's just the weighed number of aligned good matches.


In [None]:
MAX_FEATURES = 2000
GOOD_MATCH_PERCENT = 0.15

In [None]:
from typing import List, Tuple

def ORB_features(img: np.ndarray, ORB_obj: cv2.ORB) -> Tuple[List[cv2.KeyPoint], np.ndarray]:
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    keypoints, descriptors = ORB_obj.detectAndCompute(img, None)
    return keypoints, descriptors

def get_all_features(imgs: dict) -> tuple:
    
    all_keypoints: List[List[cv2.KeyPoint]] = []
    all_descriptors: List[np.ndarray] = []
    for path, img in imgs.items():
        orb: cv2.ORB = cv2.ORB_create(MAX_FEATURES)
        keypoints: List[cv2.KeyPoint]
        descriptors: np.ndarray
        keypoints, descriptors = ORB_features(img, orb)
        all_keypoints.append(keypoints)
        all_descriptors.append(descriptors)

    return all_keypoints, all_descriptors
    
def compare_images(keypoints1: List[cv2.KeyPoint], keypoints2: List[cv2.KeyPoint],
                   descriptors1: np.ndarray, descriptors2: np.ndarray) -> Tuple[np.ndarray, float]:
  # Match features.
  matcher = cv2.DescriptorMatcher_create(cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_HAMMING)
  matches = matcher.match(descriptors1, descriptors2, None)

  # Sort matches by score
  matches = sorted(matches, key=lambda x: x.distance)

  # Remove not so good matches
  numGoodMatches = int(len(matches) * GOOD_MATCH_PERCENT)
  matches = matches[:numGoodMatches]

  # Extract location of good matches
  points1 = np.zeros((len(matches), 2), dtype=np.float32)
  points2 = np.zeros((len(matches), 2), dtype=np.float32)

  for i, match in enumerate(matches):
    points1[i, :] = keypoints1[match.queryIdx].pt
    points2[i, :] = keypoints2[match.trainIdx].pt

  # Find affine transform
  h, mask = cv2.estimateAffinePartial2D(points1, points2)
  # calculate the weighted sum of the inliers,
  # where the weights are the distances between the hamming distances
  inv_distances = np.array([1/match.distance if match.distance > 0 else 1 for match in matches])
  weighted_sum = np.sum(mask * inv_distances)

  return h, weighted_sum

In [None]:
random_index_of_image = np.random.randint(0, len(image_paths))

path1 = image_paths[random_index_of_image]
img1 = cv2.imread(path1)
cv2_images = { path: cv2.imread(path) for path in image_paths }

diffs = {}
print("starting comparison")
for path in image_paths:
  if os.path.basename(path) != os.path.basename(path1):
    img2 = cv2_images[path]
    orb = cv2.ORB_create(MAX_FEATURES)
    keypoints1, descriptors1 = ORB_features(img1, orb)
    orb = cv2.ORB_create(MAX_FEATURES)
    keypoints2, descriptors2 = ORB_features(img2, orb)
    h, metric = compare_images(keypoints1, keypoints2, descriptors1, descriptors2)
    

    batch_id = os.path.basename(path).split('-')[3]
    submission_id = os.path.basename(path).split('-')[1]
    print("Estimated metric with batch {} and submission {} : \n".format(batch_id, submission_id), metric)
    diffs[path] = {
      'batch_id': batch_id, 
      'submission_id': submission_id, 
      'metric': metric}

df = pd.DataFrame.from_dict(diffs, orient='index')
df = df.reset_index()
df = df.rename(columns={'index': 'path'})
df

In [None]:
# plot the histogram of the silimarity metrics
# color the bars based on the batch id
sns.set_style("whitegrid")
sns.set(rc={'figure.figsize':(8,6)})
sns.histplot(data=df, x="metric", hue="batch_id", multiple="stack", bins=100)
# make the x-axis logarithmic

### Cluster images based on their features

In [None]:
def distance_metric(x, y, keypoint_vectors, descriptor_vectors):
    i, j = int(x[0]), int(y[0])     # extract indices
    return 1 / compare_images(
        keypoint_vectors[i], keypoint_vectors[j],
        descriptor_vectors[i], descriptor_vectors[j]
    )[1]

cv2_images = { path: cv2.imread(path) for path in image_paths }
# first calculate all the feature vectors
print("calculating feature vectors")
keypoint_vectors, descriptor_vectors = get_all_features(cv2_images)

print("calculating distance matrix")
# create a distance matrix
dist_matrix = np.zeros((len(image_paths), len(image_paths)))
for i in range(len(image_paths)):
    print("comparing {} and 0\t\t".format(i), end='\r')
    for j in range(i+1, len(image_paths)):
        print("comparing {} and {}".format(i, j), end='\r')
        dist_matrix[i, j] = distance_metric([i], [j], keypoint_vectors, descriptor_vectors)
        dist_matrix[j, i] = dist_matrix[i, j]

In [None]:
sns.set_style("whitegrid")
sns.set(rc={'figure.figsize':(8,6)})
df = pd.DataFrame(dist_matrix)
# add the batch id to the dataframe
df['batch_id'] = [os.path.basename(path).split('-')[3] for path in image_paths]
# add the path to the dataframe
idx = np.random.randint(0, len(image_paths))
df['path'] = image_paths
sns.histplot(
    data=df, 
    x=idx, 
    hue="batch_id", 
    multiple="stack",
    bins=100)

# title the plot
plt.title(f"Histogram of distances from {os.path.basename(image_paths[idx])}")

In [None]:

print("clustering images")
# cluster the images
clustering = OPTICS(
    metric="precomputed",
    min_samples=10, 
    metric_params={
        'keypoint_vectors': keypoint_vectors, 
        'descriptor_vectors': descriptor_vectors},
).fit(dist_matrix)


In [None]:
clustering.labels_

In [None]:

labels_200 = cluster_optics_dbscan(
    reachability=clustering.reachability_,
    core_distances=clustering.core_distances_,
    ordering=clustering.ordering_,
    eps=1)

space = np.arange(len(dist_matrix))
reachability = clustering.reachability_[clustering.ordering_]
labels = clustering.labels_[clustering.ordering_]

fig, ax1 = plt.subplots()
plt.title('Automatic Clustering')


# Reachability plot
colors = ["g.", "r.", "b.", "y.", "c."]
for klass, color in zip(range(0, 5), colors):
    Xk = space[labels == klass]
    Rk = reachability[labels == klass]
    ax1.plot(Xk, Rk, color, alpha=0.3)
ax1.plot(space[labels == -1], reachability[labels == -1], "k.", alpha=0.3)
ax1.plot(space, np.full_like(space, 0, dtype=float), "k", alpha=0.5)
ax1.set_ylabel("Reachability (epsilon distance)")
ax1.set_title("Reachability Plot");