# Weapon Detector

В инструменте используется Object Detection API и модель [RetinaNet](https://arxiv.org/abs/1708.02002) для обнаружения объектов оружия, которую можно переобучить, используя минимальный набор учебных изображений.

RetinaNet - одна из лучших одноступенчатых моделей обнаружения объектов, которая хорошо зарекомендовала себя при работе с плотными и мелкомасштабными объектами.

Модель была представлена Facebook AI Research для решения проблемы обнаружения плотных объектов. Она была разработана для устранения несбалансированности и противоречивости одноступенчатых систем обнаружения объектов, таких как YOLO и SSD, при работе с классами переднего и заднего плана.

По сути, оригинальная сеть RetinaNet является составной сетью, состоящей из:

* Backbone – основная (базовая) сеть, служащая для извлечения признаков из поступающего на вход изображения. Данная часть сети является вариативной и в её основу могут входить классификационные нейросети, такие как ResNet, VGG, EfficientNet и другие

* Feature Pyramid Net (FPN) – свёрточная нейронная сеть, построенная в виде пирамиды, служащая для объединения достоинств карт признаков нижних и верхних уровней сети, первые имеют высокое разрешение, но низкую семантическую, обобщающую способность; вторые — наоборот

* Classification Subnet – подсеть, извлекающая из FPN информацию о классах объектов, решая задачу классификации

* Regression Subnet – подсеть, извлекающая из FPN информацию о координатах объектов на изображении, решая задачу регрессии

<img src='https://i115.fastpic.ru/big/2021/0628/5b/a64b2f27cf2dca5feae0b3097599be5b.png' alt='model'>

Прежде всего, необходимо клонировать Tensorflow Model Garden и установить Tensorflow 2 [Object Detection API](https://github.com/tensorflow/models/tree/master/research/object_detection).

In [None]:
!rm -rf ./models/

!git clone --depth 1 https://github.com/tensorflow/models/

Установка Object Detection API

In [None]:
!cd models/research/ && protoc object_detection/protos/*.proto --python_out=. && cp object_detection/packages/tf2/setup.py . && python3 -m pip install .

## Импорт библиотек

Импорт библиотек, необходимых для проекта

In [None]:
import glob
import imageio
from six import BytesIO
from PIL import Image, ImageDraw, ImageFont
from IPython.display import display, Javascript
from IPython.display import Image as IPyImage
import matplotlib
import matplotlib.pyplot as plt
import os
import random
import zipfile
import io
import scipy.misc
import numpy as np
import tensorflow as tf

# Import Object Detection API packages

from object_detection.utils import label_map_util
from object_detection.utils import config_util
from object_detection.utils  import visualization_utils as viz_utils
from object_detection.utils import colab_utils
from object_detection.utils import colab_utils
from object_detection.builders import model_builder

print('Done')

## Утилиты


In [None]:
def load_image_into_numpy_array(path):
    
    image_data = tf.io.gfile.GFile(path, 'rb').read()
    image = Image.open(BytesIO(image_data))
    (im_width, im_height) = image.size
    
    return np.array(image.getdata()).reshape(
        (im_height, im_width, 3)).astype(np.uint8)


def plot_detections(image_np,
                    boxes,
                    classes,
                    scores,
                    category_index,
                    figsize=(12, 16),
                    image_name=None):
    
    image_np_with_annotations = image_np.copy()
    
    viz_utils.visualize_boxes_and_labels_on_image_array(
        image_np_with_annotations,
        boxes,
        classes,
        scores,
        category_index,
        use_normalized_coordinates=True,
        min_score_thresh=0.8)
    
    if image_name:
        plt.imsave(image_name, image_np_with_annotations)
    
    else:
        plt.imshow(image_np_with_annotations)

## Визуализация данных изображений оружия

Для обучения будут использованы 4 изображения оружия.

In [None]:
%matplotlib inline
train_image_dir = './training'
train_images_np = []

for i in range(1, 5):

    # define the path (string) for each image
    image_path = os.path.join('./training/gun_'+str(i)+'.jpg')
    print(image_path)

    # load images into numpy arrays and append to a list
    train_images_np.append(load_image_into_numpy_array(image_path))

# configure plot settings via rcParams
plt.rcParams['axes.grid'] = False
plt.rcParams['xtick.labelsize'] = False
plt.rcParams['ytick.labelsize'] = False
plt.rcParams['xtick.top'] = False
plt.rcParams['xtick.bottom'] = False
plt.rcParams['ytick.left'] = False
plt.rcParams['ytick.right'] = False
plt.rcParams['figure.figsize'] = [14, 7]

# plot images
for idx, train_image_np in enumerate(train_images_np):
    plt.subplot(1, 4, idx+1)
    plt.imshow(train_image_np)

plt.show()

## Подготовка данных для обучения

В процессе работы будут создаваться ячейки для установления объектов поиска. Другими словами, необходимо обвести объекты на изображениях, данные которых будут использоваться для последующего обучения. Для этих целей будет использоваться **colab_utils.annotate** 

In [None]:
# Define the list of ground truth boxes
gt_boxes = []
colab_utils.annotate(train_images_np, box_storage_pointer=gt_boxes)

## Определение словаря указателя категорий

Необходимо указать модели, а точнее какой идентификатор класса присвоить категории "имеет оружие", и какое "имя" связать с этим целочисленным идентификатором.

In [None]:
has_gun_class_id = 1

category_index = {
    has_gun_class_id: 
        {
            'id': has_gun_class_id, 
            'name': 'has gun'
        }
}

num_classes = 1

print(category_index[has_gun_class_id])

Теперь необходимо проветси некоторую предварительную обработку данных, чтобы они были правильно отформатированы перед подачей в модель:

* Преобразовать метки классов в одноточечные представления.
* Преобразовать все (т.е. учебные изображения, gt-боксы и метки классов) в тензоры.

In [None]:
label_id_offset = 1
train_image_tensors = []
gt_classes_one_hot_tensors = []
gt_box_tensors = []

for (train_image_np, gt_box_np) in zip(train_images_np, gt_boxes):
  train_image_tensors.append(tf.expand_dims(tf.convert_to_tensor( train_image_np, dtype=tf.float32), axis=0))
    
  gt_box_tensors.append(tf.convert_to_tensor(gt_box_np, dtype=tf.float32))
    
  zero_indexed_groundtruth_classes = tf.convert_to_tensor(np.ones(shape=[gt_box_np.shape[0]], dtype=np.int32) - label_id_offset)
    
  gt_classes_one_hot_tensors.append(tf.one_hot(zero_indexed_groundtruth_classes, num_classes))

print('Done')

## Загрузка контрольных точек с предварительно обученными весами

Далее нужно скачать [RetinaNet](https://arxiv.org/abs/1708.02002) и скопировать его в каталог обнаружения объектов. Загрузим сжатую контрольную точку SSD Resnet 50 версии 1, 640 x 640. Затем распакуем загруженный файл и переместим распакованную контрольную точку в models/research/object_detection/test_data/


In [None]:
!wget http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz

!tar -xf ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz

!mv ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/checkpoint models/research/object_detection/test_data/

## Настройка модели


#### pipeline_config
- В Colab, слева от оглавления, выбираем значок папки, чтобы отобразить браузер файлов для текущего рабочего пространства.  
- Переходим в папку `models/research/object_detection/configs/tf2`. В папке есть несколько файлов с расширением .config.  
- Находим файл, соответствующий ssd resnet 50 version 1 640x640.
- Устанавливаем `pipeline_config` в строку, содержащую полный путь к файлу конфигурации resnet, другими словами: `models/research/.../... .config`

#### configs
Если изучить модуль [config_util](https://github.com/tensorflow/models/blob/master/research/object_detection/utils/config_util.py) который был исопртирован ранее, то можно найти функцию:

```
def get_configs_from_pipeline_file(pipeline_config_path, config_override=None):
```
- Необходимо использовать эту функцию, для загрузки конфигурации из `pipeline_config`.
  - после этого `configs` будет содержать словарь.


In [None]:
tf.keras.backend.clear_session()

pipeline_config = "/content/models/research/object_detection/configs/tf2/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.config"

configs = config_util.get_configs_from_pipeline_file(pipeline_config)

# Read in the object stored at the key 'model' of the configs dictionary
model_config = configs['model']

# Modify the number of classes from its default of 90
model_config.ssd.num_classes = num_classes

model_config.ssd.freeze_batchnorm = True
model_config

## Построение пользовательской модели

Используем model_builder, который имеет функцию *build*

In [None]:
detection_model = model_builder.build(
    model_config, is_training= True
)

print(type(detection_model))

## Восстановление весов из контрольной точки

Теперь выборочно восстановливаем веса из контрольной точки.
- Конечная цель - создать пользовательскую модель, которая повторно использует некоторые слои RetinaNet (в настоящее время хранящиеся в переменной `detection_model`).
  - Слои RetinaNet, которые будут использоваться повторно, это:
    - Слои извлечения признаков
    - Слой предсказания регрессии по граничным ячейкам
  - Слой RetinaNet, которую не будет использоваться повторно, это слой прогнозирования классификации (так как будем определять и обучать собственный слой классификации, специфичный для оружия).
  - Для слоев RetinaNet, которые будем использовать повторно, восстановливаем веса из контрольной точки, которую выбрали ранее.

In [None]:
detection_model._box_predictor

# View variables in _box_predictor
vars(detection_model._box_predictor)

## Определение контрольных точек для предсказателя ячеек

Определим box_predictor_checkpoint как контрольную точку для этих двух слоев предсказателя ячеек модели detection_model

In [None]:
tmp_box_predictor_checkpoint = tf.compat.v2.train.Checkpoint(
    _base_tower_layers_for_heads=detection_model._box_predictor._base_tower_layers_for_heads,
    _box_prediction_head=detection_model._box_predictor._box_prediction_head,
    )

#Define the temporary model checkpoint
tmp_model_checkpoint = tf.compat.v2.train.Checkpoint(
          _feature_extractor=detection_model._feature_extractor,
          _box_predictor=tmp_box_predictor_checkpoint)

tmp_model_checkpoint = tf.compat.v2.train.Checkpoint(model=tmp_model_checkpoint)

## Восстановление контрольной точки
Сначала нужно найти и установить `checkpoint_path`.

- checkpoint_path: 
  - Если была выполнена предыдущая ячейка кода, которая загружает и перемещает контрольную точку, то в результате её выполнения создается папка с именем "checkpoint".  
    - Папка "checkpoint" содержит три файла:
      - checkpoint
      - ckpt-0.data-00000-of-00001
      - ckpt-0.index
    - В качестве checkpoint_path нужно установить полный путь, а точнее `models/.../ckpt-0`. 
      - Нужно обратить внимание, что нет необходимости указывать расширение файла после `ckpt-0`.

Далее определяем последнюю контрольную точку, используя `tf.train.Checkpoint()`.
- Для аргумента с одним ключевым словом, 
  - Задаем ключ как `model=` 
  - Устанавливаем значение временной контрольной точки модели, которую только что определили.
- **ВАЖНО**: Нужно задать ключевое слово аргумента `model=`, а не что-то другое, например `detection_model=`.
- Если задать этот ключевой аргумент каким-либо другим значением, это не приведет к немедленному появлению ошибки, но когда будет осуществляться процесс обучения модели на изображениях, потери модели не уменьшатся (модель не будет обучаться).

Наконец, вызовем функцию `.restore()` этой контрольной точки, передав ей путь к контрольной точке.

In [None]:
checkpoint_path = 'models/research/object_detection/test_data/checkpoint/ckpt-0'

checkpoint =tf.train.Checkpoint(model = detection_model) 

# Restore the checkpoint to the checkpoint path
tmp_model_checkpoint.restore(checkpoint_path).expect_partial()

## Запуск фиктивного изображения для создания переменных модели

Пропустим фиктивное изображение через модель, чтобы были созданы переменные. Нужно будет выбрать обучаемые переменные позже, а сейчас они все еще пусты. Если выполнить `len(detection_model.trainable_variables)`, то в ячейке кода получим `0`. Передадим фиктивное изображение через прямой проход, чтобы создать эти переменные.

Важными методами, доступными в объекте `detection_model`, являются:
- [preprocess()](https://github.com/tensorflow/models/blob/dc4d11216b738920ddb136729e3ae71bddb75c7e/research/object_detection/meta_architectures/ssd_meta_arch.py#L459): 
    - принимает тензор, представляющий изображение и возвращает его
    - возвращает `изображения, формы`.
    - для фиктивного изображения можно объявить [tensor of zeros](https://www.tensorflow.org/api_docs/python/tf/zeros), имеющий форму, которую может принять метод `preprocess()` (т.е. [партия, высота, ширина, каналы]). 
    - при создании фиктивного изображения можно передать партию, равную 1. 

- [predict()](https://github.com/tensorflow/models/blob/dc4d11216b738920ddb136729e3ae71bddb75c7e/research/object_detection/meta_architectures/ssd_meta_arch.py#L525)
  - принимает `изображение, формы`, которые создаются вызовом функции `preprocess()`.
  - возвращает предсказание в словаре Python.
  - позволит пропустить фиктивное изображение через прямой проход сети и создать переменные модели

- [postprocess()](https://github.com/tensorflow/models/blob/dc4d11216b738920ddb136729e3ae71bddb75c7e/research/object_detection/meta_architectures/ssd_meta_arch.py#L655)
  - принимает prediction_dict и формы
  - возвращает словарь постпроцессированных предсказаний обнаруженных объектов ("detections").
   

In [None]:
tmp_image, tmp_shapes = detection_model.preprocess(tf.zeros([1, 640, 640, 3]))
tmp_prediction_dict = detection_model.predict(tmp_image, tmp_shapes)

tmp_detections = detection_model.postprocess(tmp_prediction_dict, tmp_shapes)

print('Weights restored!')

In [None]:
#Set training hyperparameters
tf.keras.backend.set_learning_phase(True)
batch_size = 4

num_batches = 100

learning_rate = 0.01

# set the optimizer and pass in the learning_rate
optimizer = tf.keras.optimizers.SGD(learning_rate= learning_rate, momentum= 0.9)

In [None]:
# define a list that contains the layers that  wish to fine tune
trainable_variables = detection_model.trainable_variables
to_fine_tune = []
prefixes_to_train = [
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalBoxHead',
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalClassHead']
for var in trainable_variables:
  if any([var.name.startswith(prefix) for prefix in prefixes_to_train]):
    to_fine_tune.append(var)

## Обучение модели
Определим функцию, которая обрабатывает обучение для одной партии, которую позже будем использовать в цикле обучения.

Сначала пройдемся по ячейкам кода, чтобы узнать, как будем проводить обучение с помощью этой модели.

In [None]:
# Get a batch of your training images
g_images_list = train_image_tensors[0:2]

# Use .preprocess to preprocess an image
g_preprocessed_image = detection_model.preprocess(g_images_list[0])
print(f"g_preprocessed_image type: {type(g_preprocessed_image)}")
print(f"g_preprocessed_image length: {len(g_preprocessed_image)}")
print(f"index 0 has the preprocessed image of shape {g_preprocessed_image[0].shape}")
print(f"index 1 has information about the image's true shape excluding padding: {g_preprocessed_image[1]}")

Можно предварительно обработать каждое изображение и сохранить результаты в два отдельных списка

* Один список предварительно обработанных изображений.
* Один список истинной формы для каждого предварительно обработанного изображения

In [None]:
preprocessed_image_list = []
true_shape_list = []

for img in g_images_list:
    processed_img, true_shape = detection_model.preprocess(img)
    preprocessed_image_list.append(processed_img)
    true_shape_list.append(true_shape)

## Предсказание
В `detection_model` также есть функция `.predict`.  Согласно исходному коду для [predict](https://github.com/tensorflow/models/blob/dc4d11216b738920ddb136729e3ae71bddb75c7e/research/object_detection/meta_architectures/ssd_meta_arch.py#L525)


In [None]:
# Turn a list of tensors into a tensor
preprocessed_image_tensor = tf.concat(preprocessed_image_list, axis=0)
true_shape_tensor = tf.concat(true_shape_list, axis=0)

# Make predictions on the images
prediction_dict = detection_model.predict(preprocessed_image_tensor, true_shape_tensor)

print("keys in prediction_dict:")
for key in prediction_dict.keys():
    print(key)

In [None]:
# Get the ground truth bounding boxes
gt_boxes_list = gt_box_tensors[0:2]

# Get the ground truth class labels
gt_classes_list = gt_classes_one_hot_tensors[0:2]

# Provide the ground truth to the model
detection_model.provide_groundtruth(
            groundtruth_boxes_list=gt_boxes_list,
            groundtruth_classes_list=gt_classes_list)

Теперь можно посчитать потери

In [None]:
# Calculate the loss after you've provided the ground truth 
losses_dict = detection_model.loss(prediction_dict, true_shape_tensor)

# View the loss dictionary
losses_dict = detection_model.loss(prediction_dict, true_shape_tensor)
print(f"loss dictionary keys: {losses_dict.keys()}")
print(f"localization loss {losses_dict['Loss/localization_loss']:.8f}")
print(f"classification loss {losses_dict['Loss/classification_loss']:.8f}")

## Определение одного шага обучения

In [None]:

# decorate with @tf.function for faster training (remember, graph mode!)
@tf.function
def train_step_fn(image_list,
                groundtruth_boxes_list,
                groundtruth_classes_list,
                model,
                optimizer,
                vars_to_fine_tune):

    shapes = tf.constant(batch_size * [[640, 640, 3]], dtype=tf.int32)
    model.provide_groundtruth(
        groundtruth_boxes_list=groundtruth_boxes_list,
        groundtruth_classes_list=groundtruth_classes_list)
    with tf.GradientTape() as tape:
        # Preprocess the images
        
        preprocessed_image_tensor = tf.concat(
            [detection_model.preprocess(image_tensor)[0]
             for image_tensor in image_list], axis=0) 
        true_shape_tensor = preprocessed_image_tensor.shape 

        # Make a prediction
        prediction_dict = model.predict(preprocessed_image_tensor, shapes)

        # Calculate the total loss (sum of both losses)
        losses_dict = model.loss(prediction_dict, shapes)
        total_loss = losses_dict['Loss/localization_loss'] + losses_dict['Loss/classification_loss']

        # Calculate the gradients
        gradients = tape.gradient(total_loss, vars_to_fine_tune)

        optimizer.apply_gradients(zip(gradients, vars_to_fine_tune))
        
    return total_loss

## Запуск цикла обучения

Запустим цикл обучения, используя функцию шага обучения, которую определили ранее.

In [None]:
print('Start', flush=True)

for idx in range(num_batches):
    all_keys = list(range(len(train_images_np)))
    random.shuffle(all_keys)
    example_keys = all_keys[:batch_size]

    # Get the ground truth
    gt_boxes_list = [gt_box_tensors[key] for key in example_keys]
    gt_classes_list = [gt_classes_one_hot_tensors[key] for key in example_keys]
    
    # get the images
    image_tensors = [train_image_tensors[key] for key in example_keys]

    # Training step (forward pass + backwards pass)
    total_loss = train_step_fn(image_tensors, 
                               gt_boxes_list, 
                               gt_classes_list,
                               detection_model,
                               optimizer,
                               to_fine_tune
                              )

    if idx % 10 == 0:
        print('batch ' + str(idx) + ' of ' + str(num_batches)
        + ', loss=' +  str(total_loss.numpy()), flush=True)

print('Done')

## Загрузка тестовых изображений и запуск вывода с новой моделью

Теперь протестируем модель на новом наборе изображений, которые хранятся в каталоге result. В ячейке ниже эти изображения загружаются в массивы numpy, чтобы подготовить их к выводу.

In [None]:
test_image_dir = './result/'
test_images_np = []

# load images into a numpy array. this will take a few minutes to complete.
for i in range(0, 2):
    image_path = os.path.join(test_image_dir, 'test' + "{0:04}".format(i) + '.jpeg')
    print(image_path)
    test_images_np.append(np.expand_dims(
      load_image_into_numpy_array(image_path), axis=0))

In [None]:
#  Preprocess, predict, and post process an image

# Again, uncomment this decorator if you want to run inference eagerly
@tf.function
def detect(input_tensor):

    preprocessed_image, shapes = detection_model.preprocess(input_tensor)
    prediction_dict = detection_model.predict(preprocessed_image, shapes)

    detections = detection_model.postprocess(prediction_dict, shapes)
    
    return detections

In [None]:
label_id_offset = 1
results1 = {'boxes': [], 'scores': []}

for i in range(len(test_images_np)):
    input_tensor = tf.convert_to_tensor(test_images_np[i], dtype=tf.float32)
    detections = detect(input_tensor)
    plot_detections(
      test_images_np[i][0],
      detections['detection_boxes'][0].numpy(),
      detections['detection_classes'][0].numpy().astype(np.uint32)
      + label_id_offset,
      detections['detection_scores'][0].numpy(),
      category_index, figsize=(15, 20), image_name="./result/gif_frame_" + ('%04d' % i) + ".jpg")
    results1['boxes'].append(detections['detection_boxes'][0][0].numpy())
    results1['scores'].append(detections['detection_scores'][0][0].numpy())

Также можно проверить, обнаруживает ли модель класс оружия на изображениях, изучив значение ключа scores в словаре результатов.

In [None]:
x = np.array(results1['scores'])

# percent of frames where a gun is detected
gun_detected = (np.where(x > 0.9, 1, 0).sum())/237*100
print(gun_detected)

Есть возможность показать несколько стоп-кадров и провести визуальный осмотр. 

In [None]:
print('Frame 0')
display(IPyImage('./result/gif_frame_0001.jpg'))
print()

In [None]:
print('Frame 1')
display(IPyImage('./result/gif_frame_0000.jpg'))
print()

In [None]:
#Save results
import pickle

with open('results.data', 'wb') as filehandle:
    pickle.dump(results1['boxes'], filehandle)

#from google.colab import files
#files.download('results.data')