In [1]:
import numpy as np
import scipy as sp
from scipy import optimize
import pandas as pd
import zipfile
import matplotlib.pyplot as plt
from tqdm import tqdm
import cv2 # this is for image feature extraction SIFT and HoG

#### pre-processing :

In [2]:
def to_image(img): # takes an flattened 3072 image and transforms it into 3x32x32 RGB image
    X = np.moveaxis(img.reshape(3, 32, 32), 0, -1)
    return (X - np.min(X)) /(np.max(X) - np.min(X))

def to_gray(img): # takes a RGB image and transforms it into grayscale
    X = np.moveaxis(img.reshape(3, 32, 32), 0, -1)
    gray = 0.299 * X[:,:,0] + 0.587 * X[:,:,1] + 0.114* X[:,:,2]
    return (gray - np.min(gray)) /(np.max(gray) - np.min(gray)) # we normalize them

def visualize(imgs, nb_cols = 10): # code to visualize the images
    N = imgs.shape[0]
    n = N//nb_cols
    fig, axes = plt.subplots(n, nb_cols, figsize=(nb_cols * 1.5, n * 1.5))
    for i in range(n):
        for j in range(nb_cols):
            axes[i, j].imshow(to_image(imgs[i*nb_cols+j, :]))
            axes[i, j].axis('off')
    plt.show()

def flip(x): # x is a flattened version, flip returns the flipped image (the symmetrical along the x_axis)
    # the flipped image should have the same label, we use this for data augmentation
    img = np.moveaxis(x.reshape(3,32,32),0,-1)
    img_flipped = np.zeros(img.shape)
    for i in range(3) :
        img_flipped[:,:,i] = img[:,::-1,i]

    return np.ravel(np.moveaxis(img_flipped, -1, 0)) # we return the flattened version

def augment_data(imgs, y): # we augment a dataset if images with a set of the flipped images
    return np.concatenate([imgs, np.apply_along_axis(flip, axis=1, arr=imgs)]), np.concatenate([y, y])

In [12]:
class StandardScaler: # implementation of StandardScaler
    def __init__(self):
        self.mean_ = None
        self.scale_ = None
    def fit_transform(self, X):
        self.mean_ = np.mean(X, axis=0)
        self.scale_ = np.std(X, axis=0)
        self.scale_[self.scale_ == 0] = 1
        return (X - self.mean_) / self.scale_
    def transform(self, X):
        return (X - self.mean_) / self.scale_

#### kernels :

In [4]:
class RBF:
    def __init__(self, sigma=1.):
        self.sigma = sigma  ## the variance of the kernel
        self.name = 'RBF'
    def kernel(self,X,Y):
        squared_norm = np.expand_dims(np.sum(X**2,axis=1),axis=1) + np.expand_dims(np.sum(Y**2,axis=1),axis=0)-2*np.einsum('ni,mi->nm',X,Y)
        return np.exp(-0.5*squared_norm/self.sigma**2)
    
class RBF_batches: # the RBF kernel but in batches to help reduce the space (memory) complexity
    def __init__(self, sigma=1., batch_size=10):
        self.sigma = sigma  ## the variance of the kernel
        self.batch_size = batch_size
    def kernel(self, X, Y):
        ## Input vectors X and Y of shape Nxd and Mxd
        N = X.shape[0]
        M = Y.shape[0]
        res = np.zeros((N, M))
        X_nbbatches = N//self.batch_size
        Y_nbbatches = M//self.batch_size
        for i in range(X_nbbatches):
            for j in range(Y_nbbatches):
                X_batch = X[i*self.batch_size:(i+1)*self.batch_size, :]
                Y_batch = Y[j*self.batch_size:(j+1)*self.batch_size, :]
                squared_norm = np.expand_dims(np.sum(X_batch**2,axis=1),axis=1) + np.expand_dims(np.sum(Y_batch**2,axis=1),axis=0)-2*np.einsum('ni,mi->nm',X_batch,Y_batch)
                res[i*self.batch_size:(i+1)*self.batch_size, j*self.batch_size:(j+1)*self.batch_size] = np.exp(- 0.5 * squared_norm / (self.sigma**2))
        return res

class Polynomial: # the polynomial kernel includes the linear kernel for d=1
    def __init__(self, d=1.):
        self.d = d ## the variance of the kernel
        self.type = 'poly'

    def kernel(self, X, Y):
        ## Input vectors X and Y of shape Nxd and Mxd
        return (X @ Y.T)**self.d

#### kernel methods :

In [5]:
class KernelPCA: # kernel pca will be used to reduce the dimension of the dataset
    def __init__(self,kernel, r=2):
        self.kernel = kernel          # <---
        self.alpha = None # Matrix of shape N times d representing the d eingenvectors alpha corresp
        self.lmbda = None # Vector of size d representing the top d eingenvalues
        self.support = None # Data points where the features are evaluated
        self.r =r ## Number of principal components
    def compute_PCA(self, X):
        # assigns the vectors
        N = X.shape[0]
        K = self.kernel(X, X)
        J = np.ones((N, N))
        I = np.eye(N)

        G = (I - J/N) @ K @ (I - J/N)
        self.support = X
        eigen_values, eigen_vectors = np.linalg.eig(G)
        idx = np.argsort(eigen_values)
        self.lmbda = eigen_values[idx][-self.r:].real
        self.alpha = eigen_vectors[:, idx][:, -self.r:].real / np.sqrt(N * self.lmbda).reshape(1, self.r)

    def transform(self,x):
        # Input : matrix x of shape N data points times d dimension
        # Output: vector of size N
        N = self.support.shape[0]
        J = np.ones((N, N))
        I = np.eye(N)
        k = self.kernel(x, self.support)
        g = k @ (I - J/N)
        return g @ self.alpha
    
class KLR: # kernel binary logistic regression
    def __init__(self, lbda, max_iters=100):
        self.lbda = lbda
        self.max_iters = max_iters
        self.weights = None
    def fit(self, X, y, K):
        N = len(y)
        def loss(w):
            logistic = np.mean(np.log(1 + np.exp(-y*(K@w)))) + 0.5 * self.lbda * np.dot(w, K @ w)
            return logistic
        def grad_loss(w):
            P = - 1 / (1 + np.exp(y*(K@w)))
            return (1/N) * K @ (P*y) + self.lbda * (K@w)
        res = optimize.minimize(loss, x0=np.ones(N), jac=grad_loss, method='L-BFGS-B', options={'maxiter': self.max_iters})
        self.weights = res.x
    def predict(self, x, K):
        z = 1 / (1 + np.exp(-(K @ self.weights)))
        return np.where(z>0.5, 1, -1)

class KernelSVC: # simple binary classification SVC

    def __init__(self, C, kernel, epsilon = 1e-3):
        self.type = 'non-linear'
        self.C = C
        self.alpha = None
        self.support = None
        self.epsilon = epsilon
        self.kernel = kernel
        self.X_train = None
        self.to_fit = True


    def fit(self, X, y):
        #### You might define here any variable needed for the rest of the code
        N = len(y)
        # Lagrange dual problem
        K = self.kernel(X, X)
        G = np.dot(np.diag(y), np.dot(K, np.diag(y)))
        def loss(alpha):
            return - np.sum(alpha) + 0.5 * np.dot(alpha, np.dot(G, alpha))

        # Partial derivate of Ld on alpha
        def grad_loss(alpha):
            return - np.ones(N) + np.dot(G, alpha)


        # Constraints on alpha of the shape :
        # -  d - C*alpha  = 0
        # -  b - A*alpha >= 0

        fun_eq = lambda alpha: np.dot(y, alpha)
        jac_eq = lambda alpha: y
        fun_ineq = lambda alpha: np.concatenate((np.zeros(N), self.C*np.ones(N)), axis=0) - np.dot(np.concatenate((-np.eye(N), np.eye(N)), axis=0), alpha)
        jac_ineq = lambda alpha: np.concatenate((np.eye(N), -np.eye(N)), axis=0)

        constraints = ({'type': 'eq',  'fun': fun_eq, 'jac': jac_eq},
                       {'type': 'ineq',
                        'fun': fun_ineq ,
                        'jac': jac_ineq})

        optRes = optimize.minimize(fun=lambda alpha: loss(alpha),
                                   x0=np.ones(N),
                                   method='SLSQP',
                                   jac=lambda alpha: grad_loss(alpha),
                                   constraints=constraints)
        self.alpha = optRes.x
        ## Assign the required attributes
        support_indices = np.where((self.alpha+self.epsilon<self.C) * (self.alpha-self.epsilon>0))[0]
        self.support = X[support_indices] #'''------------------- A matrix with each row corresponding to a point that falls on the margin ------------------'''
        if len(support_indices)==0:
            self.b = 0
        else:
            self.b = np.mean(y[support_indices] - np.dot(K, y*self.alpha)[support_indices]) #''' -----------------offset of the classifier------------------ '''
        self.norm_f = np.sqrt(np.dot(y*self.alpha, np.dot(K, y*self.alpha))) # '''------------------------RKHS norm of the function f ------------------------------'''
        self.z = y*self.alpha # we need it to define the separating function
        self.X_train = X # we need it to define the separating function
        self.to_fit = False

    ### Implementation of the separting function $f$
    def separating_function(self, x):
        # Input : matrix x of shape N data points times d dimension
        # Output: vector of size N
        K = self.kernel(x, self.X_train)
        return K @ self.z


    def predict(self, x):
        """ Predict y values in {-1, 1} """
        d = self.separating_function(x)
        return 2 * (d+self.b> 0) - 1

class Kernel_OvO: # One v One classification
    def __init__(self, C, nb_classes, kernel, clf, epsilon = 1e-3):
        self.clf = clf
        self.C = C
        self.epsilon = epsilon
        self.kernel = kernel
        self.nb_classes = nb_classes
        self.support = None
        self.classifiers = {}
        self.indices = {}
        for i in range(nb_classes):
            for j in range(nb_classes):
                if i!=j:
                    if self.clf == 'SVC':
                        self.classifiers[(i,j)] = KernelSVC(C = self.C, kernel = self.kernel, epsilon = self.epsilon)
                    elif self.clf == 'KLR':
                        self.classifiers[(i,j)] = KLR(lbda = self.C, kernel = self.kernel) # C here is just lbda
                    else:
                        print("--> method not recognized")

    def fit(self, X, y):
        # we need to fit every classifier in self.classifiers
        K = self.kernel(X, X)
        self.support = X
        for i in range(self.nb_classes):
            for j in range(self.nb_classes):
                if i!=j:
                    indices_i = np.where(y==i)[0]
                    indices_j = np.where(y==j)[0]
                    self.indices[(i,j)] = np.concatenate((indices_i, indices_j))
                    X_ij = X[self.indices[(i,j)], :]
                    K_ij = K[np.ix_(self.indices[(i,j)], self.indices[(i,j)])]
                    if self.classifiers[(i,j)].to_fit:
                      self.classifiers[(i,j)].fit(X_ij, 2 * (y[self.indices[(i,j)]]==i) - 1)
                    print("--> done")

    def predict(self, x):
        # we need to keep having OvO until we reach the best class
        K = self.kernel(x, self.support)
        classes = np.zeros((x.shape[0], self.nb_classes))
        for i in range(self.nb_classes):
            for j in range(self.nb_classes):
                if i!=j:
                    y = self.classifiers[(i,j)].predict(x)
                    indices_i = np.where(y==1)[0]
                    indices_j = np.where(y==-1)[0]
                    classes[indices_i, i] += 1
                    classes[indices_j, j] += 1
        return np.argmax(classes, axis=1)

class Kernel_OvR: # One v Rest classification

    def __init__(self, C, nb_classes, kernel, clf, epsilon = 1e-3):
        self.clf = clf
        self.C = C
        self.kernel = kernel
        self.epsilon = epsilon
        self.classifiers = {}
        self.nb_classes = nb_classes
        self.support = None
        for i in range(nb_classes):
            if self.clf=='SVC':
                self.classifiers[i] = KernelSVC(self.C, self.epsilon)
            elif self.clf=='KLR':
                self.classifiers[i] = KLR(self.C)
            else:
                print("--> method not recognized")

    def fit(self, X, y):
        # we need to fit every classifier in self.classifiers
        K = self.kernel(X, X)
        self.support = X
        for i in tqdm(range(self.nb_classes)):
            y_i = np.where(y==i, 1, -1)
            self.classifiers[i].fit(X, y_i, K)

    def predict(self, x):
        # we need to keep having OvO until we reach the best class
        K = self.kernel(x, self.support)
        classes = np.zeros((x.shape[0], self.nb_classes))
        for i in range(self.nb_classes):
            classes[:, i] = self.classifiers[i].separating_function(K) + self.classifiers[i].b
        return np.argmax(classes, axis=1)

class KMeans: # Kmeans algorithm which is used in the bag of words representation
    def __init__(self, nb_clusters, niter=100):
        self.nb_clusters = nb_clusters
        self.centers = None
        self.niter = niter
    def fit(self, X):
        N = X.shape[0]
        centers = X[np.random.randint(0, N, self.nb_clusters), :]
        for i in tqdm(range(self.niter)):
            distances = np.sum((X - centers[:, np.newaxis, :])**2, axis=2)
            nearest = np.argmin(distances, axis=0)
            for k in range(self.nb_clusters):
                cluster_data = X[nearest == k]
                if cluster_data.shape[0] > 0:
                    centers[k, :] = np.mean(cluster_data, axis=0)
        self.centers = centers
    def predict(self, x):
        nearest = np.argmin(np.linalg.norm(x - np.expand_dims(self.centers, axis=1), axis=-1), axis=0)
        return nearest

#### feature extraction :

In [6]:
def compute_descriptors_sift(imgs, threshold, sigma): # sift feature extraction
    descriptors = []
    sift = cv2.SIFT_create(contrastThreshold=threshold, sigma=sigma)
    for i in range(imgs.shape[0]):
        gray_image = to_gray(imgs[i, :])
        _, d = sift.detectAndCompute((gray_image * 255).astype(np.uint8), None)
        descriptors.append(d)
    return descriptors

def nearest_to_BoW(nearest, descriptors, nb_clusters):
    BoW = np.zeros((len(descriptors), nb_clusters))
    k = 0
    for i in range(len(descriptors)):
        nearest_i = nearest[k: k+descriptors[i].shape[0]]
        indices, counts = np.unique(nearest_i, return_counts=True)
        BoW[i, indices] += counts
        k += descriptors[i].shape[0]
    return BoW

def compute_HOG(imgs): # HoG feature extraction
    descriptors = []
    hog = cv2.HOGDescriptor(_winSize=(32,32), _blockStride=(4,4), _blockSize=(16,16),
                        _cellSize=(4,4), _nbins=9)
    for i in range(imgs.shape[0]):
        gray_image = to_gray(imgs[i, :])
        features = hog.compute((gray_image * 255).astype(np.uint8))
        descriptors.append(features.tolist())
    return np.array(descriptors)

#### our best model :

In [None]:
# we retrieve the data
Xtr = pd.read_csv('Xtr.csv.zip',header=None, sep=',', usecols=range(3072), compression='zip').dropna().values
Xte = pd.read_csv('Xte.csv.zip',header=None, sep=',', usecols=range(3072), compression='zip').dropna().values
Ytr = pd.read_csv('Ytr.csv',sep=',',usecols=[1]).squeeze().dropna().values
# we augment the data using flips
augmented_Xtr, augmented_Ytr = augment_data(Xtr, Ytr)
# we extract the HoG features
hog_tr = compute_HOG(augmented_Xtr)
hog_te = compute_HOG(Xte)

In [None]:
# we fit the classifier OvO to the HoG features on the augmented data
scaler = StandardScaler()
kernel = RBF(sigma=np.sqrt(hog_tr.shape[1])).kernel
clf = Kernel_OvO(C=1., kernel=kernel, nb_classes=10, clf='SVC')
clf.fit(scaler.fit_transform(hog_tr), augmented_Ytr)

#### submit prediction :

In [None]:
prediction = clf.predict(scaler.transform(hog_te))
Yte = {'Prediction' : prediction}
dataframe = pd.DataFrame(Yte)
dataframe.index += 1
dataframe.to_csv('Yte_pred_hog_SVC_bloc_size_16_augmented_data.csv',index_label='Id')