# Oxford5k + MPEG7 Image Retrieval:

Clean version of all functions used to compute the metrics/Image retrieval of either MPEG7 or Oxford5k dataset

At the bottom of the page is the functions to run them, and instructions on inputted variables

## Imports

In [None]:
import numpy as np 
import cv2
import os
import matplotlib.pyplot as plt
from PIL import Image
from IPython.display import clear_output
import time
import pandas
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import euclidean_distances
from sklearn.cluster import MiniBatchKMeans, KMeans
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.model_selection import train_test_split

## Loading in Oxford Test data

In [None]:
def load_test_data(images_path, gt_path):
    test_images_path = []
    test_names = []
    test_gray_images = []
    test_colour_images = []
    
    for filename in sorted(os.listdir(gt_path)):
        if filename.endswith("query.txt"):
            
            # Saving filename
            tmp = filename.split(".")[0].split("_")
            if len(tmp) == 4:
                name = tmp[0]+"_"+tmp[1]
            elif len(tmp) == 3:
                name = tmp[0]
            test_names.append(name)

            # Reading the image number to be saved
            with open(os.path.join(gt_path, filename), "r") as f:
                line = f.readline()
                test_images_path.append(line.split(" ")[0])
    
    for path in test_images_path:
        image = cv2.imread(os.path.join(images_path, path[5:]) + ".jpg")
        gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        test_gray_images.append(gray_image)
        test_colour_images.append(image)
    
    print("Loaded in {} Images!".format(len(test_gray_images)))
    
    return test_gray_images, test_colour_images, test_names, test_images_path

## Loading in Oxford Training data

In [None]:
def load_train_data(images_path, gt_path, test_img_paths):
    train_names = []
    train_images_path = []
    train_gray_images = []
    train_colour_images = []

    all_image_names = []

    for filename in sorted(os.listdir(gt_path)):
        if filename.endswith("good.txt") or filename.endswith("ok.txt"):

            # Saving filenames
            tmp = filename.split(".")[0].split("_")
            if len(tmp) == 4:
                name = tmp[0]+"_"+tmp[1]
            elif len(tmp) == 3:
                name = tmp[0]

            # Saving image paths
            with open(os.path.join(gt_path, filename), "r") as f:
                line = f.readlines()
                for i in range(len(line)):
                    line[i] = line[i][:-1]
                    if line[i] not in all_image_names:
                        if "oxc1_"+str(line[i]) not in test_img_paths:
                            # Append this many names
                            train_names.append(name)
                            train_images_path.append(line[i])
                            all_image_names.append(line[i])

    for path in train_images_path:
        image = cv2.imread(os.path.join(images_path, path) + ".jpg")
        gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        train_gray_images.append(gray_image)
        train_colour_images.append(image)
    
    print("Loaded in {} Images!".format(len(train_gray_images)))
    
    return train_gray_images, train_colour_images, train_names

## Loading in MPEG7 function

In [None]:
def load_MPEG7_data(folder, image_type, gray=False):
    images = []
    filenames = []
    y = []
    category, idx = "none", 0
    for filename in sorted(os.listdir(folder)):
        if filename.endswith(image_type):
            if image_type == ".gif":
                gif_image = cv2.VideoCapture(os.path.join(folder, filename))
                ret, frame = gif_image.read()
                image = Image.fromarray(frame)
                image = image.resize((32, 32), Image.ANTIALIAS)
                image = np.array(image)
            else:
                image = cv2.imread(os.path.join(folder, filename))
            if gray:
                gray_image = image
            else:
                gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
            if gray_image is not None:
                images.append(gray_image)
                split_name = filename.split('_')
                if len(split_name) == 3:
                    fname = filename.split('_')[0] + "_" + filename.split('_')[1]
                elif len(split_name) == 2:
                    fname = filename.split('_')[0]
                elif len(split_name) == 1:
                    fname = filename.split('-')[0]
                filenames.append(fname)
                if filename.startswith(category):
                    y.append(idx)
                else:
                    split = filename.split('_')
                    if len(split) == 3:
                        category = filename.split('_')[0] + "_" + filename.split('_')[1]
                    elif len(split) == 2:
                        category = filename.split('_')[0]
                    idx = idx + 1
                    y.append(idx)
    print(len(images), "Images loaded successfully!")
    return images, filenames

## Metrics computation

In [None]:
def image_retrieval_k(train_data, test_data, train_names, test_names, train_images_as_array, test_images_as_array, k=20, view_option=0, image_size=(32,32), border_size=20):
    avg_precisions = []
    avg_recalls = []
    precisionsatk = []
    count = 0
    
    for idx, query in enumerate(test_data):
        
        all_precisions = []
        all_recalls = []
        precisions = []
        recalls = []

        # Finding the euclidean distance from the query image and sorting them into index
        query = query.reshape((1, -1))
        D = euclidean_distances(train_data, query).squeeze()
        index = np.argsort(D)
        
        # Finding the index of the last correct image in the sorted index to iter to
        last_correct_image_idx = 0
        for i in range(len(index)):
            if train_names[index[i]] == test_names[idx]:
                last_correct_image_idx = i
        
        # make sure we iter to k (for precision@k) if all correct images are found before k
        if k > last_correct_image_idx:
            last_correct_image_idx = k+1
        
        # Itering through all images untill we get to k or last correct image to compute AP
        for kk in range(1, last_correct_image_idx+2):
            TP = 0
            FP = 0
            FN = 0
            
            # Finding the correct amount of images in the training set
            correct_count = 0
            for ind in index:
                if train_names[ind] == test_names[idx]:
                    correct_count += 1
            sized_index = index[:kk]
            
            # Find TP FP FN
            for ind in sized_index:
                if train_names[ind] == test_names[idx]:
                    TP += 1
                else:
                    FP += 1
            FN = correct_count - TP
            
            # If we want to view the images then we run this code, else its a waste of computational time
            if view_option == 1:
                # Creating image of k images (including query image at start)
                tmp = [query.reshape(image_size)]
                for ind in sized_index[:k]:
                    tmp.append(train_data[ind].reshape(image_size))
                output = np.array(tmp)*255
                output = output.transpose(1, 0, 2)
                output = output.reshape((image_size[0], -1))
                im_query = Image.fromarray(output)
            
            # If the last k image is a correct image we add precision to the list
            if train_names[sized_index[-1]] == test_names[idx]:
                precisions.append(TP/(TP+FP))
                recalls.append(TP/(TP+FN))

            # Adding all precisions and recalls to a seperate list
            all_precisions.append(TP/(TP+FP))
            all_recalls.append(TP/(TP+FN))
        
     
        # Solving AP, AR and precision@k
        avg_precisions.append(np.average(precisions))
        avg_recalls.append(np.average(all_recalls))
        precisionsatk.append(all_precisions[k-1])
        
        # Set a viewing option, if 1 we print out the following:
        if view_option == 1:
            display(im_query) 
            print("Label: {}".format(test_names[idx]))
            print("Average Precision for query {}: ".format(idx), avg_precisions[-1])
            print("Precision@k for query {}: ".format(idx), precisionsatk[-1])
            print("\n")
        elif view_option == 0:
            count += 1 
            print("Percentage Complete: {}".format(round((count/len(test_data))*100),2), end="\r")
        elif view_option == 2:
            # Allowing a view_option 2 -> for viewing top k images from non_pixel value inputs
            # creating an array of the top k similar images
            top_k_images = [test_images_as_array[idx]]
            for i in range(0,k):
                top_k_images.append(train_images_as_array[index[i]])

            fig, axes = plt.subplots(1, k+1, figsize=(200/k, 200/k))
            for i, (image, ax) in enumerate(zip(top_k_images, axes.ravel())):
                # convert image to RGB and add border:
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                # resize image if border size greater than 10:
                if border_size >= 10:
                    image = cv2.resize(image, (250, 400), interpolation=cv2.INTER_CUBIC)
                if i == 0:
                    query_name = test_names[idx]
                    title = "Query: {}".format(query_name)
                    color = (0, 255, 0)
                    image = border(image, color, border_size)
                else:
                    title = train_names[sized_index[i-1]]
                    if train_names[sized_index[i-1]] == query_name:
                        color = (0, 255, 0)
                        image = border(image, color, border_size)
                    else:
                        color = (255, 0, 0)
                        image = border(image, color, border_size)
                # display all set options
                ax.imshow(image, cmap="gray")
                ax.set_title(title)
                ax.axis("off")
            plt.show()
            print("Label: {}".format(test_names[idx]))
            print("Average Precision for query {}: ".format(idx), avg_precisions[-1])
            print("Precision@k for query {}: ".format(idx), precisionsatk[-1])
            print("\n")
        elif view_option == 3:
            top_k_images = [test_images_as_array[idx]]
            for i in range(0,k):
                top_k_images.append(train_images_as_array[index[i]])

            fig, axes = plt.subplots(1, k+1, figsize=(200/k, 200/k))
            for i, (image, ax) in enumerate(zip(top_k_images, axes.ravel())):
                # convert image to RGB and add border:
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                # resize image if border size greater than 10:
                if border_size >= 10:
                    image = cv2.resize(image, (250, 400), interpolation=cv2.INTER_CUBIC)
                if i == 0:
                    query_name = test_names[idx]
                    title = "Query: {}".format(query_name)
                else:
                    title = train_names[sized_index[i-1]]
                    if train_names[sized_index[i-1]] == query_name:
                        color = (0, 255, 0)
                        image = border(image, color, border_size)
                    else:
                        color = (255, 0, 0)
                        image = border(image, color, border_size)
                # display all set options
                ax.imshow(image, cmap="gray")
                ax.set_title(title)
                ax.axis("off")
            plt.show()
    return avg_precisions, avg_recalls, precisionsatk

## Save metrics data to csv

In [None]:
def save_data_to_csv(_precisionsatk, _AP, _k, _dataset_name):
    data = {'Precision@k': _precisionsatk, 'Average Precision': _AP}
    df = pandas.DataFrame(data=data)
    pandas.set_option("display.max_rows", 500, "display.max_columns", 4)
    df.to_csv('{}-metrics_k={}.csv'.format(_dataset_name, _k))

## Solving pixel values for oxford

In [None]:
def find_pixel_values(test_gray_images, train_gray_images):
    test_pixels = []
    for image in test_gray_images:
        img = Image.fromarray(image)
        img = img.resize((100,100))
        img = np.array(img)
        img = img.reshape((10000,))
        test_pixels.append(img)

    train_pixels = []
    for image in train_gray_images:
        img = Image.fromarray(image)
        img = img.resize((100,100))
        img = np.array(img)
        img = img.reshape((10000,))
        train_pixels.append(img)
    
    return test_pixels, train_pixels

## Solving pixel values for mpeg

In [None]:
def mpeg_find_pixel_values(images):
    tmp = []
    for image in images:
        im = np.asarray(image)
        im = im.reshape((1024,))
        tmp.append(im)
    pixel_values = np.array(tmp)
    return pixel_values

## Putting a border around an image

In [None]:
def border(img, color, border_size):
    # get dimensions
    h, w = img.shape[0:2]

    # make a base slightly bigger than image
    base_size= h+(border_size*2), w+(border_size*2), 3
    base = np.zeros(base_size, dtype=np.uint8)

    # make a boundary of chosen color
    cv2.rectangle(base, (0,0), (w+20,h+20), color, 30)

    # put original image into base
    base[border_size:h+border_size, border_size:w+border_size] = img
    plt.imshow(base)
    
    return base

## Solving SIFT + BoW

In [None]:
def SIFT(images):
    sift = cv2.SIFT_create()
    
    keypoints_per_image = []
    descriptor_per_image = []
    
    count = 0
    for image in images:
        keypoints, descriptor = sift.detectAndCompute(image, None)

        keypoints_per_image.append(keypoints)
        descriptor_per_image.append(descriptor)
        
        count += 1 
        print("Percentage Completed: {}%".format(round((count/len(images))*100), 2), end="\r")
    
    print("")
    return keypoints_per_image, descriptor_per_image

def stack_descriptors(descriptors):
    stack = []
    
    for desc in descriptors:
        tmp = np.array(desc)
        if tmp.shape:
            stack.append(tmp)
            
    all_descriptors = np.vstack(i for i in stack)
    
    return all_descriptors

def cluster(data, n_clusters=100, cluster_type="minibatch"):
    start = time.time()
    
    if cluster_type == "minibatch":
        cluster = MiniBatchKMeans(n_clusters=n_clusters)
        y_cluster = cluster.fit(data)
    elif cluster_type == "kmeans":
        cluster = KMeans(n_clusters=n_clusters)
        y_cluster = cluster.fit(data)
    else:
        print("Unknown cluster_type! Try: 'minibatch' or 'kmeans'")
        
    end = time.time()
    print("Time Elapsed: {} min".format(round((end - start)/60, 2)))
    return y_cluster

def solve_BoW(descriptors, y_cluster, n_clusters):
    previous = 0
    count = 0
    image_words = []
    for image_number in range(len(descriptors)):
        if descriptors[image_number] is not None:
            tmp = []
            for kp in descriptors[image_number]:
                cluster = y_cluster.predict(np.array([kp]))
                tmp.append(cluster[0])
            image_words.append(tmp)
            
            count += 1
            print("(1/2) Percentage Completed: {}%".format(round((count/len(descriptors))*100), 2), end="\r")
        else:
            # If image has no desciptors, append 0 words to it
            image_words.append([0])
    
    print("")
    count = 0
    image_histograms = []
    for image in range(len(image_words)):
        hist = np.zeros(n_clusters)
        for words in image_words[image]:
            hist[words-1] = hist[words-1]+1
        image_histograms.append(hist)
        
        count += 1
        print("(2/2) Percentage Completed: {}%".format(round((count/len(image_words))*100), 2), end="\r")
    
    print("")
    # Transforming data using tf-idf:
    transformer = TfidfTransformer(smooth_idf=False)
    weighted_image_histograms = transformer.fit_transform(image_histograms).toarray()
    
    return weighted_image_histograms

# Oxford 5k dataset func

In [None]:
def Oxford5k_Image_Retrieval(images_path, gt_path, pixelorsift="sift", savedata=0, n_clusters=100, k=10, view_option=2):
    if pixelorsift == "pixel":
        # Load in data
        print("Loading Images...")
        test_gray_images, test_colour_images, test_names, test_imgs_path = load_test_data(images_path, gt_path)
        train_gray_images, train_colour_images, train_names = load_train_data(images_path, gt_path, test_imgs_path)
        
        # Compute pixel values
        print("\nComputing Pixel Values...")
        test_pixels, train_pixels = find_pixel_values(test_gray_images, train_gray_images)
        
        # Compute metrics
        print("\nComputing Metics...")
        AP, AR, precisionsatk = image_retrieval_k(train_pixels, test_pixels, train_names, test_names, train_colour_images, test_colour_images, k, view_option, border_size=20)

        # Display mAP
        mAP = np.average(AP)
        print("\nmAP =", mAP)
        
        # Save data
        if savedata == 1:
            save_data_to_csv(precisionsatk, AP, k, "Oxford5k_pixelvalues")
            print("\nData saved to csv")
        
    elif pixelorsift == "sift":
        # Load in data
        print("Loading Images...")
        test_gray_images, test_colour_images, test_names, test_imgs_path = load_test_data(images_path, gt_path)
        train_gray_images, train_colour_images, train_names = load_train_data(images_path, gt_path, test_imgs_path)
        
        # Copmuting bovw
        print("\nComputing test SIFT features...")
        test_kp, test_desc = SIFT(test_gray_images)
        print("\nComputing train SIFT features...")
        train_kp, train_desc = SIFT(train_gray_images)
        stacked_train_desc = stack_descriptors(train_desc)
        
        print("\nClustering Descriptors...")
        cluster_func = cluster(stacked_train_desc, n_clusters)
        
        print("\nComputing test BoVW...")
        test_bovw  = solve_BoW(test_desc, cluster_func, n_clusters)
        print("\nComputing train BoVW...")
        train_bovw = solve_BoW(train_desc, cluster_func, n_clusters)
        
        # Compute metrics
        print("\nComputing Metrics...")
        AP, AR, precisionsatk = image_retrieval_k(train_bovw, test_bovw, train_names, test_names, train_colour_images, test_colour_images, k, view_option, border_size=20)
        
        # Display mAP
        mAP = np.average(AP)
        print("\nmAP =", mAP)
        
        # Save data
        if savedata == 1:
            save_data_to_csv(precisionsatk, AP, k, "Oxford5k_BoVW_{}".format(n_clusters))
            print("\nData saved to csv")

    else:
        print("3rd input must be either: \"pixel\" or \"sift\"")

# MPEG7 dataset func

In [None]:
def MPEG_Image_Retrieval(images_folder, pixelorsift="sift", savedata=0, n_clusters=100, k=10, view_option=2):
    if pixelorsift == "pixel":
        # Load data
        print("Loading data...")
        mpeg_images, filenames = load_MPEG7_data(images_folder, ".gif", False)
        
        # Compute Pixel Values
        print("\nComputing Pixel Values...")
        pixel_values = mpeg_find_pixel_values(mpeg_images)
        
        # Split data
        mpeg_train_pixels, mpeg_test_pixels, mpeg_train_names, mpeg_test_names, mpeg_train_images, mpeg_test_images = train_test_split(pixel_values, filenames, mpeg_images, test_size=0.2, random_state=43)

        # Compute metrics
        print("\nComputing Metics...")
        AP, AR, precisionsatk = image_retrieval_k(mpeg_train_pixels, mpeg_test_pixels, mpeg_train_names, mpeg_test_names, mpeg_train_images, mpeg_test_images, k, view_option, border_size=5)

        # Display mAP
        mAP = np.average(AP)
        print("\nmAP =", mAP)
        
        # Save data
        if savedata == 1:
            save_data_to_csv(precisionsatk, AP, k, "MPEG7_pixelvalues")
            print("\nData saved to csv")
    
    elif pixelorsift == "sift":
        # Load data
        print("Loading data...")
        mpeg_images, filenames = load_MPEG7_data(images_folder, ".gif", False)
        
        # Split data
        mpeg_train, mpeg_test, mpeg_train_names, mpeg_test_names, mpeg_train_images, mpeg_test_images = train_test_split(mpeg_images, filenames, mpeg_images, test_size=0.2, random_state=42) 
        
        # Compute bovw
        print("\nComputing test SIFT features...")
        test_kp, test_desc = SIFT(mpeg_test)
        print("\nComputing train SIFT features...")
        train_kp, train_desc = SIFT(mpeg_train)
        stacked_train_desc = stack_descriptors(train_desc)
        
        print("\n\nCalculating For {} number of words!".format(n_clusters))
        print("\nClustering Descriptors...")
        cluster_func = cluster(stacked_train_desc, n_clusters)

        print("\nComputing test BoVW...")
        test_bovw  = solve_BoW(test_desc, cluster_func, n_clusters)
        print("\nComputing train BoVW...")
        train_bovw = solve_BoW(train_desc, cluster_func, n_clusters)

        # Compute metrics
        print("\nComputing Metics...")
        AP, AR, precisionsatk = image_retrieval_k(train_bovw, test_bovw, mpeg_train_names, mpeg_test_names, mpeg_train_images, mpeg_test_images, k, view_option, border_size=5)

        # Display mAP
        mAP = np.average(AP)
        print("\n\nmAP =", mAP)

        # Save data
        if savedata == 1:
            save_data_to_csv(precisionsatk, AP, k, "MPEG7_BoVW_{}".format(n_clusters))
            print("\nData saved to csv")

    else:
        print("2nd input must be either: \"pixel\" or \"sift\"")

# Computation:

## Variable descriptions:

images_folder -> path to directory with all MPEG7 images

images_path -> path to directory with all oxford building images

gt_path -> path to directory with all ground truth files

pixelorsift -> Choose either "pixel" or "sift", runs that code

savedata -> Saves metrics to csv

n_clusters -> number of words for SIFT

k -> number of returned images (also k images checked in precision at k)

view_option:
 - 0 -> returns only mAP
 - 1 -> returns merged images and metrics (only for database of same sized images) (doesnt work with SIFT)
 - 2 -> returns images and metrics (coloured, any size, labelled)
 - 3 -> returns images only

In [None]:
# MPEG Code:
images_folder = r"C:\Users\Sean\Desktop\University Yr2\project\Image-Retrival (clean)\MPEG7\MPEG7"
pixelorsift = "pixel"
savedata = 0
n_clusters = 1000
k = 10
view_option = 3

MPEG_Image_Retrieval(images_folder, pixelorsift, savedata, n_clusters, k, view_option)

In [None]:
# Oxford Code:
images_path = r"C:\Users\Sean\Desktop\University Yr2\project\Image-Retrival (clean)\SIFT\Oxford building images"
gt_path = r"C:\Users\Sean\Desktop\University Yr2\project\Image-Retrival (clean)\SIFT\Ground Truth files"
pixelorsift = "sift"
savedata = 0
n_clusters = 100
k = 10
view_option = 3

Oxford5k_Image_Retrieval(images_path, gt_path, pixelorsift, savedata, n_clusters, k, view_option)

## Vizualize from own BoVW NPY files:

In [None]:
images_path = r"C:\Users\Sean\Desktop\University Yr2\project\Image-Retrival (clean)\SIFT\Oxford building images"
gt_path = r"C:\Users\Sean\Desktop\University Yr2\project\Image-Retrival (clean)\SIFT\Ground Truth files"

train_bovw = np.load("SIFT/NPY files for BoVW/bovw files for 100000 Words/BoW_Train.npy")
test_bovw = np.load("SIFT/NPY files for BoVW/bovw files for 100000 Words/BoW_Test.npy")

test_gray_images, test_colour_images, test_names, test_imgs_path = load_test_data(images_path, gt_path)
train_gray_images, train_colour_images, train_names = load_train_data(images_path, gt_path, test_imgs_path)

k = 10
view_option = 2

In [None]:
AP, AR, precisionsatk = image_retrieval_k(train_bovw, test_bovw, train_names, test_names, train_colour_images, test_colour_images, k, view_option)