In [6]:
import cv2
import numpy as np
from sklearn.cluster import KMeans

import os
from typing import Tuple

In [7]:
# path_images = r'C:\Fotos y videos\test bovw'
path_images = r'C:\TRABAJO\Willdom\Sr Machine Learning Engineer\Challenge'
working_resolution = [512,512]
vocabulary_size = 64

# Class definition

In [34]:
class ImageRetriever:
    def __init__(self, working_resolution: list = None, vocabulary_size: int = 64) -> None:
        self.working_resolution = [512, 512] if working_resolution is None else working_resolution
        assert len(self.working_resolution) == 2
        assert isinstance(vocabulary_size, int) and vocabulary_size > 2
        self.vocabulary_size = vocabulary_size

        self.images = None
        self.visual_dictionary = None
        self.descriptors_by_image = None
        self.visual_words = None
        self.word_classifier = None
        self.words_by_image = None
        self.global_descriptor_by_image = None
        self.global_descriptors = None

    def fit(self, folder_path: str):
        # Helper functions
        def load_images_from_folder(folder: str, resolution: list) -> list:
            images = {}
            for filename in os.listdir(folder):
                if filename.split('.')[-1] == 'jpg':
                    path = os.path.join(folder, filename)
                    img = cv2.imread(path,0)
                    img = cv2.resize(img, resolution)
                    if img is not None:
                        images[filename] = img
            return images
        
        def sift_dictionary(images: dict) -> list:
            """Create a dictionary of descriptors with SIFT."""
            sift = cv2.SIFT_create()
            visual_dictionary = []
            descriptors_by_image = {}
            for name, img in images.items():
                kp, des = sift.detectAndCompute(img, None)
                # print(f'image {name} has {len(des)} descriptors')
                visual_dictionary.extend(des)
                descriptors_by_image[name] = des
            return visual_dictionary, descriptors_by_image
        
        def kmeans_visual_vocabulary(k: int, visual_dictionary: list) -> tuple:
            """Create a vocabulary with k-means quantization."""
            kmeans = KMeans(n_init=10, n_clusters=64)
            kmeans.fit(visual_dictionary)
            visual_words = kmeans.cluster_centers_
            word_classifier = lambda x: kmeans.predict(np.array(x, dtype='float')) 
            return visual_words, word_classifier
        
        def get_words_by_image(descriptors_by_image: dict, word_classifier) -> dict:
            """Get the visual words in each image by using kmeans predictor."""
            words_by_image = {}
            for name, descriptors in descriptors_by_image.items():
                words_by_image[name] = word_classifier(descriptors)
            return words_by_image
        
        def get_global_descriptor_by_image(words_by_image: dict, vocabulary_size: int) -> Tuple[dict, np.array]:
            """Create a histogram of visual words for each image"""
            global_descriptor_by_image = {}
            for name, words in words_by_image.items():
                global_descriptor_by_image[name] = np.array(
                    np.histogram(words, range=(-.5,vocabulary_size+.5), bins=vocabulary_size)[0],
                dtype='float32')

            # global_descriptor_by_image to array of descriptors
            global_descriptors = np.zeros((len(self.images), vocabulary_size), 
                                          dtype=global_descriptor_by_image[list(global_descriptor_by_image.keys())[0]].dtype)
            i = 0
            for name, global_descriptor in global_descriptor_by_image.items():
                global_descriptors[i, :] = global_descriptor_by_image[name]
                i += 1

            return global_descriptor_by_image, global_descriptors
        
        # Processing
        print('Loading images...')        
        self.images = load_images_from_folder(folder_path, self.working_resolution)
        print(f'{len(self.images)} images were loaded')

        print('Extracting local descriptors from the images...')   
        self.visual_dictionary, self.descriptors_by_image = sift_dictionary(self.images)
        print(f'The visual dictionary has {len(self.visual_dictionary)} descriptors')

        print('Clustering local descriptors...')
        self.visual_words, self.word_classifier = kmeans_visual_vocabulary(self.vocabulary_size, self.visual_dictionary)
        print(f'The visual vocabulary has {len(self.visual_words)} visual words')

        print('Estracting visual words from the image local descriptors...')
        self.words_by_image = get_words_by_image(self.descriptors_by_image, self.word_classifier)
        print('The visual words of each image are ready.')

        print("Computing a global descriptor for each image...")
        self.global_descriptor_by_image, self.global_descriptors = get_global_descriptor_by_image(self.words_by_image, self.vocabulary_size)
        print("The global descriptors of each image are ready.")

    def retrieve(self, query_image_name: str, result_size: int = 3):
        # Create a matcher
        bf = cv2.FlannBasedMatcher()

        # Match descriptors
        matches = bf.knnMatch(self.global_descriptor_by_image[query_image_name].reshape((1, -1)), 
                              self.global_descriptors, k=result_size)[0]
        matches_idx = [match.trainIdx for match in matches]
        print(matches_idx)
        matches_names = [list(self.images.keys())[i] for i in matches_idx]
        print(matches_names)

        for i in range(result_size):
            name = matches_names[i]
            cv2.imshow(f'Query result: {name} - Distance: {matches[i].distance}', cv2.resize(cv2.imread(name), working_resolution))
        cv2.waitKey(0)
        cv2.destroyAllWindows()

    def display_images(self):
        if self.images is not None:
            cv2.namedWindow('image', cv2.WINDOW_GUI_EXPANDED)
            for filename in self.images.keys():
                cv2.imshow('image', self.images[filename])
                cv2.waitKey(0)
            cv2.destroyAllWindows()
        else:
            print('Images not loaded, use .fit()')
            

# Test

In [35]:
ir = ImageRetriever()

ir.fit(path_images)
# ir.display_images()

Loading images...
28 images were loaded
Extracting local descriptors from the images...
The visual dictionary has 37999 descriptors
Clustering local descriptors...
The visual vocabulary has 64 visual words
Estracting visual words from the image local descriptors...
The visual words of each image are ready.
Computing a global descriptor for each image...
The global descriptors of each image are ready.


In [41]:
ir.retrieve('IMG_20220203_152013349.jpg', result_size=8)


[3, 9, 11, 12, 8, 6, 13, 10]
['IMG_20220203_152013349.jpg', 'IMG_20220214_144907525.jpg', 'IMG_20220216_143433424.jpg', 'IMG_20220216_145949270.jpg', 'IMG_20220214_134233140.jpg', 'IMG_20220214_124603956.jpg', 'IMG_20220216_171549908.jpg', 'IMG_20220215_144642554.jpg']


# Load images

In [190]:
# Function to load images massively
def load_images_from_folder(folder: str, resolution: list) -> list:
    images = {}
    for filename in os.listdir(folder):
        if filename.split('.')[-1] == 'jpg':
            path = os.path.join(folder, filename)
            img = cv2.imread(path,0)
            img = cv2.resize(img, resolution)
            if img is not None:
                images[filename] = img
    return images

In [191]:
images = load_images_from_folder(path_images, working_resolution)
cv2.namedWindow('image', cv2.WINDOW_GUI_EXPANDED)
for filename in images.keys():
    cv2.imshow('image', images[filename])
    cv2.waitKey(0)
cv2.destroyAllWindows()

# Extract global descriptors

## Create a dictionary of descriptors with SIFT

In [192]:
def sift_dictionary(images: dict) -> list:
    sift = cv2.SIFT_create()
    visual_dictionary = []
    descriptors_by_image = {}
    for name, img in images.items():
        kp, des = sift.detectAndCompute(img, None)
        print(f'image {name} has {len(des)} descriptors')
        visual_dictionary.extend(des)
        descriptors_by_image[name] = des
    return visual_dictionary, descriptors_by_image

In [193]:
visual_dictionary, descriptors_by_image = sift_dictionary(images)
print(f'The visual dictionary has {len(visual_dictionary)} descriptors')

image im1.jpg has 2920 descriptors
image im2.jpg has 183 descriptors
image im3.jpg has 504 descriptors
image IMG_20220203_152013349.jpg has 1545 descriptors
image IMG_20220203_155830590.jpg has 986 descriptors
image IMG_20220207_131922403.jpg has 656 descriptors
image IMG_20220214_124603956.jpg has 1594 descriptors
image IMG_20220214_133333313.jpg has 1986 descriptors
image IMG_20220214_134233140.jpg has 1742 descriptors
image IMG_20220214_144907525.jpg has 1572 descriptors
image IMG_20220215_144642554.jpg has 1961 descriptors
image IMG_20220216_143433424.jpg has 1739 descriptors
image IMG_20220216_145949270.jpg has 1467 descriptors
image IMG_20220216_171549908.jpg has 1826 descriptors
image IMG_20220218_153152657.jpg has 1647 descriptors
image IMG_20220330_143611689.jpg has 1531 descriptors
image IMG_20220330_143707804.jpg has 495 descriptors
image IMG_20220330_143722955.jpg has 1988 descriptors
image IMG_20221013_095154171.jpg has 926 descriptors
image IMG_20221013_095209402.jpg has 

## Create a vocabulary with k-means quantization

In [194]:
def kmeans_visual_vocabulary(k: int, visual_dictionary: List) -> tuple:
    kmeans = KMeans(n_init=10, n_clusters=64)
    kmeans.fit(visual_dictionary)
    visual_words = kmeans.cluster_centers_
    word_classifier = lambda x: kmeans.predict(np.array(x, dtype='float')) 
    return visual_words, word_classifier

In [195]:
visual_words, word_classifier = kmeans_visual_vocabulary(vocabulary_size, visual_dictionary)
print(f'The visual vocabulary has {len(visual_words)} visual words')

The visual vocabulary has 64 visual words


## Get the image global descriptors

### Get the visual words in each image by using kmeans predictor

In [196]:
def get_words_by_image(descriptors_by_image: dict, word_classifier) -> dict:
    words_by_image = {}
    for name, descriptors in descriptors_by_image.items():
        words_by_image[name] = word_classifier(descriptors)
    return words_by_image



In [197]:
words_by_image = get_words_by_image(descriptors_by_image, word_classifier)

print(descriptors_by_image[list(descriptors_by_image.keys())[0]].shape)
print(words_by_image[list(descriptors_by_image.keys())[0]].shape)

(2920, 128)
(2920,)


### Create a histogram of visual words for each image

In [198]:
def get_global_descriptor_by_image(words_by_image: dict, vocabulary_size: int) -> dict:
    global_descriptor_by_image = {}
    for name, words in words_by_image.items():
        global_descriptor_by_image[name] = np.array(
            np.histogram(words, range=(-.5,vocabulary_size+.5), bins=vocabulary_size)[0],
        dtype='float32')
    return global_descriptor_by_image


In [199]:
global_descriptor_by_image = get_global_descriptor_by_image(words_by_image, vocabulary_size)

print(global_descriptor_by_image[list(global_descriptor_by_image.keys())[0]])

[ 49.  19.  34.  47.  46.  49.  35.  34.  30.   1.  44.  68.  42.  41.
  41.  54.  25.  56.  79.  24.  62.  25.  20.  93.  35.  74.  93.  34.
 125.  21. 116.  77.  61.  79.  72.  45.  67.  32. 103.  23.  41.  35.
  47.  26.  50.  43.  33.  34.  19.  33.  49.  22.  71.  30.  58.  66.
  11.  44.   9.  39.  23.  46.  16.   0.]


# Query

## Definition of the query image and result size

In [238]:
query_image_name = list(images.keys())[3]
query_image_name = 'IMG_20220203_152013349.jpg'
result_size = 4

win_name = f"Query image: {query_image_name}"
cv2.namedWindow(win_name)
cv2.imshow(win_name, images[query_image_name])
cv2.waitKey(0)
cv2.destroyAllWindows()

## Results


In [239]:
# Create a matcher
bf = cv2.FlannBasedMatcher()

# global_descriptor_by_image to array of descriptors
global_descriptors = np.zeros((len(images), vocabulary_size), 
                              dtype=global_descriptor_by_image[query_image_name].dtype)
i = 0
for name, global_descriptor in global_descriptor_by_image.items():
    global_descriptors[i, :] = global_descriptor_by_image[name]
    i += 1

# Match descriptors
matches = bf.knnMatch(global_descriptor_by_image[query_image_name].reshape((1, -1)), global_descriptors, k=result_size)[0]
matches_idx = [match.trainIdx for match in matches]
print(matches_idx)
matches_names = [list(images.keys())[i] for i in matches_idx]
print(matches_names)


for i in range(result_size):
    name = matches_names[i]
    cv2.imshow(f'Query result: {name} - Distance: {matches[i].distance}', cv2.resize(cv2.imread(name), working_resolution))
cv2.waitKey(0)
cv2.destroyAllWindows()


[3, 9, 8, 11]
['IMG_20220203_152013349.jpg', 'IMG_20220214_144907525.jpg', 'IMG_20220214_134233140.jpg', 'IMG_20220216_143433424.jpg']
