# Convolutional Neural Networks

With the basic principles of multi-layer perceptrons understood, extending the same ideas to convolution relies on understanding that to a convolutional filter, a single sample can actually be multiple input samples that the discrete optimizer has to work with.

In dealing with convolutions, we can consider each subsampled region to be a legitimate source of blame for any dodgy assessment made by the network itself. For absolute simplicity (and to save time running this) - we will assume there is a single 5-by-5 convolutional filter with stride of 1 moving across a 28-by-28 MNIST image.

In this case, we expect 24 * 24 outputs which can then be fed into an ordinary perceptron to train.

We'll begin by implementing the convolutional filter and its forward/backward passes:

In [None]:
from scipy.signal import convolve2d
import numpy as np

# We'll assume that the input is correctly folded into 2D
def conv_forward(filter,bias,x):
    out = convolve2d(x,filter).flatten()
    return np.sign(out - bias)

def conv_blame(x, false_pos, false_neg, filter, bias, filter_blame, bias_blame, filter_thres, bias_thres):

    for idx, i in enumerate(false_pos):

        # A less lazy person would generalize this for all possible convolution
        if i:
            row = idx // 24
            col = idx % 24
            sample = x[row:row+5,col:col+5]
            filter_blame += abs(sample + filter) // 2 # If contributed to FP, blame it!
            bias_blame[idx] += 1 # increment bias blame

    for idx, i in enumerate(false_neg):

        if i:
            row = idx // 24
            col = idx % 24
            sample = x[row:row+5,col:col+5]
            filter_blame += 1 - (abs(sample + filter) // 2) # Opposite is true if contributed to FN
            bias_blame[idx] -= 1 # decrement bias blame

    # Flip weights where appropriate
    filter_blame_bool = filter_blame >= filter_thres
    if filter_blame_bool.any():

        for i in range(np.where(filter_blame_bool)):
            filter[i//5,i%5] *= -1
            filter_blame[i//5,i%5] = 0

    # Increment/decrement bias where appropriate
    bias_blame = np.abs(bias_blame) >= bias_thres
    if bias_blame.any():

        for i in range(np.where(bias_blame)):

            if np.abs(bias_blame[i]) > 0:
                bias += 1
            else:
                bias -= 1

            bias_blame[i] = 0

    return filter, bias, filter_blame, bias_blame
        

We can now add all of the original boilerplate from the previous two chapters and write up the final network to examine the performance of our proposed convolutional neural network optimization.

In [None]:
import tensorflow as tf

# Load MNIST data
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

THRESHOLD = 128

# Full data conversion
x_train = ((x_train > THRESHOLD)*2 - 1)
x_test = ((x_test > THRESHOLD)*2 - 1)
y_train = tf.one_hot(y_train,depth=10)*2 - 1
y_test = tf.one_hot(y_test,depth=10)*2 - 1


# Set seed for reproducability
SEED = 1337
np.random.seed(SEED)

# Feedforward one sample
def forward(W,b,x):
    return np.sign(x@W-b)

def compute_fpfn(z,zhat):
       # Binarize inputs
    z = z > 0
    zhat = zhat > 0

    # Compute where there are false positives and false negatives
    false_pos = np.logical_and(zhat,np.logical_not(z))
    false_neg = np.logical_and(np.logical_not(zhat),z)

    false_pos = false_pos * 2 - 1
    false_neg = false_neg * 2 - 1

    return false_pos, false_neg

# Discover which columns/bias terms are to blame
def blame_columns(z,zhat,b_blame,bias,param):

    # Binarize inputs
    z = z > 0
    zhat = zhat > 0

    # Compute where there are false positives and false negatives
    false_pos = np.logical_and(zhat,np.logical_not(z))
    false_neg = np.logical_and(np.logical_not(zhat),z)

    # Increment bias blame for false positives (too big!)
    for idx, i in enumerate(false_pos):
        if i:
            b_blame[idx] += 1

    # Decrement bias blame for false negatives (too small!)
    for idx, i in enumerate(false_neg):
        if i:
            b_blame[idx] -= 1

    # If bias threshold is crossed, reset blame and increment/decrement bias
    for idx, i in enumerate(np.abs(b_blame)>param):
        if np.sign(b_blame[idx]) > 0 and i:
            bias[idx] += 1
        elif np.sign(b_blame[idx]) < 0 and i:
            bias[idx] -= 1

    return false_pos, false_neg, b_blame, bias

def blame_weights(x,false_pos,false_neg,W_blame,Weight,param):

    # Binarize inputs
    x = x > 0

    # If a weight is found to be blame for a false positive attribute blame
    for idx, i in enumerate(false_pos):
        if i:
            for jdx, j in enumerate(np.logical_not(np.logical_xor(Weight[:,idx]>0,x))):
                if j:
                    W_blame[jdx,idx] += 1

    # If a weight is found to be blame for a false negative attribute blame
    for idx, i in enumerate(false_neg):
        if i:
            for jdx, j in enumerate(np.logical_xor(Weight[:,idx]>0,x)):
                if j:
                    W_blame[jdx,idx] += 1

    # Find where weights exceed the blame threshold
    rows,cols = np.where(W_blame >= param)

    # Reset blame counter and flip corresponding weight
    for i,j in zip(rows,cols):
        W_blame[i,j] = 0
        Weight[i,j] = Weight[i,j] * -1
    
    return W_blame, Weight

# Majority vote inter-layer glue
def majority_vote(z,W,fp,fn):

    out = np.zeros(W.shape[0])

    if not fp.any() and not fn.any():
        return z

    for idx, i in enumerate(fp):
        if i:
            out -= W[:,idx]
    
    for idx, i in enumerate(fn):
        if i:
            out += W[:,idx]
            
    return np.sign(out)

Let us define and train this CNN!

In [None]:
from tqdm import tqdm

# Convolutional filter and bias
conv_filter = (np.random.uniform(0,1,(5,5)) < 0.5) * 2 - 1
conv_bias = np.random.randint(0,25,24*24)

# Linear weights and biases
linear_weight = (np.random.uniform(0,1,(24*24,10)) < 0.5) * 2 - 1
linear_bias = np.random.randint(-144,144,10)

# Blame counters
filter_blame = np.zeros((5,5))
conv_bias_blame = np.zeros(24*24)
weight_blame = np.zeros((24*24,10))
lin_bias_blame = np.zeros(10)

# Counters
epochs = 3
counter = 0
acc_count = 0
COUNT_RESET = 10
ACC_RESET = 10000

# Iterate over epochs
for e in range(epochs):

    print("EPOCH "+str(e+1))
    indices = np.arange(x_train.shape[0])
    np.random.shuffle(indices)

    # Iterate over samples
    for i in tqdm(indices):
        
        # Load training samples
        x = x_train[i]
        y = y_train[i]

        # Forward pass
        z1 = conv_forward(conv_filter, conv_bias, x)
        z2 = forward(linear_weight, linear_bias, z1)

        # Backward pass on perceptron layer
        fp,fn,lin_bias_blame,linear_bias = blame_columns(z2,y,lin_bias_blame,linear_bias,4)
        weight_blame, linear_weight = blame_weights(z1,fp,fn,weight_blame,linear_weight,4)

        # Compute inter-layer majority vote
        y2 = majority_vote(y,linear_weight,fp,fn)

        # Backward pass on convolutional layer
        fp, fn = compute_fpfn(y2,z1)
        conv_filter, conv_bias, filter_blame, conv_bias_blame = conv_blame(x,fp,fn,conv_filter,
                                                                           conv_bias,filter_blame,
                                                                           conv_bias_blame,4,4)

        # Forgiveness counter
        counter += 1 - np.sum(np.logical_and(z2 > 0, y > 0))
        if counter >= COUNT_RESET:
            filter_blame -= 1
            filter_blame[filter_blame < 0] = 0
            conv_bias_blame = np.sign(conv_bias_blame) * (np.abs(conv_bias_blame) - 1)
            linear_weight -= 1
            linear_weight[linear_weight < 0 ] = 0
            lin_bias_blame = np.sign(lin_bias_blame) * (np.abs(lin_bias_blame) - 1)
            counter = 0

        # Accuracy metric counter
        acc += np.sum(np.logical_and(z2 > 0, y > 0))
        acc_count += 1
        if acc_count >= ACC_RESET:
            print("Current Accuracy: " + str(acc / ACC_RESET))
            acc_count = 0
            acc = 0

And finally, let us test on an independent sample.

In [None]:
# Iterate over epochs
for e in range(epochs):

    print("EPOCH "+str(e+1))
    indices = np.arange(x_test.shape[0])
    np.random.shuffle(indices)

    # Iterate over samples
    for i in tqdm(indices):
        
        # Load training samples
        x = x_test[i]
        y = y_test[i]

        # Forward pass
        z1 = conv_forward(conv_filter, conv_bias, x)
        z2 = forward(linear_weight, linear_bias, z1)

        # Accuracy metric counter
        acc += np.sum(np.logical_and(z2 > 0, y > 0))
        acc_count += 1
        if acc_count >= ACC_RESET:
            print("Current Accuracy: " + str(acc / ACC_RESET))
            acc_count = 0
            acc = 0