# Material Utilizado

- Script para gerar TFrecords - https://tensorflow-object-detection-api-tutorial.readthedocs.io/en/latest/_downloads/da4babe668a8afb093cc7776d7e630f3/generate_tfrecord.py
- Setup https://tensorflow-object-detection-api-tutorial.readthedocs.io/en/latest/install.html
- VisualStudio C++ Build Tools - https://visualstudio.microsoft.com/pt-br/vs/community/
- CUDA Toolkit v10.1 - https://developer.nvidia.com/cuda-10.1-download-archive-base
- cuDNN v7.6.5 - https://developer.nvidia.com/rdp/cudnn-archive
- Protoc v3.14.0 - https://github.com/protocolbuffers/protobuf/releases
- Modelos Tensorflow pré-treinados - https://github.com/tensorflow/models
- GitBash (para comandos Unix no Windows) - https://git-scm.com/downloads

# Inicializa os diretórios

In [1]:
WORKSPACE_PATH = 'Tensorflow/workspace'
SCRIPTS_PATH = 'Tensorflow/scripts'
APIMODEL_PATH = 'Tensorflow/models'
ANNOTATION_PATH = WORKSPACE_PATH+'/annotations'
IMAGE_PATH = WORKSPACE_PATH+'/images'
MODEL_PATH = WORKSPACE_PATH+'/models'
PRETRAINED_MODEL_PATH = WORKSPACE_PATH+'/pre-trained-models'
CONFIG_PATH = MODEL_PATH+'/my_ssd_mobnet/pipeline.config'
CHECKPOINT_PATH = MODEL_PATH+'/my_ssd_mobnet/'
VIDEOS_PATH = WORKSPACE_PATH+'/videos'

# Download dos modelos pré-treinados TF do Tensorflow Model Zoo

In [None]:
# Faz o download do repositório 'models' da biblioteca Tensorflow
!cd Tensorflow && git clone https://github.com/tensorflow/models

In [None]:
import wget
# Faz o download do modelo pré-treinado escolhido e o armazena na pasta 'pre-trained-models' em 'workspace'
wget.download('http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8.tar.gz')
!mv ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8.tar.gz {PRETRAINED_MODEL_PATH}
!cd {PRETRAINED_MODEL_PATH} && tar -zxvf ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8.tar.gz

# Copia a configuração do modelo para o diretório de treinamento

In [4]:
# Nome da pasta a ser criada, que armazenará o pipeline e os checkpoints do treino
CUSTOM_MODEL_NAME = 'my_ssd_mobnet' 

In [None]:
# Cria a pasta my_ssd_mobnet para o modelo a ser treinado
!mkdir {'Tensorflow\workspace\models\\'+CUSTOM_MODEL_NAME}

# Copia o pipeline do modelo pré-treinado para a pasta my_ssd_mobnet, criada acima
!cp {PRETRAINED_MODEL_PATH+'/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/pipeline.config'} {MODEL_PATH+'/'+CUSTOM_MODEL_NAME}

# Converte as imagens em RGB

In [None]:
from PIL import Image     
import os       
import warnings

In [None]:
# Ignora alguns avisos durante a execução
warnings.filterwarnings('error')

# Pasta de imagens
ALL_IMAGES_PATH = IMAGE_PATH + '/allimages/'
for file in os.listdir(ALL_IMAGES_PATH):
    # Guarda a exensão da imagem
    extension = file.split('.')[-1]
    if extension == 'jpg':
        # Caminho da imagem em relação ao script
        fileLoc = ALL_IMAGES_PATH+file
        try:
            # Abre a imagem
            img = Image.open(fileLoc) 
            exif_data = img._getexif()
        except ValueError as err:
            print(err)
            print("Erro na image: ", img)

        if img.mode != 'RGB':
            print(file+', '+img.mode)
            
             # Converte a imagem em RGB
            new = Image.new("RGB", img.size, (255, 255, 255))
            new.paste(img,None)
            
            # Salva a nova imagem
            new.save(file, 'JPEG', quality=100)

print('Imagens pré-processadas com sucesso!');

# Criação do Label Map

In [None]:
# Declara um vetor de rótulos
labels = [{'name':'Com Mascara', 'id':1}, {'name':'Sem Mascara', 'id':2}]

# Abre o arquivo de rótulos
with open(ANNOTATION_PATH + '\\label_map.pbtxt', 'w') as f:
    # Escreve as linhas do arquivo, para cada rótulo
    for label in labels:
        f.write('item { \n') 
        f.write('\tname:\'{}\'\n'.format(label['name']))
        f.write('\tid:{}\n'.format(label['id']))
        f.write('}\n')

# Separação de imagens para treino e teste

In [None]:
!python {SCRIPTS_PATH + '/train-test.py'} {IMAGE_PATH} {0.9}

# Criação dos TF records

In [None]:
# Gera o arquivo TFRecord com base nas imagens de treino
!python {SCRIPTS_PATH + '/generate_tfrecord.py'} -x {IMAGE_PATH + '/train'} -l {ANNOTATION_PATH + '/label_map.pbtxt'} -o {ANNOTATION_PATH + '/train.record'}

# Gera o arquivo TFRecord com base nas imagens de teste
!python {SCRIPTS_PATH + '/generate_tfrecord.py'} -x{IMAGE_PATH + '/test'} -l {ANNOTATION_PATH + '/label_map.pbtxt'} -o {ANNOTATION_PATH + '/test.record'}

# Atualiza as configurações para o treinamento

In [2]:
# Importa as bibliotecas necessárias para a atualização da pipeline.config
# com base nas duas classes que a aplicação proposta deve reconhecer:
# - Com Mascara;
# - Sem Mascara

import tensorflow as tf
from object_detection.utils import config_util
from object_detection.protos import pipeline_pb2
from google.protobuf import text_format

In [5]:
# Caminho para a configuração do modelo a ser treinado
CONFIG_PATH = MODEL_PATH+'/'+CUSTOM_MODEL_NAME+'/pipeline.config'

In [6]:
# Abre o arquivo 'pipeline.config' e armazena em uma variável, para chamada posterior
config = config_util.get_configs_from_pipeline_file(CONFIG_PATH)

In [7]:
# Cria um arquivo modelo de pipeline config e o trata como uma string
pipeline_config = pipeline_pb2.TrainEvalPipelineConfig()
with tf.io.gfile.GFile(CONFIG_PATH, "r") as f:                                                                                                                                                                                                                     
    proto_str = f.read()                                                                                                                                                                                                                  
    text_format.Merge(proto_str, pipeline_config)  

In [8]:
# Altera os valores de interesse

# Altera o número de classes para o treinamento
pipeline_config.model.ssd.num_classes = 2

# Altera o tamanho do lote de treinamento
pipeline_config.train_config.batch_size = 4

# Altera o caminho para o último ponto de verificação do modelo pré-treinado
pipeline_config.train_config.fine_tune_checkpoint = PRETRAINED_MODEL_PATH+'/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/checkpoint/ckpt-0'

# Altera o tipo de ponto de verificação para Detecção
pipeline_config.train_config.fine_tune_checkpoint_type = "detection"

# Altera o caminho para as anotações (mapa de rótulos e TFRecords de treino e teste)
pipeline_config.train_input_reader.label_map_path= ANNOTATION_PATH + '/label_map.pbtxt'
pipeline_config.train_input_reader.tf_record_input_reader.input_path[:] = [ANNOTATION_PATH + '/train.record']
pipeline_config.eval_input_reader[0].label_map_path = ANNOTATION_PATH + '/label_map.pbtxt'
pipeline_config.eval_input_reader[0].tf_record_input_reader.input_path[:] = [ANNOTATION_PATH + '/test.record']

In [9]:
# Salva o arquivo atualizado em sua extensão original
config_text = text_format.MessageToString(pipeline_config)                                                                                                                                                                                                        
with tf.io.gfile.GFile(CONFIG_PATH, "wb") as f:                                                                                                                                                                                                                     
    f.write(config_text)   

# Linha de comando para treinar o modelo

In [11]:
print("""python {}/research/object_detection/model_main_tf2.py --model_dir={}/{} --pipeline_config_path={}/{}/pipeline.config --num_train_steps=5000""".format(APIMODEL_PATH, MODEL_PATH,CUSTOM_MODEL_NAME,MODEL_PATH,CUSTOM_MODEL_NAME))

python Tensorflow/models/research/object_detection/model_main_tf2.py --model_dir=Tensorflow/workspace/models/my_ssd_mobnet --pipeline_config_path=Tensorflow/workspace/models/my_ssd_mobnet/pipeline.config --num_train_steps=5000


In [None]:
print("""python {}/research/object_detection/model_main_tf2.py --model_dir={}/{} --pipeline_config_path={}/{}/pipeline.config --checkpoint_dir={}/{}
""".format(APIMODEL_PATH, MODEL_PATH, CUSTOM_MODEL_NAME, MODEL_PATH, CUSTOM_MODEL_NAME, MODEL_PATH, CUSTOM_MODEL_NAME))

# Carrega o modelo treinado a partir do Checkpoint

In [None]:
import os
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.builders import model_builder

In [None]:
# Carrega o arquivo pipeline.config e constrói o modelo
configs = config_util.get_configs_from_pipeline_file(CONFIG_PATH)
detection_model = model_builder.build(model_config=configs['model'], is_training=False)

# Restaura checkpoint
ckpt = tf.compat.v2.train.Checkpoint(model=detection_model)
ckpt.restore(os.path.join(CHECKPOINT_PATH, 'ckpt-1')).expect_partial()

@tf.function
def detect_fn(image):
    image, shapes = detection_model.preprocess(image)
    prediction_dict = detection_model.predict(image, shapes)
    detections = detection_model.postprocess(prediction_dict, shapes)
    return detections

# Detecção de máscara em tempo real

In [None]:
import cv2 
import numpy as np

In [None]:
category_index = label_map_util.create_category_index_from_labelmap(ANNOTATION_PATH+'/label_map.pbtxt')

In [None]:
# Configura a captura de imagens
cap = cv2.VideoCapture('http://192.168.0.3:4747/mjpegfeed?640x480')
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

In [None]:
while True: 
    ret, frame = cap.read()
    image_np = np.array(frame)
    
    input_tensor = tf.convert_to_tensor(np.expand_dims(image_np, 0), dtype=tf.float32)
    detections = detect_fn(input_tensor)
    
    num_detections = int(detections.pop('num_detections'))
    detections = {key: value[0, :num_detections].numpy()
                for key, value in detections.items()}
    detections['num_detections'] = num_detections

    # detection_classes precisam ser inteiros.
    detections['detection_classes'] = detections['detection_classes'].astype(np.int64)

    label_id_offset = 1
    image_np_with_detections = image_np.copy()

    viz_utils.visualize_boxes_and_labels_on_image_array(
                image_np_with_detections,
                detections['detection_boxes'],
                detections['detection_classes']+label_id_offset,
                detections['detection_scores'],
                category_index,
                use_normalized_coordinates=True,
                max_boxes_to_draw=5,
                min_score_thresh=.5,
                agnostic_mode=False)

    cv2.imshow('Detector de Máscaras', image_np_with_detections)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        cap.release()
        break