In [1]:
import gzip
import os
import tensorflow as tf
import numpy

In [2]:
data_path = '/home/hadoop/data/mnist'
def load_mnist():
    train_data = os.path.join(data_path,'train-images-idx3-ubyte.gz')
    train_label = os.path.join(data_path, 'train-labels-idx1-ubyte.gz')
    test_data = os.path.join(data_path, 't10k-images-idx3-ubyte.gz')
    test_label = os.path.join(data_path, 't10k-labels-idx1-ubyte.gz')
    def _read32(bytestream):
        return numpy.frombuffer(bytestream.read(4), 
                        dtype=numpy.dtype(numpy.uint32).newbyteorder('>'))[0]
    
    def _read_image(filename):
        with tf.gfile.Open(filename, 'rb') as f, gzip.GzipFile(fileobj=f) as bytestream:
            magic = _read32(bytestream)
            if magic != 2051:
              raise ValueError(
                  'Invalid magic number %d in MNIST image file: %s' %
                  (magic, filename))
            num_images = _read32(bytestream)
            rows = _read32(bytestream)
            cols = _read32(bytestream)
            buf = bytestream.read(rows * cols * num_images)
            data = numpy.frombuffer(buf, dtype=numpy.uint8)
            data = data.reshape(num_images, rows * cols)
            return data.astype(numpy.float32)
    
    def _read_label(filename):
        with tf.gfile.Open(filename, 'rb') as f, gzip.GzipFile(fileobj=f) as bytestream:
            magic = _read32(bytestream)
            if magic != 2049:
                ValueError(
                  'Invalid magic number %d in MNIST image file: %s' %
                  (magic, filename))
            num_labels = _read32(bytestream)
            buf = bytestream.read(num_labels)
            labels = numpy.frombuffer(buf, dtype=numpy.uint8)
            return labels.astype(numpy.int32)
    
    return _read_image(train_data), _read_label(train_label), _read_image(test_data), _read_label(test_label)

In [3]:
train_data, train_label, test_data, test_label = load_mnist()
assert(train_data.shape == (60000, 784))
assert(train_label.shape == (60000,))
assert(test_data.shape == (10000, 784))
assert(test_label.shape == (10000,))

# normalize image for knn
train_data_norm = numpy.expand_dims( numpy.sqrt(numpy.sum(train_data * train_data, axis=1)), axis=1)
train_data = numpy.divide(train_data, train_data_norm)

test_data_norm = numpy.expand_dims( numpy.sqrt(numpy.sum(test_data * test_data, axis=1)), axis=1)
test_data = numpy.divide(test_data, test_data_norm)

In [4]:
def knn_predict(_k):
    image = tf.placeholder(tf.float32, shape=(None, 784))
    y = tf.matmul(image, train_data, transpose_b=True)
    predicts = tf.nn.top_k(y, k=_k)
    return image, predicts
# knn with k=1 or k=3 or k=5 for experiment

In [5]:
session = tf.Session()
session.run(tf.initialize_all_variables())

In [6]:
image, pred = knn_predict(1)
# this may take few seconds
p_value, p_labels = session.run(pred, feed_dict={image:test_data[:10000]})

In [7]:
from collections import Counter
accurary = 0
correct = [Counter(a).most_common()[0][0] for a in train_label[p_labels]]
correct_count = sum(correct[i] == test_label[i] for i in range(len(test_label[:10000])))
accurary = correct_count * 1.0 / len(test_label[:10000])
print accurary

0.9723
