In [1]:
pip install download

Looking in indexes: http://repo.myhuaweicloud.com/repository/pypi/simple
Collecting download
  Downloading http://repo.myhuaweicloud.com/repository/pypi/packages/37/45/01e7455a9659528e77a414b222326d4c525796e4f571bbabcb2e0ff3d1f4/download-0.3.5-py3-none-any.whl (8.8 kB)
Collecting urllib3!=1.25.0,!=1.25.1,<1.26,>=1.21.1
  Downloading http://repo.myhuaweicloud.com/repository/pypi/packages/56/aa/4ef5aa67a9a62505db124a5cb5262332d1d4153462eb8fd89c9fa41e5d92/urllib3-1.25.11-py2.py3-none-any.whl (127 kB)
[K     |████████████████████████████████| 127 kB 56.7 MB/s eta 0:00:01
[?25hInstalling collected packages: urllib3, download
  Attempting uninstall: urllib3
    Found existing installation: urllib3 1.26.10
    Uninstalling urllib3-1.26.10:
      Successfully uninstalled urllib3-1.26.10
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
moxing-framework 2.0.0.rc2.4

In [2]:
"""
LLdois
mindspore 1.7.0
推理文件
"""
import mindspore as ms
from typing import Type, Union, List, Optional
from mindspore import nn
from mindspore.common.initializer import Normal
from mindspore import load_checkpoint, load_param_into_net
from download import download
import os
from mindspore import context
from PIL import Image
import numpy as np


# 构建resnet50网络
class ResidualBlockBase(nn.Cell):
    expansion: int = 1  # 最后一个卷积核数量与第一个卷积核数量相等

    def __init__(self, in_channel: int, out_channel: int,
                 stride: int = 1, norm: Optional[nn.Cell] = None,
                 down_sample: Optional[nn.Cell] = None) -> None:
        super(ResidualBlockBase, self).__init__()
        if not norm:
            self.norm = nn.BatchNorm2d(out_channel)
        else:
            self.norm = norm

        self.conv1 = nn.Conv2d(in_channel, out_channel,
                               kernel_size=3, stride=stride,
                               weight_init=Normal(mean=0, sigma=0.02))
        self.conv2 = nn.Conv2d(in_channel, out_channel,
                               kernel_size=3, weight_init=Normal(mean=0, sigma=0.02))
        self.relu = nn.ReLU()
        self.down_sample = down_sample

    def construct(self, x):
        """ResidualBlockBase construct."""
        identity = x  # shortcuts分支

        out = self.conv1(x)  # 主分支第一层：3*3卷积层
        out = self.norm(out)
        out = self.relu(out)
        out = self.conv2(out)  # 主分支第二层：3*3卷积层
        out = self.norm(out)

        if self.down_sample is not None:
            identity = self.down_sample(x)
        out += identity  # 输出为主分支与shortcuts之和
        out = self.relu(out)

        return out


class ResidualBlock(nn.Cell):
    expansion = 4  # 最后一个卷积核的数量是第一个卷积核数量的4倍

    def __init__(self, in_channel: int, out_channel: int,
                 stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None:
        super(ResidualBlock, self).__init__()

        self.conv1 = nn.Conv2d(in_channel, out_channel,
                               kernel_size=1, weight_init=Normal(mean=0, sigma=0.02))
        self.norm1 = nn.BatchNorm2d(out_channel)
        self.conv2 = nn.Conv2d(out_channel, out_channel,
                               kernel_size=3, stride=stride,
                               weight_init=Normal(mean=0, sigma=0.02))
        self.norm2 = nn.BatchNorm2d(out_channel)
        self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion,
                               kernel_size=1, weight_init=Normal(mean=0, sigma=0.02))
        self.norm3 = nn.BatchNorm2d(out_channel * self.expansion)

        self.relu = nn.ReLU()
        self.down_sample = down_sample

    def construct(self, x):
        identity = x  # shortscuts分支

        out = self.conv1(x)  # 主分支第一层：1*1卷积层
        out = self.norm1(out)
        out = self.relu(out)
        out = self.conv2(out)  # 主分支第二层：3*3卷积层
        out = self.norm2(out)
        out = self.relu(out)
        out = self.conv3(out)  # 主分支第三层：1*1卷积层
        out = self.norm3(out)

        if self.down_sample is not None:
            identity = self.down_sample(x)

        out += identity  # 输出为主分支与shortcuts之和
        out = self.relu(out)

        return out


def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]],
               channel: int, block_nums: int, stride: int = 1):
    down_sample = None  # shortcuts分支

    if stride != 1 or last_out_channel != channel * block.expansion:
        down_sample = nn.SequentialCell([
            nn.Conv2d(last_out_channel, channel * block.expansion,
                      kernel_size=1, stride=stride, weight_init=Normal(mean=0, sigma=0.02)),
            nn.BatchNorm2d(channel * block.expansion, gamma_init=Normal(mean=1, sigma=0.02))
        ])

    layers = []
    layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample))

    in_channel = channel * block.expansion
    # 堆叠残差网络
    for _ in range(1, block_nums):
        layers.append(block(in_channel, channel))

    return nn.SequentialCell(layers)


class ResNet(nn.Cell):
    def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]],
                 layer_nums: List[int], num_classes: int, input_channel: int) -> None:
        super(ResNet, self).__init__()

        self.relu = nn.ReLU()
        # 第一个卷积层，输入channel为3（彩色图像），输出channel为64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=Normal(mean=0, sigma=0.02))
        self.norm = nn.BatchNorm2d(64)
        # 最大池化层，缩小图片的尺寸
        self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')
        # 各个残差网络结构块定义，
        self.layer1 = make_layer(64, block, 64, layer_nums[0])
        self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2)
        self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2)
        self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2)
        # 平均池化层
        self.avg_pool = nn.AvgPool2d()
        # flattern层
        self.flatten = nn.Flatten()
        # 全连接层
        self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes)

    def construct(self, x):
        x = self.conv1(x)
        x = self.norm(x)
        x = self.relu(x)
        x = self.max_pool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avg_pool(x)
        x = self.flatten(x)
        x = self.fc(x)

        return x


def _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]],
            layers: List[int], num_classes: int, pretrained: bool, pretrianed_ckpt: str,
            input_channel: int):
    model = ResNet(block, layers, num_classes, input_channel)

    if pretrained:
        # 加载预训练模型
        download(url=model_url, path=pretrianed_ckpt)
        param_dict = load_checkpoint(pretrianed_ckpt)
        load_param_into_net(model, param_dict)

    return model


def resnet50(num_classes: int = 1000, pretrained: bool = False):
    """ResNet50模型"""
    resnet50_url = "https://obs.dualstack.cn-north-4.myhuaweicloud.com/mindspore-website/notebook/models/application" \
                   "/resnet50_224_new.ckpt "
    resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt"
    return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes,
                   pretrained, resnet50_ckpt, 2048)


def load_testdata(testdata_path):
    image = Image.open(testdata_path)
    mean = [0.485 * 255, 0.456 * 255, 0.406 * 255]
    std = [0.229 * 255, 0.224 * 255, 0.225 * 255]
    scale = 32
    image_size = 224
    image = image.resize((image_size + scale, image_size + scale))
    image = image.crop((int(scale / 2), int(scale / 2), image_size + int(scale / 2), image_size + int(scale / 2)))
    image = np.array(image)
    image = ms.Tensor(image, dtype=ms.float32)
    image = (image - mean)/std
    image = image.transpose()
    image = image.reshape((1, 3, 224, 224))
    return image


def main():
    lr = 0.001  # 学习率
    momentum = 0.9  # momentum
    # 使用ascend加速
    context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
    # 加载训练好的网络并预测
    network = resnet50()
    # 模型微调
    # 全连接层输入层的大小
    in_channels = network.fc.in_channels
    # 输出通道数大小为分类数10
    head = nn.Dense(in_channels, 10)
    # 重置全连接层
    network.fc = head

    # 平均池化层kernel size为7
    avg_pool = nn.AvgPool2d(kernel_size=7)
    # 重置平均池化层
    network.avg_pool = avg_pool
    # 最优模型路径
    best_ckpt_path = "./model/resnet50-best.ckpt"
    # 加载参数
    param_dict = load_checkpoint(best_ckpt_path)
    load_param_into_net(network, param_dict)
    # 定义优化器和损失函数
    opt = nn.Momentum(params=network.trainable_params(), learning_rate=lr, momentum=momentum)
    loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
    # 调用高阶接口
    model = ms.Model(network, loss_fn, opt, metrics={"Accuracy": nn.Accuracy()})
    class_name = {0: "冰激凌", 1: "鸡蛋布丁", 2: "烤冷面", 3: "芒果班戟", 4: " 三明治", 5: "松鼠鱼",
                  6: "甜甜圈", 7: "土豆泥", 8: "小米粥", 9: "玉米饼"}
    if os.path.exists("./result.txt"):
        os.remove("./result.txt")
    for i in range(500):
        testdata_path = "./Data/test/img_" + str(i) + ".jpg"
        image = load_testdata(testdata_path=testdata_path)
        temp = model.predict(image)
        result = np.argmax(temp, axis=1)
        result = int(result)
        # 写入预测结果
        with open("./result.txt", "a") as op:
            op.write(str(result) + "\n")
        op.close()
    print("已完成")


if __name__ == "__main__":
    main()

已完成
