# 目标检测：口罩佩戴检测  

<br>
<hr>

## 1.实验介绍

### 1.1 实验背景  

今年一场席卷全球的新型冠状病毒给人们带来了沉重的生命财产的损失。  
有效防御这种传染病毒的方法就是积极佩戴口罩。  
我国对此也采取了严肃的措施，在公共场合要求人们必须佩戴口罩。  
在本次实验中，我们要建立一个目标检测的模型，可以识别图中的人是否佩戴了口罩。

### 1.2 实验要求

1. 建立深度学习模型，检测出图中的人是否佩戴了口罩，并将其尽可能调整到最佳状态。 
2. 学习OpenCV dnn的使用方法，以及经典模型 MobileNetV2 的结构。
3. 学习训练时的方法。

### 1.3 实验环境

可以使用基于 Python 的 OpenCV 、PIL 库进行图像相关处理，使用 Numpy 库进行相关数值运算，使用 MindSpore 等深度学习框架训练模型等。


### 1.4 注意事项  
+ Python 与 Python Package 的使用方式，可在右侧 `API文档` 中查阅。
+ 当右上角的『Python 3』长时间指示为运行中的时候，造成代码无法执行时，可以重新启动 Kernel 解决（左上角『Kernel』-『Restart Kernel』）。

### 1.5 参考资料
+ 论文 Joint Face Detection and Alignment using Multi-task Cascaded Convolutional Networks：https://kpzhang93.github.io/MTCNN_face_detection_alignment/
+ OpenCV：https://opencv-python-tutroals.readthedocs.io/en/latest/py_tutorials/py_tutorials.html
+ PIL：https://pillow.readthedocs.io/en/stable/
+ Numpy：https://www.numpy.org/
+ Scikit-learn： https://scikit-learn.org/
+ mindspore：https://www.mindspore.cn/tutorial/training/zh-CN/master/index.html

### 1.6 实验思路

针对目标检测的任务，可以分为两个部分：目标识别和位置检测。  
通常情况下，特征提取需要由特有的特征提取神经网络来完成，如 VGG、MobileNet、ResNet 等，这些特征提取网络往往被称为 Backbone 。而在 BackBone 后面接全连接层(FC)就可以执行分类任务。  
但 FC 对目标的位置识别乏力。经过算法的发展，当前主要以特定的功能网络来代替 FC 的作用，如 Mask-Rcnn、SSD、YOLO 等。  
我们选择充分使用已有的人脸检测的模型，再训练一个识别口罩的模型，从而提高训练的开支、增强模型的准确率。

**常规目标检测：**  

<img src="https://imgbed.momodel.cn/20200914162156.png" width=500px/>



**本次案例：**   


<img src="https://imgbed.momodel.cn/20200918102630.png" width=500px/>

<br>
<br>

## 2. OpenCV 人脸检测

数据信息存放在 `/datasets/5f680a696ec9b83bb0037081-momodel/data` 文件夹下。    
该文件夹主要有文件夹 `image`、文件 `train.txt` 、文件夹 `keras_model_data` 和文件夹 `mindspore_model_data`共四部分：
+ **image 文件夹**：图片分成两类，戴口罩的和没有戴口罩的  
+ **train.txt**：  存放的是 image 文件夹下对应图片的标签  （keras 框架专用文件）
+ **keras_model_data** 文件夹：存放 keras 框架相关预训练好的模型 （keras 框架专用文件夹）
+ **mindspore_model_data** 文件夹：存放 mindspore 框架相关预训练好的模型（mindspore 框架专用文件）

opencv 人脸检测模型在数据集 **mindspore_model_data/opencv_dnn** 文件夹中

In [None]:
import os
# 数据集路径
basic_path = "./datasets/5f680a696ec9b83bb0037081-momodel/data/"
# opencv 人脸检测模型在数据集 mindspore_model_data/opencv_dnn 文件夹中
opencv_dnn_path = basic_path + 'mindspore_model_data/opencv_dnn'
print(opencv_dnn_path)
# 查看文件夹里面文件
os.listdir(opencv_dnn_path)


发现该文件夹下有我们需要的路径，所以
+ **依赖文件夹的路径为 opencv_dnn_path** =           
`/datasets/5f680a696ec9b83bb0037081-momodel/data/mindspore_model_data/opencv_dnn`

+ **deploy.prototxt 文件的路径**：       
`opencv_dnn_path + '/' + 'deploy.prototxt'`
+ **res10_300x300_ssd_iter_140000_fp16.caffemodel 文件的路径**：        
`opencv_dnn_path + '/' + 'res10_300x300_ssd_iter_140000_fp16.caffemodel'`

In [None]:
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt

class FaceDet():
    def __init__(self):
        self.opencv_dnn_path = 'datasets/5f680a696ec9b83bb0037081-momodel/data/mindspore_model_data/opencv_dnn/'
        self.threshold = 0.15
        self.caffe_model = self.opencv_dnn_path + "deploy.prototxt"
        self.caffe_param = self.opencv_dnn_path + "res10_300x300_ssd_iter_140000_fp16.caffemodel"

    def draw_detections(self, image, detections):
        h, w, c = image.shape
        for i in range(0, detections.shape[2]):
            confidence = detections[0, 0, i, 2]
            if confidence > self.threshold:
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")
                text = "{:.2f}%".format(confidence * 100)
                y = startY - 10 if startY - 10 > 10 else startY + 10
                cv2.rectangle(image, (startX, startY), (endX, endY),
                              (0, 255, 0), 1)
                cv2.putText(image, text, (startX, y),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
        return image

    def detect(self, image):
        net = cv2.dnn.readNetFromCaffe(self.caffe_model, self.caffe_param)

        # def blobFromImage(image, scalefactor=None, size=None, mean=None, swapRB=None, crop=None, ddepth=None)
        # image：输入图像
        # mean：对每个通道像素值减去对应的均值，这里用(104.0, 177.0, 123.0)，和模型训练时的值一致
        # scalefactor：对像素值的缩放比例
        # size：模型输入图片的尺寸
        # swapRB：OpenCV默认的图片通道顺序是BGR，如果需要交换R和G，则设为True
        # crop: 调整图片大小后，是否裁剪
        blob = cv2.dnn.blobFromImage(image, 1.0, (300, 300), (104.0, 177.0, 123.0), False, False)
        net.setInput(blob)
        detections = net.forward()
        return detections


验证一下人脸检测的效果，其中人脸框上方的`xx%`为置信度

In [None]:
img = cv2.imread("test.jpg")
detect = FaceDet()
detections = detect.detect(img)
drawed_img = detect.draw_detections(img, detections)

# OpenCV reads image to BGR format. Transform images before showing it.
drawed_img = cv2.cvtColor(drawed_img, cv2.COLOR_BGR2RGB)
plt.figure(figsize = (8,8))
plt.imshow(drawed_img)


## 3 口罩识别

导入标准库、第三方库，已及MindSpore的模块。

In [None]:
import math
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt
from easydict import EasyDict
from PIL import Image

from mindspore import context
from mindspore import nn
from mindspore import Tensor
from mindspore.train.model import Model
from mindspore.train.serialization import load_checkpoint
from mindspore.train.callback import Callback
from mindspore.train.callback import LossMonitor
from mindspore.train.callback import ModelCheckpoint
from mindspore.train.callback import CheckpointConfig

# 模型定义脚本以及数据处理脚本
from mindspore_py.mobilenetV2 import MobileNetV2Backbone
from mindspore_py.mobilenetV2 import MobileNetV2Head
from mindspore_py.mobilenetV2 import mobilenet_v2
from mindspore_py.dataset import create_dataset
# Log Level = Error
os.environ['GLOG_v'] = '3'
# 设置采用图模式执行，设备为CPU/GPU
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")


### 3.1 数据集介绍

数据信息存放在 `/datasets/5f680a696ec9b83bb0037081-momodel/data/image` 文件夹下。              
收集的图片分成 mask 和 nomask 两类，戴口罩的和没有戴口罩的。        
现在我们尝试读取数据集中的一张戴口罩的图片并显示图片名称。  
现在我们尝试读取数据集中戴口罩的图片及其名称，以下是训练集中的正样本：

In [None]:
mask_num = 4
fig = plt.figure(figsize = (15,15))
basic_path = "./datasets/5f680a696ec9b83bb0037081-momodel/data/"
for i in range(mask_num):
    sub_img = cv2.imread(basic_path + "/image/mask/mask_" + str(i + 101) + ".jpg")
    sub_img = cv2.cvtColor(sub_img, cv2.COLOR_RGB2BGR)
    ax = fig.add_subplot(4, 4, (i + 1))
    ax.set_xticks([])
    ax.set_yticks([])
    ax.set_title("mask_" + str(i + 1))
    ax.imshow(sub_img)



以下是训练集中的负样本：

In [None]:
nomask_num = 4
fig1 = plt.figure(figsize=(15, 15))
for i in range(nomask_num):
    sub_img = cv2.imread(basic_path + "/image/nomask/nomask_" + str(i + 130) + ".jpg")
    sub_img = cv2.cvtColor(sub_img, cv2.COLOR_RGB2BGR)
    ax = fig1.add_subplot(4, 4, (i + 1))
    ax.set_xticks([])
    ax.set_yticks([])
    ax.set_title("nomask_" + str(i + 1))
    ax.imshow(sub_img)


### 3.2 模型训练Tips

配置后续训练、验证、推理用到的参数。

In [None]:
basic_path = "./datasets/5f680a696ec9b83bb0037081-momodel/data/"
config = EasyDict({
    "num_classes": 2,
    "image_height": 224,
    "image_width": 224,
    "data_split": [0.9, 0.1],
    "backbone_out_channels":1280,
    "batch_size": 16,
    "eval_batch_size": 8,
    "epochs": 3,
    "lr_max": 0.01,
    "momentum": 0.9,
    "weight_decay": 1e-4,
    "save_checkpoint": True,
    "save_checkpoint_epochs": 1,
    "save_checkpoint_path": "./results",
    "dataset_path": basic_path + "image",
    "pretrained_ckpt": basic_path + "mindspore_model_data/mobilenetV2-200_1067.ckpt"
})


#### 3.2.1 动态学习率

一般情况下，模型训练时采用静态学习率，如0.01。随着训练步数的增加，模型逐渐趋于收敛，对权重参数的更新幅度应该逐渐降低，以减小模型训练后期的抖动。所以，模型训练时可以采用动态下降的学习率，常见的学习率下降策略有：

- polynomial decay/square decay
- cosine decay
- exponential decay
- stage decay

这里实现 cosine decay 下降策略。

In [None]:
def cosine_decay(total_steps, lr_init=0.0, lr_end=0.0, lr_max=0.1, warmup_steps=0):
    """
    Applies cosine decay to generate learning rate array.

    Args:
       total_steps(int): all steps in training.
       lr_init(float): init learning rate.
       lr_end(float): end learning rate
       lr_max(float): max learning rate.
       warmup_steps(int): all steps in warmup epochs.

    Returns:
       list, learning rate array.
    """
    lr_init, lr_end, lr_max = float(lr_init), float(lr_end), float(lr_max)
    decay_steps = total_steps - warmup_steps
    lr_all_steps = []
    inc_per_step = (lr_max - lr_init) / warmup_steps if warmup_steps else 0
    for i in range(total_steps):
        if i < warmup_steps:
            lr = lr_init + lr_inc * (i + 1)
        else:
            cosine_decay = 0.5 * (1 + math.cos(math.pi * (i - warmup_steps) / decay_steps))
            lr = (lr_max - lr_end) * cosine_decay + lr_end
        lr_all_steps.append(lr)

    return lr_all_steps


#### 3.2.2 边训练边验证

在面对复杂网络时，往往需要进行几十甚至几百次的epoch训练。在训练之前，很难掌握在训练到第几个epoch时，模型的精度能达到满足要求的程度，所以经常会采用一边训练的同时，在相隔固定epoch的位置对模型进行精度验证，并保存相应的模型，等训练完毕后，通过查看对应模型精度的变化就能迅速地挑选出相对最优的模型。流程如下：

- 定义回调函数EvalCallback，实现同步进行训练和验证。
- 定义训练网络并执行。
- 将不同epoch下的模型精度绘制出折线图并挑选最优模型Checkpoint。

当我们训练深度学习神经网络的时候通常希望能获得最好的泛化性能。但是深度学习神经网络很容易过拟合。当网络在训练集上表现越来越好，错误率越来越低的时候，就极有可能出现了过拟合。我们可以设计一种早停法，比如验证精度连续5次不在上升就停止训练，这样能避免继续训练导致过拟合的问题。

In [None]:
class EvalCallback(Callback):
    def __init__(self, model, eval_dataset, history, eval_epochs=1):
        self.model = model
        self.eval_dataset = eval_dataset
        self.eval_epochs = eval_epochs
        self.history = history
        self.acc_max = 0
        # acc连续5次<=过程中的最大值，则停止训练
        self.count_max = 5
        self.count = 0

    def epoch_begin(self, run_context):
        self.losses = []

    def step_end(self, run_context):
        cb_param = run_context.original_args()
        loss = cb_param.net_outputs
        self.losses.append(loss.asnumpy())

    def epoch_end(self, run_context):
        cb_param = run_context.original_args()
        cur_epoch = cb_param.cur_epoch_num
        train_loss = np.mean(self.losses)

        if cur_epoch % self.eval_epochs == 0:
            metric = self.model.eval(self.eval_dataset, dataset_sink_mode=False)
            self.history["epoch"].append(cur_epoch)
            self.history["eval_acc"].append(metric["acc"])
            self.history["eval_loss"].append(metric["loss"])
            self.history["train_loss"].append(train_loss)
            if self.acc_max < metric["acc"]:
                self.count = 0
                self.acc_max = metric["acc"]
            else:
                self.count += 1
                if self.count == self.count_max:
                    run_context.request_stop()
            print("epoch: %d, train_loss: %f, eval_loss: %f, eval_acc: %f" %(cur_epoch, train_loss, metric["loss"], metric["acc"]))


### 3.3 模型训练

在模型训练过程中，可以添加检查点（Checkpoint）用于保存模型的参数，以便进行推理及中断后再训练使用。使用场景如下：

- 训练后推理场景
    - 模型训练完毕后保存模型的参数，用于推理或预测操作。
    - 训练过程中，通过实时验证精度，把精度最高的模型参数保存下来，用于预测操作。
- 再训练场景
    - 进行长时间训练任务时，保存训练过程中的Checkpoint文件，防止任务异常退出后从初始状态开始训练。
    - Fine-tuning（微调）场景，即训练一个模型并保存参数，基于该模型，面向第二个类似任务进行模型训练。

这里加载 ImageNet 数据上预训练的 MobileNetv2 进行 Fine-tuning，**只训练最后修改的 FC 层**，并在训练过程中保存 Checkpoint。

In [None]:
# 运行该 cell 代码训练模型时，请清理 results 文件夹中 mindspore 框架之前训练好的模型，否则 model_path 格式会发生一定的变化，造成推理时可能找不到模型的报错
def train():
    train_dataset, eval_dataset = create_dataset(dataset_path=config.dataset_path, config=config)
    step_size = train_dataset.get_dataset_size()

    backbone = MobileNetV2Backbone() #last_channel=config.backbone_out_channels
    # Freeze parameters of backbone. You can comment these two lines.
    for param in backbone.get_parameters():
       param.requires_grad = False
    # load parameters from pretrained model
    load_checkpoint(config.pretrained_ckpt, backbone)

    # head = MobileNetV2Head(num_classes=config.num_classes, last_channel=config.backbone_out_channels)
    head = MobileNetV2Head(input_channel=backbone.out_channels, num_classes=config.num_classes)
    network = mobilenet_v2(backbone, head)

    # define loss, optimizer, and model
    loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
    lrs = cosine_decay(config.epochs * step_size, lr_max=config.lr_max)
    opt = nn.Momentum(network.trainable_params(), lrs, config.momentum, config.weight_decay)
    model = Model(network, loss, opt, metrics={'acc', 'loss'})

    history = {'epoch': [], 'train_loss': [], 'eval_loss': [], 'eval_acc': []}
    eval_cb = EvalCallback(model, eval_dataset, history)
    cb = [eval_cb]
    if config.save_checkpoint:
        ckpt_cfg = CheckpointConfig(save_checkpoint_steps=config.save_checkpoint_epochs * step_size, keep_checkpoint_max=config.epochs)
        ckpt_cb = ModelCheckpoint(prefix="mobilenetv2_mask", directory=config.save_checkpoint_path, config=ckpt_cfg)
        cb.append(ckpt_cb)
    model.train(config.epochs, train_dataset, callbacks=cb, dataset_sink_mode=False)

    return history


将不同 epoch 下的模型精度绘制出折线图并挑选最优模型 Checkpoint。

In [None]:
history = train()

plt.plot(history['epoch'], history['train_loss'], label='train_loss')
plt.plot(history['epoch'], history['eval_loss'], 'r', label='val_loss')
plt.legend()
plt.show()

plt.plot(history['epoch'], history['eval_acc'], 'r', label = 'val_acc')
plt.legend()
plt.show()

model_path = 'results/mobilenetv2_mask-%d_39.ckpt' % (np.argmax(history['eval_acc']) + 1) # 挑选出最优模型Checkpoint
print("the path of best model checkpoint is :", model_path)


### 3.4 模型推理

加载模型 Checkpoint 进行推理。           
使用 load_checkpoint 接口加载数据时，需要把数据传入给原始网络，而不能传递给带有优化器和损失函数的训练网络。

In [None]:
def image_process(image):
    """Precess one image per time.

    Args:
        image: shape (H, W, C)
    """
    mean=[0.485*255, 0.456*255, 0.406*255]
    std=[0.229*255, 0.224*255, 0.225*255]
    image = (np.array(image) - mean) / std
    image = image.transpose((2,0,1))
    img_tensor = Tensor(np.array([image], np.float32))
    return img_tensor

def infer_one(network, image_path):
    image = Image.open(image_path).resize((config.image_height, config.image_width))
    logits = network(image_process(image))
    pred = np.argmax(logits.asnumpy(), axis=1)[0]
    print("图片路径：", image_path, "图片预测类别",pred)

def infer(basic_path, model_path):
    backbone = MobileNetV2Backbone(last_channel=config.backbone_out_channels)
    head = MobileNetV2Head(input_channel=backbone.out_channels, num_classes=config.num_classes)
    network = mobilenet_v2(backbone, head)
    load_checkpoint(model_path, network)
    for i in range(250, 258):
        infer_one(network, basic_path + 'image/mask/mask_%s.jpg' % i)
    for i in range(371, 378):
        infer_one(network, basic_path + 'image/nomask/nomask_%s.jpg' % i)


In [None]:
infer(basic_path, model_path)


### 3.5 口罩识别

In [None]:
class MaskRec():
    def __init__(self, model_path):
        self.face_det = FaceDet()
        self.mask_model_input_size = (160, 160)
        self.class_names = ['YES', 'NO']
        # 初始化MobileNetv2
        backbone = MobileNetV2Backbone(last_channel=config.backbone_out_channels)
        head = MobileNetV2Head(input_channel=backbone.out_channels, num_classes=config.num_classes)
        self.mask_model = mobilenet_v2(backbone, head)
        load_checkpoint(model_path, self.mask_model)

    def to_small_square(self, startX, startY, endX, endY):
        w = endX - startX
        h = endY - startY
        l = min(w, h)

        startX = int(startX + (w - l) / 2)
        endX = startX + l
        startY = int(startY + (h - l) / 2)
        endY = startY + l
        return startX, startY, endX, endY

    def recognize(self, image):
        # 人脸检测
        detections = self.face_det.detect(image)
        h, w, c = image.shape
        predict_labels = []
        for i in range(0, detections.shape[2]):
            confidence = detections[0, 0, i, 2]
            if confidence > self.face_det.threshold:
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                startX, startY, endX, endY = box.astype("int")
                # 截取图像
                startX, startY, endX, endY = self.to_small_square(startX, startY, endX, endY)
                crop_img = image[startY:endY, startX:endX]
                # 图像预处理, mask_model accept image with RGB
                resized = cv2.resize(crop_img, (config.image_height, config.image_width))
                img_tensor = image_process(cv2.cvtColor(crop_img, cv2.COLOR_BGR2RGB))
                # 预测
                logits = self.mask_model(img_tensor)
                predict_label = np.argmax(logits.asnumpy(), axis=1)[0]
                predict_labels.append(predict_label)
                # 画图
                y = startY - 10 if startY - 10 > 10 else startY + 10
                cv2.rectangle(image, (startX, startY), (endX, endY),
                              (0, 255, 0), 1)
                cv2.putText(image, self.class_names[predict_label], (startX, y),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
        return image, len(predict_labels), predict_labels.count(0)


In [None]:
img = cv2.imread("./test1.jpg")
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

detect = MaskRec(model_path)
img, all_num, mask_num = detect.recognize(img)

# 展示图片口罩识别结果
fig = plt.figure(figsize=(8, 8))
ax1 = fig.add_subplot(111)
ax1.set_xticks([])
ax1.set_yticks([])
ax1.set_title('test_mask')
ax1.imshow(img)
print("图中的人数有：" + str(all_num) + "个")
print("戴口罩的人数有：" + str(mask_num) + "个")


## 4 作业
**作业要求及注意事项**：    

1.使用上述学到的方法，训练自己的口罩识别模型，尽可能提高准确度。将训练好的模型保存在 results 文件夹下。             
2.点击左侧栏提交作业后点击【生成文件】则需要勾选与预测 predict() 函数的 cell相关的其它cell ，并将其转化成为 main.py 文件。                       
3.请导入必要的包和第三方库以及该模型所依赖的 py 文件 (包括此文件中曾经导入过的)。             
4.请加载你认为训练最佳的模型，即请按要求填写模型路径。              
5.predict() 函数的输入输出及函数名称请不要改动。


===========================================  **模型预测代码答题区域**  ===========================================  
在下方的代码块中编写 **模型预测** 部分的代码，请勿在别的位置作答

In [None]:
# -------------------------- 请加载您最满意的模型 -----------------------------
# 加载模型(请加载你认为的最佳模型)
# 加载模型,加载请注意 model_path 是相对路径, 与当前文件同级。
# 如果你的模型是在 results 文件夹下的 mobilenetv2_mask_1-1_39.ckpt 模型，则 model_path = 'results/mobilenetv2_mask-1_39.ckpt'
model_path = None
# ---------------------------------------------------------------------------

def predict(img):
    """
    加载模型和模型预测
    :param img: cv2.imread 图像
    :return: 预测的图片中的总人数、其中佩戴口罩的人数
    """
    # -------------------------- 实现模型预测部分的代码 ---------------------------

    detect = MaskRec(model_path)
    img, all_num, mask_num = detect.recognize(img)

    # -------------------------------------------------------------------------

    return all_num,mask_num


In [None]:
# 输入图片路径和名称
img = cv2.imread("test1.jpg")
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
all_num,mask_num = predict(img)
# 打印预测该张图片中总人数以及戴口罩的人数
print(all_num, mask_num)
