In [1]:
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
@author: Xiao Jin
In this part we write the hw5
"""
import numpy as np
import tensorflow as tf
from plot import visual, plot

# 32x32x3
(training_data, training_label), (testing_data, testing_label) = tf.keras.datasets.cifar10.load_data()
N_train = training_data.shape[0]
N_test = testing_data.shape[0]
step_size = 0.001
training_epochs = 40
batch_size = 500

In [2]:
train_dataset = (
    tf.data.Dataset.from_tensor_slices((training_data, training_label)).batch(
        batch_size).shuffle(buffer_size=N_train, seed=0)
)


train_dataset = (train_dataset.map(lambda x, y:
                                   (tf.divide(tf.cast(x, tf.float32), 255.0),
                                    tf.reshape(tf.one_hot(y, 10), (-1, 10))))
)

test_dataset = (
    tf.data.Dataset.from_tensor_slices((testing_data, testing_label)).batch(N_test)
)

test_dataset = (
    test_dataset.map(lambda x, y:
                      (tf.divide(tf.cast(x, tf.float32), 255.0),
                       tf.reshape(tf.one_hot(y, 10), (-1, 10))))
)


In [3]:
class neural_netowrk(tf.keras.Model):
    def __init__(self, seed=1):
        super(neural_netowrk, self).__init__()
        # use random seed to make the initialization repeat
        tf.random.set_seed(seed)
        # define convolutional layers
        self.c1 = tf.keras.layers.Conv2D(32, kernel_size=5, activation = 'relu', name='c1')
        self.m1 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=2, name='m1')
        self.c2 = tf.keras.layers.Conv2D(32, kernel_size=5, activation = 'relu', name='c2')
        self.m2 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=2, name='m2')
        self.c3 = tf.keras.layers.Conv2D(64, kernel_size=3, activation='relu', name='c3')
        self.fc1 = tf.keras.layers.Dense(10)

    def forward(self, input):
        '''
        here we define the forward function
        :param input: the input data
        :return: output tensor
        '''
        # For each layer, a bias will also be initialized and add to the output after matrix multiply.
        x = self.c1(input)
        x = self.m1(x)
        x = self.c2(x)
        x = self.m2(x)
        x = self.c3(x)
        x = tf.keras.layers.Flatten()(x)
        output = self.fc1(x)
        return output

In [4]:
class input_block(tf.keras.Model):
    def __init__(self, seed=1):
        super(input_block, self).__init__()
        # use random seed to make the initialization repeat
        tf.random.set_seed(seed)
        # define convolutional layers
        self.c1 = tf.keras.layers.Conv2D(64, kernel_size=7, strides=2, padding='VALID', activation = None, name='c1')
        self.bn1 = tf.keras.layers.BatchNormalization()
        self.relu = tf.keras.activations.relu
        self.maxpool = tf.keras.layers.MaxPool2D(3, strides=2, padding='VALID')

    def forward(self, input):
        # batch_size x frame x 224 x 224 x 3
        x = self.c1(input)
        # batch_size x frame x 112 x 112 x 64
        x = self.bn1(x)
        x = self.relu(x)
        output = self.maxpool(x)
        # batch_size x frame x 56 x 56 x 64
        return output

In [5]:
class basic_block(tf.keras.Model):
    def __init__(self, planes, stride=1, seed=1):
        super(basic_block, self).__init__()
        # use random seed to make the initialization repeat
        tf.random.set_seed(seed)
        # define convolutional layers
        self.conv1 = tf.keras.layers.Conv2D(planes, kernel_size=3, strides=stride, padding='SAME', activation = None)
        self.bn1 = tf.keras.layers.BatchNormalization()
        self.conv2 = tf.keras.layers.Conv2D(planes, kernel_size=3, padding='SAME', activation=None)
        self.bn2 = tf.keras.layers.BatchNormalization()
        self.relu = tf.keras.activations.relu
        self.stride = stride
        if self.stride != 1:
            self.short_cut = tf.keras.layers.Conv2D(planes, kernel_size=1, strides=stride, padding='SAME',
                                                    activation = None)
            self.bn_short = tf.keras.layers.BatchNormalization()

    def forward(self, input):
        # batch_size x frame x H x W x planes
        x_1 = self.conv1(input)
        x_1 = self.bn1(x_1)
        x_1 = self.relu(x_1)
        x_1 = self.conv2(x_1)
        x_1 = self.bn2(x_1)
        if self.stride != 1:
            # shot cut
            x_2 = self.short_cut(input)
            x_2 = self.bn_short(x_2)
            # batch_size x frame x H/2 x W/2 x planes
            output = self.relu(x_1 + x_2)
        else:
            output = self.relu(x_1 + input)
            # batch_size x frame x H x W x planes
        return output

In [6]:
class ResNet18(tf.keras.Model):
    def __init__(self):
        super(ResNet18, self).__init__()
        # use random seed to make the initialization repeat
        # CNN 1-2
        self.input_part = input_block()
        # CNN 3-6
        self.block_11 = basic_block(planes=64, seed=1)
        self.block_12 = basic_block(planes=64, seed=2)
        # CNN 6-10
        self.block_21 = basic_block(planes=128, stride=2, seed=1)
        self.block_22 = basic_block(planes=128, seed=2)
        # CNN 10-14
        self.block_31 = basic_block(planes=256, stride=2, seed=1)
        self.block_32 = basic_block(planes=256, seed=2)
        # CNN 14-18
        self.block_41 = basic_block(planes=512, stride=2, seed=1)
        self.block_42 = basic_block(planes=512, seed=2)
        # Avg pooling
        self.Avg = tf.keras.layers.AvgPool2D(7, padding='VALID')
        self.p1 = tf.keras.layers.Dense(10, name='p1')
        self.relu = tf.keras.activations.relu
        self.dropout = tf.keras.layers.Dropout(0.5)


    def forward(self, input):
        '''
        here we define the forward function
        :param input: the input data
        :return: output tensor
        '''
        # For each layer, a bias will also be initialized and add to the output after matrix multiply.
        # reshape
        # image interpolation
        x = tf.image.resize(input, [224, 224])
        # batch size x 224 x 224 x 3
        x = self.input_part.forward(x)
        # batch size x 56 x 56 x 64
        x = self.block_11.forward(x)
        x = self.block_12.forward(x)
        # batch size x 56 x 56 x 64
        x = self.block_21.forward(x)
        x = self.block_22.forward(x)
        # batch size x 28 x 28 x 128
        x = self.block_31.forward(x)
        x = self.block_32.forward(x)
        # batch size x 14 x 14 x 256
        x = self.block_41.forward(x)
        x = self.block_42.forward(x)
        # batch size x 7 x 7 x 512
        x = self.Avg(x)
        # batch size x 1 x 1 x 512
        x = tf.reshape(x, [-1, 512])
        output = self.p1(x)
        
        return output

In [7]:
def compute_accuracy(predict, labels):
    '''
    Compute accuracy
    :param predict: predict vector
    :param labels: ground truth
    :return: accuracy, accuracy for each digit
    '''
    # predict vector
    predict = tf.one_hot(tf.math.argmax(predict, 1), 10, dtype=tf.float32)
    # accuracy
    avg_accuracy = tf.reduce_mean(tf.reduce_sum(tf.math.multiply(labels, predict), axis=1))
    return avg_accuracy

In [8]:
def evaluate(model, data):
    test_x, test_y = zip(*data)
    test_x = test_x[0]
    test_y= test_y[0]
    output = model.forward(test_x)
    test_acc = compute_accuracy(output, test_y)
    avg_error = 1 - test_acc
    # classification error for each digit
    error_for_each_class = []
    predict = tf.one_hot(tf.math.argmax(output, 1), 10, dtype=tf.float32)
    for each_class in range(test_y.numpy().shape[1]):
        num_correct = tf.reduce_sum(tf.math.multiply(predict[:, each_class], test_y[:, each_class]))
        num_labels = tf.reduce_sum(test_y[:, each_class])
        error_for_each_class.append(1 - num_correct / num_labels)
    return avg_error.numpy(), error_for_each_class

In [9]:
def compute_loss(predict, labels):
    return tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels, predict))

In [10]:
def check(model, train_dataset, test_dataset):
    # we compute the training loss & accuracy / testing loss & accuracy
    '''
    :param model: the neural network
    :return: training loss; training accuracy; testing loss; testing accuracy
    '''
    loss = []
    acc = []
    for x, y in train_dataset:
        output = model.forward(x)
        loss.append(compute_loss(output, y))
        acc.append(compute_accuracy(output, y))
    train_loss = sum(loss) / len(loss)
    train_acc = sum(acc) / len(acc)
    test_x, test_y = zip(*test_dataset)
    test_x = test_x[0]
    test_y = test_y[0]
    output = model.forward(test_x)
    test_loss = compute_loss(output, test_y)
    test_acc = compute_accuracy(output, test_y)
    return train_loss.numpy(), train_acc.numpy(), test_loss.numpy(), test_acc.numpy()

In [11]:
def train(train_data, test_data):
    epoch = 0
    model = neural_netowrk()
    model_2 = ResNet18()
    optimizer = tf.keras.optimizers.Adam(learning_rate=step_size)
    training_loss = []
    training_acc = []
    testing_loss = []
    testing_acc = []
    while epoch < training_epochs:
        epoch += 1
        for x, y in train_data:
            with tf.GradientTape() as tape:
                # predict = model.forward(x)
                predict = model_2.forward(x)
                loss = compute_loss(predict, y)
            gradients = tape.gradient(loss, model_2.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model_2.trainable_variables))
        train_loss, train_acc, test_loss, test_acc = check(model_2, train_data, test_data)
        training_loss.append(train_loss)
        training_acc.append(train_acc)
        testing_loss.append(test_loss)
        testing_acc.append(test_acc)
        print('Epoch:', epoch, 'training loss:', training_loss[-1], 'training accuracy', training_acc[-1],
              'testing loss', testing_loss[-1], 'testing accuracy', testing_acc[-1])
    # visualize filters
    # visual(model.trainable_variables[0])
    # plot
    plot(training_loss, training_acc, testing_loss, testing_acc)
    # save model
    model_2.save_weights('model_2_weights.h5')

In [12]:
train(train_dataset, test_dataset)



KeyboardInterrupt: 