<a href="https://colab.research.google.com/github/NyanSwanAung/Pothole-Detection-using-MaskRCNN/blob/main/inference.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h2 align=center> <b>MaskRCNN using Tensorflow Object Detection API (TF Version 2) </h2>

Using pre-trained model to identify potholes in live webcam and videos. The model was trained on [COCO2017](https://cocodataset.org/) and fine-tuned with pothole dataset from this [repo](https://github.com/SamdenLepcha/Pothole-Detection-With-Mask-R-CNN/tree/master/place_in_object_detection/images).

This pre-trained model is taken from [TensorFlow2 Object Detection Model Zoo](https://github.com/tensorflow/models/blob/master/research/object_detection/g3doc/tf2_detection_zoo.md)

# Prepare prerequisite 

### Install TensorFlow Object Detection API

In [None]:
from IPython.display import clear_output
# Clone the tensorflow models repository
!git clone --depth 1 https://github.com/tensorflow/models

# # API Installation 
!sudo apt install -y protobuf-compiler
%cd models/research
!protoc object_detection/protos/*.proto --python_out=.
!cp object_detection/packages/tf2/setup.py .
!python -m pip install .

%cd ~/../content
clear_output()

### Import Dependencies

In [None]:
import numpy as np
import cv2
import os
import sys
import time
import tensorflow as tf
import math
import pathlib
import math
from collections import defaultdict
from io import StringIO
from matplotlib import pyplot as plt
from PIL import Image
from IPython.display import display

from IPython.display import HTML
from base64 import b64encode
import time

Import the object detection module.

In [None]:
from object_detection.utils import ops as utils_ops
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util

### Download my pre-trained inference graph or import yours

In [None]:
# Download from github release page 
!wget https://github.com/NyanSwanAung/Pothole-Detection-using-MaskRCNN/releases/download/v1.0/inference_graph.zip
!unzip inference_graph.zip

# Remove unecessary files
!rm -r __MACOSX
!rm -r inference_graph.zip


### Download test dataset and label map from github release page

In [None]:
# Test dataset
!wget https://github.com/NyanSwanAung/Pothole-Detection-using-MaskRCNN/releases/download/v1.0/test.zip
!unzip test.zip 

# Label Map
!wget https://github.com/NyanSwanAung/Pothole-Detection-using-MaskRCNN/releases/download/v1.0/label.pbtxt

# Remove unecessary files
!rm -r __MACOSX
!rm -r test.zip

# Model preparation 

## Setup Paths

In [None]:
IG_PATH = '/content/inference_graph'
LABEL_MAP_PATH = '/content/label.pbtxt'
TEST_IMG_DIR = '/content/test'
TEST_VID_DIR = '/content/test/test_vid.mp4'

## Loader

In [None]:
def load_model():
    start = time.time()
    model_dir = IG_PATH
    model_dir = pathlib.Path(model_dir)/"saved_model"
    model = tf.saved_model.load(str(model_dir))
    end = time.time()
    total = math.ceil(end-start)
    print(f'It took {total}s to load model')
    return model

## Loading label map and test dataset

In [None]:
# List of the strings that is used to add correct label for each box.
category_index = label_map_util.create_category_index_from_labelmap(LABEL_MAP_PATH, use_display_name=True)

For the sake of simplicity we will test on 2 images:

In [None]:
# If you want to test the code with your images, just add path to the images to the TEST_IMAGE_PATHS.
PATH_TO_TEST_IMAGES_DIR = pathlib.Path(TEST_IMG_DIR)
TEST_IMAGE_PATHS = sorted(list(PATH_TO_TEST_IMAGES_DIR.glob("*.jpg")))
TEST_IMAGE_PATHS

# Detection

Load an object detection model:

In [None]:
detection_model = load_model()

Check the model's input signature, it expects a batch of 3-color images of type uint8:

In [None]:
print(detection_model.signatures['serving_default'].inputs)

And returns several outputs:

In [None]:
detection_model.signatures['serving_default'].output_dtypes

In [None]:
detection_model.signatures['serving_default'].output_shapes

Add a wrapper function to call the model, and cleanup the outputs:

In [None]:
def run_inference_for_single_image(model, image):
    
    image = np.asarray(image)
    
    # The input needs to be a tensor, convert it using `tf.convert_to_tensor`.
    input_tensor = tf.convert_to_tensor(image)
    
    # The model expects a batch of images, so add an axis with `tf.newaxis`.
    input_tensor = input_tensor[tf.newaxis, ...]
    
    # Run inference
    model_fn = model.signatures['serving_default']
    output_dict = model_fn(input_tensor)
    
    # All outputs are batches tensors.
    # Convert to numpy arrays, and take index [0] to remove the batch dimension.
    # We're only interested in the first num_detections.
    
    num_detections = int(output_dict.pop('num_detections'))
    need_detection_key = ['detection_classes','detection_boxes','detection_masks','detection_scores']
    output_dict = {key: output_dict[key][0, :num_detections].numpy()
                   for key in need_detection_key}
    
    output_dict['num_detections'] = num_detections
    
    # detection_classes should be ints.
    output_dict['detection_classes'] = output_dict['detection_classes'].astype(np.int64)
    
    # Handle models with masks:
    if 'detection_masks' in output_dict:
        # Reframe the the bbox mask to the image size.
        detection_masks_reframed = utils_ops.reframe_box_masks_to_image_masks(
            tf.convert_to_tensor(output_dict['detection_masks']), output_dict['detection_boxes'],
            image.shape[0], image.shape[1])
        detection_masks_reframed = tf.cast(detection_masks_reframed > 0.5,
                                           tf.uint8)
        output_dict['detection_masks_reframed'] = detection_masks_reframed.numpy()

    return output_dict

Run it on each test image and show the results:

In [None]:
def run_inference_image(model, image_path):
  
  start = time.time()
  
  # the array based representation of the image will be used later in order to prepare the
  # result image with boxes and labels on it.
  image_np = np.array(Image.open(image_path))
  
  # Actual detection.
  output_dict = run_inference_for_single_image(model, image_np)
  
  # Visualization of the results of a detection.
  vis_util.visualize_boxes_and_labels_on_image_array(
      image_np,
      output_dict['detection_boxes'],
      output_dict['detection_classes'],
      output_dict['detection_scores'],
      category_index,
      instance_masks=output_dict.get('detection_masks_reframed', None),
      use_normalized_coordinates=True,
      line_thickness=5)
   
  end = time.time()
  total = math.ceil(end-start)
  display(Image.fromarray(image_np))
  print(f'It took {total}s for above image')

In [None]:
def run_inference_video(model, video_path):

  cap = cv2.VideoCapture(video_path)

  if cap.isOpened():
      width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
      height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
      res=(int(width), int(height))

      # save detected video
      # Initialize our video writer
      fourcc = cv2.VideoWriter_fourcc(*"XVID") #codec
      out = cv2.VideoWriter('detected_output.avi', fourcc, 20.0, res)
      frame = None

      while True:
          try:
              is_success, image_np = cap.read()
          except cv2.error:
              continue

          if not is_success:
              break

          # Actual detection.
          start = time.time()
          image_np = np.array(image_np)
            
          # Actual detection.
          output_dict = run_inference_for_single_image(model, image_np)

          # Visualization of the results of a detection.
          vis_util.visualize_boxes_and_labels_on_image_array(
              image_np,
              output_dict['detection_boxes'],
              output_dict['detection_classes'],
              output_dict['detection_scores'],
              category_index,
              instance_masks=output_dict.get('detection_masks_reframed', None),
              use_normalized_coordinates=True,
              line_thickness=8)
          
          end = time.time()
          total = math.ceil(end-start)
          print(f'{total}s per frame')
          out.write(image_np)
        
      out.release() 

      # OPTIONAL: show last image
      if frame:
        cv2_imshow(frame)

  cap.release()


# Inference

In [None]:
# Inference on Image
for image_path in TEST_IMAGE_PATHS:
  show_inference(detection_model, image_path)

In [None]:
# Inference on Video
run_inference_video(detection_model, TEST_VID_DIR)

In [None]:
# Show inference video in colab

# detected video path
input_path = 'detected_output.avi'

# Compressed video path
compressed_path = "/content/compressed_output.mp4"

os.system(f"ffmpeg -i {input_path} -vcodec libx264 {compressed_path}")

# Show video
mp4 = open(compressed_path,'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=400 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url)