In [1]:
import pickle
import numpy
import random
import matplotlib.pyplot as plt
import platform
import cv2
import sys
import os
import tensorflow as tf

#data处理
class Corpus:
    
    def __init__(self):
        self.load_cifar10('./cifar-10-batches-py')
        self._split_train_valid(valid_rate=0.9)
        self.n_train = self.train_images.shape[0]
        self.n_valid = self.valid_images.shape[0]
        self.n_test = self.test_images.shape[0]
        
    def _split_train_valid(self, valid_rate=0.9):
        images, labels = self.train_images, self.train_labels 
        thresh = int(images.shape[0] * valid_rate)
        self.train_images, self.train_labels = images[0:thresh,:,:,:], labels[0:thresh]
        self.valid_images, self.valid_labels = images[thresh:,:,:,:], labels[thresh:]
    
    def load_cifar10(self, directory):
        # 读取训练集
        images, labels = [], []
        for filename in ['%s/data_batch_%d' % (directory, j) for j in range(1, 6)]:
            with open(filename, 'rb') as fo:
                if 'Windows' in platform.platform():
                    cifar10 = pickle.load(fo, encoding='bytes')
                elif 'Linux' in platform.platform():
                    cifar10 = pickle.load(fo)
            for i in range(len(cifar10[b"labels"])):
                image = numpy.reshape(cifar10[b"data"][i], (3, 32, 32))
                image = numpy.transpose(image, (1, 2, 0))
                image = image.astype(float)
                images.append(image)
            labels += cifar10[b"labels"]
        images = numpy.array(images, dtype='float')
        labels = numpy.array(labels, dtype='int')
        self.train_images, self.train_labels = images, labels
        # 读取测试集
        images, labels = [], []
        for filename in ['%s/test_batch' % (directory)]:
            with open(filename, 'rb') as fo:
                if 'Windows' in platform.platform():
                    cifar10 = pickle.load(fo, encoding='bytes')
                elif 'Linux' in platform.platform():
                    cifar10 = pickle.load(fo)
            for i in range(len(cifar10[b"labels"])):
                image = numpy.reshape(cifar10[b"data"][i], (3, 32, 32))
                image = numpy.transpose(image, (1, 2, 0))
                image = image.astype(float)
                images.append(image)
            labels += cifar10[b"labels"]
        images = numpy.array(images, dtype='float')
        labels = numpy.array(labels, dtype='int')
        self.test_images, self.test_labels = images, labels
        
    def data_augmentation(self, images, mode='train', flip=False, 
                          crop=False, crop_shape=(24,24,3), whiten=False, 
                          noise=False, noise_mean=0, noise_std=0.01):
        # 图像切割
        if crop:
            if mode == 'train':
                images = self._image_crop(images, shape=crop_shape)
            elif mode == 'test':
                images = self._image_crop_test(images, shape=crop_shape)
        # 图像翻转
        if flip:
            images = self._image_flip(images)
        # 图像白化
        if whiten:
            images = self._image_whitening(images)
        # 图像噪声
        if noise:
            images = self._image_noise(images, mean=noise_mean, std=noise_std)
            
        return images
    
    def _image_crop(self, images, shape):
        # 图像切割
        new_images = []
        for i in range(images.shape[0]):
            old_image = images[i,:,:,:]
            left = numpy.random.randint(old_image.shape[0] - shape[0] + 1)
            top = numpy.random.randint(old_image.shape[1] - shape[1] + 1)
            new_image = old_image[left: left+shape[0], top: top+shape[1], :]
            new_images.append(new_image)
        
        return numpy.array(new_images)
    
    def _image_crop_test(self, images, shape):
        # 图像切割
        new_images = []
        for i in range(images.shape[0]):
            old_image = images[i,:,:,:]
            left = int((old_image.shape[0] - shape[0]) / 2)
            top = int((old_image.shape[1] - shape[1]) / 2)
            new_image = old_image[left: left+shape[0], top: top+shape[1], :]
            new_images.append(new_image)
        
        return numpy.array(new_images)
    
    def _image_flip(self, images):
        # 图像翻转
        for i in range(images.shape[0]):
            old_image = images[i,:,:,:]
            if numpy.random.random() < 0.5:
                new_image = cv2.flip(old_image, 1)
            else:
                new_image = old_image
            images[i,:,:,:] = new_image
        
        return images
    
    def _image_whitening(self, images):
        # 图像白化
        for i in range(images.shape[0]):
            old_image = images[i,:,:,:]
            new_image = (old_image - numpy.mean(old_image)) / numpy.std(old_image)
            images[i,:,:,:] = new_image
        
        return images
    
    def _image_noise(self, images, mean=0, std=0.01):
        # 图像噪声
        for i in range(images.shape[0]):
            old_image = images[i,:,:,:]
            new_image = old_image
            for i in range(image.shape[0]):
                for j in range(image.shape[1]):
                    for k in range(image.shape[2]):
                        new_image[i, j, k] += random.gauss(mean, std)
            images[i,:,:,:] = new_image
        
        return images

  return f(*args, **kwds)


In [2]:
#卷积基础框架
class ConvLayer:
    
    def __init__(self, input_shape, n_size, n_filter, stride=1, activation='relu',
                 batch_normal=False, weight_decay=None, name='conv'):
        # params
        self.input_shape = input_shape
        self.n_filter = n_filter
        self.activation = activation
        self.stride = stride
        self.batch_normal = batch_normal
        self.weight_decay = weight_decay
        
        # 权重矩阵
        self.weight = tf.Variable(
            initial_value=tf.truncated_normal(
                shape=[n_size, n_size, self.input_shape[3], self.n_filter],
                mean=0.0, stddev=numpy.sqrt(
                    2.0 / (self.input_shape[1] * self.input_shape[2] * self.input_shape[3]))),
            name='W_%s' % (name))
        
        # weight decay技术
        if self.weight_decay:
            weight_decay = tf.multiply(tf.nn.l2_loss(self.weight), self.weight_decay)
            tf.add_to_collection('losses', weight_decay)
            
        # 偏置向量
        self.bias = tf.Variable(
            initial_value=tf.constant(
                0.0, shape=[self.n_filter]),
            name='b_%s' % (name))
        
        # batch normalization 技术的参数
        if self.batch_normal:
            self.epsilon = 1e-5
            self.gamma = tf.Variable(
                initial_value=tf.constant(
                    1.0, shape=[self.n_filter]),
            name='gamma_%s' % (name))
        
    def get_output(self, input):
        # calculate input_shape and output_shape
        self.output_shape = [self.input_shape[0], int(self.input_shape[1]/self.stride),
                             int(self.input_shape[2]/self.stride), self.n_filter]
        
        # hidden states
        self.conv = tf.nn.conv2d(
            input=input, filter=self.weight, 
            strides=[1, self.stride, self.stride, 1], padding='SAME')
        
        # batch normalization 技术
        if self.batch_normal:
            mean, variance = tf.nn.moments(self.conv, axes=[0, 1, 2], keep_dims=False)
            self.hidden = tf.nn.batch_normalization(
                self.conv, mean, variance, self.bias, self.gamma, self.epsilon)
        else:
            self.hidden = self.conv + self.bias
            
        # activation
        if self.activation == 'relu':
            self.output = tf.nn.relu(self.hidden)
        elif self.activation == 'tanh':
            self.output = tf.nn.tanh(self.hidden)
        elif self.activation == 'none':
            self.output = self.hidden
        
        return self.output

In [3]:
#堆叠层
class DenseLayer:
    
    def __init__(self, input_shape, hidden_dim, activation='relu', dropout=False, 
                 keep_prob=None, batch_normal=False, weight_decay=None, name='dense'):
        # params
        self.input_shape = input_shape
        self.hidden_dim = hidden_dim
        self.activation = activation
        self.dropout = dropout
        self.batch_normal = batch_normal
        self.weight_decay = weight_decay
        
        # 权重矩阵
        self.weight = tf.Variable(
            initial_value=tf.random_normal(
                shape=[self.input_shape[1], self.hidden_dim],
                mean=0.0, stddev=numpy.sqrt(2.0 / self.input_shape[1])),
            name='W_%s' % (name))
        
        # weight decay技术
        if weight_decay:
            weight_decay = tf.multiply(tf.nn.l2_loss(self.weight), self.weight_decay)
            tf.add_to_collection('losses', weight_decay)
            
        # 偏置向量
        self.bias = tf.Variable(
            initial_value=tf.constant(
                0.0, shape=[self.hidden_dim]),
            name='b_%s' % (name))
        
        # batch normalization 技术的参数
        if self.batch_normal:
            self.epsilon = 1e-5
            self.gamma = tf.Variable(
                initial_value=tf.constant(
                    1.0, shape=[self.hidden_dim]),
            name='gamma_%s' % (name))
        # dropout 技术
        if self.dropout:
            self.keep_prob = keep_prob
        
    def get_output(self, input):
        # calculate input_shape and output_shape
        self.output_shape = [self.input_shape[0], self.hidden_dim]
        # hidden states
        intermediate = tf.matmul(input, self.weight)
        
        # batch normalization 技术
        if self.batch_normal:
            mean, variance = tf.nn.moments(intermediate, axes=[0])
            self.hidden = tf.nn.batch_normalization(
                intermediate, mean, variance, self.bias, self.gamma, self.epsilon)
        else:
            self.hidden = intermediate + self.bias
            
        # dropout 技术
        if self.dropout:
            self.hidden = tf.nn.dropout(self.hidden, keep_prob=self.keep_prob)
            
        # activation
        if self.activation == 'relu':
            self.output = tf.nn.relu(self.hidden)
        elif self.activation == 'tanh':
            self.output = tf.nn.tanh(self.hidden)
        elif self.activation == 'softmax':
            self.output = tf.nn.softmax(self.hidden)
        elif self.activation == 'none':
            self.output = self.hidden
        
        return self.output

In [4]:
#残差网络
class ConvNet():
    
    def __init__(self, n_channel=3, n_classes=10, image_size=24, n_layers=20):
        # 设置超参数
        self.n_channel = n_channel
        self.n_classes = n_classes
        self.image_size = image_size
        self.n_layers = n_layers
        
        # 输入变量
        self.images = tf.placeholder(
            dtype=tf.float32, shape=[None, self.image_size, self.image_size, self.n_channel], 
            name='images')
        self.labels = tf.placeholder(
            dtype=tf.int64, shape=[None], name='labels')
        self.keep_prob = tf.placeholder(
            dtype=tf.float32, name='keep_prob')
        self.global_step = tf.Variable(
            0, dtype=tf.int32, name='global_step')
        
        # 网络输出
        self.logits = self.inference(self.images)
        
        # 目标函数
        self.objective = tf.reduce_sum(
            tf.nn.sparse_softmax_cross_entropy_with_logits(
                logits=self.logits, labels=self.labels))
        tf.add_to_collection('losses', self.objective)
        self.avg_loss = tf.add_n(tf.get_collection('losses'))
        # 优化器
        lr = tf.cond(tf.less(self.global_step, 50000), 
                     lambda: tf.constant(0.01),
                     lambda: tf.cond(tf.less(self.global_step, 100000),
                                     lambda: tf.constant(0.005),
                                     lambda: tf.cond(tf.less(self.global_step, 150000),
                                                     lambda: tf.constant(0.0025),
                                                     lambda: tf.constant(0.001))))
        self.optimizer = tf.train.AdamOptimizer(learning_rate=lr).minimize(
            self.avg_loss, global_step=self.global_step)
        
        # 观察值
        correct_prediction = tf.equal(self.labels, tf.argmax(self.logits, 1))
        self.accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float'))
        
    def inference(self, images):
        n_layers = int((self.n_layers - 2) / 6)
        # 网络结构
        conv_layer0_list = []
        conv_layer0_list.append(
            ConvLayer(
                input_shape=(None, self.image_size, self.image_size, self.n_channel), 
                n_size=3, n_filter=64, stride=1, activation='relu', 
                batch_normal=True, weight_decay=1e-4, name='conv0'))
        
        conv_layer1_list = []
        for i in range(1, n_layers+1):
            conv_layer1_list.append(
                ConvLayer(
                    input_shape=(None, self.image_size, self.image_size, 64), 
                    n_size=3, n_filter=64, stride=1, activation='relu', 
                    batch_normal=True, weight_decay=1e-4, name='conv1_%d' % (2*i-1)))
            conv_layer1_list.append(
                ConvLayer(
                    input_shape=(None, self.image_size, self.image_size, 64), 
                    n_size=3, n_filter=64, stride=1, activation='none', 
                    batch_normal=True, weight_decay=1e-4, name='conv1_%d' % (2*i)))
        
        conv_layer2_list = []
        conv_layer2_list.append(
            ConvLayer(
                input_shape=(None, self.image_size, self.image_size, 64), 
                n_size=3, n_filter=128, stride=2, activation='relu', 
                batch_normal=True, weight_decay=1e-4, name='conv2_1'))
        conv_layer2_list.append(
            ConvLayer(
                input_shape=(None, int(self.image_size)/2, int(self.image_size)/2, 128), 
                n_size=3, n_filter=128, stride=1, activation='none', 
                batch_normal=True, weight_decay=1e-4, name='conv2_2'))
        for i in range(2, n_layers+1):
            conv_layer2_list.append(
                ConvLayer(
                    input_shape=(None, int(self.image_size/2), int(self.image_size/2), 128), 
                    n_size=3, n_filter=128, stride=1, activation='relu', 
                    batch_normal=True, weight_decay=1e-4, name='conv2_%d' % (2*i-1)))
            conv_layer2_list.append(
                ConvLayer(
                    input_shape=(None, int(self.image_size/2), int(self.image_size/2), 128), 
                    n_size=3, n_filter=128, stride=1, activation='none', 
                    batch_normal=True, weight_decay=1e-4, name='conv2_%d' % (2*i)))
        
        conv_layer3_list = []
        conv_layer3_list.append(
            ConvLayer(
                input_shape=(None, int(self.image_size/2), int(self.image_size/2), 128), 
                n_size=3, n_filter=256, stride=2, activation='relu', 
                batch_normal=True, weight_decay=1e-4, name='conv3_1'))
        conv_layer3_list.append(
            ConvLayer(
                input_shape=(None, int(self.image_size/4), int(self.image_size/4), 256), 
                n_size=3, n_filter=256, stride=1, activation='relu', 
                batch_normal=True, weight_decay=1e-4, name='conv3_2'))
        for i in range(2, n_layers+1):
            conv_layer3_list.append(
                ConvLayer(
                    input_shape=(None, int(self.image_size/4), int(self.image_size/4), 256), 
                    n_size=3, n_filter=256, stride=1, activation='relu', 
                    batch_normal=True, weight_decay=1e-4, name='conv3_%d' % (2*i-1)))
            conv_layer3_list.append(
                ConvLayer(
                    input_shape=(None, int(self.image_size/4), int(self.image_size/4), 256), 
                    n_size=3, n_filter=256, stride=1, activation='none', 
                    batch_normal=True, weight_decay=1e-4, name='conv3_%d' % (2*i)))
        
        dense_layer1 = DenseLayer(
            input_shape=(None, 256),
            hidden_dim=self.n_classes,
            activation='none', dropout=False, keep_prob=None, 
            batch_normal=False, weight_decay=1e-4, name='dense1')
        
        # 数据流
        hidden_conv = conv_layer0_list[0].get_output(input=images)
        
        for i in range(0, n_layers):
            hidden_conv1 = conv_layer1_list[2*i].get_output(input=hidden_conv)
            hidden_conv2 = conv_layer1_list[2*i+1].get_output(input=hidden_conv1)
            hidden_conv = tf.nn.relu(hidden_conv + hidden_conv2)
            
        hidden_conv1 = conv_layer2_list[0].get_output(input=hidden_conv)
        hidden_conv2 = conv_layer2_list[1].get_output(input=hidden_conv1)
        hidden_pool = tf.nn.max_pool(
            hidden_conv, ksize=[1,2,2,1], strides=[1,2,2,1], padding='SAME')
        hidden_pad = tf.pad(hidden_pool, [[0,0], [0,0], [0,0], [32,32]])
        hidden_conv = tf.nn.relu(hidden_pad + hidden_conv2)
        for i in range(1, n_layers):
            hidden_conv1 = conv_layer2_list[2*i].get_output(input=hidden_conv)
            hidden_conv2 = conv_layer2_list[2*i+1].get_output(input=hidden_conv1)
            hidden_conv = tf.nn.relu(hidden_conv + hidden_conv2)
        
        hidden_conv1 = conv_layer3_list[0].get_output(input=hidden_conv)
        hidden_conv2 = conv_layer3_list[1].get_output(input=hidden_conv1)
        hidden_pool = tf.nn.max_pool(
            hidden_conv, ksize=[1,2,2,1], strides=[1,2,2,1], padding='SAME')
        hidden_pad = tf.pad(hidden_pool, [[0,0], [0,0], [0,0], [64,64]])
        hidden_conv = tf.nn.relu(hidden_pad + hidden_conv2)
        for i in range(1, n_layers):
            hidden_conv1 = conv_layer3_list[2*i].get_output(input=hidden_conv)
            hidden_conv2 = conv_layer3_list[2*i+1].get_output(input=hidden_conv1)
            hidden_conv = tf.nn.relu(hidden_conv + hidden_conv2)
            
        # global average pooling
        input_dense1 = tf.reduce_mean(hidden_conv, reduction_indices=[1, 2])
        logits = dense_layer1.get_output(input=input_dense1)
        
        return logits
        
    def train(self, dataloader, backup_path, n_epoch=5, batch_size=128):
        # 构建会话
        gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.2)
        self.sess = tf.Session(config=tf.ConfigProto(gpu_options=gpu_options))
        # 模型保存器
        self.saver = tf.train.Saver(
            var_list=tf.global_variables(), write_version=tf.train.SaverDef.V2, 
            max_to_keep=5)
        # 模型初始化
        self.sess.run(tf.global_variables_initializer())
        
        # 验证集数据增强
        valid_images = dataloader.data_augmentation(dataloader.valid_images, mode='test',
            flip=False, crop=True, crop_shape=(24,24,3), whiten=True, noise=False)
        valid_labels = dataloader.valid_labels
        # 模型训练
        for epoch in range(0, n_epoch+1):
            # 训练集数据增强
            train_images = dataloader.data_augmentation(dataloader.train_images, mode='train',
                flip=True, crop=True, crop_shape=(24,24,3), whiten=True, noise=False)
            train_labels = dataloader.train_labels
            
            # 开始本轮的训练，并计算目标函数值
            train_loss = 0.0
            for i in range(0, dataloader.n_train, batch_size):
                batch_images = train_images[i: i+batch_size]
                batch_labels = train_labels[i: i+batch_size]
                [_, avg_loss, iteration] = self.sess.run(
                    fetches=[self.optimizer, self.avg_loss, self.global_step], 
                    feed_dict={self.images: batch_images, 
                               self.labels: batch_labels, 
                               self.keep_prob: 0.5})
                
                train_loss += avg_loss * batch_images.shape[0]
            train_loss = 1.0 * train_loss / dataloader.n_train
            
            # 在训练之后，获得本轮的验证集损失值和准确率
            valid_accuracy, valid_loss = 0.0, 0.0
            for i in range(0, dataloader.n_valid, batch_size):
                batch_images = valid_images[i: i+batch_size]
                batch_labels = valid_labels[i: i+batch_size]
                [avg_accuracy, avg_loss] = self.sess.run(
                    fetches=[self.accuracy, self.avg_loss], 
                    feed_dict={self.images: batch_images, 
                               self.labels: batch_labels, 
                               self.keep_prob: 1.0})
                valid_accuracy += avg_accuracy * batch_images.shape[0]
                valid_loss += avg_loss * batch_images.shape[0]
            valid_accuracy = 1.0 * valid_accuracy / dataloader.n_valid
            valid_loss = 1.0 * valid_loss / dataloader.n_valid
            
            print('epoch{%d}, iter[%d], train loss: %.6f, '
                  'valid precision: %.6f, valid loss: %.6f' % (
                epoch, iteration, train_loss, valid_accuracy, valid_loss))
            sys.stdout.flush()
            
            # 保存模型
            if epoch <= 1000 and epoch % 100 == 0 or \
                epoch <= 10000 and epoch % 1000 == 0:
                saver_path = self.saver.save(
                    self.sess, os.path.join(backup_path, 'model_%d.ckpt' % (epoch)))
                
        self.sess.close()
                
    def test(self, dataloader, backup_path, epoch, batch_size=128):
        gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.25)
        self.sess = tf.Session(config=tf.ConfigProto(gpu_options=gpu_options))
        # 读取模型
        self.saver = tf.train.Saver(write_version=tf.train.SaverDef.V2)
        model_path = os.path.join(backup_path, 'model_%d.ckpt' % (epoch))
        assert(os.path.exists(model_path+'.index'))
        self.saver.restore(self.sess, model_path)
        print('read model from %s' % (model_path))
        # 在测试集上计算准确率
        accuracy_list = []
        test_images = dataloader.data_augmentation(dataloader.test_images,
            flip=False, crop=True, crop_shape=(24,24,3), whiten=True, noise=False)
        test_labels = dataloader.test_labels
        for i in range(0, dataloader.n_test, batch_size):
            batch_images = test_images[i: i+batch_size]
            batch_labels = test_labels[i: i+batch_size]
            [avg_accuracy] = self.sess.run(
                fetches=[self.accuracy], 
                feed_dict={self.images:batch_images, 
                           self.labels:batch_labels,
                           self.keep_prob:1.0})
            accuracy_list.append(avg_accuracy)
        print('test precision: %.4f' % (numpy.mean(accuracy_list)))
        self.sess.close()
            
    def debug(self):
        sess = tf.Session()
        sess.run(tf.global_variables_initializer())
        [temp] = sess.run(
            fetches=[self.logits],
            feed_dict={self.images: numpy.random.random(size=[128, 24, 24, 3]),
                       self.labels: numpy.random.randint(low=0, high=9, size=[128,]),
                       self.keep_prob: 1.0})
        print(temp.shape)