## SVM Implementation with TensowFlow
 - Reference: This work is based on material provided in the book: TensorFlow Machine Learning Cookbook by Nick McClure

In [1]:
import tensorflow as tf
from tensorflow.python.framework import ops
from sklearn.datasets import (load_breast_cancer, load_iris)
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np

### Define helper logic

In [8]:
class ProblemProvider(object):
    def __init__(self):
        self._problem_store = {}
        self._X_train, self._X_test, self._y_train, self._y_test = None, None, None, None
        self._scaled_X_train, self._scaled_X_test = None, None
        self.setProblemData()
            
    def setCancerData(self):
        cancer_data = load_breast_cancer()
        X = cancer_data.data
        y = cancer_data.target
        y = np.array([-1 if y == 0 else 1 for y in y])
        y = np.reshape(y, (y.shape[0], 1))
        self._X_train, self._X_test, self._y_train, self._y_test = train_test_split(X, y, test_size=0.33, random_state=42)
        self.rescaleData()
        
    def setIrisData(self):
        iris_data = load_iris()
        X = np.array([[x[0], x[3]] for x in iris_data.data])
        y = np.array([1 if y == 0 else -1 for y in iris_data.target])
        #y = np.reshape(y, (y.shape[0], 1))
        self._X_train, self._X_test, self._y_train, self._y_test = train_test_split(X, y, test_size=0.20, random_state=42)
        self.rescaleData()
        
    def setProblemData(self):
        data_sets = {'Cancer_Data': self.setCancerData, 'Iris_Data': self.setIrisData}
        for data_set in data_sets:
            data_sets[data_set]()
            self._problem_store[data_set] = {
                                    'Scaler': self._scaler,
                                    'X_train_scaled':  self._X_train_scaled,
                                    "X_test_scaled": self._X_test_scaled,
                                    "y_train": self._y_train,
                                    "y_test": self._y_test
                                        }

    def rescaleData(self):
        self._scaler = StandardScaler()
        self._X_train_scaled = self._scaler.fit_transform(self._X_train)
        self._X_test_scaled = self._scaler.fit_transform(self._X_test)
    

In [9]:
problem_provider = ProblemProvider()
cancer_problem_prov = problem_provider._problem_store['Iris_Data']
scaler = cancer_problem_prov['Scaler']
X_train = cancer_problem_prov['X_train_scaled']
X_test = cancer_problem_prov['X_test_scaled']
y_train = cancer_problem_prov['y_train']
y_test = cancer_problem_prov['y_test']


In [10]:
X_train[:5]

array([[-1.47393679, -1.30948358],
       [-0.13307079, -1.04292204],
       [ 1.08589829,  0.28988568],
       [-1.23014297, -1.30948358],
       [-1.7177306 , -1.30948358]])

### Specify the SVM using Tensorflow constructs

In [11]:
class SvmUsingTensorflow(object):
    def __init__(self, X_train, y_train):
        # Define the Tensorflow session
        ops.reset_default_graph()
        self._sess = tf.Session()
        
        # Define number of iteration to start reporting updates
        self._reported_iteration = 100
        
        # Define the training feature dataset
        self._X_train = X_train
        
        # Define the training target vector
        self._y_train = y_train
        
        # Define the number of training samples/features
        self._n_samples = self._X_train.shape[0]
        self._n_features = float(self._X_train.shape[1])
        
        # Declare batch size
        self._batch_size = 50
        
        # Declare the Guassain Kernel 'gamma' parameter
        self._gamma_param = -25.
        
        # Define learning rate
        self._learn_rate = 0.01

        # Initialize placeholders
        self._x_data = tf.placeholder(shape=[None, self._n_features], dtype=tf.float32)
        self._y_target = tf.placeholder(shape=[None, 1], dtype=tf.float32)
        self._prediction_grid = tf.placeholder(shape=[None, self._n_features], dtype=tf.float32)
        
        # Create variables for svm
        self._b = tf.Variable(tf.random_normal(shape=[1, self._batch_size]))
        
        # Define the feature/prediction kernelss
        self._feature_kernel = self.computeGuassainFeatureKernel()
        self._predict_kernel = self.computeGuassainPredictionKernel()
        
        # Define the loss function
        self._loss = self.computeLoss()
        
        # Define the model Accuracy
        self._accuracy = self.computeAccuracy()
        
        # Define the Trainer
        self._train_step = self.computeTrainer()
        
        # Initialize variables
        init = tf.global_variables_initializer()
        self._sess.run(init)
        
    
    def computeLinearFeatureKernel(self):
        sq_dists = tf.matmul(self._x_data, tf.transpose(self._x_data))
        return sq_dists    
    
    def computeGuassainFeatureKernel(self):
        gamma = tf.constant(self._gamma_param)
        sq_dists = tf.multiply(self._n_features, tf.matmul(self._x_data, tf.transpose(self._x_data)))
        return tf.exp(tf.multiply(gamma, tf.abs(sq_dists)))
    
    def computeLinearPredictionKernel(self):
        sq_dists = tf.matmul(self._x_data, tf.transpose(self._prediction_grid))
        return sq_dists    
    
    def computeGuassainPredictionKernel(self):
        gamma = tf.constant(self._gamma_param)
        rA = tf.reshape(tf.reduce_sum(tf.square(self._x_data), 1), [-1, 1])
        rB = tf.reshape(tf.reduce_sum(tf.square(self._prediction_grid), 1), [-1, 1])
        pred_sq_dist = tf.add(tf.subtract(rA, tf.multiply(
            self._n_features, tf.matmul(self._x_data, tf.transpose(self._prediction_grid)))), tf.transpose(rB))
        pred_kernel = tf.exp(tf.multiply(gamma, tf.abs(pred_sq_dist)))
        return pred_kernel
        
    def computeLoss(self):        
        first_term = tf.reduce_sum(self._b)
        b_vec_cross = tf.matmul(tf.transpose(self._b), self._b)
        y_target_cross = tf.matmul(self._y_target, tf.transpose(self._y_target))
        second_term = tf.reduce_sum(tf.multiply(self._feature_kernel, tf.multiply(b_vec_cross, y_target_cross)))
        loss = tf.negative(tf.subtract(first_term, second_term))
        return loss
    
    def computeAccuracy(self):
        prediction_output = tf.matmul(tf.multiply(tf.transpose(self._y_target), self._b), self._predict_kernel)
        prediction = tf.sign(prediction_output - tf.reduce_mean(prediction_output))
        accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.squeeze(prediction), tf.squeeze(self._y_target)), tf.float32))
        return accuracy
    
    def computeTrainer(self):
        optimizer = tf.train.GradientDescentOptimizer(self._learn_rate)
        train_step = optimizer.minimize(self._loss)
        return train_step
        
    def updateStatus(self, i, loss_i, accuracy_i):
        if (i + 1) % self._reported_iteration == 0:
            print('Step: #{0}\tLoss: {1:.4f}\tAccuracy: {2:.4f}'.format(i + 1, loss_i, accuracy_i))            
        
    def runMainLoop(self, epochs=300):
        loss_vec = []
        batch_accuracy = []
        for i in range(epochs):
            rand_index = np.random.choice(len(self._X_train), size=self._batch_size)
            rand_x = self._X_train[rand_index]
            rand_y = np.transpose([self._y_train[rand_index]])
            self._sess.run(self._train_step, feed_dict={self._x_data: rand_x, self._y_target: rand_y})
            loss_i = self._sess.run(self._loss, feed_dict={self._x_data: rand_x, self._y_target: rand_y})
            loss_vec.append(loss_i)
            accuracy_i = self._sess.run(self._accuracy, feed_dict={self._x_data: rand_x,
                                                     self._y_target: rand_y,
                                                     self._prediction_grid: rand_x})
            batch_accuracy.append(accuracy_i)
            self.updateStatus(i, loss_i, accuracy_i)

        
    
        
        

In [13]:
def runIrisClassifyProblem():
    problem_provider = ProblemProvider()
    iris_problem_prov = problem_provider._problem_store['Cancer_Data']
    scaler = cancer_problem_prov['Scaler']
    X_train = cancer_problem_prov['X_train_scaled']
    X_test = cancer_problem_prov['X_test_scaled']
    y_train = cancer_problem_prov['y_train']
    y_test = cancer_problem_prov['y_test']
#     iris_data = load_iris()
#     X_train = np.array([[x[0], x[3]] for x in iris_data.data])
#     y_train = np.array([1 if y == 0 else -1 for y in iris_data.target])
#     print("y_train.shape:{}".format(y_train.shape))
    svm_model = SvmUsingTensorflow(X_train=X_train, y_train=y_train)
    svm_model.runMainLoop(1000)

runIrisClassifyProblem()

Step: #100	Loss: -6.2352	Accuracy: 0.8400
Step: #200	Loss: -16.4429	Accuracy: 0.9400
Step: #300	Loss: -19.8027	Accuracy: 0.9000
Step: #400	Loss: -33.8937	Accuracy: 0.9800
Step: #500	Loss: -39.7540	Accuracy: 0.9800
Step: #600	Loss: -34.6916	Accuracy: 0.9600
Step: #700	Loss: -16.9913	Accuracy: 0.9400
Step: #800	Loss: -51.1725	Accuracy: 0.9200
Step: #900	Loss: -58.3918	Accuracy: 0.8800
Step: #1000	Loss: -27.4812	Accuracy: 0.7800
