In [1]:
import os
import argparse
from mindspore import context

parser = argparse.ArgumentParser(description='Recycle Grad-CAM model')
parser.add_argument('--device_target', type=str, default="Ascend", choices=['Ascend', 'GPU', 'CPU'])

args = parser.parse_known_args()[0]
context.set_context(mode=context.GRAPH_MODE, device_target=args.device_target)

In [2]:
import mindspore.dataset as ds
import mindspore.dataset.transforms.c_transforms as C
import mindspore.dataset.vision.c_transforms as CV
from mindspore.dataset.vision import Inter
from mindspore import dtype as mstype

In [3]:
import mindspore.nn as nn
from mindspore.common.initializer import TruncatedNormal
from mindspore.ops import operations as P


def weight_variable():
    """Weight variable."""
    return TruncatedNormal(0.02)


class Conv2dBlock(nn.Cell):
    """
     Basic convolutional block
     Args:
         in_channles (int): Input channel.
         out_channels (int): Output channel.
         kernel_size (int): Input kernel size. Default: 1
         stride (int): Stride size for the first convolutional layer. Default: 1.
         padding (int): Implicit paddings on both sides of the input. Default: 0.
         pad_mode (str): Padding mode. Optional values are "same", "valid", "pad". Default: "same".
      Returns:
          Tensor, output tensor.
    """

    def __init__(self, in_channels, out_channels, kernel_size=1, stride=1, padding=0, pad_mode="same"):
        super(Conv2dBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride,
                              padding=padding, pad_mode=pad_mode, weight_init=weight_variable())
        self.bn = nn.BatchNorm2d(out_channels, eps=0.001)
        self.relu = nn.ReLU()

    def construct(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        return x


class Inception(nn.Cell):
    """
    Inception Block
    """

    def __init__(self, in_channels, n1x1, n3x3red, n3x3, n5x5red, n5x5, pool_planes):
        super(Inception, self).__init__()
        self.b1 = Conv2dBlock(in_channels, n1x1, kernel_size=1)
        self.b2 = nn.SequentialCell([Conv2dBlock(in_channels, n3x3red, kernel_size=1),
                                     Conv2dBlock(n3x3red, n3x3, kernel_size=3, padding=0)])
        self.b3 = nn.SequentialCell([Conv2dBlock(in_channels, n5x5red, kernel_size=1),
                                     Conv2dBlock(n5x5red, n5x5, kernel_size=3, padding=0)])
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=1, pad_mode="same")
        self.b4 = Conv2dBlock(in_channels, pool_planes, kernel_size=1)
        self.concat = P.Concat(axis=1)

    def construct(self, x):
        branch1 = self.b1(x)
        branch2 = self.b2(x)
        branch3 = self.b3(x)
        cell = self.maxpool(x)
        branch4 = self.b4(cell)
        return self.concat((branch1, branch2, branch3, branch4))


class GoogleNet(nn.Cell):
    """
    Googlenet architecture
    """

    def __init__(self, num_classes, include_top=True):
        super(GoogleNet, self).__init__()
        self.conv1 = Conv2dBlock(3, 64, kernel_size=7, stride=2, padding=0)
        self.maxpool1 = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode="same")

        self.conv2 = Conv2dBlock(64, 64, kernel_size=1)
        self.conv3 = Conv2dBlock(64, 192, kernel_size=3, padding=0)
        self.maxpool2 = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode="same")

        self.block3a = Inception(192, 64, 96, 128, 16, 32, 32)
        self.block3b = Inception(256, 128, 128, 192, 32, 96, 64)
        self.maxpool3 = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode="same")

        self.block4a = Inception(480, 192, 96, 208, 16, 48, 64)
        self.block4b = Inception(512, 160, 112, 224, 24, 64, 64)
        self.block4c = Inception(512, 128, 128, 256, 24, 64, 64)
        self.block4d = Inception(512, 112, 144, 288, 32, 64, 64)
        self.block4e = Inception(528, 256, 160, 320, 32, 128, 128)
        self.maxpool4 = nn.MaxPool2d(kernel_size=2, stride=2, pad_mode="same")

        self.block5a = Inception(832, 256, 160, 320, 32, 128, 128)
        self.block5b = Inception(832, 384, 192, 384, 48, 128, 128)

        self.dropout = nn.Dropout(keep_prob=0.8)
        self.include_top = include_top
        if self.include_top:
            self.mean = P.ReduceMean(keep_dims=True)
            self.flatten = nn.Flatten()
            self.classifier = nn.Dense(1024, num_classes, weight_init=weight_variable(),
                                       bias_init=weight_variable())


    def construct(self, x):
        """construct"""
        x = self.conv1(x)
        x = self.maxpool1(x)

        x = self.conv2(x)
        x = self.conv3(x)
        x = self.maxpool2(x)

        x = self.block3a(x)
        x = self.block3b(x)
        x = self.maxpool3(x)

        x = self.block4a(x)
        x = self.block4b(x)
        x = self.block4c(x)
        x = self.block4d(x)
        x = self.block4e(x)
        x = self.maxpool4(x)

        x = self.block5a(x)
        x = self.block5b(x)
        if not self.include_top:
            return x

        x = self.mean(x, (2, 3))
        x = self.flatten(x)
        x = self.classifier(x)

        return x


In [4]:
net = GoogleNet(num_classes = 4)

In [5]:
net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')

In [6]:
net_opt = nn.Momentum(net.trainable_params(), learning_rate=0.01, momentum=0.9)

In [7]:
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig
# Set model saving parameters.
config_ck = CheckpointConfig(save_checkpoint_steps=14, keep_checkpoint_max=20)
# Use model saving parameters.
ckpoint = ModelCheckpoint(prefix="recycle_huawei", config=config_ck)

In [8]:
import io
import pandas as pd
from skimage import io
class RecycleData:
    def __init__(self, csv_file, root_dir, transform=None):
        self.annotations = pd.read_csv(csv_file)
        self.root_dir = root_dir

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        img_path = os.path.join(self.root_dir, self.annotations.iloc[index, 0])
        image = io.imread(img_path)
        y_label = int(self.annotations.iloc[index, 1])

        return (image, y_label)

        

In [9]:
# import tensorflow as tf
def create_dataset(data_path, batch_size=32, repeat_size=1,
                   num_parallel_workers=1):
    # Define the dataset.
    train_ds = ds.GeneratorDataset(source = RecycleData("train.csv", "Train_data"), column_names = ["image","label"])

    resize_height, resize_width = 224, 224
    rescale = 1.0 / 255.0
    shift = 0.0
    rescale_nml = 1 / 0.3081
    shift_nml = -1 * 0.1307 / 0.3081

    # Define the mapping to be operated.
    resize_op = CV.Resize((resize_height, resize_width), interpolation=Inter.LINEAR)
    rescale_nml_op = CV.Rescale(rescale_nml, shift_nml)
    rescale_op = CV.Rescale(rescale, shift)
    hwc2chw_op = CV.HWC2CHW()
    type_cast_op = C.TypeCast(mstype.int32)

    # Use the map function to apply data operations to the dataset.
    train_ds = train_ds.map(operations=type_cast_op, input_columns="label", num_parallel_workers=num_parallel_workers)
    train_ds = train_ds.map(operations=resize_op, input_columns="image", num_parallel_workers=num_parallel_workers)
    train_ds = train_ds.map(operations=rescale_op, input_columns="image", num_parallel_workers=num_parallel_workers)
    train_ds = train_ds.map(operations=rescale_nml_op, input_columns="image", num_parallel_workers=num_parallel_workers)
    train_ds = train_ds.map(operations=hwc2chw_op, input_columns="image", num_parallel_workers=num_parallel_workers)

    # Perform shuffle and batch operations.
    buffer_size = 1000
    train_ds = train_ds.shuffle(buffer_size=buffer_size)
    train_ds = train_ds.batch(batch_size, drop_remainder=True)

    return train_ds

In [10]:
# dataset = ds.GeneratorDataset(source = RecycleData("train.csv", "Train_data"), column_names = ["data","label"])

# for data in dataset.create_dict_iterator():
#         print("Image shape {}, label {}".format(data['data'], data['label']))
# iterator = iter(dataset)
# x, y = next(iterator)
# print(x.shape)

In [11]:
# Import the library required for model training.
from mindspore.nn import Accuracy
from mindspore.train.callback import LossMonitor
from mindspore import Model
import tensorflow as tf

def train_net(args, model, epoch_size, data_path, repeat_size, ckpoint_cb, sink_mode):
    """Define a training method."""
    # Load the training dataset.
    ds_train = create_dataset(os.path.join(data_path, "Train_data"), 32, repeat_size)
    model.train(epoch_size, ds_train, callbacks=[ckpoint_cb, LossMonitor(14)], dataset_sink_mode=sink_mode)

In [12]:
def test_net(network, model, data_path):
    """Define a validation method."""
    ds_eval = create_dataset(os.path.join(data_path, "Train_data"))
    acc = model.eval(ds_eval, dataset_sink_mode=False)
    print("{}".format(acc))

In [13]:
train_epoch = 15
mnist_path = ""
dataset_size = 1
# model = Model(net, net_loss, net_opt, metrics={"Accuracy": Accuracy()})
# train_net(args, model, train_epoch, mnist_path, dataset_size, ckpoint, False)


In [14]:
from mindspore import load_checkpoint, load_param_into_net

googlenet = GoogleNet(4)
# Store model parameters in the parameter dictionary.
param_dict = load_checkpoint("recycle_huawei-11_14.ckpt")
# Load parameters to the network.
load_param_into_net(googlenet, param_dict)
model = Model(googlenet, net_loss, metrics={"accuracy"})

In [15]:
test_net(googlenet, model, "")

{'accuracy': 0.9375}


In [None]:
# ds_eval = create_dataset(os.path.join("", "Train_data"))
# image = io.imread("Train_data/Can/can001.jpg")
# import cv2
# image = cv2.resize(image,(224,224))
# val = model.predict(image.tolist())
# print(val)

In [None]:
# print(net)

In [None]:
import numpy as np
from mindspore import Tensor

# Define a test dataset. If batch_size is set to 1, an image is obtained.
ds_test = create_dataset(os.path.join(mnist_path, "Train_data"), batch_size=1).create_dict_iterator()
data = next(ds_test)

# `images` indicates the test image, and `labels` indicates the actual classification of the test image.
images = data["image"].asnumpy()
labels = data["label"].asnumpy()

# Use the model.predict function to predict the classification of the image.
output = model.predict(Tensor(data['image']))
print(output)
predicted = np.argmax(output.asnumpy(), axis=1)

# Output the predicted classification and the actual classification.
print(f'Predicted: "{predicted[0]}", Actual: "{labels[0]}"')

In [None]:
import mindspore_xai

In [None]:
from mindspore import context, load_checkpoint, load_param_into_net
from mindspore import load_checkpoint, load_param_into_net


context.set_context(mode=context.PYNATIVE_MODE)


num_classes = 4

# load the trained classifier
net = GoogleNet(num_classes)
param_dict = load_checkpoint("recycle_huawei-11_14.ckpt")
load_param_into_net(net, param_dict)

In [None]:
from PIL import Image
import os
ds_sample = create_dataset(os.path.join("", "Test_data"), batch_size=1).create_dict_iterator()
data_sample = next(ds_sample)
x = data_sample["image"]
print(x.shape)

In [None]:
import mindspore as ms
from mindspore import Tensor
from mindspore_xai.explanation import GradCAM

# usually specify the last convolutional layer
# print(net)
grad_cam = GradCAM(net, layer="block5b")

# 5 is the class id of 'boat'
saliency = grad_cam(x, targets=2)

In [None]:
import numpy as np

print(saliency)

In [None]:
import cv2
image = cv2.imread('paper015.jpg')

In [None]:
# print(net)