In [None]:
!pip install tensorflow==1.15

# Google Mount

In [None]:
from google.colab import drive

drive.mount("/content/drive")

# Import Library

In [None]:
import os
import random
import numpy as np
import tensorflow as tf
import _pickle as cPickle
import matplotlib.pyplot as plt

DATA_PATH = '/content/drive/MyDrive/4-1/컴퓨터비전/cifar-10-python/cifar-10-batches-py/'

random.seed(123)
tf.set_random_seed(123)
np.random.seed(123)

In [None]:
# pickle 파일 불러오기
def unpickle(file): 
    with open(os.path.join(DATA_PATH, file), 'rb') as fo:
        dict = cPickle.load(fo, encoding='latin-1')

    return dict


# one-hot 인코딩
def one_hot(labels, vals=10):
    n = len(labels)
    out = np.zeros((n, vals))
    out[range(n), labels] = 1

    return out


# cifar data 시각화
def display_cifar(images, size):
    n = len(images)
    plt.figure()
    plt.gca().set_axis_off()
    im = np.vstack([np.hstack([images[np.random.choice(n)] for i in range(size)]) for i in range(size)])
    #im = np.hstack([images[np.random.choice(n)] for i in range(size)])
    #im = np.vstack([images[np.random.choice(n)] for i in range(size)])
    plt.imshow(im)
    plt.show()
    

# cifar 데이터로더 클래스
class CifarLoader(object):
    def __init__(self, source_files):
        self._source = source_files
        self._i = 0
        self.images = None
        self.labels = None

    # 데이터 불러오기
    def load(self):
        data = [unpickle(f) for f in self._source]
        images = np.vstack([d['data'] for d in data])
        n = len(images)
        self.images = images.reshape(n, 3, 32, 32).transpose(0, 2, 3, 1).astype(float)/255
        self.labels = one_hot(np.hstack([d['labels'] for d in data]), 10)
        
        return self
    

    # 다음 배치
    def next_batch(self, batch_size):
        x = self.images[self._i: self._i+batch_size]
        y = self.labels[self._i: self._i+batch_size]
        self._i = (self._i + batch_size) % len(self.images)
        return x, y
    

# cifar 데이터 매니저 클래스
class CifarDataManager(object):
    def __init__(self):
        self.train = CifarLoader(['data_batch_{}'.format(i) for i in range(1, 6)]).load()
        self.test = CifarLoader(['test_batch']).load()

In [None]:
cifar = CifarDataManager()

print('number of train images : {}'.format(len(cifar.train.images)))
print('number of train labels : {}'.format(len(cifar.train.labels)))
print('number of test images : {}'.format(len(cifar.test.images)))
print('number of test labels : {}'.format(len(cifar.test.labels)))

#images = cifar.train.images
#display_cifar(images, 5)

#BaseLine

In [None]:
init_weight = tf.initializers.truncated_normal(stddev=0.1)
init_bias = tf.initializers.constant(0.1)

x = tf.placeholder(tf.float32, shape=[None, 32, 32, 3])
y = tf.placeholder(tf.float32, shape=[None, 10])
rate = tf.placeholder(tf.float32)

# 32
w1 = tf.Variable(init_weight(shape=[5,5,3,32], dtype=tf.float32))
b1 = tf.Variable(init_bias(shape=[32]), dtype=tf.float32)
conv1 = tf.nn.relu(tf.nn.conv2d(x, w1, strides=[1,1,1,1], padding='SAME')+b1)
conv1_pool = tf.nn.max_pool(conv1, ksize=[1,2,2,1], strides=[1,2,2,1], padding='SAME')

# 64
w2 = tf.Variable(init_weight(shape=[5,5,32,64], dtype=tf.float32))
b2 = tf.Variable(init_bias(shape=[64]), dtype=tf.float32)
conv2 = tf.nn.relu(tf.nn.conv2d(conv1_pool, w2, strides=[1,1,1,1], padding='SAME')+b2)
conv2_pool = tf.nn.max_pool(conv2, ksize=[1,2,2,1], strides=[1,2,2,1], padding='SAME')

# flat
conv2_flat = tf.reshape(conv2_pool, [-1, 8*8*64])

# fully-connected: 1024
w3 = tf.Variable(init_weight(shape=[8*8*64, 1024], dtype=tf.float32))
b3 = tf.Variable(init_bias(shape=[1024]), dtype=tf.float32)
full1 = tf.nn.relu(tf.matmul(conv2_flat, w3)+b3)
full1_drop = tf.nn.dropout(full1, rate=rate) # drop-rate

#fully-connected: 10
w4 = tf.Variable(init_weight(shape=[1024, 10], dtype=tf.float32))
b4 = tf.Variable(init_bias(shape=[10]), dtype=tf.float32)
full2 = tf.matmul(full1_drop, w4)+b4

logits = full2

loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(labels=y, logits=logits))
#train = tf.train.AdamOptimizer(0.001).minimize(loss)
train = tf.train.MomentumOptimizer(0.005, 0.9, use_nesterov=True).minimize(loss)


correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(logits, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

In [None]:
x_test = cifar.test.images.reshape(10, 1000, 32, 32, 3)
y_test = cifar.test.labels.reshape(10, 1000, 10)
acc_data = []
loss_data = []

def test(sess):
    t_acc = []
    t_los = []
    for i in range(10):
        acc, los = sess.run([accuracy, loss], feed_dict={x: x_test[i], y: y_test[i], rate: 0.0})
        t_acc.append(acc)
        t_los.append(los)

    acc = np.mean(t_acc)
    los = np.mean(t_los)
    acc_data.append(acc * 100)
    loss_data.append(los)  

    print('test accuracy: {:.4}%, test loss: {}'.format(acc*100, los))


TRAIN_SIZE = 50000  
BATCH_SIZE = 128
STEPS = TRAIN_SIZE // BATCH_SIZE
EPOCH = 50

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    saver = tf.train.Saver(max_to_keep=10)
    for i in range(EPOCH):
        for j in range(STEPS):
            batch = cifar.train.next_batch(BATCH_SIZE)
            _, loss_, acc = sess.run([train, loss, accuracy], feed_dict={x: batch[0], y: batch[1], rate: 0.5})

            if j%10 == 0:
                print('epoch: {}, steps: {}, train-loss: {}, train-accuracy: {}'.format(i+1, j+1, loss_, acc))
        # if (i+1) % 10 == 0:
        #     saver.save(sess, DATA_PATH + f"baseline/epoch{i+1}.ckpt")
        test(sess)

sess.close()

fig, ax = plt.subplots(2, constrained_layout=True)
ax[0].plot(np.arange(1, EPOCH + 1), acc_data)
ax[0].set_xlabel('Epoch')
ax[0].set_ylabel('Accuracy')
ax[1].set_xlabel('Epoch')
ax[1].set_ylabel('Loss')
ax[1].plot(np.arange(1, EPOCH + 1), loss_data)
plt.show()

# 최종 Model

In [None]:
init_weight = tf.initializers.truncated_normal(stddev=0.1)
init_bias = tf.initializers.constant(0.1)

x = tf.placeholder(tf.float32, shape=[None, 32, 32, 3])
y = tf.placeholder(tf.float32, shape=[None, 10])
rate = tf.placeholder(tf.float32)

# 1F
w1 = tf.Variable(init_weight(shape=[3,3,3,32], dtype=tf.float32))
b1 = tf.Variable(init_bias(shape=[32]), dtype=tf.float32)
conv1 = tf.nn.relu(tf.nn.conv2d(x, w1, strides=[1,1,1,1], padding='SAME')+b1)

# 2F
w2 = tf.Variable(init_weight(shape=[3,3,32,64], dtype=tf.float32))
b2 = tf.Variable(init_bias(shape=[64]), dtype=tf.float32)
conv2 = tf.nn.relu(tf.nn.conv2d(conv1, w2, strides=[1,1,1,1], padding='SAME')+b2)
conv2_pool = tf.nn.max_pool(conv2, ksize=[1,2,2,1], strides=[1,2,2,1], padding='SAME')

# 3F
w3 = tf.Variable(init_weight(shape=[3,3,64,128], dtype=tf.float32))
b3 = tf.Variable(init_bias(shape=[128]), dtype=tf.float32)
conv3 = tf.nn.relu(tf.nn.conv2d(conv2_pool, w3, strides=[1,1,1,1], padding='SAME')+b3)

# 4F
w4 = tf.Variable(init_weight(shape=[3,3,128,128], dtype=tf.float32))
b4 = tf.Variable(init_bias(shape=[128]), dtype=tf.float32)
conv4 = tf.nn.relu(tf.nn.conv2d(conv3, w4, strides=[1,1,1,1], padding='SAME')+b4)
conv4_pool = tf.nn.max_pool(conv4, ksize=[1,2,2,1], strides=[1,2,2,1], padding='SAME')

# 5F
w5 = tf.Variable(init_weight(shape=[3,3,128,256], dtype=tf.float32))
b5 = tf.Variable(init_bias(shape=[256]), dtype=tf.float32)
conv5 = tf.nn.relu(tf.nn.conv2d(conv4_pool, w5, strides=[1,1,1,1], padding='SAME')+b5)

# 6F
w6 = tf.Variable(init_weight(shape=[3,3, 256,256], dtype=tf.float32))
b6 = tf.Variable(init_bias(shape=[256]), dtype=tf.float32)
conv6 = tf.nn.relu(tf.nn.conv2d(conv5, w6, strides=[1,1,1,1], padding='SAME')+b6)
conv6_pool = tf.nn.max_pool(conv6, ksize=[1,2,2,1], strides=[1,2,2,1], padding='SAME')

# 7F flat
conv7_flat = tf.reshape(conv6_pool, [-1, 4*4*256])

# 8F fully-connected: 4*4*256
w8 = tf.Variable(init_weight(shape=[4*4*256, 1024], dtype=tf.float32))
b8 = tf.Variable(init_bias(shape=[1024]), dtype=tf.float32)
full8 = tf.nn.relu(tf.matmul(conv7_flat, w8)+b8)
full8_drop = tf.nn.dropout(full8, rate=rate) # drop-rate

# 9F
w9 = tf.Variable(init_weight(shape=[1024, 10], dtype=tf.float32))
b9 = tf.Variable(init_bias(shape=[10]), dtype=tf.float32)
full9 = tf.matmul(full8_drop, w9)+b9

logits = full9

loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(labels=y, logits=logits))
#train = tf.train.AdamOptimizer(0.001).minimize(loss)
train = tf.train.MomentumOptimizer(0.001, 0.9, use_nesterov=True).minimize(loss)


correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(logits, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

In [None]:
x_test = cifar.test.images.reshape(10, 1000, 32, 32, 3)
y_test = cifar.test.labels.reshape(10, 1000, 10)
acc_data = []
loss_data = []

loss_epoch=[]
accuracy_epoch=[]

def test(sess):
    t_acc = []
    t_los = []
    for i in range(10):
        acc, los = sess.run([accuracy, loss], feed_dict={x: x_test[i], y: y_test[i], rate: 0.0})
        t_acc.append(acc)
        t_los.append(los)

    acc = np.mean(t_acc)
    los = np.mean(t_los)
    acc_data.append(acc * 100)
    loss_data.append(los) 

    print('test accuracy: {:.4}%, test loss: {}'.format(acc*100, los))

TRAIN_SIZE = 50000  
BATCH_SIZE = 128
STEPS = TRAIN_SIZE // BATCH_SIZE
EPOCH = 150

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    saver = tf.train.Saver(max_to_keep=10)
    for i in range(EPOCH):
        loss_batch=[]
        accuracy_batch=[]

        for j in range(STEPS):
            batch = cifar.train.next_batch(BATCH_SIZE)
            _, loss_, acc = sess.run([train, loss, accuracy], feed_dict={x: batch[0], y: batch[1], rate: 0.5})

            if j%10 == 0:
                print('epoch: {}, steps: {}, train-loss: {}, train-accuracy: {}'.format(i+1, j+1, loss_, acc))

            loss_batch.append(loss_) 
            accuracy_batch.append(acc)

        mean_loss = np.mean(loss_batch)
        mean_accuracy = np.mean(accuracy_batch)    
        loss_epoch.append(mean_loss)
        accuracy_epoch.append(mean_accuracy * 100)

        if (i+1) % 10 == 0:
            saver.save(sess, DATA_PATH + f"mynet3/epoch{i+1}.ckpt")

        test(sess)

sess.close()

print('Best Train Acc : {:.2f}, Best Test Acc : {:.2f}'.format(max(accuracy_epoch), max(acc_data)))

fig, ax = plt.subplots(2, constrained_layout=True)

ax[0].set_xlabel('Epoch')
ax[0].set_ylabel('Accuracy')
ax[0].plot(np.arange(1, EPOCH + 1), accuracy_epoch, 'r')
ax[0].plot(np.arange(1, EPOCH + 1), acc_data, 'b')

ax[1].set_xlabel('Epoch')
ax[1].set_ylabel('Loss')
ax[1].plot(np.arange(1, EPOCH + 1), loss_epoch, 'r')
ax[1].plot(np.arange(1, EPOCH + 1), loss_data, 'b')
plt.show()