In [1]:
# coding: utf-8
import time
import pickle

import numpy as np
import tensorflow as tf


def load_pickled_data(pickled_file):
    """
    load picked data
    :param pickled_file:
    :return: train_data, train_labels, test_data,
             test_labels, valid_data, valid_labels
    """
    with open(pickled_file, 'rb') as f:
        save = pickle.load(f)
        _train_data = save['train_data']
        _train_labels = save['train_labels']
        _test_data = save['test_data']
        _test_labels = save['test_labels']
        _valid_data = save['valid_data']
        _valid_labels = save['valid_labels']
        del save
        print(_train_data.shape, _train_labels.shape)
        print(_test_data.shape, _test_labels.shape)
        print(_valid_data.shape, _valid_labels.shape)
    return _train_data, _train_labels, _test_data, _test_labels, _valid_data, _valid_labels


def accuracy_func(predicts, labels):
    """
    total accuracy, digit-wise
    :param predicts:
    :param labels:
    :return: float value, precesion
    """
    return 100.0 * np.sum(np.argmax(predicts, 2).T == labels) / predicts.shape[1] / predicts.shape[0]


def local_contrast_normalization(input_data, image_shape, threshold=1e-4, radius=7):
    """
    Local Contrast Normalization
    :param input_data: input data
    :param image_shape: image shape
    :param threshold: threshold
    :param radius: redius
    :return: local contrast normalized input data
    """
    # Gaussian filter
    filter_shape = radius, radius, image_shape[3], 1
    filters = gaussian_initializer(filter_shape)
    input_data = tf.convert_to_tensor(input_data, dtype=tf.float32)
    convout = tf.nn.conv2d(input_data, filters, [1, 1, 1, 1], 'SAME')
    centered_data = tf.sub(input_data, convout)
    denoms = tf.sqrt(tf.nn.conv2d(tf.square(centered_data), filters, [1, 1, 1, 1], 'SAME'))
    mean = tf.reduce_mean(denoms)
    divisor = tf.maximum(mean, denoms)
    # Divisise step
    new_data = tf.truediv(centered_data, tf.maximum(divisor, threshold))
    return new_data


def gaussian_initializer(kernel_shape):
    """
    initialize the kernel weights
    :param kernel_shape: kernel shape
    :return: tensor
    """
    x = np.zeros(kernel_shape, dtype=float)
    mid = np.floor(kernel_shape[0] / 2.)
    for kernel_idx in range(0, kernel_shape[2]):
        for i in range(0, kernel_shape[0]):
            for j in range(0, kernel_shape[1]):
                x[i, j, kernel_idx, 0] = gaussian(i - mid, j - mid)
    return tf.convert_to_tensor(x / np.sum(x), dtype=tf.float32)


def gaussian(x, y, sigma=3.0):
    """
    gaussian function
    :param x: x value
    :param y: y value
    :param sigma: sigma
    :return: guassian normalized value
    """
    z = 2 * np.pi * sigma ** 2
    return 1. / z * np.exp(-(x ** 2 + y ** 2) / (2. * sigma ** 2))


In [4]:
class MultiDigits(object):
    """
    Multi Digits Recognition Model
    """
    def __init__(self, picked_file=None, image_size=32, num_labels=11, num_channels=1,
                 batch_size=64, patch_size=5, depth_1=16, depth_2=32, depth_3=64,
                 hidden_num=64, num_hidden1=64
                 ):
        """
        :param picked_file:
        :param image_size:
        :param num_labels:
        :param num_channels:
        :param batch_size:
        :param patch_size:
        :param depth_1:
        :param depth_2:
        :param depth_3:
        :param hidden_num:
        :param num_hidden1:
        """
        if picked_file is not None:
            self.train_data, self.train_labels, self.test_data, \
                self.test_labels, self.valid_data, self.valid_labels = \
                load_pickled_data(picked_file)
        self.train_graph = tf.Graph()
        self.infer_graph = tf.Graph()
        self.image_size = image_size
        self.num_labels = num_labels
        self.num_channels = num_channels
        self.batch_size = batch_size
        self.patch_size = patch_size
        self.depth_1 = depth_1
        self.depth_2 = depth_2
        self.depth_3 = depth_3
        self.hidden_num = hidden_num
        self.num_hidden1 = num_hidden1
        self.shape = [batch_size, image_size, image_size, num_channels]
        self.saver = None
        self.valid_prediction, self.test_prediction = None, None
        self.tf_train_dataset = tf.placeholder(tf.float32, shape=self.shape)
        self.tf_train_labels = None
        self.tf_valid_dataset = None
        self.tf_test_dataset = None
        self.loss = None
        self.optimizer = None
        self.train_prediction = None
        self.save_path = None
        self.infer_saver = None
        self.is_inited = False
        self.conv_layer1_weights = None
        self.conv_layer1_biases = None
        self.conv_layer2_weights = None
        self.conv_layer2_biases = None
        self.conv_layer2_biases = None
        self.conv_layer3_weights = None
        self.conv_layer3_biases = None
        self.out_weights_1 = None
        self.out_biases_1 = None
        self.out_weights_2 = None
        self.out_weights_2 = None
        self.out_biases_2 = None
        self.out_weights_3 = None
        self.out_biases_3 = None
        self.out_weights_4 = None
        self.out_biases_4 = None
        self.out_weights_5 = None
        self.out_biases_5 = None

    def define_graph(self):
        with self.train_graph.as_default():
            # Input Data.
            self.tf_train_dataset = tf.placeholder(tf.float32, shape=self.shape)
            self.tf_train_labels = tf.placeholder(tf.int32, shape=(self.batch_size, 6))
            self.tf_valid_dataset = tf.constant(self.valid_data)
            self.tf_test_dataset = tf.constant(self.test_data)
            # init varibales
            # Conv Layers
            self.conv_layer1_weights = tf.get_variable('c_1_w', shape=[self.patch_size, self.patch_size,
                                                                       self.num_channels, self.depth_1],
                                                       initializer=tf.contrib.layers.xavier_initializer_conv2d())
            self.conv_layer1_biases = tf.Variable(tf.constant(1.0, shape=[self.depth_1]), name='c_1_b')
            self.conv_layer2_weights = tf.get_variable('c_2_w', shape=[self.patch_size, self.patch_size,
                                                                       self.depth_1, self.depth_2],
                                                       initializer=tf.contrib.layers.xavier_initializer_conv2d())
            self.conv_layer2_biases = tf.Variable(tf.constant(1.0, shape=[self.depth_2]), name='c_2_b')
            self.conv_layer3_weights = tf.get_variable('c_3_w', shape=[self.patch_size, self.patch_size,
                                                                       self.depth_2, self.num_hidden1],
                                                       initializer=tf.contrib.layers.xavier_initializer_conv2d())
            self.conv_layer3_biases = tf.Variable(tf.constant(1.0, shape=[self.num_hidden1]), name='c_3_b')
            # Output Layer
            self.out_weights_1 = tf.get_variable('o_1', shape=[self.hidden_num, self.num_labels],
                                                 initializer=tf.contrib.layers.xavier_initializer())
            self.out_biases_1 = tf.Variable(tf.constant(1.0, shape=[self.num_labels], name='o_b_1'))
            self.out_weights_2 = tf.get_variable('o_2', shape=[self.hidden_num, self.num_labels],
                                                 initializer=tf.contrib.layers.xavier_initializer())
            self.out_biases_2 = tf.Variable(tf.constant(1.0, shape=[self.num_labels], name='o_b_2'))
            self.out_weights_3 = tf.get_variable('o_3', shape=[self.hidden_num, self.num_labels],
                                                 initializer=tf.contrib.layers.xavier_initializer())
            self.out_biases_3 = tf.Variable(tf.constant(1.0, shape=[self.num_labels], name='o_b_3'))
            self.out_weights_4 = tf.get_variable('o_4', shape=[self.hidden_num, self.num_labels],
                                                 initializer=tf.contrib.layers.xavier_initializer())
            self.out_biases_4 = tf.Variable(tf.constant(1.0, shape=[self.num_labels], name='o_b_4'))
            self.out_weights_5 = tf.get_variable('o_5', shape=[self.hidden_num, self.num_labels],
                                                 initializer=tf.contrib.layers.xavier_initializer())
            self.out_biases_5 = tf.Variable(tf.constant(1.0, shape=[self.num_labels], name='o_b_5'))
            # Training computation.
            logits1, logits2, logits3, logits4, logits5 = self.__infer(self.tf_train_dataset, 0.95, self.shape)
            self.loss = \
                tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits1, self.tf_train_labels[:, 1])) + \
                tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits2, self.tf_train_labels[:, 2])) + \
                tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits3, self.tf_train_labels[:, 3])) + \
                tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits4, self.tf_train_labels[:, 4])) + \
                tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits5, self.tf_train_labels[:, 5]))
            # Optimizer.
            global_step = tf.Variable(0)
            learning_rate = tf.train.exponential_decay(0.05, global_step, 10000, 0.95)
            self.optimizer = tf.train.AdagradOptimizer(learning_rate).minimize(self.loss, global_step=global_step)
            # Predictions of the training, validation, and test data.
            self.train_prediction = tf.pack(list(map(tf.nn.softmax,
                                                     self.__infer(self.tf_train_dataset, 1.0, self.shape))))
            self.valid_prediction = tf.pack(list(map(tf.nn.softmax,
                                                     self.__infer(self.tf_valid_dataset, 1.0, self.shape))))
            self.test_prediction = tf.pack(list(map(tf.nn.softmax,
                                                    self.__infer(self.tf_test_dataset, 1.0, self.shape))))
            self.saver = tf.train.Saver()

    def train_model(self, save_path=None, save=True, epoch=100000):
        """
        训练模型，部署应用的时候不能调用
        :param save_path: ckpt数据保存路径
        :param save: 是否保存ckpt数据
        :param epoch: 训练迭代次数
        :return: None
        """
        epoch_index = []
        losses = []
        mini_batch_acc = []
        valid_batch_acc = []
        epochs = epoch
        start_time = time.time()
        with tf.Session(graph=self.train_graph) as sess:
            tf.global_variables_initializer().run()
            print('Initialized all variables')
            for e in range(epochs):
                offset = (e * self.batch_size) % (self.train_labels.shape[0] - self.batch_size)
                batch_data = self.train_data[offset:(offset + self.batch_size), :, :, :]
                batch_labels = self.train_labels[offset:(offset + self.batch_size), :]
                feed_dict = {self.tf_train_dataset: batch_data, self.tf_train_labels: batch_labels}
                _, l, predictions = sess.run([self.optimizer, self.loss, self.train_prediction], feed_dict=feed_dict)
                if e % 1000 == 0:
                    epoch_index.append(e)
                    mini_acc = accuracy_func(predictions, batch_labels[:, 1:6])
                    mini_batch_acc.append(mini_acc)
                    valid_acc = accuracy_func(self.valid_prediction.eval(), self.valid_labels[:, 1:6])
                    valid_batch_acc.append(valid_acc)
                    losses.append(l)
                    print('Minibatch loss at step %d: %f' % (e, l))
                    print('Minibatch accuracy: %.1f%%' % mini_acc)
                    print('Validation accuracy: %.1f%%' % valid_acc)
            print('Test accuracy: %.1f%%' % accuracy_func(self.test_prediction.eval(), self.test_labels[:, 1:6]))
            if save:
                self.save_path = self.saver.save(sess, save_path)
                print("Model saved in file: %s" % self.save_path)
            end_time = time.time()
            print('train time: %s' % (end_time - start_time))
        return epoch_index, losses, mini_batch_acc, valid_batch_acc

    def infer_data(self, input_data, ckpt_path):
        """
        infer input data
        :param input_data: input a instance
        :param ckpt_path: path to the ckpt file
        :return: return result
        """
        infer_graph = tf.Graph()
        with infer_graph.as_default():
            # Input Data.
            tf_infer_data = tf.placeholder(tf.float32, shape=(1, 32, 32, 1))

            # init varibales
            conv_layer1_weights = tf.get_variable('c_1_w', shape=[self.patch_size, self.patch_size,
                                                                  self.num_channels, self.depth_1],
                                                  initializer=tf.contrib.layers.xavier_initializer_conv2d())
            conv_layer1_biases = tf.Variable(tf.constant(1.0, shape=[self.depth_1]), name='c_1_b')
            conv_layer2_weights = tf.get_variable('c_2_w', shape=[self.patch_size, self.patch_size,
                                                                  self.depth_1, self.depth_2],
                                                  initializer=tf.contrib.layers.xavier_initializer_conv2d())
            conv_layer2_biases = tf.Variable(tf.constant(1.0, shape=[self.depth_2]), name='c_2_b')
            conv_layer3_weights = tf.get_variable('c_3_w', shape=[self.patch_size, self.patch_size,
                                                                  self.depth_2, self.num_hidden1],
                                                  initializer=tf.contrib.layers.xavier_initializer_conv2d())
            conv_layer3_biases = tf.Variable(tf.constant(1.0, shape=[self.num_hidden1]), name='c_3_b')
            # Output Layer
            out_weights_1 = tf.get_variable('o_1', shape=[self.hidden_num, self.num_labels],
                                            initializer=tf.contrib.layers.xavier_initializer())
            out_biases_1 = tf.Variable(tf.constant(1.0, shape=[self.num_labels], name='o_b_1'))
            out_weights_2 = tf.get_variable('o_2', shape=[self.hidden_num, self.num_labels],
                                            initializer=tf.contrib.layers.xavier_initializer())
            out_biases_2 = tf.Variable(tf.constant(1.0, shape=[self.num_labels], name='o_b_2'))
            out_weights_3 = tf.get_variable('o_3', shape=[self.hidden_num, self.num_labels],
                                            initializer=tf.contrib.layers.xavier_initializer())
            out_biases_3 = tf.Variable(tf.constant(1.0, shape=[self.num_labels], name='o_b_3'))
            out_weights_4 = tf.get_variable('o_4', shape=[self.hidden_num, self.num_labels],
                                            initializer=tf.contrib.layers.xavier_initializer())
            out_biases_4 = tf.Variable(tf.constant(1.0, shape=[self.num_labels], name='o_b_4'))
            out_weights_5 = tf.get_variable('o_5', shape=[self.hidden_num, self.num_labels],
                                            initializer=tf.contrib.layers.xavier_initializer())
            out_biases_5 = tf.Variable(tf.constant(1.0, shape=[self.num_labels], name='o_b_5'))

            def infer(data, keep_prob, d_shape):
                # conv layer
                lcn = local_contrast_normalization(data, d_shape)
                conv_1 = tf.nn.conv2d(lcn, conv_layer1_weights, [1, 1, 1, 1], 'VALID', name='c_1')
                conv_1 = tf.nn.relu(conv_1 + conv_layer1_biases)
                conv_1 = tf.nn.local_response_normalization(conv_1)
                pool_1 = tf.nn.max_pool(conv_1, [1, 2, 2, 1], [1, 2, 2, 1], 'SAME', name='p_1')
                conv_2 = tf.nn.conv2d(pool_1, conv_layer2_weights, [1, 1, 1, 1], padding='VALID', name='c_2')
                conv_2 = tf.nn.relu(conv_2 + conv_layer2_biases)
                conv_2 = tf.nn.local_response_normalization(conv_2)
                pool_2 = tf.nn.max_pool(conv_2, [1, 2, 2, 1], [1, 2, 2, 1], 'SAME', name='p_2_')
                conv_3 = tf.nn.conv2d(pool_2, conv_layer3_weights, [1, 1, 1, 1], padding='VALID', name='c_3')
                conv_3 = tf.nn.relu(conv_3 + conv_layer3_biases)
                conv_3 = tf.nn.dropout(conv_3, keep_prob)
                shapes = conv_3.get_shape().as_list()
                hidden = tf.reshape(conv_3, [shapes[0], shapes[1] * shapes[2] * shapes[3]])
                # fc layer
                logits_1 = tf.matmul(hidden, out_weights_1) + out_biases_1
                logits_2 = tf.matmul(hidden, out_weights_2) + out_biases_2
                logits_3 = tf.matmul(hidden, out_weights_3) + out_biases_3
                logits_4 = tf.matmul(hidden, out_weights_4) + out_biases_4
                logits_5 = tf.matmul(hidden, out_weights_5) + out_biases_5
                return logits_1, logits_2, logits_3, logits_4, logits_5

            # Predictions
            infer_predict = tf.pack(list(map(tf.nn.softmax, infer(tf_infer_data, 1.0, self.shape))))
            prediction = tf.transpose(tf.argmax(infer_predict, 2))
            self.infer_saver = tf.train.Saver()

        with tf.Session(graph=infer_graph) as session:
            self.infer_saver.restore(session, save_path=ckpt_path)
            input_prediction, infer_prediction = session.run([prediction, infer_predict],
                                                             feed_dict={tf_infer_data: input_data})
            return input_prediction

    def __infer(self, data, keep_prob, d_shape):
        # conv layer
        lcn = local_contrast_normalization(data, d_shape)
        conv_1 = tf.nn.conv2d(lcn, self.conv_layer1_weights, [1, 1, 1, 1], 'VALID', name='c_1')
        conv_1 = tf.nn.relu(conv_1 + self.conv_layer1_biases)
        lrn = tf.nn.local_response_normalization(conv_1)
        pool_1 = tf.nn.max_pool(lrn, [1, 2, 2, 1], [1, 2, 2, 1], 'SAME', name='p_1')
        conv_2 = tf.nn.conv2d(pool_1, self.conv_layer2_weights, [1, 1, 1, 1], padding='VALID', name='c_2')
        conv_2 = tf.nn.relu(conv_2 + self.conv_layer2_biases)
        lrn = tf.nn.local_response_normalization(conv_2)
        pool_2 = tf.nn.max_pool(lrn, [1, 2, 2, 1], [1, 2, 2, 1], 'SAME', name='p_2_')
        conv_3 = tf.nn.conv2d(pool_2, self.conv_layer3_weights, [1, 1, 1, 1], padding='VALID', name='c_3')
        conv_3 = tf.nn.relu(conv_3 + self.conv_layer3_biases)
        conv_3 = tf.nn.dropout(conv_3, keep_prob)
        shapes = conv_3.get_shape().as_list()
        hidden = tf.reshape(conv_3, [shapes[0], shapes[1] * shapes[2] * shapes[3]])
        # fc layer
        logits_1 = tf.matmul(hidden, self.out_weights_1) + self.out_biases_1
        logits_2 = tf.matmul(hidden, self.out_weights_2) + self.out_biases_2
        logits_3 = tf.matmul(hidden, self.out_weights_3) + self.out_biases_3
        logits_4 = tf.matmul(hidden, self.out_weights_4) + self.out_biases_4
        logits_5 = tf.matmul(hidden, self.out_weights_5) + self.out_biases_5
        return logits_1, logits_2, logits_3, logits_4, logits_5


In [5]:
train_model = MultiDigits('SVHN.pickle') 
train_model.define_graph()

((230070, 32, 32, 1), (230070, 6))
((13068, 32, 32, 1), (13068, 6))
((5684, 32, 32, 1), (5684, 6))


In [6]:
epoch_index, losses, mini_batch_acc, valid_batch_acc = \
    train_model.train_model(save_path='ckpt_data/SVHN.ckpt', save=True, epoch=150000)

Initialized all variables
Minibatch loss at step 0: 17.476204
Minibatch accuracy: 5.3%
Validation accuracy: 43.8%
Minibatch loss at step 1000: 3.540220
Minibatch accuracy: 79.4%
Validation accuracy: 77.3%
Minibatch loss at step 2000: 2.048233
Minibatch accuracy: 87.2%
Validation accuracy: 85.4%
Minibatch loss at step 3000: 1.777309
Minibatch accuracy: 91.2%
Validation accuracy: 87.2%
Minibatch loss at step 4000: 2.447232
Minibatch accuracy: 87.2%
Validation accuracy: 88.9%
Minibatch loss at step 5000: 1.365698
Minibatch accuracy: 90.9%
Validation accuracy: 89.1%
Minibatch loss at step 6000: 1.219130
Minibatch accuracy: 94.1%
Validation accuracy: 89.8%
Minibatch loss at step 7000: 2.277873
Minibatch accuracy: 87.5%
Validation accuracy: 90.0%
Minibatch loss at step 8000: 1.150327
Minibatch accuracy: 94.4%
Validation accuracy: 90.5%
Minibatch loss at step 9000: 1.522681
Minibatch accuracy: 94.4%
Validation accuracy: 90.9%
Minibatch loss at step 10000: 1.192972
Minibatch accuracy: 92.8%
Va

In [7]:
indexes = [6, 8, 12, 13, 115, 120, 121]
input_datas, input_labels = train_model.test_data[indexes, ...], train_model.test_labels[indexes, ...]

In [8]:
input_datas[0].shape, input_labels[0]

((32, 32, 1), array([ 3,  1,  8,  3, 10, 10]))

In [54]:
f = lambda a,i : int(''.join(map(str, a[i:a.index(10)])))

In [55]:
train_model.save_path = "ckpt_data/SVHN.ckpt"
predicts = []
print(('predict', 'real'))
for i, input_data in enumerate(input_datas):
    print(f(train_model.infer_data(input_data.reshape((1, 32, 32, 1)), ckpt_path="ckpt_data/SVHN.ckpt")[0].tolist(), 0), 
          f(input_labels[i].tolist(), 1))

('predict', 'real')
(183, 183)
(144, 144)
(19, 13)
(25, 25)
(20, 20)
(22, 22)
(9, 9)
