# Performing Discrete Convolutions

In [None]:
import numpy as np
import tensorflow as tf
tf.logging.set_verbosity(tf.logging.ERROR)

In [None]:
def conv1d(x, w, p=0, s=1):
    w_rot = np.array(w[::-1])
    x_padded = np.array(x)
    if p > 0:
        zero_pad = np.zeros(shape=p)
        x_padded = np.concatenate([zero_pad, x_padded, zero_pad])
    res = []
    for i in range(0, int(len(x)/s), s):
        res.append(np.sum(x_padded[i:i+w_rot.shape[0]] * w_rot))
    return np.array(res)

# Testing:
x = [1, 3, 2, 4, 5, 6, 1, 3]
w = [1, 0, 3, 1, 2]

print('Conv1d Implementation:', conv1d(x, w, p=2, s=1))
print('Numpy Results:', np.convolve(x, w, mode='same'))

In [None]:
import scipy.signal

def conv2d(X, W, p=(0, 0), s=(1, 1)):
    W_rot = np.array(W)[::-1, ::-1]
    X_orig = np.array(X)
    n1 = X_orig.shape[0] + 2 * p[0]
    n2 = X_orig.shape[1] + 2 * p[1]
    X_padded = np.zeros(shape=(n1, n2))
    X_padded[p[0]:p[0]+X_orig.shape[0], p[1]:p[1]+X_orig.shape[1]] = X_orig

    res = []
    for i in range(0, int((X_padded.shape[0] - W_rot.shape[0])/s[0])+1, s[0]):
        res.append([])
        for j in range(0, int((X_padded.shape[1]\
                               - W_rot.shape[1])/s[1]) + 1, s[1]):
            X_sub = X_padded[i:i+W_rot.shape[0], j:j+W_rot.shape[1]]
            res[-1].append(np.sum(X_sub * W_rot))
    return(np.array(res))

In [None]:
X = [[1, 3, 2, 4], [5, 6, 1, 3], [1, 2, 0, 2], [3, 4, 3, 2]]
W = [[1, 0, 3], [1, 2, 1], [0, 1, 1]]

print('Conv2d Implementation:\n', conv2d(X, W, p=(1, 1), s=(1, 1)))
print('SciPy Results:\n', scipy.signal.convolve2d(X, W, mode='same'))

# Building a CNN

In [None]:
import imageio

img = imageio.imread('./example-image.png', pilmode='RGB')
print('Image shape:', img.shape)
print('Number of channels:', img.shape[2])
print('Image data type:', img.dtype)
print(img[100:102, 100:102, :])

# Implementing a CNN with Low-Level API

In [None]:
import os
import struct

def load_mnist(path, kind='train'):
    """Load MNIST data from `path`"""
    labels_path = os.path.join(
        path, f'{kind}-labels-idx1-ubyte'
    )
    images_path = os.path.join(
        path, f'{kind}-images-idx3-ubyte'
    )
    
    with open(labels_path, 'rb') as lbpath:
        magic, n = struct.unpack('>II', lbpath.read(8))
        labels = np.fromfile(lbpath, dtype=np.uint8)

    with open(images_path, 'rb') as imgpath:
        magic, num, rows, cols = struct.unpack(">IIII", imgpath.read(16))
        images = np.fromfile(imgpath, dtype=np.uint8).reshape(len(labels), 784)
        images = ((images / 255.) - .5) * 2

    return images, labels

In [None]:
# Loading the data
X_data, y_data = load_mnist('./mnist/', kind='train')
print(f'Rows: {X_data.shape[0]},  Columns: {X_data.shape[1]}')
X_test, y_test = load_mnist('./mnist/', kind='t10k')
print(f'Rows: {X_test.shape[0]},  Columns: {X_test.shape[1]}')

X_train, y_train = X_data[:1000, :], y_data[:1000]
X_valid, y_valid = X_data[1000:1500, :], y_data[1000:1500]
X_test, y_test = X_test[:500, :], y_test[:500]

print('Training:   ', X_train.shape, y_train.shape)
print('Validation: ', X_valid.shape, y_valid.shape)
print('Test Set:   ', X_test.shape, y_test.shape)

In [None]:
def batch_generator(X, y, batch_size=64, shuffle=False, random_seed=None):
    
    idx = np.arange(y.shape[0])
    
    if shuffle:
        rng = np.random.RandomState(random_seed)
        rng.shuffle(idx)
        X = X[idx]
        y = y[idx]
    
    for i in range(0, X.shape[0], batch_size):
        yield (X[i:i + batch_size, :], y[i:i + batch_size])

In [None]:
mean_vals = np.mean(X_train, axis=0)
std_val = np.std(X_train)

X_train_centered = (X_train - mean_vals)/std_val
X_valid_centered = (X_valid - mean_vals)/std_val
X_test_centered = (X_test - mean_vals)/std_val

In [None]:
from cnnlowlevel import conv_layer

g = tf.Graph()
with g.as_default():
    x = tf.placeholder(tf.float32, shape=[None, 28, 28, 1])
    conv_layer(x, name='convtest', kernel_size=(3, 3), n_output_channels=32)
    
del g, x

In [None]:
from cnnlowlevel import fc_layer

g = tf.Graph()
with g.as_default():
    x = tf.placeholder(tf.float32, shape=[None, 28, 28, 1])
    fc_layer(x, name='fctest', n_output_units=32, activation_fn=tf.nn.relu)

del g, x

In [None]:
from cnnlowlevel import build_cnn

# Define random seed
random_seed = 123

# create a graph
g = tf.Graph()
with g.as_default():
    tf.set_random_seed(random_seed)
    # build the graph
    build_cnn()

    # saver:
    saver = tf.train.Saver()

In [None]:
from cnnlowlevel import train, save

# create a TF session and train the CNN model
with tf.Session(graph=g) as sess:
    train(
        sess,
        training_set=(X_train_centered, y_train), 
        validation_set=(X_valid_centered, y_valid), 
        initialize=True, random_seed=123
    )
    save(saver, sess, epoch=20)

In [None]:
from cnnlowlevel import load, predict

del g

# Calculate prediction accuracy on test set restoring the saved model
# create a new graph and build the model
g2 = tf.Graph()
with g2.as_default():
    tf.set_random_seed(random_seed)
    # build the graph
    build_cnn()

    # saver:
    saver = tf.train.Saver()

# create a new session and restore the model
with tf.Session(graph=g2) as sess:
    load(saver, sess, epoch=20, path='./model/')
    
    preds = predict(sess, X_test_centered, return_proba=False)
    print(f'Test Accuracy: {(100 * np.sum(preds == y_test)/len(y_test)):.3f}')

In [None]:
# run the prediction on some test samples
np.set_printoptions(precision=2, suppress=True)

with tf.Session(graph=g2) as sess:
    load(saver, sess, epoch=20, path='./model/')
        
    print(predict(sess, X_test_centered[:10], return_proba=False))
    print(predict(sess, X_test_centered[:10], return_proba=True))

In [None]:
# continue training for 20 more epochs without re-initializing
# i.e. initialize=False

# create a new session and restore the model
with tf.Session(graph=g2) as sess:
    load(saver, sess, epoch=20, path='./model/')

    train(
        sess,
        training_set=(X_train_centered, y_train), 
        validation_set=(X_valid_centered, y_valid),
        initialize=False,
        epochs=20, random_seed=123
    )

    save(saver, sess, epoch=40, path='./model/')

    preds = predict(sess, X_test_centered, return_proba=False)

    print(f'Test Accuracy: {(100 * np.sum(preds == y_test) / len(y_test)):.3f}')

# Implementing a CNN with `Layers` API

In [None]:
from cnnlayers import ConvNN

cnn = ConvNN(random_seed=123)

# train the model
cnn.train(
    training_set=(X_train_centered, y_train), 
    validation_set=(X_valid_centered, y_valid),
    initialize=True
)
cnn.save(epoch=20)

In [None]:
del cnn

cnn2 = ConvNN(random_seed=123)
cnn2.load(epoch=20, path='./tflayers-model/')

print(cnn2.predict(X_test_centered[:10, :]))

In [None]:
preds = cnn2.predict(X_test_centered)

print(f'Test Accuracy: {(100 * np.sum(y_test == preds) / len(y_test)):.2f}')