# Mapping visual embeddings to textual embeddings :

This notebook implements the paper “DeViSE: A Deep Visual-Semantic Embedding Model”
by Fromme et al. (2013).

### Importing the modules :

In [41]:
import os 
import logging
import pandas as pd
import random
import numpy as np
from tqdm import tqdm, tqdm_notebook
import matplotlib.pyplot as plt
import pickle
import itertools
from sklearn.metrics import confusion_matrix, accuracy_score
import seaborn as sns
from typing import List, Tuple, Dict

In [42]:
import torch
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Dense, Dropout, GlobalAveragePooling2D, BatchNormalization, RandomFlip
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model, load_model
from tensorflow.keras import preprocessing
from sklearn.preprocessing import LabelEncoder

In [43]:
class Variables:
    path = '../input/996-imagenet'
    embeddings_path = '../input/996-embeddings/996-embeddings.csv'
    super_class_csv = '../input/super-classes/super_classes.csv'
    general_mapping_model_path = '../input/mod996/resnet50-996-classes.model'
    small_animal_mapper_path = '../input/small-animal/resnet-small-animal.model'

## Building a data generator :
---
This data generator aims to not overpass the usage of the RAM. You have to give the label embeddings so It can encode the labels, the filenames which represent the images's path, the classes_size (in our case is 300 which is the size of the word embeddings) and then the batch and the image size.

In [44]:
class ImageGenerator(tf.keras.utils.Sequence):
    """
    Load and read a dataset (load images and encode labels) 
    
    Attributes
    ----------
    embedding_csv :
        A csv file containing the word embedding of the labels.
    filenames :
        The path of images
    labels :
        The labels of images
    classes_size : 
        The size of the word embeddings
    batch_size 
    image_size
    shuffle :
        Shuffle or not the batch of data (default=True)
    """
    def __init__(self, embeddings_csv : pd.DataFrame, filenames : List[str], labels : List[str], classes_size : int, batch_size : int, image_size=(224, 224), shuffle=True):
        self.embeddings_df = pd.read_csv(embeddings_csv)
        self.image_size, self.batch_size = image_size, batch_size
        self.items, self.items_size = filenames, len(filenames)
        self.labels = labels
        self.classes_size = classes_size
        self.indexes = np.arange(self.items_size)
        self.shuffle = shuffle
        self.on_epoch_end()
    
    def encode_label(self, label : str) -> np.ndarray:
        """
        Returns the word embedding of the label given in the parameter
        """
        return self.embeddings_df[self.embeddings_df['embeddings'].str.lower() == label][:].to_numpy().T[1:].T
    
    def load_urls(self, indexes : np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """
        Load and read the images, and encode the labels
        """
        images = np.zeros((self.batch_size, self.image_size[0], self.image_size[1], 3), dtype=np.float32)
        labels = np.zeros((self.batch_size, self.classes_size), dtype=np.float32)
        
        urls = [self.items[k] for k in indexes]
        lbls = [self.labels[k] for k in indexes]
        
        for idx, img_path in enumerate(urls):
            img = preprocessing.image.load_img(img_path, target_size=self.image_size)
            img_data = preprocessing.image.img_to_array(img)
            img_data = preprocess_input(img_data)
            lbl_data = lbls[idx]
            images[idx, :] = img_data
            labels[idx] = self.encode_label(lbl_data)
        
        return images, labels
    
    def on_epoch_end(self):
        self.indexes = np.arange(self.items_size)
        if self.shuffle:
            np.random.shuffle(self.indexes)
    
    def __len__(self):
        return int(np.floor(self.items_size / self.batch_size))
    
    def __getitem__(self, index):
        indexes = self.indexes[index * self.batch_size:(index+1) * self.batch_size]
        X, y = self.load_urls(indexes)
        return X, y

## Visualization class :
---
This class presents some functions that helps to plot images with their labels in a grid.

In [45]:
class Ploter:
    @classmethod
    def ceildiv(cls, a, b):
        return -(-a // b)

    @classmethod
    def plots_from_files(cls, img_paths, figsize=(10,5), rows=1, titles=None, main_title=None):
        """Plots the images in a grid"""
        f = plt.figure(figsize=figsize)
        if main_title is not None: plt.suptitle(main_title, fontsize=10)
        for i in range(len(img_paths)):
            sp = f.add_subplot(rows, Ploter.ceildiv(len(img_paths), rows), i+1)
            sp.axis('Off')
            if titles is not None: sp.set_title(titles[i], fontsize=16)
            img = plt.imread(img_paths[i])
            plt.imshow(img)

## Dataset builder :
---
This class uses the data generator, It takes the training, validation and testing sets. This class is what we feed to the neural network.

In [46]:
class DataBunch():
    """
    An image data bunch 
    
    Attributes
    ----------
    classes_size : 
        The size of the word embeddings
    train_data, validation_data, test_data :
        The dataset generated by the class ImageGenerator
    """
    def __init__(self, classes_size : int, train_data : ImageGenerator, validation_data=None, test_data=None):
        self.cls_size = classes_size
        self.train_data = train_data
        self.validation_data = validation_data
        self.test_data = test_data
    
    def show_bunch(self, get_title, rows=3, figsize=(7, 6), **kwargs):
        """Show a bunch of images from the dataset"""
        imspaths = np.random.choice(self.train_data.items, 9)
        titles = [get_title(p) for p in imspaths]
        
        Ploter.plots_from_files(imspaths, figsize, rows, titles)
       
    @property
    def classes_size(self):
        return self.cls_size

## Building the mapping model architecture :
---

In [47]:
class Learner():
    """Base learner object"""
    def __init__(self):
        pass
            
    @classmethod
    def freeze(cls, model, limit=None):
        """freeze all layers of the model (from left to right)"""
        # handle negative indices
        if limit != None and limit < -1:
            limit += len(model.layers) 
        # loop for all valid indices and mark the corresponding layer
        for index, layer in enumerate(model.layers):
            if limit != None and index > limit:
                break
            layer.trainable = False

    @classmethod
    def unfreeze(cls, model, limit=None):
        """unfreeze all layers of the model up to the given layer index (from right to left)"""
        # handle negative indices
        if limit != None and limit < -1:
            limit += len(model.layers)
        for index, layer in enumerate(model.layers):
            if limit != None and index < limit:
                continue
            layer.trainable = True

In [48]:
class ZeroShotLearner(Learner):
    """
    Zero shot learner
    
    Attributes
    ----------
    data :
        Data generated by The ImageGenerator class, It can train, validation or test data
    loss :
        The loss function to train the neural networks
    metrics :
        List of metrics to observe the performance of the model while training
    """
    """"""
    def __init__(self, data, loss=tf.keras.losses.CosineSimilarity(axis=1), metrics=['accuracy']):
        self.data = data
        self.model = self.create_model()
        adam = Adam(learning_rate=0.001, epsilon=0.01, decay=0.0001)
        self.model.compile(adam, loss, metrics)
        
    
    def create_model(self):
        base_model = ResNet50(weights='imagenet')
        Learner.freeze(base_model, -3)
        
        x = base_model.layers[-3].output          # shape = (bs=None, 7, 7, 2048)
        x = Dropout(rate=0.3)(x)                  # shape = (bs=None, 7, 7, 2048)
        x = GlobalAveragePooling2D()(x)           # shape = (bs=None, 2048)
        x = Dense(1024, activation='relu')(x)     # shape = (bs=None, 1024)
        x = BatchNormalization()(x)
        y = Dense(self.data.classes_size, activation='linear')(x)
         
        return Model(inputs=base_model.input, outputs=y)
        
    def fit(self, epochs=10):
        history = self.model.fit(self.data.train_data, validation_data=self.data.validation_data, epochs=epochs)
        return history
    
    def predict_on_one_sample(self, image_path : str, solver : Solver) -> List[str]:
        """
        Predict the label of the image given its path and using the solver
        to get the nearest labels
        """
        img = preprocessing.image.load_img(image_path, target_size=(224, 224))
        img_data = preprocessing.image.img_to_array(img)
        img_data = preprocess_input(img_data)
        vec = self.model.predict(img_data[None])
        totest = torch.FloatTensor(vec.reshape(-1))
        return solver.get_nearest_embedding_of(totest)[0][0]
    
    def predict_on_samples(self, image_paths : List[str], solver : Solver) -> List[List[str]]:
        """
        Predict the label of the images given their path and using the solver
        to get the nearest labels
        """
        y_pred = []
        for i in tqdm(range(len(image_paths))):
            img = preprocessing.image.load_img(image_paths[i], target_size=(224, 224))
            img_data = preprocessing.image.img_to_array(img)
            img_data = preprocess_input(img_data)
            vec = self.model.predict(img_data[None])
            totest = torch.FloatTensor(vec.reshape(-1))
            y_pred.append(solver.get_nearest_embedding_of(totest)[0][0])
        return y_pred
    
    def save_model(self, model_name : str):
        self.model.save(model_name+".model", save_format="h5")

## A solver that helps to give predictions

In [49]:
class EmbeddingsLoader:

    def __init__(self, filename : str):

        self.file = filename
        self.embeddings = {}

        self._load_file()

    def _load_file(self):
        try:
            with open(self.file, "r") as f:
                lines = f.readlines()
                
            for line in lines:
                data = line.split(",")
                self.embeddings[data[0]] = torch.FloatTensor(list(map(float, data[1:])))

        except IOError as e:
            raise IOError(f"No file {self.file}")

class SimilarityCompute(EmbeddingsLoader):

    def __init__(self, embeddings):
        super(SimilarityCompute, self).__init__(embeddings)


    def compute_sim(self):
        """ compute cosine similarity between all vectors """
        if len(self.embeddings) == 0:
            raise Exception("Tags not converted yet !")

        logging.info("Computing cosine similarity, this could take some time...")

        if self.cosine_sim_matrix is None:
            n_tokens = len(self.embeddings)
            self.cosine_sim_matrix = [[1 for j in range(n_tokens)] for i in range(n_tokens)]

        for j, vector in tqdm(enumerate(self.embeddings), total = len(self.embeddings)):

            for i, other_vector in enumerate(self.embeddings):

                if i == j:
                    continue

                cos = torch.nn.CosineSimilarity(dim=0)
                similarity = cos(vector[1], other_vector[1])

                self.cosine_sim_matrix[i][j] = similarity
                self.cosine_sim_matrix[j][i] = similarity

    def export_sim_matrix(self, filename):
        if self.cosine_sim_matrix == None:
            self.compute_sim()
        
        try:
            f = open(filename, "w")
        except OSError:
            raise OSError("Could not open file")

        with f:
            print("/", ",".join([tag[0] for tag in self.embeddings]), sep = ",", file = f)

            for j, tag_y in enumerate(self.embeddings):
                print(tag_y[0], ",".join( [str(round(float(self.cosine_sim_matrix[j][i]), 3)) for i in range(len(self.embeddings))]), sep = ",", file = f)

    def sim_between(self, token1, token2):
        index1, v1 = [(i, v[1]) for i, v in enumerate(self.embeddings) if v[0] == token1][0]
        index2, v2 = [(i, v[1]) for i, v in enumerate(self.embeddings) if v[0] == token2][0]

        if self.cosine_sim_matrix is None:
            n_tokens = len(self.embeddings)
            self.cosine_sim_matrix = [[1 for j in range(n_tokens)] for i in range(n_tokens)]

        if self.cosine_sim_matrix[index1][index2] == 0 or self.cosine_sim_matrix[index2][index1]:
            cos = torch.nn.CosineSimilarity(dim=0)
            similarity = cos(v1, v2)

            self.cosine_sim_matrix[index1][index2] = similarity
            self.cosine_sim_matrix[index2][index1] = similarity

        return self.cosine_sim_matrix[index1][index2]

class Solver(EmbeddingsLoader):

    def __init__(self, embeddings, nb_predictions):
        super(Solver, self).__init__(embeddings)
        self.nb = nb_predictions

    def get_nearest_embedding_of(self, embedding):

        if self.nb > len(self.embeddings):
            raise Exception("nb too high, not enough token")

        nearest = []
        for tag, e in self.embeddings.items():

            cos = torch.nn.CosineSimilarity(dim=0)
            similarity = cos(embedding, e)

            nearest.append((tag, similarity))
        
        nearest.sort(key = lambda tup : tup[1])
        return nearest[-1:-self.nb-1:-1]

## Building a super class handler :
---
This class presents different functions that are useful when working with super classes.

In [50]:
class SuperClassHandler:
    def __init__(self, super_class_csv : pd.DataFrame):
        self.super_class_df = pd.read_csv(super_class_csv)
        self.super_classes = self.super_class_df.iloc[:, 0].tolist()
    
    def get_classes(self, super_class : str) -> list:
        """Take a super class as parameter and returns its correspondent classes"""
        try:
            return [i.replace("_", " ").lower() for i in self.super_class_df[self.super_class_df.iloc[:, 0] == super_class].to_numpy().tolist()[0][1:] if not(pd.isnull(i)) == True]
        except:
            print('Failed to find the super class')
        
    def get_super_class(self, class_name : str) -> str:
        """return the super class of the class given in the parameter"""
        for super_class in self.super_classes:
            if class_name.replace("_", " ").lower() in self.get_classes(super_class):
                return super_class
        return None
    
    def get_super_classes(self) -> List[str]:
        """returns a list containing all super classes"""
        return self.super_classes

## Building a super class balancer :
---
This class allows to balance a given dataset and equally distributing data on each class.

In [51]:
class SuperClassBalancer:
    def __init__(self, path : str, super_classes : List[str]):
        self.util = Util(path)
        self.super_classes = super_classes
    
    def balance(self, samples_number : int, labels):
        """Return indexes equaly distributed between each label"""
        self.balanced_indexes = dict()
        for super_class in tqdm(self.super_classes):
            self.balanced_indexes[super_class] = random.Random(0).sample(self.util.get_indexes(super_class, labels), samples_number)
        
        return list(itertools.chain.from_iterable(list(self.balanced_indexes.values())))
            
    def add_value_label(self, x_list, y_list):
        """allows to write text on a given plot"""
        for i in range(len(x_list)):
            plt.text(i, y_list[i], y_list[i], ha="center", fontweight='bold', fontsize="medium")
    
    def plot_distribution(self):
        """Plot a histogram presenting the number of samples on each class"""
        super_class_distribution = dict()
        for super_class in tqdm(self.super_classes):
            super_class_distribution[super_class] = len(self.balanced_indexes[super_class])
        
        plt.figure(figsize=(20,8))
        width = 1.0    
        plt.bar(super_class_distribution.keys(), super_class_distribution.values(), align='center', width=0.5, color='g')
        self.add_value_label(list(super_class_distribution.keys()), list(super_class_distribution.values()))

## Building a super class classifier:
---
This class presents a classifier that classify super classes. It uses the second approach explained in the report

In [52]:
class SuperClassClassifier:
    def __init__(self, super_class_handler):
        self.model = ResNet50(weights='imagenet')
        self.super_class_handler = super_class_handler
    
    def predict(self, image_path):
        img = preprocessing.image.load_img(image_path, target_size=(224, 224))
        img_data = preprocessing.image.img_to_array(img)
        img_data = preprocess_input(img_data)
        prediction = self.model.predict(img_data[None])
        label = tf.keras.applications.imagenet_utils.decode_predictions(prediction) 
        return self.super_class_handler.get_super_class(label[0][0][1])
    
    def predict_on_samples(self, image_paths):
        predictions = []
        for i in tqdm(range(len(image_paths))):
            img = preprocessing.image.load_img(image_paths[i], target_size=(224, 224))
            img_data = preprocessing.image.img_to_array(img)
            img_data = preprocess_input(img_data)
            prediction = self.model.predict(img_data[None])
            label = tf.keras.applications.imagenet_utils.decode_predictions(prediction) 
            predictions.append(self.super_class_handler.get_super_class(label[0][0][1]))
        return predictions

## Building a util class :
---
This class presents different functions that are useful in data manipulation, testing parts...

In [53]:
class Util:
    def __init__(self, path):
        self.path = path
        self.directory_classes = [os.listdir(self.get_path(os.listdir(self.path)[i])) for i in range(len(os.listdir(self.path)))]
          
    def get_path(self, _type : str):
        return '../input/996-imagenet/'+_type+'/data1/'+_type
    
    def get_data(self):
        images_path = []
        labels = []
        absolute_path = os.listdir(path)
        for i in range(len(self.directory_classes)):
            for current_class in tqdm(self.directory_classes[i]):
                pth = os.path.join(get_path(absolute_path[i]), current_class)
                for dirname, _, filenames in os.walk(pth):
                    for file in filenames:
                        images_path.append(os.path.join(dirname, file))
                        labels.append(current_class)
        
        return images_path, current_class
    
    def get_all_classes(self) -> list:
        classes = list(itertools.chain.from_iterable(list(self.directory_classes)))
        return [x.lower() for x in classes]
    
    def get_indexes(self, class_name : str, labels : list) -> list:
        """return indexes of class_name in labels"""
        try:
            return [index for index in range(len(labels)) if labels[index] == class_name]
        except ValueError:
            print("That item does not exist")
    
    def get_label_from_fname(self, fname):
        fname = fname.split('/')

        index = fname.index('data1') + 2
        word = fname[index]
        return word
    
    def get_mean_visual_embeddings(self, model) -> dict:
        """computes the mean of the visual embeddigns of each class in the dataset"""
        features = dict()
        absolute = os.listdir(Variables.path)
        
        for i in range(len(self.directory_classes)):
            for clse in tqdm(self.directory_classes[i]):
                mean_list = []
                pth = os.path.join(utils.get_path(absolute[i]), clse)
                for dirname, _, filenames in os.walk(pth):
                    for file in filenames:
                        image = preprocessing.image.load_img(os.path.join(dirname, file), target_size=(224, 224))
                        image = preprocessing.image.img_to_array(image)
                        image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
                        image = preprocess_input(image)
                        mean_list.append(model.predict(image, verbose=0))

                features[clse] = np.mean(mean_list, axis=0) 
        
        return features
                                               
    def get_mean_textual_embeddings(self, model) -> dict:
        """computes the mean of the textual embeddigns of each class in the dataset"""
        features = dict()
        absolute = os.listdir(Variables.path)

        for i in range(len(self.directory_classes)):
            for clse in tqdm(self.directory_classes[i]):
                tensor_list = []
                pth = os.path.join(get_path(absolute[i]), clse)
                for dirname, _, filenames in os.walk(pth):
                    for file in filenames:
                        img = preprocessing.image.load_img(os.path.join(dirname, file), target_size=(224, 224))
                        img_data = preprocessing.image.img_to_array(img)
                        img_data = preprocess_input(img_data)
                        vec = model.predict(img_data[None])
                        totest = torch.FloatTensor(vec.reshape(-1))
                        tensor_list.append(totest)
                features[clse] = torch.mean(torch.stack(tensor_list), dim=0)

        return features
    
    def get_visual_embeddings(self, img_path):
        model = ResNet50(weights="imagenet", include_top=True)
        model = Model(inputs=model.inputs, outputs=model.layers[-2].output)
        
        image = preprocessing.image.load_img(img_path, target_size=(224, 224))
        image = preprocessing.image.img_to_array(image)
        image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
        image = preprocess_input(image)

        return model.predict(image, verbose=0)

In [54]:
with open('../input/utils996/img_paths.pkl', 'rb') as f:
    img_paths = pickle.load(f)

with open('../input/true-labels-super/true_labels_super.pkl', 'rb') as f:    
    true_labels_super = pickle.load(f)
    
with open('../input/utils996/true_labels.pkl', 'rb') as f:    
    true_labels = pickle.load(f)

## Initializing the super class handler :

In [55]:
utils = Util(Variables.path)
sc_handler = SuperClassHandler('../input/super-classes/super_classes.csv')
sc_balancer = SuperClassBalancer(Variables.path, sc_handler.get_super_classes())
sc_classifier = SuperClassClassifier(sc_handler)

## Training the mapping model on different super classes :
---
We can choose to train the full ImageNet dataset or just data from a specific super class as It is shown below

##### Training the mapping model on the *outdoor* super class :

In [56]:
indexes = utils.get_indexes('outdoor', true_labels_super)
indexes = random.Random(0).sample(indexes, 20000)

outdoor_img_paths = [img_paths[i] for i in indexes]
outdoor_true_labels = [true_labels[i] for i in indexes]

batch_size = 64
class_size = 300

train_data = ImageGenerator(Variables.embeddings_path, outdoor_img_paths, outdoor_true_labels, class_size, batch_size)
print('Training set has %d batches of size %d' % (len(train_gen), batch_size))

In [57]:
data = DataBunch(class_size, train_data)

In [58]:
data.show_bunch(utils.get_label_from_fname)

### Train the model:

In [None]:
learner = ZeroShotLearner(data)
history = learner.fit()

### Saving the model:

In [None]:
learner.save_model('outdoor_model')

## Testing the general mapping model :
    * Testing set : 20 000 images of shape (224, 224, 3)

In [64]:
X = random.Random(0).sample(img_paths, 20000)
y = random.Random(0).sample(true_labels, 20000)

In [15]:
mapping_model = load_model('../input/mod996/resnet50-996-classes.model')
map_solver = Solver('../input/996-embeddings/996-embeddings.csv', 10)

In [68]:
predictions = []
for i in tqdm(range(len(X))):
    img = preprocessing.image.load_img(X[i], target_size=(224, 224))
    img_data = preprocessing.image.img_to_array(img)
    img_data = preprocess_input(img_data)

    vec = mapping_model.predict(img_data[None])
    totest = torch.FloatTensor(vec.reshape(-1))

    predictions.append([i[0].lower() for i in map_solver.get_nearest_embedding_of(totest)])

In [69]:
def top_k_accuracy(y_true, y_pred, k) -> float:
    n = len(y_true)
    true_pred = 0
    for i in range(n):
        if y_true[i] in y_pred[i][:k]:
            true_pred += 1
    return true_pred/n

In [70]:
accuracy = []
for i in range(1, 11):
    accuracy.append(top_k_accuracy(y, predictions, i) * 100)
    print(f"top-{i} : {top_k_accuracy(y, predictions, i)}")

x = np.arange(10)
plt.ylabel("Accuracy")
plt.xlabel("top k")
plt.plot(x, accuracy, 'o')

## Testing the super class classifiers :
---

We start by picking 400 images from each super class. We can do this thanks to the super class balancer.

In [23]:
indexes = sc_balancer.balance(400, true_labels_super)

In [24]:
sc_balancer.plot_distribution()

In [25]:
X_test = [img_paths[i] for i in indexes]
y_test = [true_labels_super[i] for i in indexes]

#### Shuffling the dataset :

In [26]:
random.Random(0).shuffle(X_test)
random.Random(0).shuffle(y_test)

### Testing the first classifier : (the second approach presented in the report)

In [47]:
y_pred = sc_classifier.predict_on_samples(X_test)

In [30]:
encoder = LabelEncoder()
y_test_enc = encoder.fit_transform(y_test)

In [48]:
y_pred_enc = encoder.transform(y_pred)

### Accuracy score :

In [49]:
print(accuracy_score(y_test_enc, y_pred_enc))

### Ploting the confusion matrix :

In [50]:
M = confusion_matrix(y_test_enc, y_pred_enc)
cmn = M.astype('float') / M.sum(axis=1)[:, np.newaxis]

plt.figure(figsize = (16, 7))

confusion_matrix_plot = sns.heatmap(cmn, cmap='RdYlGn', fmt='.1%', annot=True, xticklabels=encoder.classes_, yticklabels=encoder.classes_)

### Testing the second classifier : (the first approach presented in the report)

In [27]:
classifier = load_model('../input/super-class-classifier/resnet-11-super-classes.model')

In [28]:
classif_solver = Solver('../input/embeddings-super-class/super_emb.csv', 1)

In [29]:
predictions = []
for i in tqdm(range(len(X_test))):
    img = preprocessing.image.load_img(X_test[i], target_size=(224, 224))
    img_data = preprocessing.image.img_to_array(img)
    img_data = preprocess_input(img_data)

    vec = classifier.predict(img_data[None])
    totest = torch.FloatTensor(vec.reshape(-1))

    pred = classif_solver.get_nearest_embedding_of(totest)[0][0]
    predictions.append(pred)

In [31]:
pred_enc = encoder.transform(predictions)

### Accuracy score :

In [32]:
print(accuracy_score(y_test_enc, pred_enc))

### Confusion matrix :

In [46]:
M = confusion_matrix(y_test_enc, pred_enc)
cmn = M.astype('float') / M.sum(axis=1)[:, np.newaxis]

plt.figure(figsize = (16,7))

confusion_matrix_plot = sns.heatmap(cmn, cmap='RdYlGn', fmt='.1%', annot=True, xticklabels=encoder.classes_, yticklabels=encoder.classes_)

## Testing the super class pipeline :

In [16]:
def super_class_test(super_class, classifier, sc_embeddings, mapping_model_path):
    mapping_model = load_model(mapping_model_path)
    indexes = utils.get_indexes(super_class, true_labels_super)

    X = [img_paths[i] for i in indexes]
    y = [true_labels[i] for i in indexes]

    X_test = random.Random(0).sample(X, 1000)
    y_test = random.Random(0).sample(y, 1000)
    
    predictions = []

    for i in tqdm(range(len(X_test))):
        classification_pred = classifier.predict(X_test[i])
        if classification_pred == super_class:
            img = preprocessing.image.load_img(X_test[i], target_size=(224, 224))
            img_data = preprocessing.image.img_to_array(img)
            img_data = preprocess_input(img_data)

            vec = mapping_model.predict(img_data[None])
            totest = torch.FloatTensor(vec.reshape(-1))

            map_pred = [i[0].lower() for i in map_solver.get_nearest_embedding_of(totest)]
            predictions.append(y_test[i] in map_pred)
        
    print("Classification accuracy : ", len(predictions)/1000)
    print("Mapping model top-10 accuracy :", sum(predictions)/1000)
    
    return predictions

In [None]:
predictions = super_class_test('small animal', sc_classifier, '../input/different-super-class-embeddings/small_animal_emb.csv', '../input/small-animal/resnet-small-animal.model')

## Hybrid method for classification :
---
#### Combining the two classifiers : 

In [60]:
def hybrid_method_test(super_class, classifiers : list):
    classif_solver = Solver('../input/embeddings-super-class/super_emb.csv', 1)
    indexes = utils.get_indexes(super_class, true_labels_super)

    X = [img_paths[i] for i in indexes]
    y = [true_labels[i] for i in indexes]

    X_test = random.Random(0).sample(X, 1000)
    y_test = random.Random(0).sample(y, 1000)
    
    classifiers[1] = load_model(classifiers[1])
    
    predictions = []
    correct_agreement = 0
    wrong_agreement = 0

    for i in tqdm(range(len(X_test))):
        img = preprocessing.image.load_img(X_test[i], target_size=(224, 224))
        img_data = preprocessing.image.img_to_array(img)
        img_data = preprocess_input(img_data)
        
        vec = classifiers[1].predict(img_data[None])
        
        classifier1_pred = classifiers[0].predict(X_test[i])
        classifier2_pred = classif_solver.get_nearest_embedding_of(torch.FloatTensor(vec.reshape(-1)))[0][0]
        
        if classifier1_pred == classifier2_pred  and classifier2_pred == super_class:
            correct_agreement += 1
        elif classifier1_pred == classifier2_pred  and classifier2_pred != super_class:
            wrong_agreement += 1
    
    return correct_agreement, wrong_agreement

In [61]:
correct_agreement, wrong_agreement = hybrid_method_test('objects', [sc_classifier, '../input/super-class-classifier/resnet-11-super-classes.model'])

In [62]:
print("------Objects Super Class------")
print(f"Correct agreement : {correct_agreement}")
print(f"Wrong agreement : {wrong_agreement}")