In [None]:
!pip install pyfunctional
!pip install opencv-python==4.5.2.54 

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import cv2 as cv
import matplotlib.pyplot as plt
from scipy.cluster.vq import kmeans, vq
from sklearn.preprocessing import StandardScaler



import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [None]:
from functional import seq

brisk = cv.BRISK_create()
orb = cv.ORB_create()
sift = cv.SIFT_create()
    
def get_descriptor(image_loaded, feature_detector):
    if feature_detector.lower() == 'sift':
        keypoints, descriptor = sift.detectAndCompute(image_loaded, None)
    elif feature_detector.lower() == 'orb':
          keypoints = orb.detect(image_loaded, None)
          keypoints, descriptor = orb.compute(image_loaded, keypoints)
    elif feature_detector.lower() == 'brisk':
        keypoints, descriptor = brisk.detectAndCompute(image_loaded, None)
    else:
        raise Exception("Feature detector " + feature_detector + " not supported")

    return keypoints, descriptor

def find_descriptors(images, labels, feature_detector):
    descriptors = []
    targets = []
    for idx, image in enumerate(images):
        keypoints, descriptor = get_descriptor(image, feature_detector)
        if descriptor is None:
            continue
        kps = seq(keypoints).map(lambda k: k.pt).to_list()
        descriptors.append([np.array(kps), np.array(descriptor).astype(float)])
        targets.append(labels[idx])
    
    return descriptors, targets

In [None]:
from functional import seq
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.cluster import MiniBatchKMeans

class SpatialPyramid(BaseEstimator, TransformerMixin):
    # http://www.micc.unifi.it/bagdanov/pdfs/peronnin_etal_ECCV10.pdf
    def __init__(self, n_levels, n_clusters=800):
        self.n_clusters = n_clusters
        self.n_levels = n_levels
        self.kmeans = None
        np.random.seed(42)

    def fit(self, desc, y=None):
        self.kmeans = MiniBatchKMeans(
            n_clusters=self.n_clusters,
            verbose=False,
            batch_size=self.n_clusters * 3,
            compute_labels=False,
            reassignment_ratio=10 ** -4,
            random_state=42)
    
        descriptors = np.vstack([d[1] for d in desc])
        self.kmeans.fit(descriptors)
        return self

    def transform(self, descriptors):
        n_blocks = seq.range(self.n_levels).map(lambda l: 4 ** l).sum()
        visual_words = np.empty((len(descriptors), n_blocks * self.n_clusters), dtype=np.float32)
        for i, descriptor in enumerate(descriptors):
            words = self.kmeans.predict(descriptor[1])
            j = 0
            for l in range(self.n_levels):
                word_sets = self._descriptor_sets(l, descriptor)
                w = 1 / 2 ** (self.n_levels - l)  # descriptors at finer resolutions are weighted more
                for inds in word_sets:
                    histogram = np.bincount(words[inds], minlength=self.n_clusters)
                    histogram = self._normalize(histogram) * w
                    visual_words[i, j:j + self.n_clusters] = histogram
                    j += self.n_clusters
        return visual_words

    def _normalize(self, x, alpha = 0.5):
        x = np.sign(x) * np.abs(x) ** alpha
        norm = np.linalg.norm(x, ord=2)
        return np.divide(x, norm, out=np.zeros_like(x), where=norm!=0)
    
    @staticmethod
    def _descriptor_sets(level, descriptor):
        block_h = IMG_SIZE / 2 ** level
        block_w = IMG_SIZE / 2 ** level
        word_sets = [[] for _ in range(4 ** level)]
        for idx, kp in enumerate(descriptor[0]):
            i = int(np.floor(kp[1] / block_h))
            j = int(np.floor(kp[0] / block_w))
            word_sets[i * (2 ** level) + j].append(idx)
        return word_sets

In [None]:
IMG_SIZE = 128
print(cv.__version__)

def load_images(images):
    loaded_images = {}
    for image in images:
        image_loaded = cv.imread(os.path.join('../input/imet-2019-fgvc6/train', image + '.png'))
        image_resized = cv.resize(image_loaded, (IMG_SIZE, IMG_SIZE), interpolation=cv.INTER_AREA)
        loaded_images[image] = image_resized
    return loaded_images

In [None]:
from collections import Counter
from imgaug import augmenters as iaa

def augment(image):
  augment_img = iaa.Sequential([
      iaa.SomeOf((0,4),[
          iaa.Crop(percent=(0, 0.25)),
          iaa.LinearContrast((0.8, 1.2)),
          iaa.Multiply((0.9, 1.1), per_channel=0.2),
          iaa.Fliplr(0.5),
      ])], random_order=True)

  image_aug = augment_img.augment_image(image)
  return image_aug

def data_augmentation(df, images, labels, ratio):    
    #### check here some stats ###
    new_images = list(images.values())
    new_labels = labels.tolist()
    frequency = Counter(labels)
    
    max_occurrence = max(frequency.values())
    lower_number = max_occurrence // ratio
    
    # lets try to guarantee at least a ratio of 1: 50 for the classes
    for key, value in frequency.items(): 
        if value < lower_number:
            for i in range(lower_number - value):
                images_with_class = df[df['attribute_ids'] == int(key)] 
                random_image = images_with_class.sample(n = 1)
                image_id = random_image['id'].iloc[0]
                augmented_image = augment(images[image_id])
                new_images.append(augmented_image)
                new_labels.append(key)

    return new_images, new_labels

In [None]:
from sklearn.model_selection import train_test_split

# load dataset
df = pd.read_csv("../input/multiclass/multiclass.csv") 
images = df['id']
labels = df['attribute_ids']

# split dataset 
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.33, random_state=42, stratify=labels)
            
# load and resize images
images_X_train = load_images(X_train)
images_X_test = load_images(X_test)
print('Loaded and resized images')

In [None]:
# data augmentation for low frequency classes
train_df = df.loc[df['id'].isin(X_train)]
images_X_train, y_train = data_augmentation(train_df, images_X_train, y_train, 50)

In [None]:
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.multiclass import OneVsRestClassifier, OneVsOneClassifier

model_params = {
    'svm1': {
        'model': OneVsRestClassifier(svm.SVC(gamma='auto', class_weight='balanced', verbose=True)),
        'params' : {
            'classifier__estimator__C': [1,10],
            'classifier__estimator__kernel': ['rbf','linear'],
            'transformer__n_clusters': [200, 400]
        }  
    },
    'svm2': {
        'model': OneVsOneClassifier(svm.SVC(gamma='auto', class_weight='balanced', verbose=True)),
        'params' : {
            'classifier__estimator__C': [1,10],
            'classifier__estimator__kernel': ['rbf','linear'],
            'transformer__n_clusters': [200, 400]
        }  
    },
    'random_forest': {
        'model': OneVsRestClassifier(RandomForestClassifier(class_weight='balanced', verbose=1)),
        'params' : {
            'classifier__estimator__n_estimators': [1,5],
            'transformer__n_clusters': [200, 400]
        }
    },
    'logistic_regression' : {
        'model': OneVsRestClassifier(LogisticRegression(solver='liblinear',multi_class='auto', class_weight='balanced', verbose=1)),
        'params': {
            'classifier__estimator__C': [1,5],
            'transformer__n_clusters': [200, 400]
        }
    }
}

In [None]:
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import fbeta_score, confusion_matrix, accuracy_score, recall_score, precision_score, f1_score, classification_report
from sklearn.model_selection import GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.pipeline import Pipeline
from joblib import Memory

scores = []
# calculate descriptors for images
X_train, y_train = find_descriptors(images_X_train, y_train, 'brisk')
del images_X_train

images_X_test = list(images_X_test.values())
y_test = y_test.tolist()
X_test, y_test = find_descriptors(images_X_test, y_test, 'brisk')
del images_X_test

def train_and_test(X_train, X_test, y_train, y_test):
    transformer = SpatialPyramid(n_levels=2)
    scaler = StandardScaler(copy=False)
    cv = StratifiedKFold(n_splits=5)
    
    for model_name, mp in model_params.items():
        memory = Memory(location='.cache', verbose=0)
        pipeline = Pipeline(memory=memory, steps=[('transformer', transformer), ('scaler', scaler), ('classifier', mp['model'])])
        clf =  GridSearchCV(pipeline,  mp['params'], n_jobs=2, cv=cv, refit=True, verbose=11, return_train_score=True)

        clf.fit(X_train, y_train)
        y_pred = clf.predict(X_test)

        score = {}
        score['y_test'] = y_test
        score['Best Params'] = clf.best_params_
        score['Confusion Matrix'] = confusion_matrix(y_test, y_pred, labels=np.unique(y_pred))
        score['Classification Report'] = classification_report(y_test, y_pred)
        score['f1_score'] = f1_score(y_test, y_pred, average='micro', labels=np.unique(y_pred))
        scores.append(score)

train_and_test(X_train, X_test, y_train, y_test)

In [None]:
import itertools

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    # print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=90)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

## SVM Classifier OnevsRest

In [None]:
svm_classifier = scores[0]

print('\nBest Params')
print(svm_classifier['Best Params'])

print('\nReport')
print(svm_classifier['Classification Report'])
print('f1_micro: ',svm_classifier['f1_score'])

print('\nConfusion Matrix')
class_names = np.array(svm_classifier['y_test'])
class_names = np.unique(class_names)
cnf_matrix = svm_classifier['Confusion Matrix']

fig = plt.figure()
fig.set_size_inches(14, 12, forward=True)
plot_confusion_matrix(cnf_matrix, classes=class_names, normalize=True,
                      title='Normalized confusion matrix')

# SVM Classifier OnevsOne

In [None]:
svm_classifier = scores[1]

print('\nBest Params')
print(svm_classifier['Best Params'])

print('\nReport')
print(svm_classifier['Classification Report'])
print('f1_micro: ',svm_classifier['f1_score'])



print('\nConfusion Matrix')
class_names = np.array(svm_classifier['y_test'])
class_names = np.unique(class_names)
cnf_matrix = svm_classifier['Confusion Matrix']

fig = plt.figure()
fig.set_size_inches(14, 12, forward=True)
plot_confusion_matrix(cnf_matrix, classes=class_names, normalize=True,
                      title='Normalized confusion matrix')

# Random Forest

In [None]:
rf_classifier = scores[2]

print('\nBest Params')
print(rf_classifier['Best Params'])

print('\nReport')
print(rf_classifier['Classification Report'])
print('f1_micro: ',rf_classifier['f1_score'])

print('\nConfusion Matrix')
class_names = np.array(rf_classifier['y_test'])
class_names = np.unique(class_names)
cnf_matrix = rf_classifier['Confusion Matrix']

fig = plt.figure()
fig.set_size_inches(20, 20, forward=True)
plot_confusion_matrix(cnf_matrix, classes=class_names, normalize=True,
                      title='Normalized confusion matrix')

# Logistic Regression

In [None]:
lr_classifier = scores[3]

print('\nBest Params')
print(lr_classifier['Best Params'])

print('\nReport')
print(lr_classifier['Classification Report'])
print('f1_micro: ',lr_classifier['f1_score'])

print('\nConfusion Matrix')
class_names = np.array(lr_classifier['y_test'])
class_names = np.unique(class_names)
cnf_matrix = lr_classifier['Confusion Matrix']

fig = plt.figure()
fig.set_size_inches(20, , forward=True)
plot_confusion_matrix(cnf_matrix, classes=class_names, normalize=True,
                      title='Normalized confusion matrix')