## Vehicle counting with CNNs

---

(Always be aware of your imports and preserve namespaces!!!)

In [None]:
import os
import glob
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import scipy.ndimage as nd
from tensorflow.examples.tutorials.mnist import input_data

%matplotlib inline

plt.rcParams["image.cmap"] = "gist_gray"

### Loading the data

Our goal will be similar to the last lecture, to count cars on the road way.  We'll concentrate on these two cameras:

In [None]:
impath  = os.path.join("images", "dot_dl")
imlist0 = sorted(glob.glob(os.path.join(impath, "cctv528*.jpg")))
imlist1 = sorted(glob.glob(os.path.join(impath, "cctv679*.jpg")))
img0    = nd.imread(imlist0[0])
img1    = nd.imread(imlist1[0])
fig, ax = plt.subplots(1, 2, figsize=(10, 6))
[i.axis("off") for i in ax]
ims = [i.imshow(j) for i, j in zip(ax, (img0, img1))]
fig.canvas.draw()

I have generate training/testing data consisting of $\sim1000$ positive and $\sim1000$ negative square postage stamps of sidelength $50$ (I am going to trim off a 1 pixel border for reasons that will become apparent later, making the side length 48).  Let's read those in:

In [None]:
# -- get path to images
stpath = os.path.join("images", "dl_training_lum", "*.npy")
stlist = sorted(glob.glob(stpath))
nstamp = len(stlist)

# -- set a shuffle index
np.random.seed(314)
sind = np.random.rand(nstamp).argsort()

# -- read the postage stamps and shuffle
stamps = np.array([np.load(i)[1:-1, 1:-1].flatten() for i in stlist]).astype(np.float32)[sind]
npix   = stamps.shape[1]
nside  = int(np.sqrt(npix))

# -- set the labels
labels = np.array([[1.0, 0.0] if "pos" in i  else [0.0, 1.0] for i in stlist]).astype(np.float32)[sind]

In [None]:
nax = 5
fig, ax = plt.subplots(nax, nax, figsize=(8, 8))
[ax[ii // nax, ii % nax].axis("off") for ii in range(nax * nax)]
ims = [ax[ii // nax, ii % nax].imshow(stamps[labels[:, 0] == 1][ii].reshape(nside, nside)) for ii in range(nax * nax)]
fig.canvas.draw()

In [None]:
nax = 5
fig, ax = plt.subplots(nax, nax, figsize=(8, 8))
[ax[ii // nax, ii % nax].axis("off") for ii in range(nax * nax)]
ims = [ax[ii // nax, ii % nax].imshow(stamps[labels[:, 1] == 1][ii].reshape(nside, nside)) for ii in range(nax * nax)]
fig.canvas.draw()

---

### Building the CNN

The data is now in **exactly** the same form as the MNIST data was, and so we can apply the same model:

In [None]:
# -- prototype weight and bias variables
def weight_variable(shape):
    """ Initialize a variable with Gaussian noise."""
    initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial)

def bias_variable(shape):
    """ Initialize a variable with a constant value."""
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial)


# -- prototype convolutional and pooling functions
def conv2d(x, W):
    return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding="SAME")

def max_pool_2x2(x):
    return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding="SAME")


# -- define data prototypes and reshape input
x       = tf.placeholder(tf.float32, shape=[None, npix])
y_      = tf.placeholder(tf.float32, shape=[None, 2]) # only two possibilities car vs not car
x_image = tf.reshape(x, [-1, 48, 48, 1]) # last channel is color channel


# -- first layer
W_conv1 = weight_variable([5, 5, 1, 32])
b_conv1 = bias_variable([32])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
h_pool1 = max_pool_2x2(h_conv1)


# -- second layer
W_conv2 = weight_variable([5, 5, 32, 64])
b_conv2 = bias_variable([64])
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
h_pool2 = max_pool_2x2(h_conv2)


# -- fully connected layer
W_fc1        = weight_variable([12 * 12 * 64, 1024]) # this is why 48x48 instead of 50x50
b_fc1        = bias_variable([1024])
h_pool2_flat = tf.reshape(h_pool2, [-1, 12 * 12 * 64])
h_fc1        = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)


# -- dropout
keep_prob  = tf.placeholder(tf.float32)
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)


# -- output layer
W_fc2  = weight_variable([1024, 2])
b_fc2  = bias_variable([2])
y_conv = tf.matmul(h_fc1_drop, W_fc2) + b_fc2

---

### Training the CNN

Again, we can train exactly as before.  The only difference is that we need a training and testing set, so let's do a 70/30 split.

In [None]:
ntrain = int(0.7 * nstamp)
ntest  = nstamp - ntrain

st_train = stamps[:ntrain]
st_test  = stamps[ntrain:]
lb_train = labels[:ntrain]
lb_test  = labels[ntrain:]

In [None]:
# -- define the loss
cross_entropy = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=y_conv))

# -- define the optimizer
train_step = tf.train.AdamOptimizer(1e-3).minimize(cross_entropy)

# -- define prediction and accuracy
correct_prediction = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y_, 1))
accuracy           = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

Now let's train:

In [None]:
nepoch   = 10
batch_sz = 50
nstep    = ntrain // batch_sz
loss     = np.zeros(nepoch * nstep)

np.random.seed(1519)

with tf.Session() as sess:
    # -- initialize **ALL** of those weights and biases
    sess.run(tf.global_variables_initializer())

    # -- loop through epochs
    for epoch in range(nepoch):

        rind = np.random.rand(ntrain).argsort()
        st_train = st_train[rind]
        lb_train = lb_train[rind]
        
        for ii in range(nstep):
            # get the next minibatch of images
            lo = batch_sz * ii
            hi = batch_sz *(ii + 1)
            batch = [st_train[lo:hi], lb_train[lo:hi]]

            # alert the user how the training is going after ever 100 epochs
            if ii % 1 == 0:
                acc_in = {x:batch[0], y_:batch[1], keep_prob:1.0} # don't use dropout for accuracy estimate
                train_accuracy = accuracy.eval(feed_dict=acc_in)
                print("epoch,step {0:2},{1:2} - training accuracy {2}".format(epoch, ii, train_accuracy))

            # take a gradient descent step
            mod_in = {x:batch[0], y_:batch[1], keep_prob:0.5}
            train_step.run(feed_dict=mod_in)
            loss[epoch * nstep + ii] = sess.run(cross_entropy, mod_in)

    # -- print the final accuracy on the test data
    test_in = {x:st_test, y_:lb_test, keep_prob:1.0} # don't use dropout for testing
    test_accuracy = accuracy.eval(feed_dict=test_in)
    print("test accuracy {0}".format(test_accuracy))

In [None]:
# -- plot the loss
fig, ax = plt.subplots(figsize=(8,5))
ax.grid(1)
plt.plot(np.arange(loss[loss > 0].size), np.log10(loss[loss > 0]))
plt.xlabel("step", fontsize=20)
plt.ylabel("$\log_{10}$ loss", fontsize=20)

---