In [None]:
import mindspore.nn as nn
import mindspore.ops as ops
from download import download
import mindspore
from mindspore.dataset import MnistDataset
from mindspore.dataset import transforms
from mindspore.dataset import vision
import time
from matplotlib import pyplot as plt
from PIL import Image
import numpy as np
import os
import cv2

In [None]:
class LeNet(nn.Cell):
    def __init__(self):
        super(LeNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5) 
        self.relu = nn.ReLU()
        self.maxpool1 = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.maxpool2 = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Dense(16*8*8, 120)
        self.fc2 = nn.Dense(120, 84)
        self.fc3 = nn.Dense(84, 10)
 
    def construct(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.maxpool2(x)
        x = x.reshape((-1, 16*8*8))
        x = ops.relu(self.fc1(x))
        x = ops.relu(self.fc2(x))
        x = self.fc3(x)
        output = ops.log_softmax(x, axis=1)
        return output
   

In [None]:
#定义训练过程  
def train_runner(model,  trainloader, optimizer, epoch):
    #训练模型, 启用 BatchNormalization 和 Dropout, 将BatchNormalization和Dropout置为True
    
    total = 0
    correct =0.0
    
    #enumerate迭代已加载的数据集,同时获取数据和数据下标
    for i, (inputs,labels) in enumerate(trainloader.create_tuple_iterator()):
        
        #保存训练结果
        (loss,outputs),grad = model(inputs,labels)
        
        #获取最大概率的预测结果
        #dim=1表示返回每一行的最大值对应的列下标
        predict = outputs.argmax(axis=1)
        total += labels.shape[0]
        correct += (predict == labels).sum().item()
        #反向传播
        optimizer(grad)
        if i % 1000 == 0:
            #loss.item()表示当前loss的数值
            print("Train Epoch{} \t Loss: {:.6f}, accuracy: {:.6f}%".format(epoch, loss.item(), 100*(correct/total)))
            Loss.append(loss.item())
            Accuracy.append(correct/total)
    return loss.item(), correct/total



In [None]:
#定义测试过程
def test_runner(model,  testloader):
    #设置成测试模式
    model.set_train(False)
    #统计模型正确率, 设置初始值
    correct = 0.0
    test_loss = 0.0
    total = 0
    
    for (data, label) in testloader.create_tuple_iterator():
        output = model(data)
        test_loss += ops.cross_entropy(output, label).item()
        predict = output.argmax(axis=1)
        #计算正确数量
        total += label.shape[0]
        correct += (predict == label).sum().item()
    #计算损失值
    print("test_avarage_loss: {:.6f}, accuracy: {:.6f}%".format(test_loss/total, 100*(correct/total)))


In [None]:
#利用训练好的模型进行数字识别
def test_model(path):
    param_dict = mindspore.load_checkpoint( './models/mnist.ckpt') #加载模型
    model=LeNet()
    mindspore.load_param_into_net(model, param_dict)
    model.set_train(False)    #把模型转为test模式
    
    #读取要预测的图片
    img = cv2.imread("./img/image.png")
    img=cv2.resize(img,dsize=(32,32),interpolation=cv2.INTER_NEAREST)
    cv2.imwrite('./img/resize.jpg',img) # 显示图片
    
    
    # 导入图片，图片扩展后为[1，1，32，32]
    trans = transforms.Compose(
        [   
            vision.ToTensor(),
            vision.Normalize((0.1307,), (0.3081,),is_hwc=False),
            
        ])
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)#图片转为灰度图，因为mnist数据集都是灰度图
    cv2.imwrite('./img/trans.jpg',img) # 显示图片
    img = trans(img)
    img=mindspore.tensor(img)
    img = img.unsqueeze(0)  #图片扩展多一维,因为输入到保存的模型中是4维的[batch_size,通道,长，宽]，而普通图片只有三维，[通道,长，宽]
    print(img.shape)
    
    # 预测 
    #output = model(img)
    #predict = output.argmax(dim=1)
    #print(predict.item())
    
    # 预测 
    output = model(img)
    predict = output.argmax(axis=1)
    print("预测类别：",predict.item())


In [None]:
#定义数据预处理方式
def datapipe(path, batch_size):
    image_transforms = [
        #随机旋转图片
        vision.RandomHorizontalFlip(),
        #将图片尺寸resize到32x32
        vision.Resize((32,32)),
        #正则化(当模型出现过拟合的情况时，用来降低模型的复杂度)
        vision.Normalize((0.1307,),(0.3081,)) ,
        vision.HWC2CHW() 
    ]
    label_transform = transforms.TypeCast(mindspore.int32)

    dataset = MnistDataset(path)
    dataset = dataset.map(image_transforms, 'image')
    dataset = dataset.map(label_transform, 'label')
    dataset = dataset.batch(batch_size)
    return dataset


In [None]:
#下载数据集 
    
    url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/" \
        "notebook/datasets/MNIST_Data.zip"
    path = download(url, "./", kind="zip", replace=True)
    #加载数据集
    train_dataset = datapipe('MNIST_Data/train', batch_size=64)
    test_dataset = datapipe('MNIST_Data/test', batch_size=64)  

In [None]:
#定义模型与优化器

In [None]:
#创建模型，设置运行设备CPU，并设置图模式为动态图模式方便调试
    mindspore.set_context(device_target='CPU',mode=mindspore.PYNATIVE_MODE)
    model = LeNet()
    #定义优化器
    optimizer = nn.Adam(model.trainable_params(), learning_rate=0.001)
    #计算损失和
    #多分类情况通常使用cross_entropy(交叉熵损失函数), 而对于二分类问题, 通常使用sigmod
    loss_fn = nn.CrossEntropyLoss()
    #定义前向函数
    def forward_fn(data, label):
        logits = model(data)
        loss = loss_fn(logits, label)
        return loss, logits
    #根据前向函数定义梯度函数,对网络参数求导
    grad_fn=mindspore.value_and_grad(forward_fn,None,optimizer.parameters,has_aux=True)

In [None]:
#模型训练并计算loss和accuracy

In [None]:
#调用
    epoch = 5
    Loss = []
    Accuracy = []
    #设置为训练模式
    model.set_train()
    for epoch in range(1, epoch+1):
        print("start_time",time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())))
        loss, acc = train_runner(grad_fn,  train_dataset, optimizer, epoch)
        Loss.append(loss)
        Accuracy.append(acc)
        test_runner(model, test_dataset)
        print("end_time: ",time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())),'\n')

    print('Finished Training')

In [None]:
#模型保存

In [None]:
best_ckpt_dir='./models'
    if not os.path.exists(best_ckpt_dir):
        os.mkdir(best_ckpt_dir)
    mindspore.save_checkpoint(model, './models/mnist.ckpt') #保存模型  

In [None]:
#模型测试

In [None]:
test_model('./models/mnist.ckpt')