In [3]:
import tensorflow as tf
from functools import reduce
from numpy import unique, array, vectorize
from sklearn.metrics import accuracy_score, f1_score

class CNNClassifier:

    def __init__(self, train_data=None):
        data, labels = train_data

        labels = self._transform_labels(labels)
        data = self._transform_input(data)
        
        self.train_data = (data, labels)

        self.assemble_graph()

        self._open_session()

        if train_data:
            self.train()     

    def assemble_graph(self, learning_rate = 0.02):
        data, labels = self.train_data
        
        self.input_data = tf.placeholder(tf.float32, shape=[None, 28, 28, 1], name="input_data")
        self.output_labels = tf.placeholder(tf.float32, shape=[None, 1], name="output_labels")
        
        conv1 = tf.layers.conv2d(
            inputs=self.input_data,
            filters=3,
            kernel_size=[5, 5],
            padding="same",
            activation=tf.nn.relu)

        pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)

        conv2 = tf.layers.conv2d(
            inputs=pool1,
            filters=3,
            kernel_size=[5, 5],
            padding="same",
            activation=tf.nn.relu)
        
        pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)

        pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 3])
        
        dense = tf.layers.dense(inputs=pool2_flat, units=4, activation=tf.nn.relu)
        
        logits = tf.layers.dense(inputs=dense, units=1)

        self.output = tf.nn.sigmoid(logits, name="softmax_tensor")
        
        self.loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=self.output_labels, logits=logits))
        optimizer = tf.train.GradientDescentOptimizer(learning_rate)
        self.training = optimizer.minimize(self.loss)
        

    def train(self, epochs=10, minibatch_size=256):
        minibatches = self._create_minibatches(minibatch_size)
        sess = self.sess
        sess.run(tf.global_variables_initializer())
        
        for e in range(epochs):
            for data, labels in minibatches:
                sess.run(self.training, {self.input_data: data, self.output_labels: labels})

            loss_val = sess.run(self.loss, {self.input_data: minibatches[0][0], self.output_labels: minibatches[0][1]})
            print("Epoch %d, loss: %.2f" % (e, loss_val))

    def predict(self, data):
        data = self._transform_input(data)
        
        predict = self.sess.run(self.output, {self.input_data: data})
        
        for i in range(len(predict)):
            if predict[i] > 0.5:
                predict[i] = 1
            else:
                predict[i] = 0
                
        return predict

    def _create_minibatches(self, minibatch_size):
        pos = 0

        data, labels = self.train_data
        n_samples = len(labels)

        batches = []
        while pos + minibatch_size < n_samples:
            batches.append((data[pos:pos+minibatch_size,:], labels[pos:pos+minibatch_size]))
            pos += minibatch_size

        if pos < n_samples:
            batches.append((data[pos:n_samples,:], labels[pos:n_samples,:]))

        return batches

    def _transform_labels(self, labels):
        labels = labels.reshape((-1, 1))
        return labels
        

    def _transform_input(self, data):
        data = data.reshape((-1, 28, 28, 1))
        return data

    def _open_session(self):
        self.sess = tf.Session()





if __name__ == "__main__":



    def mnist_to_binary(train_data, train_label, test_data, test_label):

        binarized_labels = []
        for labels in [train_label, test_label]:
            remainder_2 = vectorize(lambda x: x%2)
            binarized_labels.append(remainder_2(labels))

        train_label, test_label = binarized_labels

        return train_data, train_label, test_data, test_label




    ((train_data, train_labels),
        (eval_data, eval_labels)) = tf.keras.datasets.mnist.load_data()

    train_data, train_labels, test_data, test_labels = mnist_to_binary(train_data, train_labels, eval_data, eval_labels)

    cnn = CNNClassifier((train_data, train_labels))
    print("Testing score f1: {}".format(f1_score(test_labels, cnn.predict(test_data))))



Epoch 0, loss: 0.65
Epoch 1, loss: 0.63
Epoch 2, loss: 0.57
Epoch 3, loss: 0.55
Epoch 4, loss: 0.49
Epoch 5, loss: 0.46
Epoch 6, loss: 0.43
Epoch 7, loss: 0.39
Epoch 8, loss: 0.36
Epoch 9, loss: 0.35
Testing score f1: 0.8218875502008032
