# Effect of image filters on prediction


Using a minimal dataset of 40 images in 2 classes (10 in each class) to predict the image using a pretrained network.  In this case I used VGGNet (https://arxiv.org/pdf/1409.1556.pdf) trained on the ImageNet dataset(http://www.image-net.org/) as the feature extractor.  The idea here is to keep all convolutional layers but replace the final fully connected layer with a custom classifier.

Below is a diagram of the VGGNet architecture.

Here we're using the vgg16 module from tensorflow_vgg. The network takes images of size 224×224×3


<img src=images/imagenet_vgg16.png width=700px>



Image manipulation involved using photoshop to manipulate the test images.  The test involved using original un-manipulated pictures, Sharpen, Invert, Gausian Blur and Edges.

In [2]:
# Put these at the top of every notebook, to get automatic reloading and inline plotting
%reload_ext autoreload
%autoreload 2
%matplotlib inline

In [None]:
import time
import math
import random

#import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import dataset
import cv2
import PIL
import scipy.ndimage as spi
import matplotlib.image as mpimg
from PIL import Image
from PIL import ImageFilter

from matplotlib import pyplot
from datetime import timedelta

In [None]:
from urllib.request import urlretrieve
from os.path import isfile, isdir
from tqdm import tqdm

vgg_dir = 'tensorflow_vgg/'
# Make sure vgg exists
if not isdir(vgg_dir):
    raise Exception("VGG directory doesn't exist!")

class DLProgress(tqdm):
    last_block = 0

    def hook(self, block_num=1, block_size=1, total_size=None):
        self.total = total_size
        self.update((block_num - self.last_block) * block_size)
        self.last_block = block_num

if not isfile(vgg_dir + "vgg16.npy"):
    with DLProgress(unit='B', unit_scale=True, miniters=1, desc='VGG16 Parameters') as pbar:
        urlretrieve(
            'https://s3.amazonaws.com/content.udacity-data.com/nd101/vgg16.npy',
            vgg_dir + 'vgg16.npy',
            pbar.hook)
else:
    print("Parameter file already exists!")

In [None]:
# Using different directories each containing identical sets of pictures.  Using 'invert' folder in this example
data_dir = 'Invert/'
contents = os.listdir(data_dir)
classes = [each for each in contents if os.path.isdir(data_dir + each)]

In [None]:
classes = ['Capsules', 'Tablets', 'Unknown']
num_classes = len(classes)
 
train_path='training_data'
 
# validation split
validation_size = 0.2
 
# batch size
batch_size = 16

# image size
img_size = 128

train_path = 'Invert/'
test_path = 'Invert/test'
checkpoint_dir = "models"

data = dataset.read_train_sets(train_path, img_size, classes, validation_size=validation_size)
test_images, test_ids = dataset.read_test_set(test_path, img_size)

In [None]:
# print size of training set, test set and validation set
print("Size of:")
print("- Training-set:\t\t{}".format(len(data.train.labels)))
print("- Test-set:\t\t{}".format(len(test_images)))
print("- Validation-set:\t{}".format(len(data.valid.labels)))

In [None]:
# Set the batch size higher if you can fit in in your GPU memory
batch_size = 10
codes_list = []
labels = []
batch = []

codes = None

with tf.Session() as sess:
    vgg = vgg16.Vgg16()
    input_ = tf.placeholder(tf.float32, [None, 224, 224, 3], name='input')
    with tf.name_scope("content_vgg"):
        vgg.build(input_)

    for each in classes:
        print("Starting {} images".format(each))
        class_path = data_dir + each
        files = os.listdir(class_path)
        for ii, file in enumerate(files, 1):
            # Add images to the current batch
            # utils.load_image crops the input images for us, from the center
            img = utils.load_image(os.path.join(class_path, file))
            batch.append(img.reshape((1, 224, 224, 3)))
            labels.append(each)
            
            # Running the batch through the network to get the codes
            if ii % batch_size == 0 or ii == len(files):
                images = np.concatenate(batch)

                feed_dict = {input_: images}
                codes_batch = sess.run(vgg.relu6, feed_dict=feed_dict)
                
                # Here I'm building an array of the codes
                if codes is None:
                    codes = codes_batch
                else:
                    codes = np.concatenate((codes, codes_batch))
                
                # Reset to start building the next batch
                batch = []
                print('{} images processed'.format(ii))
                print

In [None]:
# write codes to file
with open('codes', 'w') as f:
    codes.tofile(f)
    
# write labels to file
import csv
with open('labels', 'w') as f:
    writer = csv.writer(f, delimiter='\n')
    writer.writerow(labels)

In [None]:
# build the classifier
# read codes and labels from file
import csv

with open('labels') as f:
    reader = csv.reader(f, delimiter='\n')
    labels = np.array([each for each in reader if len(each) > 0]).squeeze()
with open('codes') as f:
    codes = np.fromfile(f, dtype=np.float32)
    codes = codes.reshape((len(labels), -1))
    
print (labels)
print (codes)

In [None]:
# use LabelBinarizer from sklearn to create one-hot encoded vectors from the labels.
from sklearn.preprocessing import LabelBinarizer
from sklearn.preprocessing import label_binarize

lb = LabelBinarizer()
lb.fit(labels)

labels_vecs = label_binarize(labels, classes=['Tablets', 'Capsules', 'unknown'])[:,0:3]
print(labels_vecs)

In [None]:
# randomize labels and data so that validation and test sets contain datat from all classes
from sklearn.model_selection import StratifiedShuffleSplit

ss = StratifiedShuffleSplit(n_splits=1, test_size=0.2)

train_idx, val_idx = next(ss.split(codes, labels))

half_val_len = int(len(val_idx)/2)
val_idx, test_idx = val_idx[:half_val_len], val_idx[half_val_len:]

train_x, train_y = codes[train_idx], labels_vecs[train_idx]
val_x, val_y = codes[val_idx], labels_vecs[val_idx]
test_x, test_y = codes[test_idx], labels_vecs[test_idx]

In [None]:
# print shapes of train, validation and test data
print("Train shapes (x, y):", train_x.shape, train_y.shape)
print("Validation shapes (x, y):", val_x.shape, val_y.shape)
print("Test shapes (x, y):", test_x.shape, test_y.shape)

In [None]:
inputs_ = tf.placeholder(tf.float32, shape=[None, codes.shape[1]], name ="inputs")
labels_ = tf.placeholder(tf.int64, shape=[None, labels_vecs.shape[1]], name = "labels")

W = tf.Variable(tf.zeros([4096,3]), name = "weights")

b = tf.Variable(tf.zeros([3]), name = "biases")

fs = tf.nn.softmax(tf.reshape(inputs_,[-1,4096]))

fc = tf.contrib.layers.fully_connected(fs, 4096)
    
logits = tf.contrib.layers.fully_connected(fc, labels_vecs.shape[1], activation_fn=None)
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(labels=labels_, logits=logits)
cost = tf.reduce_mean(cross_entropy)

optimizer = tf.train.AdamOptimizer().minimize(cost)

predicted = tf.nn.softmax(logits)
correct_pred = tf.equal(tf.argmax(predicted, 1), tf.argmax(labels_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

print (cost)
print (logits)
print (fc)
print (predicted)

In [None]:
# get batches
def get_batches(x, y, n_batches=10):
    """ Return a generator that yields batches from arrays x and y. """
    batch_size = len(x)//n_batches
    
    for ii in range(0, n_batches*batch_size, batch_size):
        # If we're not on the last batch, grab data with size batch_size
        if ii != (n_batches-1)*batch_size:
            X, Y = x[ii: ii+batch_size], y[ii: ii+batch_size] 
        # On the last batch, grab the rest of the data
        else:
            X, Y = x[ii:], y[ii:]
        yield X, Y

In [None]:
# train the network using the get_batches function
epochs = 10
iteration = 0.01
saver = tf.train.Saver()
with tf.Session() as sess:
    
    sess.run(tf.global_variables_initializer())
    for e in range(epochs):
        for x, y in get_batches(train_x, train_y):
            feed = {inputs_: x,
                    labels_: y}
            loss, _ = sess.run([cost, optimizer], feed_dict=feed)
            print("Epoch: {}/{}".format(e+1, epochs),
                  "Iteration: {}".format(iteration),
                  "Training loss: {:.5f}".format(loss))
            iteration += 1
            
            if iteration % 5 == 0:
                feed = {inputs_: val_x,
                        labels_: val_y}
                val_acc = sess.run(accuracy, feed_dict=feed)
                print("Epoch: {}/{}".format(e, epochs),
                      "Iteration: {}".format(iteration),
                      "Validation Acc: {:.4f}".format(val_acc))
    saver.save(sess, "/output/pill.ckpt")
    file_writer = tf.summary.FileWriter('./logs/tensorboard', sess.graph)

In [None]:
# test for accuracy
with tf.Session() as sess:
    saver.restore(sess, tf.train.latest_checkpoint('/output'))
    
    feed = {inputs_: test_x,
            labels_: test_y}
    test_acc = sess.run(accuracy, feed_dict=feed)
    print("Test accuracy: {:.4f}".format(test_acc))