In [176]:
import datetime
import numpy as np

from itertools import product

from sklearn import svm
from sklearn.preprocessing import normalize
from sklearn.metrics import zero_one_loss, accuracy_score

from utils import *

In [54]:
data = load_dataset(train_file_path='./dataset/hw2_grading_iris_ver_vir_training.csv',
                   test_file_path='./dataset/hw2_grading_iris_ver_vir_testing.csv')

# data = load_dataset()

In [55]:
train_X = data['train_X']
train_y = data['train_y']
test_X = data['test_X']
test_y = data['test_y']

In [56]:
print(train_X.shape)
print(test_X.shape)

(67, 4)
(33, 4)


In [187]:
def zero_one_loss(y_truth, y_pred, sample_weights=None):
    return np.average(y_pred != y_truth, weights=sample_weights)

In [229]:
import numpy as np

class WeakClassifer:
    def __init__(self, data, sample_weights=None, scorer=zero_one_loss, verbose=False):
        self.X = data['X']
        self.y = data['y']
        self.sample_weights = sample_weights if sample_weights is not None else np.ones_like(self.y).astype(float) / len(self.y)
        self.param = {'feature_index': None, 'split_point': None, 'label': None}
        self.scorer = scorer
        self.verbose = verbose

    def train(self):

        best_error = np.inf
        binary_class_label = list(set(self.y))

        # Iterate over all features
        for feature_index in range(self.X.shape[1]):
            if self.verbose:
                print('-'*100)
                print('[*] Scanning Feature: {} ...'.format(feature_index))

            # Find best split (hypothesis) on data w.r.t the current feature
            x = self.X[:, feature_index]
            split_points = list(set(x))
            
            for split_point in split_points:
                # Choost the hypothesis either classifies one side +1, the other side -1
                # or classifies one side -1, the other size +1, which leads to lowest weighted lost.
                hypothesis1 = np.vectorize(lambda val: binary_class_label[0] if val > split_point else binary_class_label[1])
                hypothesis2 = np.vectorize(lambda val: binary_class_label[1] if val > split_point else binary_class_label[0])

                y_pred1 = hypothesis1(x)
                y_pred2 = hypothesis2(x)
                
                error1 = self.scorer(y_truth=self.y, y_pred=y_pred1, sample_weights=self.sample_weights)
                error2 = self.scorer(y_truth=self.y, y_pred=y_pred2, sample_weights=self.sample_weights)

                if error1 < error2 and error1 < best_error:
                    best_error = error1
                    self.param.update({'feature_index': feature_index,
                                       'split_point': split_point,
                                       'label': binary_class_label[0]})
                    if self.verbose:
                        print('-'*100)
                        print('[*] Error: {}'.format(best_error))
                        print('[*] Update hypothesis: {}'.format(self.param))

                elif error2 < error1 and error2 < best_error:
                    best_error = error2
                    self.param.update({'feature_index': feature_index,
                                       'split_point': split_point,
                                       'label': binary_class_label[1]})
                    if self.verbose:
                        print('-'*100)
                        print('[*] Error: {}'.format(best_error))
                        print('[*] Update hypothesis: {}'.format(self.param))

        if self.verbose:
            print('-'*100)
            print('[*] Best Error: {}'.format(best_error))
            print('[*] Best hypothesis: {}'.format(self.param))
            print('{}'.format(self))
            print('-'*100)

    def hypothesis(self, X):
        x = X[:, self.param['feature_index']]
        the_other_label = list(set(self.y) - set([self.param['label']]))[0]
        return np.vectorize(lambda val: self.param['label'] if val > self.param['split_point'] else the_other_label)(x)

    def __str__(self):
        the_other_label = list(set(self.y) - set([self.param['label']]))[0]

        description = \
        '[*] Label {} if value at {}-th feature is greater than {}. \n' + \
        '[*] Label {} if value at {}-th feature is less or equal to {}.'

        return description.format(self.param['label'],
                                  self.param['feature_index'],
                                  self.param['split_point'],
                                  the_other_label,
                                  self.param['feature_index'],
                                  self.param['split_point'])


In [260]:
class AdaboostClassifer:
    """ Class labels need to be {+1, -1} """
    def __init__(self, data, param={}, scorer=zero_one_loss, verbose=False):

        self.train_X = data['train_X']
        self.train_y = data['train_y']
        self.test_X = data['test_X']
        self.test_y = data['test_y']

        self.base_classifer = param.get('base_classifer', WeakClassifer)
        self.num_classifer = param.get('num_classifer', 5)
        self.scorer = scorer
        self.verbose = verbose

        self.sample_weights = np.ones_like(self.train_y).astype(float) / len(self.train_y)

        self.classifer_weights = np.ones(self.num_classifer) / self.num_classifer
        self.classifers = []

    def train(self, info=''):
        """ Perform adaptive boosting """
        
        if info: 
            print('-'*100)
            print('[*] {}'.format(info))
        
        for t in range(self.num_classifer):
            classifer = self.base_classifer(data={'X': self.train_X, 'y': self.train_y}, sample_weights=self.sample_weights)
            classifer.train()

            y_pred = classifer.hypothesis(X=self.train_X)
            y_truth = self.train_y
            error = self.scorer(y_truth, y_pred, sample_weights=self.sample_weights)

            self.classifer_weights[t] = 0.5 * np.log((1 - error) / error)
            normalization_factor = 2 * np.sqrt(error * (1 - error))
            self.sample_weights = self.sample_weights * np.exp(-self.classifer_weights[t]*y_truth*y_pred) / normalization_factor

            self.classifers.append(classifer)

            if self.verbose:
                print('-'*100)
                print('[*] {}-th classifer weight: {}'.format(t+1, self.classifer_weights[t]))
                print('[*] {}-th classifer error: {}'.format(t+1, error))
                print('[*] {}-th classifer hypothesis: {}'.format(t+1, self.classifers[t].param))
                print('{}'.format(self.classifers[t]))

    def hypothesis(self, X):
        y_pred = np.zeros(len(X))
        for t in range(len(self.classifers)):
            y_pred += self.classifer_weights[t] * self.classifers[t].hypothesis(X)
        return np.sign(y_pred)

In [261]:
clf = AdaboostClassifer(data, param={'num_classifer': 5}, verbose=True)

In [262]:
clf.train()

----------------------------------------------------------------------------------------------------
[*] 1-th classifer weight: 1.53013539735
[*] 1-th classifer error: 0.044776119403
[*] 1-th classifer hypothesis: {'split_point': 4.7999999999999998, 'feature_index': 2, 'label': -1}
[*] Label -1 if value at 2-th feature is greater than 4.8. 
[*] Label 1 if value at 2-th feature is less or equal to 4.8.
----------------------------------------------------------------------------------------------------
[*] 2-th classifer weight: 1.71699360224
[*] 2-th classifer error: 0.03125
[*] 2-th classifer hypothesis: {'split_point': 1.6000000000000001, 'feature_index': 3, 'label': -1}
[*] Label -1 if value at 3-th feature is greater than 1.6. 
[*] Label 1 if value at 3-th feature is less or equal to 1.6.
----------------------------------------------------------------------------------------------------
[*] 3-th classifer weight: 0.744273932616
[*] 3-th classifer error: 0.184139784946
[*] 3-th clas

In [263]:
zero_one_loss(train_y, clf.hypothesis(train_X))

0.014925373134328358

In [264]:
zero_one_loss(test_y, clf.hypothesis(test_X))

0.090909090909090912

In [268]:
def split_K_fold(X, y, K, shuffle=False):
    train_test_folds = []

    for k in range(K):
        k_train_X = np.array([_x for i, _x in enumerate(X) if i % K != k])
        k_train_y = np.array([_y for i, _y in enumerate(y) if i % K != k])
        k_valid_X = np.array([_x for i, _x in enumerate(X) if i % K == k])
        k_valid_y = np.array([_y for i, _y in enumerate(y) if i % K == k])
        
        train_test_folds.append({'train_X': k_train_X,
                                 'train_y': k_train_y,
                                 'test_X': k_valid_X,
                                 'test_y': k_valid_y})
    return train_test_folds

def K_fold_cross_validation(data, model, param, scorer=zero_one_loss, K=5):
    # Unpack data
    train_X = data['train_X']
    train_y = data['train_y']
    test_X = data['test_X']
    test_y = data['test_y']
    
    # Get K folds training, validation data.
    train_test_folds = split_K_fold(X=train_X, y=train_y, K=K)

    # Perform cross-validation.
    cv_error = 0
    for i, data in enumerate(train_test_folds):

        _model = model(data=data, param=param)
        _model.train()

        # Compute training and test error.
        train_error = scorer(y_truth=train_y, y_pred=_model.hypothesis(X=train_X))
        test_error = scorer(y_truth=test_y, y_pred=_model.hypothesis(X=test_X))
        
        print('-'*100)
        print('[*] {}-th fold | Parameter: {}'.format(i+1, param))
        print('[*] {}-th fold | Train error: {} | Validation error: {}'.format(i+1, train_error, test_error))

        cv_error += test_error

    cv_error /= K

    return cv_error

class GridSearchCV:
    def __init__(self, data, model=None, param_grid={}, scorer=zero_one_loss, num_folds=5):
        """ Exhaustive search over specified parameter values for a model. 
            
            @param data: Dictionary of training and test data:
                - data['train_X']: Training data with shape(N, D), where N is the number of examples, D is data dimension.
                - data['train_y']: Training label with shape(N,)
                - data['test_X']: Test data with shape(N, D), where N is the number of examples, D is data dimension.
                - data['test_y']: Test label with shape(N,)
            
            @param `model`: Either classifier or regression class that supports model.hypothesis(X) for evaluation.
            @param `param_grid`: Dictionary with parameters names (string) as keys and lists of parameter settings as values.
            @param `scorer`: Model evaluation function. (Defualt: zero_one_loss)
            @param `num_folds`: For K-fold cross-validation. (Defualt: 5)
        """
        self.data = data
        self.model = model
        self.params = self._span_param_grid(param_grid)
        self.scorer = scorer
        self.num_folds = num_folds
        
        self.cv_results = []
        self.best_param = {}
        self.best_score = 0.0
    
    def train(self):
        # Find the best hyper-parameters by K-fold cross-validation.
        for param in self.params:
            score = K_fold_cross_validation(data=self.data,
                                            model=self.model,
                                            param=param,
                                            scorer=self.scorer,
                                            K=self.num_folds)
            self.cv_results.append({'param': param, 'score': score})
        
        self.cv_results.sort(key=lambda item: item['score'])
        self.best_score = self.cv_results[0]['score']
        self.best_param = self.cv_results[0]['param']
    
    def _span_param_grid(self, param_grid):
        """ Perform Cartesian prodcut on `self.param_grid`
            - Input: param_grid = {'a': [1, 2], 'b': [True, False]}
            - Output: [{'a': 1, 'b': True}, {'a': 1, 'b': False},
                       {'a': 2, 'b': True}, {'a': 2, 'b': False}]
        """
        return list(dict(zip(param_grid, x)) for x in product(*param_grid.values()))

In [269]:
param_grid = {'num_classifer': [1,2,3]}

clf = GridSearchCV(data=data, model=AdaboostClassifer, param_grid=param_grid, num_folds=3)
clf.train()

print('-'*100)
print('[*] Cross validation history:')
for cv_result in clf.cv_results:
    print(' - Parameter: {} | Cross validation error: {}'.format(cv_result['param'], cv_result['score']))
print('-'*100)
print('[*] Best parameter: {}'.format(clf.best_param))
print('[*] Best cross validation error: {}'.format(clf.best_score))
print('[*] Start to train on full training data and evaluate on test data ...')

# Train on full training data and evaluate on test data with the best hyper-parameters.
best_model = AdaboostClassifer(data=data, param=clf.best_param, verbose=True)
best_model.train()

# Compute training and test error.
train_error = zero_one_loss(y_truth=train_y, y_pred=best_model.hypothesis(X=train_X))
test_error = zero_one_loss(y_truth=test_y, y_pred=best_model.hypothesis(X=test_X))

print('-'*100)
print('[*] Datetime: {}'.format(datetime.datetime.now().strftime('%H:%M:%S')))
print('[*] Performance: Train error: {} | Test error: {}'.format(train_error, test_error))
print('-'*100)

----------------------------------------------------------------------------------------------------
[*] 1-th fold | Parameter: {'num_classifer': 1}
[*] 1-th fold | Train error: 0.044776119403 | Validation error: 0.121212121212
----------------------------------------------------------------------------------------------------
[*] 2-th fold | Parameter: {'num_classifer': 1}
[*] 2-th fold | Train error: 0.044776119403 | Validation error: 0.121212121212
----------------------------------------------------------------------------------------------------
[*] 3-th fold | Parameter: {'num_classifer': 1}
[*] 3-th fold | Train error: 0.0597014925373 | Validation error: 0.0909090909091
----------------------------------------------------------------------------------------------------
[*] 1-th fold | Parameter: {'num_classifer': 2}
[*] 1-th fold | Train error: 0.0597014925373 | Validation error: 0.0606060606061
------------------------------------------------------------------------------------

In [272]:
""" Author: Howard (Yu-Chun) Lo """

import numpy as np

class SVM:
    def __init__(self, data, param={}, verbose=False):
        """ Binary SVM Classifer optimized with SMO algorithm

            Implementation Reference:
                - SMO supplement provided by EE6550 course
                - Standford CS 229, Autumn 2009 The Simplified SMO Algorithm
                - Improvment to Platt's SMO Algorithm for SVM Classifer Design, S.S. Keerthi et al
                - A Roadmap to SVM Sequential Minimal Optimization for Classification by Ruben Ramirez-Padron

            @param data: Dictionary of training and test data:
                - data['train_X']: Training data with shape(N, D), where N is the number of examples, D is data dimension.
                - data['train_y']: Training label with shape(N,)
                - data['test_X']: Test data with shape(N, D), where N is the number of examples, D is data dimension.
                - data['test_y']: Test label with shape(N,)

            @param param: Dictionary of hyper-parameters:
                - param['C']: Parameter for penalty term. (Default: 0.1)
                - param['tol']: Tolerance for KKT conditons. (Defualt: 1e-2)
                - param['kernel_type']: Kernel type to be used in SVM. Acceptable kernel type: 'linear', 'poly', 'rbf'. (Default: 'linear')
                - param['poly_degree']: Degree of the polynomial kernel function ('poly'). Ignored by all other kernels. (Default: 3)
                - param['rbf_sigma']: Sigma term in RBF (guassian). Ignored by all other kernels. (Default: 0.5)
                - param['enable_heuristic']: Whether use Platts heuristics to train SVM. (Defualt: False)
                - param['enable_kernel_cache']: Whether precompute kernel results. This can speed up training but need time to initialize when data is large. (Defualt: True)
                - param['max_iteration']: Max iteration for SMO training algorithm to avoid not converging.
        """

        # Upack training and test data
        self.train_X = data['train_X']
        self.train_y = data['train_y']
        self.test_X = data['test_X']
        self.test_y = data['test_y']

        # Unpack hyper-parameters
        self.C = param.get('C', 0.1)
        self.tol = param.get('tol', 1e-2)
        self.kernel_type = param.get('kernel_type', 'linear')
        self.poly_degree = param.get('poly_degree', 3)
        self.rbf_sigma = param.get('rbf_sigma', 0.5)
        self.enable_heuristic = param.get('enable_heuristic', False)
        self.enable_kernel_cache = param.get('enable_kernel_cache', True)
        self.max_iteration = param.get('max_iteration', 20000)
        self.verbose = verbose

        # Set kernel function
        self.kernels = {
            'linear': self._linear_kernel,
            'poly': self._poly_kernel,
            'rbf': self._rbf_kernel
        }
        self.kernel = self.kernels[self.kernel_type]

        # Precompute kernel cache
        if self.enable_kernel_cache:
            print('-'*100)
            print('[*] Enable kernel cache. Precomputing kernel results for all training examples ...')
            self.kernel_cache = self._precompute_kernel_cache()

        # Model parameters
        self.use_w = True if self.kernel_type == 'linear' else False # If linear kernel is used, use weight to perform prediction instead of alphas.
        self.w = np.zeros(self.train_X.shape[1]) # Weight vector: shape(D,)
        self.b = 0.0 # Bias term: scalar
        self.alpha = np.zeros(len(self.train_X)) # Lagrange multipliers

    def train(self, info=''):
        """ Optimize alpha with either simple SMO algorithm or simple SMO combined with Platt's heuristics
            In each iteration, the SMO algorithm solves the Lagrangian dual problem
            which involves only two Lagrangian multipliers.
        """
        if self.enable_heuristic:
            self._heuristic_smo(info)
        else:
            self._simple_smo(info)

    def hypothesis(self, X):
        """ Applying our linear classifier `f(x)` to perform binary classification.
            If f(x) >= 0, y(i) = +1
            Else    <  0, y(i) = -1

            @param `X`: X can be a single example with shape(D,) or multiple examples with shape(N, D)
        """
        return np.sign(self._f(X))

    def _simple_smo(self, info=''):

        num_changed_alphas = 1
        iteration = 0

        while num_changed_alphas > 0:
            num_changed_alphas = 0
            for i in range(len(self.train_X)):
                if self._violate_KKT_conditions(i):
                    j = i
                    while(j == i): j = np.random.randint(0, len(self.train_X))
                    num_changed_alphas += self._update_alpha_pair(i, j)

            if self.verbose and num_changed_alphas == 0:
                if info: print('[*] {}'.format(info))
                print('[*] Converged at iteration {}.'.format(iteration+1))
                print('-'*100)

            iteration += 1
            if self.verbose and (iteration == 1 or iteration % 100 == 0 or iteration == self.max_iteration):
                # Compute training and testing error
                train_error = zero_one_loss(y_truth=self.train_y, y_pred=self.hypothesis(X=self.train_X))
                test_error = zero_one_loss(y_truth=self.test_y, y_pred=self.hypothesis(X=self.test_X))
                print('-'*100)
                if info: print('[*] {}'.format(info))
                print('[*] {} alphas changed.'.format(num_changed_alphas))
                print('[*] Iteration: {} | Train error: {} | Test error: {}'.format(iteration, train_error, test_error))

            if self.verbose and iteration == self.max_iteration:
                print('-'*100)
                print('[*] Max iteration acheived.')
                return

    def _heuristic_smo(self, info=''):

        num_changed_alphas = 0
        examine_all = 1
        iteration = 0

        while num_changed_alphas > 0 or examine_all:
            num_changed_alphas = 0
            if examine_all:
                # Repeated pass iterates over entire examples.
                for i in range(len(self.train_X)):
                    # alpha_i needs update, select alpha_j (!= alpha_i) to jointly optimize the alpha pair
                    if self._violate_KKT_conditions(i):
                        j = i
                        while(j == i): j = np.random.randint(0, len(self.train_X))
                        # Update alpha_i and alpha_j
                        num_changed_alphas += self._update_alpha_pair(i, j)
                
                if self.verbose:
                    print('-'*100)
                    if info: print('[*] {}'.format(info))
                    print('[*] One passes done.')

                if self.verbose and num_changed_alphas == 0:
                    if info: print('[*] {}'.format(info))
                    print('[*] Converged at iteration {}.'.format(iteration+1))
                    print('-'*100)
                elif self.verbose:
                    if info: print('[*] {}'.format(info))
                    print('[*] Go to repeated passes.')
            else:
                # Repeated pass iterates over non-boundary examples.
                I_non_boundary = np.where(np.logical_and(self.alpha > 0, self.alpha < self.C) == True)[0].tolist()
                if len(I_non_boundary):
                    E_list = np.vectorize(self._E)(I_non_boundary)
                    if not max(E_list) - min(E_list) < 1:
                        for i in I_non_boundary:
                            num_changed_alphas += self._examine_example(i)

                if self.verbose and num_changed_alphas == 0:
                    print('-'*100)
                    if info: print('[*] {}'.format(info))
                    print('[*] Repeated passes done. Go back to one pass.')

            if examine_all == 1:
                # One pass done, go to repeated passes.
                examine_all = 0
            elif num_changed_alphas == 0:
                # Repeated pass done, go back to one pass.
                examine_all = 1

            iteration += 1
            if self.verbose and (iteration == 1 or iteration % 100 == 0 or iteration == self.max_iteration):
                # Compute training and testing error
                train_error = zero_one_loss(y_truth=self.train_y, y_pred=self.hypothesis(X=self.train_X))
                test_error = zero_one_loss(y_truth=self.test_y, y_pred=self.hypothesis(X=self.test_X))
                print('-'*100)
                if info: print('[*] {}'.format(info))
                print('[*] {} alphas changed.'.format(num_changed_alphas))
                print('[*] Iteration: {} | Train error: {} | Test error: {}'.format(iteration, train_error, test_error))

            if self.verbose and iteration == self.max_iteration:
                print('-'*100)
                print('[*] Max iteration acheived.')
                return

    def _violate_KKT_conditions(self, i):
        """ Check if an example violates the KKT conditons """
        alpha_i = self.alpha[i]
        R_i = self.train_y[i]*self._E(i)
        return (R_i < -self.tol and alpha_i < self.C) or (R_i > self.tol and alpha_i > 0)

    def _examine_example(self, i):
        """ Implement Platt's heuristics to select a good alpha pair to optimize.
            (First heuristic is not implemented since it makes training slower)
        """
        # Check if alpha_i needs updating (alpha_i violates KKT conditions)
        if self._violate_KKT_conditions(i):

            # Retrieve indexes of non boundary examples
            I_non_boundary = np.where(np.logical_and(self.alpha > 0, self.alpha < self.C) == True)[0].tolist()

            # Iterate over non-boundary items, starting at a random position
            shuffled_I_non_boundary = np.copy(I_non_boundary)
            np.random.shuffle(shuffled_I_non_boundary)
            for j in shuffled_I_non_boundary:
                if self._update_alpha_pair(i, j):
                    return 1

            # Iterate over entire items, starting at a random position
            I = np.arange(len(self.train_X))
            shuffled_I = np.copy(I)
            np.random.shuffle(shuffled_I)
            for j in shuffled_I:
                if self._update_alpha_pair(i, j):
                    return 1
        return 0

    def _update_alpha_pair(self, i, j):
        """ Jointly optimized alpha_i and alpha_j """
        # Not the alpha pair.
        if i == j: return 0

        E_i = self._E(i)
        E_j = self._E(j)

        alpha_i = self.alpha[i]
        alpha_j = self.alpha[j]

        x_i, x_j, y_i, y_j = self.train_X[i], self.train_X[j], self.train_y[i], self.train_y[j]

        if y_i == y_j:
            L = max(0, alpha_i + alpha_j - self.C)
            H = min(self.C, alpha_i + alpha_j)
        else:
            L = max(0, alpha_j - alpha_i)
            H = min(self.C, self.C + alpha_j - alpha_i)

        # This will not make any progress.
        if L == H: return 0

        # Compute eta (second derivative of the Lagrange dual function = -eta)
        if self.enable_kernel_cache:
            eta = self.kernel_cache[i][i] + self.kernel_cache[j][j] - 2*self.kernel_cache[i][j]
        else:
            eta = self.kernel(x_i, x_i) + self.kernel(x_j, x_j) - 2*self.kernel(x_i, x_j)

        # eta > 0 => second derivative(-eta) < 0 => maximum exists.
        if eta <= 0: return 0

        # Compute new alpha_j and clip it inside [L, H]
        alpha_j_new = alpha_j + y_j*(E_i - E_j)/eta
        if alpha_j_new < L: alpha_j_new = L
        if alpha_j_new > H: alpha_j_new = H

        # Compute new alpha_i based on new alpha_j
        alpha_i_new = alpha_i + y_i*y_j*(alpha_j - alpha_j_new)

        # Compute step sizes
        delta_alpha_i = alpha_i_new - alpha_i
        delta_alpha_j = alpha_j_new - alpha_j

        # Update weight vector
        if self.use_w:
            self.w = self.w + delta_alpha_i*y_i*x_i + delta_alpha_j*y_j*x_j

        # Update b
        if self.enable_kernel_cache:
            b_i = self.b - E_i - delta_alpha_i*y_i*self.kernel_cache[i][i] - delta_alpha_j*y_j*self.kernel_cache[i][j]
            b_j = self.b - E_j - delta_alpha_i*y_i*self.kernel_cache[i][j] - delta_alpha_j*y_j*self.kernel_cache[j][j]
        else:
            b_i = self.b - E_i - delta_alpha_i*y_i*self.kernel(x_i, x_i) - delta_alpha_j*y_j*self.kernel(x_i, x_j)
            b_j = self.b - E_j - delta_alpha_i*y_i*self.kernel(x_i, x_j) - delta_alpha_j*y_j*self.kernel(x_j, x_j)
        self.b = (b_i + b_j)/2
        if (alpha_i_new > 0 and alpha_i_new < self.C):
            self.b = b_i
        if (alpha_j_new > 0 and alpha_j_new < self.C):
            self.b = b_j

        # Update the alpha pair
        self.alpha[i] = alpha_i_new
        self.alpha[j] = alpha_j_new

        return 1

    def _f(self, X):
        """ Linear classifier `f(x)`, used when training or making predictions.
            @param `X`: `X` can be a single example with shape(D,) or multiple examples with shape(N, D)
        """
        if self.use_w:
            # Speed up by using computed weight only when linear kernel is used.
            return np.dot(X, self.w) + self.b
        else:
            # If X is single example
            if X.ndim == 1:
                return np.dot(self.alpha*self.train_y, self.kernel(self.train_X, X)) + self.b
            # Multiple examples
            elif X.ndim == 2:
                return np.array([np.dot(self.alpha*self.train_y, self.kernel(self.train_X, _X)) + self.b for _X in X])

    def _E(self, i):
        """ Prediction error: _f(x_i) - y_i, used when training. """
        if self.enable_kernel_cache:
            return np.dot(self.alpha*self.train_y, self.kernel_cache[i]) + self.b - self.train_y[i]
        else:
            return self._f(self.train_X[i]) - self.train_y[i]

    def _precompute_kernel_cache(self):
        """ If self.enable_kernel_cache is True, then precompute kernel results for all training examples.
            This can speed up training but need time to initialize when data is large.
        """
        kernel_cache = np.zeros((len(self.train_X), len(self.train_X)))
        for i, x_i in enumerate(self.train_X):
            for j, x_j in enumerate(self.train_X):
                kernel_cache[i][j] = self.kernel(x_i, x_j)
        return kernel_cache

    def _linear_kernel(self, X, x):
        """ Linear kernel:
            @param `X`: `X` can be a single example with shape(D,) or multiple examples with shape(N, D)
            @param `x`: `x` can only be a single example with shape(D,)
        """
        return np.dot(X, x)

    def _poly_kernel(self, X, x):
        """ Polynomial kernel:
            @param `X`: `X` can be a single example with shape(D,) or multiple examples with shape(N, D)
            @param `x`: `x` can only be a single example with shape(D,)
        """
        return (1 + np.dot(X, x))**self.poly_degree

    def _rbf_kernel(self, X, x):
        """ RBF (guassian) kernel:
            @param `X`: `X` can be a single example with shape(D,) or multiple examples with shape(N, D)
            @param `x`: `x` can only be a single example with shape(D,)
        """
        # If X is single example
        if X.ndim == 1:
            sqrt_norm = np.linalg.norm(X - x)**2
        # Multiple examples
        elif X.ndim == 2:
            sqrt_norm = np.linalg.norm(X - x, axis=1)**2

        return np.exp(-sqrt_norm / (2.0 * (self.rbf_sigma**2)))


In [273]:
param_grid = {
    'C': [0.1, 0.2, 0.3], 
    'kernel_type': ['poly'], 
    'poly_degree': [1, 2, 3]
}

clf = GridSearchCV(data=data, model=SVM, param_grid=param_grid, num_folds=3)
clf.train()

print('-'*100)
print('[*] Cross validation history:')
for cv_result in clf.cv_results:
    print(' - Parameter: {} | Cross validation error: {}'.format(cv_result['param'], cv_result['score']))
print('-'*100)
print('[*] Best parameter: {}'.format(clf.best_param))
print('[*] Best cross validation error: {}'.format(clf.best_score))
print('[*] Start to train on full training data and evaluate on test data ...')

# Train on full training data and evaluate on test data with the best hyper-parameters.
best_model = SVM(data=data, param=clf.best_param)
best_model.train()

# Compute training and test error.
train_error = zero_one_loss(y_truth=train_y, y_pred=best_model.hypothesis(X=train_X))
test_error = zero_one_loss(y_truth=test_y, y_pred=best_model.hypothesis(X=test_X))

print('-'*100)
print('[*] Datetime: {}'.format(datetime.datetime.now().strftime('%H:%M:%S')))
print('[*] Performance: Train error: {} | Test error: {}'.format(train_error, test_error))
print('-'*100)

----------------------------------------------------------------------------------------------------
[*] Enable kernel cache. Precomputing kernel results for all training examples ...
----------------------------------------------------------------------------------------------------
[*] 1-th fold | Parameter: {'kernel_type': 'poly', 'C': 0.1, 'poly_degree': 1}
[*] 1-th fold | Train error: 0.044776119403 | Validation error: 0.0909090909091
----------------------------------------------------------------------------------------------------
[*] Enable kernel cache. Precomputing kernel results for all training examples ...
----------------------------------------------------------------------------------------------------
[*] 2-th fold | Parameter: {'kernel_type': 'poly', 'C': 0.1, 'poly_degree': 1}
[*] 2-th fold | Train error: 0.0298507462687 | Validation error: 0.0909090909091
----------------------------------------------------------------------------------------------------
[*] Enable

In [280]:
np.linspace(0.1, 1.0, 15).round(2).tolist(),

([0.1,
  0.16,
  0.23,
  0.29,
  0.36,
  0.42,
  0.49,
  0.55,
  0.61,
  0.68,
  0.74,
  0.81,
  0.87,
  0.94,
  1.0],)

In [297]:
I_non_boundary = np.where(np.logical_and(best_model.alpha > 0, best_model.alpha < best_model.C) == True)[0].tolist()

In [298]:
for i in I_non_boundary:
    print(train_y[i] - np.dot(best_model.alpha*best_model.train_y, best_model.kernel_cache[i]))

7.64890195662
7.65202804117
7.63977502141
7.64890195662


In [299]:
best_model.b

7.6489019566223559