In [1]:
"""
1. 卷积层
"""
# 输入数据格式为 batch x channel x height x weight
# 权重数据格式为 output_channels x input_channels x height x weight
# 如下 input_filter and output filter 都是1

from mxnet import nd

w = nd.arange(4).reshape((1, 1, 2, 2))
b = nd.array([1])
data = nd.arange(9).reshape((1, 1, 3, 3))
out = nd.Convolution(data, w, b, kernel=w.shape[2:], num_filter = w.shape[1])

print('input: ',data, '\n weight:', w, '\nbias:', b, '\noutput: ',out)

input:  
[[[[ 0.  1.  2.]
   [ 3.  4.  5.]
   [ 6.  7.  8.]]]]
<NDArray 1x1x3x3 @cpu(0)> 
 weight: 
[[[[ 0.  1.]
   [ 2.  3.]]]]
<NDArray 1x1x2x2 @cpu(0)> 
bias: 
[ 1.]
<NDArray 1 @cpu(0)> 
output:  
[[[[ 20.  26.]
   [ 38.  44.]]]]
<NDArray 1x1x2x2 @cpu(0)>


In [2]:
# set striding size and padding size
out = nd.Convolution(data, w, b, kernel=w.shape[2:], num_filter = w.shape[1],
                    stride = (2, 2), pad= (1, 1))

print('input: ',data, '\n weight:', w, '\nbias:', b, '\noutput: ',out)

input:  
[[[[ 0.  1.  2.]
   [ 3.  4.  5.]
   [ 6.  7.  8.]]]]
<NDArray 1x1x3x3 @cpu(0)> 
 weight: 
[[[[ 0.  1.]
   [ 2.  3.]]]]
<NDArray 1x1x2x2 @cpu(0)> 
bias: 
[ 1.]
<NDArray 1 @cpu(0)> 
output:  
[[[[  1.   9.]
   [ 22.  44.]]]]
<NDArray 1x1x2x2 @cpu(0)>


当输入数据有多个通道的时候，每个通道会有对应的权重，然后会对每个通道做卷积之后在通道之间求和
$$conv(data,w,b)=\sum_{i}{conv(data[:,i,:,:],w[:,i,:,:],b)}$$

In [3]:
w = nd.arange(8).reshape((1,2,2,2))
data = nd.arange(18).reshape((1,2,3,3))

out = nd.Convolution(data, w, b, kernel=w.shape[2:], num_filter=w.shape[0])

print('input:', data, '\n\nweight:', w, '\n\nbias:', b, '\n\noutput:', out)

input: 
[[[[  0.   1.   2.]
   [  3.   4.   5.]
   [  6.   7.   8.]]

  [[  9.  10.  11.]
   [ 12.  13.  14.]
   [ 15.  16.  17.]]]]
<NDArray 1x2x3x3 @cpu(0)> 

weight: 
[[[[ 0.  1.]
   [ 2.  3.]]

  [[ 4.  5.]
   [ 6.  7.]]]]
<NDArray 1x2x2x2 @cpu(0)> 

bias: 
[ 1.]
<NDArray 1 @cpu(0)> 

output: 
[[[[ 269.  297.]
   [ 353.  381.]]]]
<NDArray 1x1x2x2 @cpu(0)>


当输出需要多通道时，每个输出通道有对应权重，然后每个通道上做卷积。

$$conv(data,w,b)[:,i,:,:]=conv(data,w[i,:,:,:],b[i])$$

## 池化层（pooling）
因为卷积层每次作用在一个窗口，它对位置很敏感。池化层能够很好的缓解这个问题。它跟卷积类似每次看一个小窗口，然后选出窗口里面最大的元素，或者平均元素作为输出。

In [4]:
data = nd.arange(18).reshape((1,2,3,3))

max_pool = nd.Pooling(data=data, pool_type="max", kernel=(2,2))
avg_pool = nd.Pooling(data=data, pool_type="avg", kernel=(2,2))

print('data:', data, '\n\nmax pooling:', max_pool, '\n\navg pooling:', avg_pool)

data: 
[[[[  0.   1.   2.]
   [  3.   4.   5.]
   [  6.   7.   8.]]

  [[  9.  10.  11.]
   [ 12.  13.  14.]
   [ 15.  16.  17.]]]]
<NDArray 1x2x3x3 @cpu(0)> 

max pooling: 
[[[[  4.   5.]
   [  7.   8.]]

  [[ 13.  14.]
   [ 16.  17.]]]]
<NDArray 1x2x2x2 @cpu(0)> 

avg pooling: 
[[[[  2.   3.]
   [  5.   6.]]

  [[ 11.  12.]
   [ 14.  15.]]]]
<NDArray 1x2x2x2 @cpu(0)>


## 建立一个卷积网络模型

In [5]:
"""
1. 获取数据
"""
import sys
sys.path.append('..')
import utils

batch_size = 256
train_data, test_data = utils.load_data_fashion_mnist(batch_size)

In [6]:
"""
2. 定义模型
"""
# 选择默认的计算设备
import mxnet as mx

try:
    ctx = mx.gpu()
    _ = nd.zeros((1,), ctx= ctx)
except:
    ctx = mx.cpu()
ctx

cpu(0)

In [7]:
#我们使用MNIST常用的LeNet，它有两个卷积层，之后是两个全连接层。注意到我们将权重全部创建在ctx上：
weight_scale = .01

# output channels = 20, kernel = (5,5)
W1 = nd.random_normal(shape=(20,1,5,5), scale=weight_scale, ctx=ctx)
b1 = nd.zeros(W1.shape[0], ctx=ctx)

# output channels = 50, kernel = (3,3)
W2 = nd.random_normal(shape=(50,20,3,3), scale=weight_scale, ctx=ctx)
b2 = nd.zeros(W2.shape[0], ctx=ctx)

# output dim = 128
W3 = nd.random_normal(shape=(1250, 128), scale=weight_scale, ctx=ctx)
b3 = nd.zeros(W3.shape[1], ctx=ctx)

# output dim = 10
W4 = nd.random_normal(shape=(W3.shape[1], 10), scale=weight_scale, ctx=ctx)
b4 = nd.zeros(W4.shape[1], ctx=ctx)

params = [W1, b1, W2, b2, W3, b3, W4, b4]
for param in params:
    param.attach_grad()

In [8]:
#卷积模块通常是“卷积层-激活层-池化层”。然后转成2D矩阵输出给后面的全连接层

def net(X, verbose=False):
    X = X.as_in_context(W1.context)
    # 第一层卷积
    h1_conv = nd.Convolution(
        data=X, weight=W1, bias=b1, kernel=W1.shape[2:], num_filter=W1.shape[0])
    h1_activation = nd.relu(h1_conv)
    h1 = nd.Pooling(
        data=h1_activation, pool_type="max", kernel=(2,2), stride=(2,2))
    # 第二层卷积
    h2_conv = nd.Convolution(
        data=h1, weight=W2, bias=b2, kernel=W2.shape[2:], num_filter=W2.shape[0])
    h2_activation = nd.relu(h2_conv)
    h2 = nd.Pooling(data=h2_activation, pool_type="max", kernel=(2,2), stride=(2,2))
    h2 = nd.flatten(h2)
    # 第一层全连接
    h3_linear = nd.dot(h2, W3) + b3
    h3 = nd.relu(h3_linear)
    # 第二层全连接
    h4_linear = nd.dot(h3, W4) + b4
    if verbose:
        print('1st conv block:', h1.shape)
        print('2nd conv block:', h2.shape)
        print('1st dense:', h3.shape)
        print('2nd dense:', h4_linear.shape)
        print('output:', h4_linear)
    return h4_linear

In [9]:
# 测试中间输出结果
for data, _ in train_data:
    net(data, verbose=True)
    break

1st conv block: (256, 20, 12, 12)
2nd conv block: (256, 1250)
1st dense: (256, 128)
2nd dense: (256, 10)
output: 
[[ -4.84412931e-06   2.09520440e-05  -6.79103759e-05 ...,  -3.71463611e-05
   -3.03670331e-05  -1.42374010e-05]
 [ -9.25582845e-06  -2.90281114e-05  -9.70217152e-05 ...,  -4.37744930e-05
   -3.45401641e-05   8.94459117e-06]
 [ -5.31731675e-05   8.54495647e-06  -7.31798864e-05 ...,  -2.39260971e-05
   -4.76452005e-05   3.00158972e-05]
 ..., 
 [ -4.38514799e-05   1.87298847e-05  -1.20235411e-04 ...,  -3.87516120e-05
   -6.14443852e-05   2.86689974e-05]
 [ -8.40173107e-06   1.25905194e-06  -5.86998176e-05 ...,  -4.25174512e-05
   -2.62760859e-05   3.83658016e-06]
 [ -2.01232906e-05   8.44011538e-06  -8.99356382e-05 ...,  -3.84814484e-05
   -3.75229756e-05  -4.81420739e-06]]
<NDArray 256x10 @cpu(0)>


In [None]:
"""
3. 训练
"""
from mxnet import autograd as autograd
from utils import SGD, accuracy, evaluate_accuracy
from mxnet import gluon

softmax_cross_entropy = gluon.loss.SoftmaxCrossEntropyLoss()

learning_rate = .2

for epoch in range(5):
    train_loss = 0.
    train_acc = 0.
    for data, label in train_data:
        label = label.as_in_context(ctx)
        with autograd.record():
            output = net(data)
            loss = softmax_cross_entropy(output, label)
        loss.backward()
        SGD(params, learning_rate/batch_size)

        train_loss += nd.mean(loss).asscalar()
        train_acc += accuracy(output, label)

    test_acc = evaluate_accuracy(test_data, net, ctx)
    print("Epoch %d. Loss: %f, Train acc %f, Test acc %f" % (
        epoch, train_loss/len(train_data),
        train_acc/len(train_data), test_acc))