In [0]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

__version__ = "0.1.5"
__author__ = "Abien Fred Agarap"

import numpy as np
import os
import sys
import tensorflow as tf
import time


class SVM:
    """Implementation of L2-Support Vector Machine using TensorFlow"""

    def __init__(self, alpha, batch_size, svm_c, num_classes, num_features):
        """Initialize the SVM class
        Parameter
        ---------
        alpha : float
          The learning rate for the SVM model.
        batch_size : int
          Number of batches to use for training and testing.
        svm_c : float
          The SVM penalty parameter.
        num_classes : int
          Number of classes in a dataset.
        num_features : int
          Number of features in a dataset.
        """
        self.alpha = alpha
        self.batch_size = batch_size
        self.svm_c = svm_c
        self.num_classes = num_classes
        self.num_features = num_features

        def __graph__():
            """Building the inference graph"""

            with tf.name_scope("input"):
                # [BATCH_SIZE, NUM_FEATURES]
                x_input = tf.placeholder(
                    dtype=tf.float32, shape=[None, self.num_features], name="x_input"
                )

                # [BATCH_SIZE]
                y_input = tf.placeholder(dtype=tf.uint8, shape=[None], name="y_input")

                # [BATCH_SIZE, NUM_CLASSES]
                y_onehot = tf.one_hot(
                    indices=y_input,
                    depth=self.num_classes,
                    on_value=1,
                    off_value=-1,
                    name="y_onehot",
                )

            learning_rate = tf.placeholder(dtype=tf.float32, name="learning_rate")

            with tf.name_scope("training_ops"):
                with tf.name_scope("weights"):
                    weight = tf.get_variable(
                        name="weights",
                        initializer=tf.random_normal(
                            [self.num_features, self.num_classes], stddev=0.01
                        ),
                    )
                    self.variable_summaries(weight)
                with tf.name_scope("biases"):
                    bias = tf.get_variable(
                        name="biases",
                        initializer=tf.constant([0.1], shape=[self.num_classes]),
                    )
                    self.variable_summaries(bias)
                with tf.name_scope("Wx_plus_b"):
                    output = tf.matmul(x_input, weight) + bias
                    tf.summary.histogram("pre-activations", output)

            with tf.name_scope("svm"):
                regularization = tf.reduce_mean(tf.square(weight))
                hinge_loss = tf.reduce_mean(
                    tf.square(
                        tf.maximum(
                            tf.zeros([self.batch_size, self.num_classes]),
                            1 - tf.cast(y_onehot, tf.float32) * output,
                        )
                    )
                )
                with tf.name_scope("loss"):
                    loss = regularization + self.svm_c * hinge_loss
            tf.summary.scalar("loss", loss)

            optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(
                loss
            )

            with tf.name_scope("accuracy"):
                predicted_class = tf.sign(output)
                predicted_class = tf.identity(predicted_class, name="prediction")
                with tf.name_scope("correct_prediction"):
                    correct = tf.equal(
                        tf.argmax(predicted_class, 1), tf.argmax(y_onehot, 1)
                    )
                with tf.name_scope("accuracy"):
                    accuracy = tf.reduce_mean(tf.cast(correct, "float"))
            tf.summary.scalar("accuracy", accuracy)

            merged = tf.summary.merge_all()

            self.x_input = x_input
            self.y_input = y_input
            self.y_onehot = y_onehot
            self.learning_rate = learning_rate
            self.loss = loss
            self.optimizer = optimizer
            self.output = output
            self.predicted_class = predicted_class
            self.accuracy = accuracy
            self.merged = merged

        sys.stdout.write("\n<log> Building graph...")
        __graph__()
        sys.stdout.write("</log>\n")

    def train(
        self,
        epochs,
        log_path,
        train_data,
        train_size,
        validation_data,
        validation_size,
        result_path,
    ):
        """Trains the SVM model
        Parameter
        ---------
        epochs : int
          The number of passes through the entire dataset.
        log_path : str
          The directory where to save the TensorBoard logs.
        train_data : numpy.ndarray
          The numpy.ndarray to be used as the training dataset.
        train_size : int
          The number of data in `train_data`.
        validation_data : numpy.ndarray
          The numpy.ndarray to be used as the validation dataset.
        validation_size : int
          The number of data in `validation_data`.
        result_path : str
          The path where to save the NPY files consisting of the actual and predicted labels.
        """

        # initialize the variables
        init_op = tf.group(
            tf.global_variables_initializer(), tf.local_variables_initializer()
        )

        # get the current time and date
        timestamp = str(time.asctime())

        # event files to contain the TensorBoard log
        train_writer = tf.summary.FileWriter(
            log_path + timestamp + "-training", graph=tf.get_default_graph()
        )
        test_writer = tf.summary.FileWriter(
            os.path.join(log_path, timestamp + "-testing"), graph=tf.get_default_graph()
        )
        with tf.Session() as sess:
            sess.run(init_op)

            try:
                for step in range(epochs * train_size // self.batch_size):
                    offset = (step * self.batch_size) % train_size
                    batch_train_data = train_data[0][
                        offset : (offset + self.batch_size)
                    ]
                    batch_train_labels = train_data[1][
                        offset : (offset + self.batch_size)
                    ]

                    feed_dict = {
                        self.x_input: batch_train_data,
                        self.y_input: batch_train_labels,
                        self.learning_rate: self.alpha,
                    }

                    summary, _, step_loss = sess.run(
                        [self.merged, self.optimizer, self.loss], feed_dict=feed_dict
                    )

                    if step % 100 == 0:
                        train_accuracy = sess.run(self.accuracy, feed_dict=feed_dict)
                        print(
                            "step[{}] train -- loss : {}, accuracy : {}".format(
                                step, step_loss, train_accuracy
                            )
                        )
                        train_writer.add_summary(summary=summary, global_step=step)
            except KeyboardInterrupt:
                print("Training interrupted at step {}".format(step))
                os._exit(1)
            finally:
                print("EOF -- training done at step {}".format(step))

                for step in range(epochs * validation_size // self.batch_size):

                    feed_dict = {
                        self.x_input: validation_data[0][: self.batch_size],
                        self.y_input: validation_data[1][: self.batch_size],
                    }

                    (
                        validation_summary,
                        predictions,
                        actual,
                        validation_loss,
                        validation_accuracy,
                    ) = sess.run(
                        [
                            self.merged,
                            self.predicted_class,
                            self.y_onehot,
                            self.loss,
                            self.accuracy,
                        ],
                        feed_dict=feed_dict,
                    )

                    if step % 100 == 0 and step > 0:
                        print(
                            "step [{}] validation -- loss : {}, accuracy : {}".format(
                                step, validation_loss, validation_accuracy
                            )
                        )
                        test_writer.add_summary(
                            summary=validation_summary, global_step=step
                        )

                    self.save_labels(
                        predictions=predictions,
                        actual=actual,
                        result_path=result_path,
                        step=step,
                        phase="testing",
                    )

                print("EOF -- testing done at step {}".format(step))

    @staticmethod
    def variable_summaries(var):
        with tf.name_scope("summaries"):
            mean = tf.reduce_mean(var)
            tf.summary.scalar("mean", mean)
            with tf.name_scope("stddev"):
                stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean)))
            tf.summary.scalar("stddev", stddev)
            tf.summary.scalar("max", tf.reduce_max(var))
            tf.summary.scalar("min", tf.reduce_min(var))
            tf.summary.histogram("histogram", var)

    @staticmethod
    def save_labels(predictions, actual, result_path, step, phase):
        """Saves the actual and predicted labels to a NPY file
        Parameter
        ---------
        predictions : numpy.ndarray
          The NumPy array containing the predicted labels.
        actual : numpy.ndarray
          The NumPy array containing the actual labels.
        result_path : str
          The path where to save the concatenated actual and predicted labels.
        step : int
          The time step for the NumPy arrays.
        phase : str
          The phase for which the predictions is, i.e. training/validation/testing.
        """

        if not os.path.exists(path=result_path):
            os.mkdir(result_path)

        # Concatenate the predicted and actual labels
        labels = np.concatenate((predictions, actual), axis=1)

        # save the labels array to NPY file
        np.save(
            file=os.path.join(result_path, "{}-svm-{}.npy".format(phase, step)),
            arr=labels,
        )

In [6]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
from sklearn.datasets import load_digits
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
import tensorflow as tf


BATCH_SIZE = 32
EPOCHS = 500
NUM_CLASSES = 2


class SVM(tf.keras.Model):
    def __init__(self, **kwargs):
        super(SVM, self).__init__()
        self.weights_var = tf.Variable(
            tf.random.normal(
                stddev=1e-2, shape=[kwargs["num_features"], kwargs["num_classes"]]
            )
        )
        self.biases_var = tf.Variable(
            tf.random.normal(stddev=1e-2, shape=[kwargs["num_classes"]])
        )

    def call(self, features):
        output = tf.matmul(features, self.weights_var) + self.biases_var
        return output


def loss_fn(model, features, labels):
    output = model(features)
    regularization = tf.reduce_mean(model.weights_var)
    squared_hinge_loss = tf.reduce_mean(
        tf.square(
            tf.maximum(
                tf.zeros([BATCH_SIZE, NUM_CLASSES]),
                (1.0 - tf.cast(labels, tf.float32) * output),
            )
        )
    )
    loss = regularization + 5e-1 * squared_hinge_loss
    return loss


def train_step(model, opt, loss, features, labels):
    with tf.GradientTape() as tape:
        train_loss = loss(model, features, labels)
    gradients = tape.gradient(train_loss, [model.weights_var, model.biases_var])
    opt.apply_gradients(zip(gradients, [model.weights_var, model.biases_var]))
    return train_loss


def train(model, opt, loss, dataset, epochs=EPOCHS):
    for epoch in range(epochs):
        epoch_loss = []
        epoch_accuracy = []
        for batch_features, batch_labels in dataset:
            train_loss = train_step(model, opt, loss, batch_features, batch_labels)
            predictions = model(batch_features)
            predictions = tf.sign(predictions)
            predictions = predictions.numpy().reshape(-1, NUM_CLASSES)
            accuracy = tf.metrics.Accuracy()
            accuracy(tf.argmax(predictions, 1), tf.argmax(batch_labels, 1))
            epoch_loss.append(train_loss)
            epoch_accuracy.append(accuracy.result().numpy())
        epoch_loss = tf.reduce_mean(epoch_loss)
        epoch_accuracy = tf.reduce_mean(epoch_accuracy)
        if epoch != 0 and (epoch + 1) % 100 == 0:
            print(
                "epoch {}/{} : mean loss = {}, mean accuracy = {}".format(
                    epoch + 1, epochs, epoch_loss, epoch_accuracy
                )
            )


features, labels = load_breast_cancer().data, load_breast_cancer().target
x_train, x_test, y_train, y_test = train_test_split(
    features, labels, test_size=0.10, stratify=labels
)

x_train = x_train.astype(np.float32) / 255.0
x_test = x_test.astype(np.float32) / 255.0

y_train = tf.keras.utils.to_categorical(y_train)
y_test = tf.keras.utils.to_categorical(y_test)

y_train[y_train == 0] = -1
y_test[y_test == 0] = -1

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.prefetch(BATCH_SIZE * 4)
train_dataset = train_dataset.shuffle(BATCH_SIZE * 4)
train_dataset = train_dataset.batch(BATCH_SIZE, True)

model = SVM(num_features=30, num_classes=NUM_CLASSES)
# optimizer = tf.optimizers.Adam(learning_rate=1e-1, decay=1e-6)
optimizer = tf.optimizers.SGD(
    learning_rate=1e-1, momentum=9e-1, decay=1e-6, nesterov=True
)
model(x_train[:10])
print(model.summary())
train(model, optimizer, loss_fn, train_dataset)
model.trainable = False
print(model.summary())
predictions = model(x_test)
predictions = tf.sign(predictions)
predictions = predictions.numpy().reshape(-1, NUM_CLASSES)
accuracy = tf.metrics.Accuracy()
test_accuracy = accuracy(tf.argmax(predictions, 1), tf.argmax(y_test, 1))
print("test accuracy : {}".format(test_accuracy.numpy()))

Model: "svm_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
Total params: 62
Trainable params: 62
Non-trainable params: 0
_________________________________________________________________
None
epoch 100/500 : mean loss = -23.13077163696289, mean accuracy = 0.880859375
epoch 200/500 : mean loss = -46.044715881347656, mean accuracy = 0.849609375
epoch 300/500 : mean loss = -68.56888580322266, mean accuracy = 0.8359375
epoch 400/500 : mean loss = -90.74737548828125, mean accuracy = 0.830078125
epoch 500/500 : mean loss = -112.84288024902344, mean accuracy = 0.84375
Model: "svm_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
Total params: 62
Trainable params: 0
Non-trainable params: 62
_________________________________________________________________
None
test accuracy : 0.9649122953414917


In [7]:
x_train.shape

(512, 30)

In [8]:
x_test.shape

(57, 30)