In [None]:
import numpy as np
import os
import sys
import tensorflow as tf
import cv2

from time import time
from collections import defaultdict
import matplotlib.pyplot as plt
import matplotlib.patches as patches

from object_detection.utils import ops as utils_ops
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util
from tqdm import tqdm_notebook as tqdm

from pathlib import Path
# This is needed to display the images.
plt.rcParams['figure.figsize'] = (20, 12)
%matplotlib inline

In [None]:
def run_inference_for_single_image(image, graph):
    with graph.as_default():
        with tf.Session() as sess:
            # Get handles to input and output tensors
            ops = tf.get_default_graph().get_operations()
            all_tensor_names = {output.name for op in ops for output in op.outputs}
            tensor_dict = {}
            for key in [
                'num_detections', 'detection_boxes', 'detection_scores',
                'detection_classes', 'detection_masks'
            ]:
                tensor_name = key + ':0'
                if tensor_name in all_tensor_names:
                    tensor_dict[key] = tf.get_default_graph().get_tensor_by_name(
                        tensor_name)
            if 'detection_masks' in tensor_dict:
                # The following processing is only for single image
                detection_boxes = tf.squeeze(tensor_dict['detection_boxes'], [0])
                detection_masks = tf.squeeze(tensor_dict['detection_masks'], [0])
                # Reframe is required to translate mask from box coordinates to image coordinates and fit the image size.
                real_num_detection = tf.cast(tensor_dict['num_detections'][0], tf.int32)
                detection_boxes = tf.slice(detection_boxes, [0, 0], [real_num_detection, -1])
                detection_masks = tf.slice(detection_masks, [0, 0, 0], [real_num_detection, -1, -1])
                detection_masks_reframed = utils_ops.reframe_box_masks_to_image_masks(
                    detection_masks, detection_boxes, image.shape[0], image.shape[1])
                detection_masks_reframed = tf.cast(
                    tf.greater(detection_masks_reframed, 0.5), tf.uint8)
                # Follow the convention by adding back the batch dimension
                tensor_dict['detection_masks'] = tf.expand_dims(
                    detection_masks_reframed, 0)
      
            image_tensor = tf.get_default_graph().get_tensor_by_name('image_tensor:0')

            # Run inference
            output_dict = sess.run(tensor_dict,
                                 feed_dict={image_tensor: np.expand_dims(image, 0)})

            # all outputs are float32 numpy arrays, so convert types as appropriate
            output_dict['num_detections'] = int(output_dict['num_detections'][0])
            output_dict['detection_classes'] = output_dict[
              'detection_classes'][0].astype(np.uint8)
            output_dict['detection_boxes'] = output_dict['detection_boxes'][0]
            output_dict['detection_scores'] = output_dict['detection_scores'][0]
            if 'detection_masks' in output_dict:
                output_dict['detection_masks'] = output_dict['detection_masks'][0]
    return output_dict

In [None]:
def filter_boxes(min_score, boxes, scores, classes, categories):
    """Return boxes with a confidence >= `min_score`"""
    n = len(classes)
    idxs = []
    for i in range(n):
        if classes[i] in categories and scores[i] >= min_score:
            idxs.append(i)

    filtered_boxes = boxes[idxs, ...]
    filtered_scores = scores[idxs, ...]
    filtered_classes = classes[idxs, ...]
    return filtered_boxes, filtered_scores, filtered_classes

In [None]:
def calculate_coord(bbox, width, height):
    """Return boxes coordinates"""
    xmin = bbox[1] * width
    ymin = bbox[0] * height
    xmax = bbox[3] * width
    ymax = bbox[2] * height

    return [xmin, ymin, xmax - xmin, ymax - ymin]
    
def calculate_centr(coord):
    """Calculate centroid for each box"""
    return (coord[0]+(coord[2]/2), coord[1]+(coord[3]/2))
  
def calculate_centr_distances(centroid_1, centroid_2):
    """Calculate the distance between 2 centroids"""
    return  np.sqrt((centroid_2[0]-centroid_1[0])**2 + (centroid_2[1]-centroid_1[1])**2)
  
def calculate_perm(centroids):
    """Return all combinations of centroids"""
    permutations = []
    for current_permutation in itertools.permutations(centroids, 2):
        if current_permutation[::-1] not in permutations:
            permutations.append(current_permutation)
    return permutations
  
def midpoint(p1, p2):
    """Midpoint between 2 points"""
    return ((p1[0] + p2[0])/2, (p1[1] + p2[1])/2)

def calculate_slope(x1, y1, x2, y2):
    """Calculate slope"""
    m = (y2-y1)/(x2-x1)
    return m

In [None]:
# for testing
def show_image(image):
    plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    plt.show()

In [None]:
def video_to_images(video, size = None):
    # Params: video as VideoCapture, size=(512, 512)
    # Yields resized images
    # video = cv2.VideoCapture(video_filename)
    success, image = video.read()
    
    while success:
        # resize image
        if not size:
            image = cv2.resize(image, size, interpolation = cv2.INTER_AREA)
        yield image
        success, image = video.read()
        
    video.release();

In [None]:
def draw_rects(image, coordinates, color = (0, 0, 255), thickness = 5):
    # Draws rectangles onto the image
    # input list of Lists of  [x, y, width, height] 
    # color is tuple in BGR
    # thickness is thickness of line in pixels
    
    for i in range(len(coordinates)):
        coord = coordinates[i]

        x1 = int(coord[0])
        y1 = int(coord[1])
        x2 = x1 + int(coord[2])
        y2 = y1 + int(coord[3])

        image = cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness)
    
    return image

In [None]:
# SETUP
# Model Name
MODEL_NAME = 'ssd_mobilenet_v2_coco_2018_03_29'

# Path to frozen detection graph. This is the actual model that is used for the object detection.
PATH_TO_FROZEN_GRAPH = MODEL_NAME+"/frozen_inference_graph.pb"

# List of the strings that is used to add correct label for each box.
PATH_TO_LABELS = "models/research/object_detection/data/mscoco_label_map.pbtxt"

# Load graph
detection_graph = tf.Graph()
with detection_graph.as_default():
    od_graph_def = tf.GraphDef()
    with tf.gfile.GFile(PATH_TO_FROZEN_GRAPH, 'rb') as fid:
        serialized_graph = fid.read()
        od_graph_def.ParseFromString(serialized_graph)
        tf.import_graph_def(od_graph_def, name='')

category_index = label_map_util.create_category_index_from_labelmap(PATH_TO_LABELS, use_display_name=True)

In [None]:
# Path to video and output
DATA_FOLDER = Path("data")

# Video size does not affect object detection time
width  = int(original_video.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(original_video.get(cv2.CAP_PROP_FRAME_HEIGHT)) 


input_video_filename = 'test_video.mp4'
output_video_filename = 'output.avi'

input_video_filepath = str(DATA_FOLDER/input_video_filename)
output_video_filepath = str(DATA_FOLDER/output_video_filename)
print("Input   - ",input_video_filepath)
print("Output - ",output_video_filepath)

In [None]:
input_video = cv2.VideoCapture(input_video_filepath)
fps = input_video.get(cv2.CAP_PROP_FPS)

fourcc = cv2.VideoWriter_fourcc(*'XVID')
output_video = cv2.VideoWriter(output_video_filepath, fourcc, fps, (width, height))


time_taken = {"video_to_images" : 0, "run_inference" : 0, "filter_boxes":0,
              "calc_coords" : 0, "draw_rects" : 0, "write" : 0}
with tqdm(total=int(input_video.get(cv2.CAP_PROP_FRAME_COUNT))) as pbar:
    start = time()
    for image in video_to_images(input_video):
        time_taken["video_to_images"] += time()-start
        
        # Actual detection
        start = time()
        output_dict = run_inference_for_single_image(image, detection_graph)
        time_taken["run_inference"] += time()-start
        
        start = time()
        confidence_cutoff = 0.3
        boxes, scores, classes = filter_boxes(confidence_cutoff, output_dict['detection_boxes'], 
            output_dict['detection_scores'], 
            output_dict['detection_classes'], [1])
        time_taken["filter_boxes"] += time()-start
        
        start = time()
        # Tuples of (x, y) coords
        centroids = []
        # Lists of  [x, y, width, height]
        coordinates = []
        for box in boxes:
            coord = calculate_coord(box, width, height)
            centr = calculate_centr(coord)
            centroids.append(centr)
            coordinates.append(coord)
        time_taken["calc_coords"] += time()-start
            
        start = time()
        # Draw rects on images
        image = draw_rects(image, coordinates)
        time_taken["draw_rects"] += time()-start
        # Centroid points

        # Distance measuring??
        # Draw lines??

        # Write to output video file
        start = time()
        output_video.write(image)
        pbar.update(1)
        time_taken["write"] += time()-start
        
        start = time()
    
output_video.release()

In [None]:
time_taken

# Testing

In [None]:
# For extracting one frame from video for testing
original_video = cv2.VideoCapture('data/test_video.mp4')
success,curr_image = original_video.read()
cv2.imwrite("data/first_image.jpg", curr_image)
original_video.release()

In [None]:
image = cv2.imread("data/first_image.jpg")
print(image.shape)
# Expanding dimensions 
# Since the model expects images to have shape: [1, None, None, 3]
image_expanded = np.expand_dims(image, axis=0)
print(image.shape)

# Actual detection.
output_dict = run_inference_for_single_image(image, detection_graph)

In [None]:
# Get boxes only for person
confidence_cutoff = 0.3
boxes, scores, classes = filter_boxes(confidence_cutoff, output_dict['detection_boxes'], 
output_dict['detection_scores'], 
output_dict['detection_classes'], [1])
# Relative coordinates
boxes

In [None]:
width, height = image.shape[1], image.shape[0]

In [None]:
centroids = []
coordinates = []
for box in boxes:
    coord = calculate_coord(box, width, height)
    centr = calculate_centr(coord)
    centroids.append(centr)
    coordinates.append(coord)

In [None]:
image = cv2.imread("data/first_image.jpg")

draw_rects(image, coordinates)
show_image(image)