# Convolutional Layer

In [1]:
import tensorflow as tf
import numpy as np

from d2l.tensorflow import config, layers, initializers, activations

config.setup()

Tensorflow running on CPU


## Convolution

In [2]:
def corr2d(X, kernel):
    h, w = kernel.shape
    Y = tf.Variable(tf.zeros((X.shape[0] - h + 1, X.shape[1] - w + 1)))
    for i in range(Y.shape[0]):
        for j in range(Y.shape[1]):
            Y.scatter_nd_update(
                [[i, j]], 
                [tf.reduce_sum(X[i:i + h, j:j + w] * kernel)]
            )
    return Y

In [3]:
X = tf.convert_to_tensor([[0, 1, 2], [3, 4, 5], [6, 7, 8]], dtype=tf.float32)
kernel = tf.convert_to_tensor([[0, 1], [2, 3]], dtype=tf.float32)

In [4]:
corr2d(X, kernel)

<tf.Variable 'Variable:0' shape=(2, 2) dtype=float32, numpy=
array([[19., 25.],
       [37., 43.]], dtype=float32)>

In [5]:
X = np.ones((6, 8))
X [:, 2:6] = 0
X = tf.convert_to_tensor(X, dtype=tf.float32)
X

<tf.Tensor: shape=(6, 8), dtype=float32, numpy=
array([[1., 1., 0., 0., 0., 0., 1., 1.],
       [1., 1., 0., 0., 0., 0., 1., 1.],
       [1., 1., 0., 0., 0., 0., 1., 1.],
       [1., 1., 0., 0., 0., 0., 1., 1.],
       [1., 1., 0., 0., 0., 0., 1., 1.],
       [1., 1., 0., 0., 0., 0., 1., 1.]], dtype=float32)>

In [6]:
kernel = tf.convert_to_tensor([[1., -1.]])
kernel

<tf.Tensor: shape=(1, 2), dtype=float32, numpy=array([[ 1., -1.]], dtype=float32)>

In [7]:
Y = corr2d(X, kernel)
Y

<tf.Variable 'Variable:0' shape=(6, 7) dtype=float32, numpy=
array([[ 0.,  1.,  0.,  0.,  0., -1.,  0.],
       [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
       [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
       [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
       [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
       [ 0.,  1.,  0.,  0.,  0., -1.,  0.]], dtype=float32)>

In [8]:
Z = corr2d(tf.transpose(X), kernel)
Z

<tf.Variable 'Variable:0' shape=(8, 5) dtype=float32, numpy=
array([[0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.]], dtype=float32)>

## Learning a kernel

In [9]:
X

<tf.Tensor: shape=(6, 8), dtype=float32, numpy=
array([[1., 1., 0., 0., 0., 0., 1., 1.],
       [1., 1., 0., 0., 0., 0., 1., 1.],
       [1., 1., 0., 0., 0., 0., 1., 1.],
       [1., 1., 0., 0., 0., 0., 1., 1.],
       [1., 1., 0., 0., 0., 0., 1., 1.],
       [1., 1., 0., 0., 0., 0., 1., 1.]], dtype=float32)>

In [10]:
kernel, _ = initializers.initialize_parameters(1, 2)

for i in range(10):
    with tf.GradientTape() as t:
        Y_hat = tf.squeeze(
            tf.nn.conv2d(
                tf.reshape(X, (1,) + X.shape + (1,)),
                tf.reshape(kernel, (kernel.shape + (1, 1))), 
                strides=1, 
                padding='VALID'
            )
        )
        loss = (Y_hat - Y) ** 2
    dW = t.gradient(loss, kernel)
    kernel.assign_sub(dW * 3e-2)
    print('batch: {}, loss: {}'.format(i + 1, tf.reduce_sum(loss)))

batch: 1, loss: 10.153135299682617
batch: 2, loss: 4.190521240234375
batch: 3, loss: 1.7367876768112183
batch: 4, loss: 0.724412202835083
batch: 5, loss: 0.30505460500717163
batch: 6, loss: 0.13028497993946075
batch: 7, loss: 0.0567789152264595
batch: 8, loss: 0.02544170618057251
batch: 9, loss: 0.011819346807897091
batch: 10, loss: 0.005736197344958782


## Multiple channels

In [11]:
def corr2d_multi_in(X, kernel):
    return tf.add_n([corr2d(x, k) for x, k in zip(X, kernel)])

In [12]:
X = tf.convert_to_tensor([
    [[0, 1, 2], [3, 4, 5], [6, 7, 8]],
    [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
])

kernel = tf.convert_to_tensor([
    [[0, 1], [2, 3]],
    [[1, 2], [3, 4]]
])

In [13]:
corr2d_multi_in(X, kernel)

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[ 56.,  72.],
       [104., 120.]], dtype=float32)>

In [14]:
def corr2d_multi_in_out(X, kernels):
    return tf.stack([corr2d_multi_in(X, k) for k in kernels])

In [15]:
kernels = tf.stack([kernel, kernel + 1, kernel + 2])

In [16]:
corr2d_multi_in_out(X, kernels)

<tf.Tensor: shape=(3, 2, 2), dtype=float32, numpy=
array([[[ 56.,  72.],
        [104., 120.]],

       [[ 76., 100.],
        [148., 172.]],

       [[ 96., 128.],
        [192., 224.]]], dtype=float32)>

In [17]:
def corr2d_multi_in_out_1x1(X, kernels):
    c_i, h, w = X.shape
    c_o = kernels.shape[0]
    X = tf.reshape(X, (c_i, h * w))
    kernels = tf.reshape(kernels, (c_o, c_i))
    Y = tf.matmul(kernels, X)
    return tf.reshape(Y, (c_o, h, w))

In [18]:
X = tf.random.uniform(shape=(3, 3, 1))
kernels = tf.random.uniform(shape=(2, 3, 1, 1))

Y1 = corr2d_multi_in_out(X, kernels)
Y2 = corr2d_multi_in_out_1x1(X, kernels)

In [19]:
(Y1 - Y2) < 1e-6

<tf.Tensor: shape=(2, 3, 1), dtype=bool, numpy=
array([[[ True],
        [ True],
        [ True]],

       [[ True],
        [ True],
        [ True]]])>

## Conv2D Layer

In [20]:
class Conv2D(layers.BaseLayer):
    def __init__(
        self, filters, kernel_shape, channels, activation=None,
        initialization='gaussian', magnitude=None, scale=None
    ):
        Conv2D.__type__ = 'compute'
        Conv2D._identifier += 1
        self.__name__ = '{}_{}'.format(Conv2D.__name__, Conv2D._identifier).lower()

        self.filter_shape = (channels,) + kernel_shape
        self.filters = tf.Variable(tf.stack([
            initializers.initialize_weights(
                self.filter_shape, method=initialization, magnitude=magnitude, scale=scale
             ) for _ in range(filters)
        ]))
        self.filter_shape = kernel_shape + (channels, filters)
        self.kernel_shape = kernel_shape
        self.channels = channels
        self.activation = activations.get_activation(activation)

    def __call__(self, X):
        assert len(X.shape) == 4, 'Input must be 4 dimentional (batch_size, height, width, channels)'
        return tf.squeeze(
            tf.nn.conv2d(
                tf.reshape(X, X.shape),
                tf.reshape(self.filters, self.filter_shape), 
                strides=1, 
                padding='VALID'
            )
        )

    def __repr__(self):
        return str({
            'name': self.__name__,
            'type': self.__type__,
            'kernel_shape': self.kernel_shape,
            'channels': self.channels,
            'activation': self.activation.__name__,
            'filters': self.filters.numpy()
        })

In [32]:
X = np.ones((6, 8, 1))
X[:, 2:6] = 0
X = tf.convert_to_tensor(X, dtype=tf.float32)
X = tf.stack([X])

In [33]:
Y = np.zeros((6, 7))
Y[:, 1] = 1
Y[:, 5] = -1
Y = tf.convert_to_tensor(Y, dtype=tf.float32)
Y = tf.stack([Y])

In [34]:
conv2d = Conv2D(1, (1, 2), 1)

for i in range(10):
    with tf.GradientTape() as t:
        Y_hat = conv2d(X)
        loss = (Y_hat - Y) ** 2
    dW = t.gradient(loss, conv2d.filters)
    conv2d.filters.assign_sub(dW * 3e-2)
    print('batch: {}, loss: {}'.format(i + 1, tf.reduce_sum(loss)))

batch: 1, loss: 14.498390197753906
batch: 2, loss: 5.938615798950195
batch: 3, loss: 2.432504653930664
batch: 4, loss: 0.9963845610618591
batch: 5, loss: 0.40813878178596497
batch: 6, loss: 0.1671862006187439
batch: 7, loss: 0.06848753243684769
batch: 8, loss: 0.028057638555765152
batch: 9, loss: 0.011495687067508698
batch: 10, loss: 0.004710735287517309


# Pooling

In [37]:
def pool2d(X, pool_size, mode='max'):
    p_h, p_w = pool_size
    Y = np.zeros((X.shape[0] - p_h + 1, X.shape[1] - p_w + 1))
    for i in range(Y.shape[0]):
        for j in range(Y.shape[1]):
            if mode == 'max':
                Y[i, j] = np.max(X[i: i + p_h, j: j + p_w])
            elif mode == 'avg':
                Y[i, j] = X[i: i + p_h, j: j + p_w].mean()
    return tf.convert_to_tensor(Y)

In [42]:
X = np.array([
    [1, 2, 3], [4, 5, 6], [7, 8, 9]
])
print(X, pool2d(X, (2, 2)))

[[1 2 3]
 [4 5 6]
 [7 8 9]] tf.Tensor(
[[5. 6.]
 [8. 9.]], shape=(2, 2), dtype=float64)
