In [1]:
import os
import urllib.request as request
import zipfile

# dataset from  http://www1.cs.columbia.edu/CAVE/software/softlib/coil-100.php
# found on  http://deeplearning.net/datasets/

dataset_prfix = './data/'
dataset_name = 'coil-100'
dataset_path = os.path.join(dataset_prfix, dataset_name)

url = "http://www.cs.columbia.edu/CAVE/databases/SLAM_coil-20_coil-100/coil-100/coil-100.zip"
N_CLASSES = 100
N_CHANNELS = 1

def dataset_prepare():
    if os.path.exists(dataset_path):
        if os.path.isdir(dataset_path):
            return
        else:
            raise Exception("{} is not an directory".format(DIR))
    print(dataset_name, "is not exist, download & unzip")
    
    prev_perdeca = 0
    def reporthook(chunk_count, max_chunk_size, total_size):
        nonlocal prev_perdeca
        estimated_size = chunk_count * max_chunk_size
        perdeca = int(estimated_size / total_size * 10)
        if prev_perdeca != perdeca:
            prev_perdeca = perdeca
            print("download: ", perdeca * 10, "%")
        
    filename, _ = request.urlretrieve(url, reporthook=reporthook)
    print(dataset_name, "is downloaded to", filename)
    with zipfile.ZipFile(filename, 'r') as zip_ref:
        zip_ref.extractall(prefix)
    print(dataset_name, "is decompressed to", dataset_path)
    
dataset_prepare()

In [2]:
import tensorflow as tf
import re
import random

IMG_WIDTH = 32
IMG_HEIGHT = 32

def get_image_paths_and_labels():
    files = [f for f in os.listdir(dataset_path) if os.path.isfile(os.path.join(dataset_path, f))]
    classified = []
    pattern = re.compile(r"obj(?P<label>\d+)__\d+.png")
    for file in files:
        m = pattern.match(file)
        if m is None:
            continue
        
        label = int(m.group('label')) - 1
        path = os.path.join(dataset_path, file)
        classified.append((label, path))
    
    random.shuffle(classified)
    
    slice_at = int(len(classified) * 0.5)
    train_image_paths = [path for (_, path) in classified[:slice_at]]
    train_labels = [label for (label, _) in classified[:slice_at]]
    test_image_paths = [path for (_, path) in classified[slice_at:]]
    test_labels = [label for (label, _) in classified[slice_at:]]
    
    return train_image_paths, train_labels, test_image_paths, test_labels

def make_batch(paths, labels, batch_size):
    paths = tf.convert_to_tensor(paths, dtype=tf.string)
    labels = tf.convert_to_tensor(labels, dtype=tf.int32)
    
    image, label, path = tf.train.slice_input_producer([paths, labels, paths], shuffle=True)
    
    image = tf.read_file(image)
    image = tf.image.decode_png(image, channels=N_CHANNELS)
    image = tf.image.resize_images(image, [IMG_HEIGHT, IMG_WIDTH])
    image = image / 255 * 2 - 1.0
    
    X, Y, Z = tf.train.batch([image, label, path], 
                          batch_size=batch_size,
                          capacity=batch_size * 8,
                          num_threads=4)
    return X, Y, Z

  return f(*args, **kwds)


In [3]:
learning_rate = 0.001
num_steps = 1000
batch_size = 128
display_step = 100

dropout = 0.75

train_paths, train_labels, test_paths, test_labels = get_image_paths_and_labels()
X, Y, _ = make_batch(train_paths, train_labels, batch_size)

In [4]:
def conv_net(x, n_classes, dropout, reuse, is_training):
    with tf.variable_scope('ConvNet', reuse=reuse):
        conv1 = tf.layers.conv2d(x, 32, 5, activation=tf.nn.relu)
        conv1 = tf.layers.max_pooling2d(conv1, 2, 2)
        
        conv2 = tf.layers.conv2d(conv1, 64, 3, activation=tf.nn.relu)
        conv2 = tf.layers.max_pooling2d(conv2, 2, 2)
        
        fc1 = tf.contrib.layers.flatten(conv2)
        
        fc1 = tf.layers.dense(fc1, 1024)
        fc1 = tf.layers.dropout(fc1, rate=dropout, training=is_training)
        
        out = tf.layers.dense(fc1, n_classes)
        out = tf.nn.softmax(out) if not is_training else out
    return out

In [5]:
logits_train = conv_net(X, N_CLASSES, dropout, reuse=False, is_training=True)

loss_op = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(
    logits=logits_train, labels=Y))
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
train_op = optimizer.minimize(loss_op)

logits_test = conv_net(X, N_CLASSES, dropout, reuse=True, is_training=False)
correct_pred = tf.equal(tf.argmax(logits_test, 1), tf.cast(Y, tf.int64))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

init = tf.global_variables_initializer()

saver = tf.train.Saver()

with tf.Session() as sess:
    sess.run(init)
    try:
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess = sess, coord=coord)

        for step in range(1, num_steps+1):
            if step % display_step == 0:
                _, loss, acc = sess.run([train_op, loss_op, accuracy])
                print("Step {}, loss={:.4f}, accuracy={:.3f}".format(step, loss, acc))
            else:
                # Only run the optimization op (backprop)
                sess.run(train_op)

        coord.request_stop()
        coord.join(threads)
        
        print("Optimization Finished!")
        saver.save(sess, './model/coil-100')
    except Exception as e:
        print("Optimization Failed!")
        coord.request_stop(e)

Step 100, loss=0.6392, accuracy=0.914
Step 200, loss=0.1868, accuracy=0.977
Step 300, loss=0.0946, accuracy=0.984
Step 400, loss=0.0322, accuracy=1.000
Step 500, loss=0.0193, accuracy=1.000
Step 600, loss=0.0479, accuracy=1.000
Step 700, loss=0.0113, accuracy=1.000
Step 800, loss=0.0202, accuracy=1.000
Step 900, loss=0.0045, accuracy=1.000
Step 1000, loss=0.0216, accuracy=1.000
Optimization Finished!


In [13]:
Xt, Yt, Zt = make_batch(test_paths, test_labels, 16)
logits_test = conv_net(Xt, N_CLASSES, dropout, reuse=True, is_training=False)
correct_pred = tf.equal(tf.argmax(logits_test, 1), tf.cast(Yt, tf.int64))
test_accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
pred = tf.argmax(logits_test, 1)

with tf.Session() as sess:
    sess.run(init)
    saver.restore(sess, './model/coil-100')
    try:
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess = sess, coord=coord)

        real_path, pred_label, acc = sess.run([Zt, pred, test_accuracy])
        
        print("accuracy:", acc)
        for i in range(len(real_path)):
            print(real_path[i].decode("utf-8"), "is classified as", pred_label[i]+1)
        
        # 이미지를 출력해봐야하는데 귀찮다.
        coord.request_stop()
        coord.join(threads)
    except Exception as e:
        print("Prediction Failed!")
        coord.request_stop(e)

INFO:tensorflow:Restoring parameters from ./model/coil-100
accuracy: 1.0
./data/coil-100/obj40__115.png is classified as 40
./data/coil-100/obj59__325.png is classified as 59
./data/coil-100/obj50__295.png is classified as 50
./data/coil-100/obj51__120.png is classified as 51
./data/coil-100/obj51__230.png is classified as 51
./data/coil-100/obj84__110.png is classified as 84
./data/coil-100/obj86__85.png is classified as 86
./data/coil-100/obj58__325.png is classified as 58
./data/coil-100/obj11__225.png is classified as 11
./data/coil-100/obj37__325.png is classified as 37
./data/coil-100/obj36__255.png is classified as 36
./data/coil-100/obj73__135.png is classified as 73
./data/coil-100/obj19__160.png is classified as 19
./data/coil-100/obj64__265.png is classified as 64
./data/coil-100/obj48__50.png is classified as 48
./data/coil-100/obj70__115.png is classified as 70
