# C1 - Content Based Image Retrieval
### Team 8 - Week 3

In [2]:
import re, os, glob, math, tqdm, pickle, itertools
import matplotlib.pyplot as plt

import numpy as np
import cv2

import pytesseract
from Levenshtein import distance as levenshtein_distance

import pywt
from skimage.feature import local_binary_pattern
from scipy.fftpack import dctn

import utils

#autoreload modules when code is run
%load_ext autoreload
%autoreload 2

# Path to the OCR executable
pytesseract.pytesseract.tesseract_cmd = r'C:\Users\Luis\AppData\Local\Programs\Tesseract-OCR\tesseract.exe'

In [3]:
# Create a set to store unique names of authors.
name_bag = set()

for folder in ['BBDD', 'qsd1_w4']:
    # Loop through each .txt file inside the folder.
    for text_file in glob.glob(f'data/{folder}/*.txt'):
        # Extract the specific text pattern from the file and add it to the set.
        name_bag.add(utils.get_text_bbdd(text_file))

In [4]:
class DataLoader():
    def __init__(self, folder_path):
        self.folder_path = folder_path

    # Divide the image into blocks
    def create_blocks_array(self, image, blockNumber):
        # Set number of slices per axis
        axisSlice = int(math.sqrt(blockNumber))

        blocksArray = []
        # Split the image into vertical blocks
        split_h = np.array_split(image, axisSlice, axis = 0)
        
        for i in range(axisSlice):
            for j in range(axisSlice):
                # Split vertical blocks into square blocks
                split_hv = np.array_split(split_h[i], axisSlice, axis = 1)
                blocksArray.append(split_hv[j])
        return blocksArray

    # Compute the histogram of the image
    def create_histogram(self, block, mask, d_hist, bins):
        
        channels = cv2.split(block)
        range_a, range_b = 256, 256

        if d_hist == 1:
            if mask is None:
                # Compute 1D histograms for each channel separately
                hist = [cv2.calcHist([chan], [0], None, [bins], [0, range_a if i == 0 else range_b]) for i,chan in enumerate(channels)]
            else:
                # Compute 1D histograms for each channel separately
                hist = [cv2.calcHist([chan[mask!=0]], [0], None, [bins], [0, range_a if i == 0 else range_b]) for i,chan in enumerate(channels)]

        elif d_hist == 2:
            if mask is None:
                # Compute 2D joint histograms for each pair of channels
                hist = [cv2.calcHist([channels[i], channels[j]], [0, 1], None, [bins, bins], [0, range_a if i == 0 else range_b, 0, range_b])
                            for i in range(len(channels)) for j in range(i+1, len(channels))]
            else:
                # Compute 2D joint histograms for each pair of channels
                hist = [cv2.calcHist([channels[i][mask!=0], channels[j][mask!=0]], [0, 1], None, [bins, bins], [0, range_a if i == 0 else range_b, 0, range_b])
                            for i in range(len(channels)) for j in range(i+1, len(channels))]

        else:
            if mask is None:
                # Compute 3D joint histogram for all three channels
                hist, _ = np.histogramdd([c.flatten() for c in channels], bins=(bins, bins, bins), range=[(0, range_a), (0, range_b), (0, range_b)])
            else:
                # Compute 3D joint histogram for all three channels
                hist, _ = np.histogramdd([c[mask != 0] for c in channels], bins=(bins, bins, bins), range=[(0, range_a), (0, range_b), (0, range_b)])

        return hist
    
    # Compute the color histogram of the image by blocks
    def get_color_features_by_blocks(self, image, level, d_hist, bins, mask_text):

        # Get blocks using multi-level resolution
        blocksArray = []
        for lvl in range(level+1):
            for b in self.create_blocks_array(image, (2**lvl)*(2**lvl)):
                blocksArray.append(b)

        if mask_text is not None:
            blocksMasks = []

            # We create a mask image blocking the bbox of the text
            # That image will be used to compute the histogram of the image without the text
            mask_text_image = np.ones(image.shape[:2], dtype=np.uint8)
            mask_text_image[mask_text[1]:mask_text[3], mask_text[0]:mask_text[2]] = 0

            # It is necessary to create the blocks of the mask image too
            for lvl in range(level+1):
                for b in self.create_blocks_array(mask_text_image, (2**lvl)*(2**lvl)):
                    blocksMasks.append(b)
        else:
            blocksMasks = [None]*len(blocksArray)

        histograms = []
        for block, mask_text_block in zip(blocksArray, blocksMasks):
            # Compute the histogram of the channel and append it to the list
            hist = self.create_histogram(block, mask_text_block, d_hist, bins)
            if isinstance(hist, list):
                for h in hist:
                    histograms.append(h.flatten() / (block.shape[0]*block.shape[1]))
            else:
                histograms.append(hist.flatten()  / (block.shape[0]*block.shape[1]))
            
        # Concatenate all histograms into a single feature vector
        return np.concatenate(histograms)

    def zigzag_scan(self, image):
        rows, cols = image.shape
        solution = [[] for _ in range(rows + cols - 1)]
        
        for i in range(rows):
            for j in range(cols):
                sum_idx = i + j
                if (sum_idx % 2 == 0):
                    # add at beginning if even index
                    solution[sum_idx].insert(0, image[i,j])
                else:
                    # add at the end if odd index
                    solution[sum_idx].append(image[i,j])

        # flatten the result
        result = np.array([num for sublist in solution for num in sublist])
        return result

    # Compute different texture features by blocks
    def get_texture_features_by_blocks(self, image, level, bins, mask_text):
        
        # Get blocks using multi-level resolution
        blocksArray = []
        for lvl in range(level+1):
            for b in self.create_blocks_array(image, (2**lvl)*(2**lvl)):
                blocksArray.append(b)

        if mask_text is not None:
            blocksMasks = []

            # We create a mask image blocking the bbox of the text
            # That image will be used to compute the histogram of the image without the text
            mask_text_image = np.ones(image.shape[:2], dtype=np.uint8)

            # Assign zero to the region corresponding to the text
            mask_text_image[mask_text[1]:mask_text[3], mask_text[0]:mask_text[2]] = 0

            # It is necessary to create the blocks of the mask image too
            for lvl in range(level+1):
                for b in self.create_blocks_array(mask_text_image, (2**lvl)*(2**lvl)):
                    blocksMasks.append(b)
        else:
            blocksMasks = [None]*len(blocksArray)

        histograms = []
        # For each block and its corresponding text mask block, compute texture features
        for block, mask_text_block in zip(blocksArray, blocksMasks):
            
            details = pywt.dwt2(block, 'bior1.3')
            approx, (h, v, d) = details # approx captures bigger details (more smooth than the original img), (h, v, d) capture de horizontal, vertical and diagonal "smaller" details
            
            if mask_text_block is not None:
                # Resize the text mask to the size of the wavelet's resulting images
                new_mask = cv2.resize(mask_text_block, approx.shape[::-1]).astype(bool) 

            # Create an histogram for each wavelet "image" and concatenate all of them
            final_hist = []
            for wt_img in [approx, h, v, d]:
                hist = np.histogram(wt_img if mask_text_block is None else wt_img[new_mask != 0], bins=bins, range=(0, 256))[0]
                final_hist.append(hist.flatten() / (wt_img.shape[0]*wt_img.shape[1]))
            histograms.append(np.concatenate(final_hist))
            
        # Concatenate all histograms into a single feature vector
        return np.concatenate(histograms)

    def get_features_by_keypoints(self, gray, mode, n_features, mask):
        
        if mode == 'sift':
            # SIFT Detector
            sift = cv2.SIFT_create(nfeatures=n_features)
            _, des = sift.detectAndCompute(gray, mask)

        elif mode == 'orb':
            # ORB Detector
            orb = cv2.ORB_create(nfeatures=n_features)
            _, des = orb.detectAndCompute(gray, mask)

        elif mode == 'akaze':
            thres = 0.005
            # AKAZE Detector
            akaze = cv2.AKAZE_create(threshold=thres)
            _, des = akaze.detectAndCompute(gray, mask)

            while des is None or des.shape[0] < n_features:
                if str(thres)[-1] == '1': 
                    thres = thres / 2
                else:
                    thres /= 5
                
                akaze = cv2.AKAZE_create(threshold=thres)
                _, des = akaze.detectAndCompute(gray, mask)

                if thres < 1e-6:
                    break

        return des

    def clean_noise(self, image, k):
        return cv2.medianBlur(image, k)
    
    # Load data, calculate background and text masks (if necessary) and compute features
    def load_data(self, level = 3, d_hist = 1, bins = 8, n_features=2048, keypoint_mode='sift', remove_background=False, remove_text=False, features_mode='color_features'):
        # Get a list of all image file names in the folder
        image_files = sorted(glob.glob(self.folder_path+'/*.jpg'))

        # Initialize an empty list to store the processed images and masks
        processed_features = dict()
        masks, masks_text = [], []

        # Iterate over each image file
        for f in tqdm.tqdm(image_files):
            
            # Get the image id from the file name. Depending on the OS, the path separator is different
            try:
                img_id = int(f.split('\\')[-1].split('.')[0].split('_')[-1])
            except:
                img_id = int(f.split('/')[-1].split('.')[0].split('_')[-1])

            # Load the image in BGR format
            image = cv2.imread(f)

            # Clean noise of the image using median filter
            image = self.clean_noise(image, k=3)

            # Convert the image to grayscale
            image_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

            features_rgb, features_wavelet, features_text, features_keypoints = [], [], [], []

            # Remove background (there can be 2 paintings in the same image)
            if remove_background:
                mask_image, coordinates = utils.get_mask(image_gray)
                n_paintings = len(coordinates)
               
                # Remove the text from each image
                if remove_text:
                    coordinates = sorted(coordinates, key=lambda x: (x[0], x[1]))
                    masks_text_i = [[None]]*n_paintings
                    # coordinates contains the coordinates of the paintings mask in the image
                    # We iterate over each masked painting and get the text mask for each one
                    # We hace to recover the original coordinates for the text mask
                    for i, (x,y,w,h) in enumerate(coordinates):
                        x_text, y_text, x_text_max, y_text_max, text = utils.get_mask_text(image_gray[y:y+h, x:x+w], name_bag=name_bag)
                        features_text.append(text)
                        masks_text_i[i] = [x+x_text, y+y_text, x+x_text_max, y+y_text_max]
                    masks_text.append(masks_text_i)
                else:
                    masks_text.extend([[None] for _ in range(n_paintings)])

            else:
                mask_image = None

                # if there is no background, the mask is the whole image
                coordinates = [[0,0,image.shape[1],image.shape[0]]]
                n_paintings = 1
                if remove_text:
                    x_text, y_text, x_text_max, y_text_max, text = utils.get_mask_text(image_gray, name_bag=name_bag)
                    features_text.append(text)
                    masks_text.append([[x_text, y_text, x_text_max, y_text_max]])
                else:
                    masks_text.append([None])

            masks.append(mask_image)
        
            for i in range(n_paintings):
                x,y,w,h = coordinates[i]

                mask_painting = np.zeros(image.shape[:2], dtype=np.uint8)
                mask_painting[y:y+h, x:x+w] = 255

                relative_mask_text = None
                if masks_text[-1][i] is not None:
                    relative_mask_text = [masks_text[-1][i][0]-x, masks_text[-1][i][1]-y, masks_text[-1][i][2]-x, masks_text[-1][i][3]-y]
                    # Remove the text from the image 
                    mask_painting[masks_text[-1][i][1]:masks_text[-1][i][3], masks_text[-1][i][0]:masks_text[-1][i][2]] = 0

                if features_mode == 'color_features' or features_mode == 'combined':    
                    # Get the features of every masked image
                    f = self.get_color_features_by_blocks(image[y:y+h, x:x+w], level, d_hist, bins, mask_text=relative_mask_text)
                    features_rgb.append(f)

                if features_mode == 'texture_features' or features_mode == 'combined':
                    f_wavelet = self.get_texture_features_by_blocks(image_gray[y:y+h, x:x+w], level, bins, mask_text=relative_mask_text)
                    features_wavelet.append(f_wavelet)

                if features_mode == 'keypoint':
                    f_keypoints = self.get_features_by_keypoints(image_gray, keypoint_mode, n_features, mask_painting)
                    features_keypoints.append(f_keypoints)
            
            # Append the features to the dict
            if features_mode == 'texture_features':
                processed_features[img_id] = features_wavelet
            
            elif features_mode == 'text_features':
                processed_features[img_id] = features_text

            elif features_mode == 'color_features':
                processed_features[img_id] = features_rgb
            
            elif features_mode == 'combined':
                if n_paintings > 1:
                    assert len(features_rgb) == len(features_wavelet) == len(features_text), 'The number of features must be the same for each mode!'
                    processed_features[img_id] = [[features_rgb[i], features_wavelet[i], features_text[i]] for i in range(n_paintings)]
                else:
                    processed_features[img_id] = [[features_rgb, features_wavelet, features_text]]
            
            elif features_mode == 'keypoint':
                processed_features[img_id] = features_keypoints
            
        return processed_features, masks, masks_text
        

In [5]:
# Create DataLoader objects for both the database and the queries
data_loader = DataLoader('data/BBDD')
data_loader_qsd1_w4 = DataLoader('data/qsd1_w4')
data_loader_qst1_w4 = DataLoader('data/qst1_w4')

# Load ground truth files for each query
with open('data/qsd1_w4/gt_corresps.pkl', 'rb') as f:
    gt_w4 = pickle.load(f)

## Validation results

### Task 1, 2, 3 and 4: 


In [30]:
k = 5

In [None]:
features_mode = 'keypoint'
keypoint_mode, sim_func = 'sift', cv2.NORM_L2
n_features = 2048
threshold = 190

# Compute features for the database and the query images
features, _, _ = data_loader.load_data(features_mode=features_mode, n_features=n_features, keypoint_mode=keypoint_mode, remove_background=False)
features_q1_w4, masks_query, text_boxes_query = data_loader_qsd1_w4.load_data(features_mode=features_mode, n_features=n_features, keypoint_mode=keypoint_mode, remove_background=True, remove_text=True)

result = utils.compare_keypoints(features_q1_w4, features, k, sim_func, threshold_matches=threshold)
mapk_1 = utils.mapk(gt_w4, result, 1)
mapk_k = utils.mapk(gt_w4, result, k)

print(f'F1 score with threshold {threshold}:', utils.calculate_f1_score(result, gt_w4))
print(f'Sift, L2, {n_features} features = mAP@1: {mapk_1}; mAP@{k}: {mapk_k}')

In [None]:
features_mode = 'keypoint'
keypoint_mode, sim_func = 'orb', cv2.NORM_HAMMING2
n_features = 2048
threshold = 100
# Compute features for the database and the query images
features, _, _ = data_loader.load_data(features_mode=features_mode, n_features=n_features, keypoint_mode=keypoint_mode, remove_background=False)
features_q1_w4, _, _ = data_loader_qsd1_w4.load_data(features_mode=features_mode, n_features=n_features, keypoint_mode=keypoint_mode, remove_background=True, remove_text=True)

result = utils.compare_keypoints(features_q1_w4, features, k, sim_func, threshold_matches=threshold)
mapk_1 = utils.mapk(gt_w4, result, 1)
mapk_k = utils.mapk(gt_w4, result, k)

print(f'F1 score with threshold {threshold}:', utils.calculate_f1_score(result, gt_w4))
print(f'Orb, Hamming, {n_features} features = mAP@1: {mapk_1}; mAP@{k}: {mapk_k}')

In [None]:
features_mode = 'keypoint'
keypoint_mode, sim_func = 'akaze', cv2.NORM_HAMMING2
n_features = 512
threshold = 40
# Compute features for the database and the query images
features, _, _ = data_loader.load_data(features_mode=features_mode, n_features=n_features, keypoint_mode=keypoint_mode, remove_background=False)
features_q1_w4, _, _ = data_loader_qsd1_w4.load_data(features_mode=features_mode, n_features=n_features, keypoint_mode=keypoint_mode, remove_background=True, remove_text=True)

result = utils.compare_keypoints(features_q1_w4, features, k, sim_func, threshold_matches=threshold)
mapk_1 = utils.mapk(gt_w4, result, 1)
mapk_k = utils.mapk(gt_w4, result, k)

print(f'F1 score with threshold {threshold}:', utils.calculate_f1_score(result, gt_w4))
print(f'Akaze, Hamming, {n_features} features = mAP@1: {mapk_1}; mAP@{k}: {mapk_k}')

In [None]:
features_mode = 'combined'
d_hist = 2
level = 3
bins = 8
# Compute features for the database and the query images
features, _, _ = data_loader.load_data(features_mode=features_mode, remove_background=False, level=level, d_hist=d_hist, bins=bins)
features_q1_w4, _, _ = data_loader_qsd1_w4.load_data(features_mode=features_mode, remove_background=True, remove_text=True, level=level, d_hist=d_hist, bins=bins)

# for thres in [x for x in np.arange(0.1,1,0.1)]: # best is 3.5
result = utils.compare_images(features_q1_w4, features, 5, utils.histogram_intersection, combine=True, param=[4,2,1], threshold_dist=3.5)
mapk_1 = utils.mapk(gt_w4, result, 1)
mapk_k = utils.mapk(gt_w4, result, k)

print(f'F1 score with threshold {3.5}:', utils.calculate_f1_score(result, gt_w4))
print(f'Combined param {[4,2,1]}, histogram_intersection = \tmAP@{1}: {mapk_1}, mAP@{k}: {mapk_k}')