In [1]:
import numpy as np
import math

In [None]:
def get_w_indices(height_or_width, filter_size, stride, ith_element):
    n = 0
    result = []
    while True:
        min = stride * n
        max = filter_size - 1 + stride * n
        if max > height_or_width - 1:
            break
        if min <= ith_element <= max:
            result.append(ith_element-stride*n)
        else:
            if result:
                break
        n += 1

    return result

In [3]:
class ThreeLayerConvNet:
    """
    A three-layer convolutional network with the following architecture:

    conv - relu - 2x2 max pool - affine - relu - affine - softmax

    The network operates on minibatches of data that have shape (N, C, H, W)
    consisting of N images, each with height H and width W and with C input
    channels.
    """
        
    def __init__(self, input_dim=(3, 32, 32), filter_num=32, filter_size=7, 
                 pad=0, stride=1, pool_size=2, hidden_dim=100, num_classes=10, 
                 weight_scale=1e-3, reg=0.0, dtype=np.float32):
        self.params = {}
        self.filter_num = filter_num
        self.filter_size = filter_size
        self.pad = pad
        self.stride = stride
        self.pool_size = pool_size
        self.reg = reg
        self.dtype = dtype
        
        W1 = np.random.randn(filter_num, input_dim[0], filter_size, filter_size) * weight_scale
        b1 = np.zeros((filter_num,))
        
        feature_map_h = (input_dim[1] - filter_size + 2*pad)/stride + 1 # H2 = (H1-F+2*P)/S+1
        feature_map_w = (input_dim[2] - filter_size + 2*pad)/stride + 1 # W2 = (W1-F+2*P)/S+1
        
        if not feature_map_h//1 == feature_map_h or not feature_map_w//1 == feature_map_w: # W2, H2 should be int
            raise ValueError('feature map width and height must be int, adjust filter size, padding or stride.')
        
        self.feature_map_h, self.feature_map_w = int(feature_map_h), int(feature_map_w)
        
        self.pooled_feature_map_h = math.ceil(feature_map_h/pool_size)
        self.pooled_feature_map_w = math.ceil(feature_map_h/pool_size)
        
        W2 = np.random.randn(self.pooled_feature_map_h * self.pooled_feature_map_w * filter_num, hidden_dim) \
             * weight_scale
        b2 = np.zeros((hidden_dim,))
        
        W3 = np.random.randn(hidden_dim, num_classes) * weight_scale
        b3 = np.zeros((num_classes,))
        
        self.params['W1'], self.params['b1'] = W1, b1
        self.params['W2'], self.params['b2'] = W2, b2
        self.params['W3'], self.params['b3'] = W3, b3
        
    def loss(self, X, y):
        W1, b1 = self.params['W1'], self.params['b1']
        W2, b2 = self.params['W2'], self.params['b2']
        W3, b3 = self.params['W3'], self.params['b3']
        
        X = np.pad(X, pad_width=((0,), (0,), (self.pad,), (self.pad,)), mode='constant', constant_values=(0,))
        feature_map = np.zeros((X.shape[0], self.filter_num, self.feature_map_h, 
                                                             self.feature_map_w)) # with shape (H2, W2)
    
        # forward pass

        # conv layer:
        # todo: is there more efficient way?
        for i in range(self.feature_map_h):
            for j in range(self.feature_map_w):
                for k in range(self.filter_num):
                    feature_map[:,k,i,j] = np.sum(X[:, :, i*self.stride: i*self.stride + self.filter_size, 
                                                         j*self.stride: j*self.stride + self.filter_size]
                                                  * W1[k,:,:,:], axis=(1, 2, 3))
        feature_map += b1[None, :, None, None]
        
        # relu:
        feature_map[feature_map < 0] = 0
        
        # max pooling
        pooled_feature_map = np.zeros((X.shape[0], self.filter_num, self.pooled_feature_map_h, 
                                                                    self.pooled_feature_map_w))
        
        patch = np.ones_like(feature_map)
        for i in range(self.pooled_feature_map_h):
            for j in range(self.pooled_feature_map_w):
                # max value of all 2*2 grid (pooling size) through batch axis and channel axis
                local_max = np.max(feature_map[:, :, i*self.pool_size: (i+1)*self.pool_size, 
                                                     j*self.pool_size: (j+1)*self.pool_size], axis=(2, 3))
                pooled_feature_map[:, :, i, j] = local_max
                # patch has same shape with feature map but stores max values of all 2*2 grid in all 4 grids
                patch[:, :, i*self.pool_size: (i+1)*self.pool_size, j*self.pool_size: (j+1)*self.pool_size] \
                    *= local_max[:, :, None, None]
        
        # indices of max values of all 2*2 grid
        # to be used in d_loss_feature_map
        indices = np.argwhere(patch==feature_map)
                
        # first fully connected layer
        scores1 = pooled_feature_map.reshape((X.shape[0], -1)) # Assumed shape: (n, 1000)
        scores2 = scores1.dot(W2) + b2 # (n, 100)
        
        # relu
        scores2[scores2 < 0] = 0
        
        # second fully connected layer
        scores = scores2.dot(W3) + b3
        
        # softmax
        exp_scores = np.exp(scores)
        sum_exp_scores = np.sum(exp_scores, axis=1)
        
        loss = np.sum(np.log(sum_exp_scores) - scores[range(scores.shape[0]), y])
        loss = loss/X.shape[0]
        loss += 0.5 * reg* (np.sum(W1*W1) + np.sum(W2*W2) + np.sum(W3*W3))
        
        # Backpropagation
        correct_matrix = np.zeros_like(scores)
        correct_matrix[range(scores.shape[0]), y] = 1
        d_loss_scores = exp_scores / sum_exp_scores.reshape(-1,1) - correct_matrix # (n, 10)
        
                                #  Assumed shape for bp deduction
        dW3 = np.zeros_like(W3) # (100, 10)
        db3 = np.zeros_like(b3) # (10,)
        dW2 = np.zeros_like(W2) # (1000, 100)
        db2 = np.zeros_like(b2) # (1000,)
        dW1 = np.zeros_like(W1) # (64, 3, 2, 2)
        db1 = np.zeros_like(b1) # (64,)
        dX = np.zeros_like(X)   # (n, 3, 28, 28)
        
        dW3 += scores2.T.dot(d_loss_scores)
        dW3 /= X.shape[0]
        dW3 += reg * W3
        
        db3 += np.sum(d_loss_scores, axis=0)
        db3 /= X.shape[0]
        
        d_loss_scores2 = d_loss_scores.dot(W3.T)
        d_loss_scores2[scores2==0] = 0
        
        dW2 += scores1.T.dot(d_loss_scores2)
        dW2 /= X.shape[0]
        dW2 += reg * W2
        
        db2 += np.sum(d_loss_scores2, axis=0)
        db2 /= X.shape[0]
        
        d_loss_scores1 = d_loss_scores2.dot(W2.T)
        d_loss_pooled_feature_map = d_loss_scores1.reshape(pooled_feature_map.shape) # recover its original shape
        
        # Pooling layer bp, only max elements gets gradients, we've stored max elements' indices
        d_loss_feature_map = np.zeros_like(feature_map) # (n, 64, pooled_size, pooled_size)
        d_loss_feature_map[indices] = d_loss_pooled_feature_map.reshape(X.shape[0],-1) # flatten all non-batch axis
        d_loss_feature_map[feature_map==0] = 0 # because of relu
        
        # conv layer bp, only consider 2D circumstance, intuition:
        # d_loss_Wij = Wij_scanned_X * d_loss_feature_map
        for i in range(self.filter_size):
            for j in range(self.filter_size):
                for k in range(X.shape[1]):
                    # shape: (n, 1, w, h)
                    scanned_X = X[:, k, i:X.shape[0]-self.filter_size+1+i:self.stride, 
                                        j:X.shape[1]-self.filter_size+1+j:self.stride]
                    # broadcasting: (n, 1, w, h) * (64, w, h) = (n, 64, w, h)
                    dW1[:, k, i, j] = np.sum(scanned_X * d_loss_feature_map, axis=(0, 2, 3)) 
        
        dW1 /= X.shape[0]
        dW1 += reg * W1
        
        db1 += np.sum(d_loss_feature_map, axis=(0, 2, 3))
        db1 /= X.shape[0]
        
        # d_loss_X
        # When we have more than one conv layers we need this, if only one conv layer, not needed.
        for i in range(X.shape[2]):
            for j in range(X.shape[3]):
                # Ws scanned through Xij, we get their x,y indices here
                W_indices_x = get_w_indices(X.shape[2], self.filter_size, self.stride, i)
                W_indices_y = get_w_indices(X.shape[3], self.filter_size, self.stride, j)
                
                # indices of Ws and ys that are related to Xij
                W_indices = np.array([[x, y] for x in W_indices_x for y in W_indices_y])
                y_indices = (np.array([i, j]) - W_indices)/self.stride
                
                # Ws and ys that are related to Xij
                scanned_w = W1[:, :, W_indices[:, 0], W_indices[:, 1]] # Assumed: (64, 3, m, n)
                scanned_y = feature_map[:, y_indices[:, 0], y_indices[:, 1]] # Assumed (n, 64, m, n)
                
                # Found that y is mapped with W rotated 180 degree
                dX[:, :, i, j] = np.sum(np.rot90(scanned_w, 2) * scanned_y[:, :, None, :, :], axis=(1, 3, 4))
                
        
        grad = dict()
        grads['W1'] = dW1
        grads['W2'] = dW2
        grads['b1'] = db1
        grads['b2'] = db2
        
        return loss, grads