In [30]:
# 导入必要的包
from download import download
from mindspore.dataset import MnistDataset
import mindspore.dataset as ds        
import mindspore.dataset.transforms as C   
import mindspore.dataset.vision as CV                
from mindspore.dataset.vision import Inter      
from mindspore import dtype as mstype
import mindspore.nn as nn
import mindspore

In [31]:
# 下载数据集
url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/" \
      "notebook/datasets/MNIST_Data.zip"
path = download(url, "./", kind="zip", replace=True)


Downloading data from https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/MNIST_Data.zip (10.3 MB)

file_sizes: 100%|██████████████████████████| 10.8M/10.8M [00:01<00:00, 9.37MB/s]
Extracting zip file...
Successfully downloaded / unzipped to ./


In [32]:
def datapipe(data_path, batch_size=32):
    # 加载数据集
    mnist_ds = ds.MnistDataset(data_path)

    # 定义所需要操作的预处理操作
    resize_op = CV.Resize((32, 32), interpolation=Inter.LINEAR)     # 目标将图片大小调整为32*32，这样特征图能保证28*28，和原图一致
    rescale_nml_op = CV.Rescale(1 / 0.3081 , -1 * 0.1307 / 0.3081)  # 数据集的标准化系数
    rescale_op = CV.Rescale(1.0 / 255.0, 0.0)                       # 数据做标准化处理，所得到的数值分布满足正态分布
    hwc2chw_op = CV.HWC2CHW()                                       # 转置操作
    type_cast_op = C.TypeCast(mstype.int32)

    # 使用map映射函数，将数据操作应用到数据集
    mnist_ds = mnist_ds.map(operations=type_cast_op, input_columns="label")
    mnist_ds = mnist_ds.map(operations=[resize_op, rescale_op, rescale_nml_op, hwc2chw_op], input_columns="image")

    # 进行shuffle、batch操作
    buffer_size = 10000
    mnist_ds = mnist_ds.shuffle(buffer_size=buffer_size)
    mnist_ds = mnist_ds.batch(batch_size, drop_remainder=True)

    return mnist_ds

In [33]:
# 数据处理
train_dataset = datapipe('./MNIST_Data/train', 32)
test_dataset = datapipe('./MNIST_Data/test', 32)


In [34]:
# LeNet模型
class LeNet(nn.Cell):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5, stride=1, pad_mode='valid')
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5, stride=1, pad_mode='valid')
        self.flatten = nn.Flatten()
        self.fc1 = nn.Dense(16*5*5, 120)
        self.fc2 = nn.Dense(120, 84)
        self.fc3 = nn.Dense(84, 10)
        self.relu = nn.ReLU()
        self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2)
        
    def construct(self, x):
        x = self.relu(self.max_pool2d(self.conv1(x)))
        x = self.relu(self.max_pool2d(self.conv2(x)))
        x = self.flatten(x)
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [35]:
# lenet
model = LeNet()

# 定义损失函数和优化器
loss_fn = nn.CrossEntropyLoss()
optimizer = nn.SGD(model.trainable_params(), 1e-2)

# 向前传播
def forward_fn(data, label):
    logits = model(data)
    loss = loss_fn(logits, label)
    return loss, logits

# 梯度计算函数
grad_fn = mindspore.value_and_grad(forward_fn, None, optimizer.parameters, has_aux=True)

# 训练步骤函数
def train_step(data, label):
    (loss, _), grads = grad_fn(data, label)
    optimizer(grads)
    return loss

def train(model, dataset):
    size = dataset.get_dataset_size()
    model.set_train()
    for batch, (data, label) in enumerate(dataset.create_tuple_iterator()):
        loss = train_step(data, label)

        if batch % 100 == 0:
            loss, current = loss.asnumpy(), batch
            print(f"loss: {loss:>7f}  [{current:>3d}/{size:>3d}]")


In [36]:
def test(model, dataset, loss_fn):
    num_batches = dataset.get_dataset_size()
    model.set_train(False)
    total, test_loss, correct = 0, 0, 0
    for data, label in dataset.create_tuple_iterator():
        pred = model(data)
        total += len(data)
        test_loss += loss_fn(pred, label).asnumpy()
        correct += (pred.argmax(1) == label).asnumpy().sum()
    test_loss /= num_batches
    correct /= total
    print(f"Test: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")


In [38]:
epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(model, train_dataset)
    test(model, test_dataset, loss_fn)
print("Done!")


Epoch 1
-------------------------------
loss: 0.212117  [  0/1875]
loss: 0.001447  [100/1875]
loss: 0.063531  [200/1875]
loss: 0.026033  [300/1875]
loss: 0.006008  [400/1875]
loss: 0.043197  [500/1875]
loss: 0.001799  [600/1875]
loss: 0.005551  [700/1875]
loss: 0.068206  [800/1875]
loss: 0.050063  [900/1875]
loss: 0.000482  [1000/1875]
loss: 0.000850  [1100/1875]
loss: 0.000387  [1200/1875]
loss: 0.045702  [1300/1875]
loss: 0.004178  [1400/1875]
loss: 0.008348  [1500/1875]
loss: 0.007381  [1600/1875]
loss: 0.190744  [1700/1875]
loss: 0.000445  [1800/1875]
Test: 
 Accuracy: 98.5%, Avg loss: 0.045345 

Epoch 2
-------------------------------
loss: 0.006005  [  0/1875]
loss: 0.000688  [100/1875]
loss: 0.004597  [200/1875]
loss: 0.002803  [300/1875]
loss: 0.002727  [400/1875]
loss: 0.011277  [500/1875]
loss: 0.028346  [600/1875]
loss: 0.009765  [700/1875]
loss: 0.003821  [800/1875]
loss: 0.004284  [900/1875]
loss: 0.004983  [1000/1875]
loss: 0.037588  [1100/1875]
loss: 0.004987  [1200/1875