# Kernel Methods Challenge

By : Olivier Chance, Sophia Lazraq, Peter Martigny 


Filiation : M2 Data Sciences , Université Paris-Saclay, Ecole Polytechnique

In [41]:
from time import time
import numpy as np
import pandas as pd
import cvxopt
import sys

In [42]:
class Helper:
    
    @staticmethod
    def log_process(title, cursor, finish_cursor, start_time = None):
        percentage = float(cursor + 1)/finish_cursor
        now_time = time()
        time_to_finish = ((now_time - start_time)/percentage) - (now_time - start_time)
        mn, sc = int(time_to_finish//60), int((time_to_finish/60 - time_to_finish//60)*60)
        if start_time:
            sys.stdout.write("\r%s - %.2f%% ----- Temps restant estimé: %d min %d sec -----" %(title, 100*percentage, mn, sc))
            sys.stdout.flush()
        else:
            sys.stdout.write("\r%s - \r%.2f%%" %(title, 100*percentage))
            sys.stdout.flush()


## Feature Extractor : SIFT

In [43]:
class SIFT:

    ###################
    #    CONSTUCTOR   #
    ###################

    def __init__(self, gs = 8, ps = 16, gaussian_thres = 1.0, gaussian_sigma = 0.8, sift_thres = 0.2, \
                 num_angles = 12, num_bins = 5, alpha = 9.0):
        self.num_angles = num_angles
        self.num_bins = num_bins
        self.alpha = alpha
        self.angle_list = np.array(range(num_angles))*2.0*np.pi/num_angles
        self.gs = gs
        self.ps = ps
        self.gaussian_thres = gaussian_thres
        self.gaussian_sigma = gaussian_sigma
        self.sift_thres = sift_thres
        self.weights = self._get_weights(num_bins)

    ###################
    # PUBLIC  METHODS #
    ###################

    def get_params_image(self, image):
        image = image.astype(np.double)
        if image.ndim == 3:
            image = np.mean(image, axis=2)
        H, W = image.shape
        gS = self.gs
        pS = self.ps
        remH = np.mod(H-pS, gS)
        remW = np.mod(W-pS, gS)
        offsetH = remH/2
        offsetW = remW/2
        gridH, gridW = np.meshgrid(range(offsetH, H-pS+1, gS), range(offsetW, W-pS+1, gS))
        gridH = gridH.flatten()
        gridW = gridW.flatten()
        features = self._calculate_sift_grid(image, gridH, gridW)
        features = self._normalize_sift(features)
        positions = np.vstack((gridH / np.double(H), gridW / np.double(W)))
        return features, positions
    
    def get_X(self, data):
        out = []
        start = time()
        finish = len(data)
        for idx, dt in enumerate(data):
            Helper.log_process('SIFT', idx, finish_cursor=finish, start_time = start)
            out.append(self.get_params_image(np.mean(np.double(dt), axis=2))[0][0])
        return np.array(out)

    
    ###################
    # PRIVATE METHODS #
    ###################

    def _get_weights(self, num_bins):
        size_unit = np.array(range(self.ps))
        sph, spw = np.meshgrid(size_unit, size_unit)
        sph.resize(sph.size)
        spw.resize(spw.size)
        bincenter = np.array(range(1, num_bins*2, 2)) / 2.0 / num_bins * self.ps - 0.5
        bincenter_h, bincenter_w = np.meshgrid(bincenter, bincenter)
        bincenter_h.resize((bincenter_h.size, 1))
        bincenter_w.resize((bincenter_w.size, 1))
        dist_ph = abs(sph - bincenter_h)
        dist_pw = abs(spw - bincenter_w)
        weights_h = dist_ph / (self.ps / np.double(num_bins))
        weights_w = dist_pw / (self.ps / np.double(num_bins))
        weights_h = (1-weights_h) * (weights_h <= 1)
        weights_w = (1-weights_w) * (weights_w <= 1)
        return weights_h * weights_w

    def _calculate_sift_grid(self, image, gridH, gridW):
        H, W = image.shape
        Npatches = gridH.size
        features = np.zeros((Npatches, self.num_bins * self.num_bins * self.num_angles))
        gaussian_height, gaussian_width = self._get_gauss_filter(self.gaussian_sigma)
        IH = self._convolution2D(image, gaussian_height)
        IW = self._convolution2D(image, gaussian_width)
        Imag = np.sqrt(IH**2 + IW**2)
        Itheta = np.arctan2(IH,IW)
        Iorient = np.zeros((self.num_angles, H, W))
        for i in range(self.num_angles):
            Iorient[i] = Imag * np.maximum(np.cos(Itheta - self.angle_list[i])**self.alpha, 0)
        for i in range(Npatches):
            currFeature = np.zeros((self.num_angles, self.num_bins**2))
            for j in range(self.num_angles):
                currFeature[j] = np.dot(self.weights,\
                        Iorient[j,gridH[i]:gridH[i]+self.ps, gridW[i]:gridW[i]+self.ps].flatten())
            features[i] = currFeature.flatten()
        return features

    def _normalize_sift(self, features):
        siftlen = np.sqrt(np.sum(features**2, axis=1))
        hcontrast = (siftlen >= self.gaussian_thres)
        siftlen[siftlen < self.gaussian_thres] = self.gaussian_thres
        features /= siftlen.reshape((siftlen.size, 1))
        features[features>self.sift_thres] = self.sift_thres
        features[hcontrast] /= np.sqrt(np.sum(features[hcontrast]**2, axis=1)).\
                reshape((features[hcontrast].shape[0], 1))
        return features


    def _get_gauss_filter(self, sigma):
        gaussian_filter_amp = np.int(2*np.ceil(sigma))
        gaussian_filter = np.array(range(-gaussian_filter_amp, gaussian_filter_amp+1))**2
        gaussian_filter = gaussian_filter[:, np.newaxis] + gaussian_filter
        gaussian_filter = np.exp(- gaussian_filter / (2.0 * sigma**2))
        gaussian_filter /= np.sum(gaussian_filter)
        gaussian_height, gaussian_width = np.gradient(gaussian_filter)
        gaussian_height *= 2.0/np.sum(np.abs(gaussian_height))
        gaussian_width  *= 2.0/np.sum(np.abs(gaussian_width))
        return gaussian_height, gaussian_width
    
    def _convolution2D(self, image, kernel):
        imRows, imCols = image.shape
        kRows, kCols = kernel.shape

        y = np.zeros((imRows,imCols))

        kcenterX = kCols//2
        kcenterY = kRows//2

        for i in range(imRows):
            for j in range(imCols):
                for m in range(kRows):
                    mm = kRows - 1 - m
                    for n in range(kCols):
                        nn = kCols - 1 - n

                        ii = i + (m - kcenterY)
                        jj = j + (n - kcenterX)

                        if ii >= 0 and ii < imRows and jj >= 0 and jj < imCols :
                            y[i][j] += image[ii][jj] * kernel[mm][nn]

        return y

## Selected Kernel : Chi-square Kernel

In [56]:
class Kernel:
    
    ########################
    # Only static  methods #
    ########################
    
    @staticmethod
    def linear():
        def f(X1, X2):
            return X1.dot(X2.T)
        return f
    
    @staticmethod
    def chi2(gamma):
        def f(X1, X2):
            out = np.zeros((X1.shape[0], X2.shape[0]))
            n_X1 = X1.shape[0]
            n_X2 = X2.shape[0]
            n_features = X1.shape[1]

            for i in range(n_X1):
                for j in range(n_X2):
                    p = 0
                    for k in range(n_features):
                        denominateur = (X1[i, k] - X2[j, k])
                        nominateur = (X1[i, k] + X2[j, k])
                        if nominateur != 0:
                            p += denominateur * denominateur / nominateur
                    out[i, j] = -p
            tmp = gamma * out
            return  np.exp(tmp, tmp)        
        return f

## Selected Classifier : SVM multi-calss using QP solver

In [57]:
class SVC:
    
    ###############
    # Constructor #
    ###############
    
    def __init__(self, C=1.0, kernel='linear', gamma=.1, degree=2):
        assert kernel in ['linear', 'chi2'], 'Kernel has to be linear or chi2'
        self.C = C
        self.degree = degree
        self.gamma = gamma
        self.kernel = self._get_kernel(kernel, gamma=gamma, degree=degree)
    
    ##################
    # Public methods #
    ##################
    
    def fit(self, X, y):
        self._X, self._y = X, y
        
        self.labels = np.unique(y)
        self.n_labels = len(self.labels)
        self._K = self.kernel(X, X)
        
        # OneVsAll
        models = {}
        start = time()
        finish = len(self.labels)
        for idx, label in enumerate(self.labels[:3]):
            models[label] = {}
            y_label = np.array([1. if e == label else -1. for e in y])
            w, b, mu_support, idx_support = self._fit_binary(X, y_label)
            
            models[label]['y'] = y_label
            models[label]['w'] = w
            models[label]['b'] = b  
            models[label]['mu_support'] = mu_support
            models[label]['idx_support'] = idx_support
            
            Helper.log_process('SVC Fitting', idx, finish_cursor=finish, start_time = start)
            
        self.models = models
            
    def get_params(self):
        return {
            'X': self._X,
            'y': self._y,
            'K': self._K,
            'n_labels': self.n_labels,
            'labels': self.labels,
            'models': self.models,
            'C': self.C,
            'degree': self.degree,
            'gamma': self.gamma,
            'training_score': self.score(self._X, self._y)
        }
    
    def set_params(self, params):
        self._X = params['X']
        self._y = params['y']
        self._K = params['K']
        self.n_labels = params['n_labels']
        self.labels = params['labels']
        self.models = params['models']
        self.C = params['C']
        self.degree = params['degree']
        self.gamma = params['gamma']
    
    def predict(self, X):
        return self.labels[np.argmax(np.array([self._predict(X, self.models[label]['y'], self.models[label]['idx_support'], self.models[label]['mu_support'], self.models[label]['b']) for label in self.labels]), axis=0)]
        
    def score(self, X, y):
        return np.mean(self.predict(X) == y)
    
    ###################
    # Private methods #
    ###################
        
    def _qp(self, H, e, A, b, C=np.inf, l=1e-8, verbose=True):
        # Gram matrix
        n = H.shape[0]
        H = cvxopt.matrix(H)
        A = cvxopt.matrix(A, (1, n))
        e = cvxopt.matrix(-e)
        b = cvxopt.matrix(0.0)
        if C == np.inf:
            G = cvxopt.matrix(np.diag(np.ones(n) * -1))
            h = cvxopt.matrix(np.zeros(n))
        else:
            G = cvxopt.matrix(np.concatenate([np.diag(np.ones(n) * -1),
                                             np.diag(np.ones(n))], axis=0))
            h = cvxopt.matrix(np.concatenate([np.zeros(n), C * np.ones(n)]))

        # Solve QP problem
        cvxopt.solvers.options['show_progress'] = verbose
        solution = cvxopt.solvers.qp(H, e, G, h, A, b)

        # Lagrange multipliers
        mu = np.ravel(solution['x'])
        return mu
    
    def _predict(self, X, y_model, idx_support, mu_support, b):
        X_support = self._X[idx_support]
        G = self.kernel(X, X_support)
        return G.dot(mu_support * y_model[idx_support]) + b
    
    def _fit_binary(self, X, y):
        mu_support, idx_support = self._svm_solver_non_sep(self._K, y, self.C)
        w = self._get_w(mu_support, idx_support, X, y)
        b = self._compute_b(self._K, y, mu_support, idx_support)
        return w, b, mu_support, idx_support
    
    def _svm_solver_non_sep(self, K, y, C):
        n = y.shape[0]
        y = y.reshape((n, 1))
        H = np.dot(y, y.T)*K
        e = np.ones(n)
        A = y
        b = np.zeros(n)
        mu = self._qp(H, e, A, b, C, l=1e-8, verbose=False)
        idx_support = np.where(np.abs(mu) > 1e-5)[0]
        mu_support = mu[idx_support]
        return mu_support, idx_support
    
    def _get_w(self, mu_support, idx_support, X, y):
        return np.sum((mu_support * y[idx_support])[: , None] * X[idx_support], axis=0)
    
    def _get_kernel(self, kernel, gamma, degree):
        return {
            'linear': Kernel.linear(),
            'chi2': Kernel.chi2(gamma)
        }[kernel]

    def _compute_b(self, K, y, mu_support, idx_support):
        num_support_vector = idx_support.size
        y_support = y[idx_support]
        K_support = K[idx_support][:, idx_support]
        b = [y_support[j] - sum([mu_support[i]*y_support[i]*K_support[i][j] for i in range(num_support_vector)]) for j in range(num_support_vector)]
        return np.mean(b)

In [46]:
class ImageTransformation:
    
    ########################
    # Only static  methods #
    ########################

    @staticmethod
    def flip_image_horizontal(image):
        # Takes an image as input and outputs the same image with a horizontal flip
        result = image.copy()
        for channel in range(3):
            aux = image[:, :, channel]
            for column in range(len(aux)):
                result[:, column, channel] = aux[:, len(aux) - column - 1]
        return result

## Import datasets

In [47]:
X_df = pd.read_csv('./Xtr.csv', header=None)
y_df = pd.read_csv('./Ytr.csv')
X_df = X_df.loc[:,:3071]

X_test = pd.read_csv('./Xte.csv', header=None)
X_test = X_test.loc[:,:3071]

X = X_df.values
y = y_df.Prediction

X_test = X_test.values

## Data preprocessing

In [48]:
# Colored images
red, green, blue = np.hsplit(X, 3)
data = np.array([np.dstack((red[i], blue[i], green[i])).reshape(32, 32, 3) for i in range(len(X))])

red, green, blue = np.hsplit(X_test, 3)
data_test = np.array([np.dstack((red[i], blue[i], green[i])).reshape(32, 32, 3) for i in range(len(X_test))])

# Gray images - Just for visualisation purpose (train set)
X_black_and_white = np.sum(np.hsplit(X, 3), axis=0)/3
data_gray = np.array([X_black_and_white[i].reshape((32, 32)) for i in range(len(X))])

## Data augmentation

In [49]:
# Flipping image from the train set
start = time()
finish = len(data)

augmented_train = []

for row in range(0, finish):
    if row % 50 == 0 or row == finish-1:
        Helper.log_process('Flipping image...', row, finish_cursor=finish, start_time = start)
    augmented_train.append(data[row])
    augmented_train.append(ImageTransformation.flip_image_horizontal(data[row]))

augmented_train=np.array(augmented_train)
    
# Compute augmented labels
start = time()
augmented_labels = []
for row in range(len(data)):
    aux = y[row]
    augmented_labels.append(aux)
    augmented_labels.append(aux)
    
augmented_labels = np.array(augmented_labels)

Flipping image... - 100.00% ----- Temps restant estimé: 0 min 0 sec -----

## Set best parameters

In [50]:
params = { 'gs': 6,
           'chi2_gamma': .6,
           'C': 10.,
           'ps': 31,
           'sift_thres': .3,
           'gaussian_thres': .7,
           'gaussian_sigma': .4,
           'num_angles': 12,
           'num_bins': 5,
           'alpha': 9.0 }

## Build the SIFT Extractor

In [51]:
extractor = SIFT(gs=params['gs'], 
                 ps=params['ps'], 
                 sift_thres=params['sift_thres'], 
                 gaussian_sigma=params['gaussian_sigma'], 
                 gaussian_thres=params['gaussian_thres'],
                 num_angles=params['num_angles'],
                 num_bins=params['num_bins'],
                 alpha=params['alpha'])

## Train the model

In [None]:
with_augmented_data = False ## False to be faster...

In [53]:
if with_augmented_data:
    X_train = extractor.get_X(augmented_train)
else:
    X_train = extractor.get_X(data)

SIFT - 100.00% ----- Temps restant estimé: 0 min 0 sec -----

In [58]:
clf = SVC(kernel='chi2', C=params['C'], gamma=params['chi2_gamma'])

In [None]:
if with_augmented_data:
    clf.fit(X_train, augmented_labels)
else:
    clf.fit(X_train, y)

## Prediction on the test set

In [104]:
X_test = extractor.get_X(data_test)
pred = clf.predict(X_test)
predictions = pd.DataFrame(pred, columns=['Prediction'])
predictions['id'] = np.arange(1, len(predictions)+1)
predictions.to_csv('submission.csv', sep=',', index=False, encoding='utf-8', columns=['id', 'Prediction'])

SIFT - 100.00% ----- Temps restant estimé: 0 min 0 sec -----