In [None]:
!pip install imutils

In [1]:
parameters = {
    'video' : 'video/campus4-c0.avi',
    'tracker' : 'kcf',
    'frame_skip': 3,
    'confidence': 0.3,
    'ttl' : 23,
    'min_dist' : 190
}

In [2]:
track_objects_in_video(to_show=True, to_save=False)

NameError: name 'track_objects_in_video' is not defined

# Run everything below

# model 

Load the model

In [3]:
import keras.models as lm

from ssd_model.keras_layer_L2Normalization import L2Normalization
from ssd_model.keras_layer_AnchorBoxes import AnchorBoxes
from ssd_model.keras_layer_DecodeDetections import DecodeDetections
from ssd_model.keras_ssd_loss import SSDLoss

ssd_loss = SSDLoss(neg_pos_ratio=3, alpha=1.0)

model = lm.load_model('ssd_model/ssd_300.h5', custom_objects={'L2Normalization' : L2Normalization, 'AnchorBoxes':AnchorBoxes, 'DecodeDetections':DecodeDetections,
                                                   'compute_loss':ssd_loss.compute_loss})

Using TensorFlow backend.


# init 

In [4]:
# import the necessary packages
from imutils.video import VideoStream
from imutils.video import FPS
import operator

import time
import cv2
import numpy as np

In [5]:
parameters = {
    'video' : 'video/campus4-c0.avi',
    'tracker' : 'kcf',
    'frame_skip': 3,
    'confidence': 0.3,
    'ttl' : 23,
    'min_dist' : 190
}

In [6]:
# initialize a dictionary that maps strings to their corresponding
# OpenCV object tracker implementations
OPENCV_OBJECT_TRACKERS = {
  "csrt": cv2.TrackerCSRT_create,
  "kcf": cv2.TrackerKCF_create,
  "boosting": cv2.TrackerBoosting_create,
  "mil": cv2.TrackerMIL_create,
  "tld": cv2.TrackerTLD_create,
  "medianflow": cv2.TrackerMedianFlow_create,
  "mosse": cv2.TrackerMOSSE_create
}

# funcs


In [7]:
class Trackable_object():
    """Trackable object describes a person in the frame
    Attributes
    -------
    object_id : int
        Id of the object
    box : np.array
        Position of the object. 
        x1, x2 = left upper (x, y)
        x3, x4 = right bottom (x, y)
    prev_box : np.array
        Previous position of the object. 
        x1, x2 = left upper (x, y)
        x3, x4 = right bottom (x, y)
    future_box : np.array
        Predicted next position of the object. 
        x1, x2 = left upper (x, y)
        x3, x4 = right bottom (x, y)
    is_tracked : bool
        Indicates if the object has assigned tracker.
    has_assigned_box : bool
        Indicates if the object has assigned box. 
    ttl : int
        Time To Live of the object. TTL is decreasing every frame. When ttl is less than 0, object is considered to be dead and will not be tracked. TTL is updated every time, the object was successfully detected.
    
    """
    def __init__(self, object_id, box, is_tracked):
        self.object_id = object_id
        self.box = box
        self.prev_box = np.array([0,0,0,0])
        self.future_box = np.array([0,0,0,0])
        self.is_tracked = is_tracked
        self.has_assigned_box = False
        self.ttl = parameters.get('ttl') 
     
    def get_diag_len(self):
        """Calculates the diagonal length of the object box.
            Returns
            -------
            Diagonal length : float
            
        """
        return np.linalg.norm(np.array(self.box[0],self.box[1])-np.array(self.box[2],self.box[3]))
    
    def compute_future_box(self):
        ''' Computes future box of the object, by adding difference of previous and current box to current box.'''
        diff = get_difference(self.prev_box, self.box)
        self.future_box = self.box + diff
        
    def update(self,ttl_n, is_tracked, box):
        """Updates object's ttl, is_tracked, prev_box, box and future_box. 
        
            Parameters
            -------
            ttl_n : int
                ttl to assign
            is_tracked : bool
                set if object is tracked
            box : np.array
                new object position
                
        """
        self.ttl = ttl_n
        self.is_tracked = is_tracked
        self.prev_box = self.box
        self.box = box
        self.compute_future_box()   

In [8]:
def get_difference(box1, box2):
    """Computes the shift from box1 to box2
    
    Parameters
    -------
    box1 : np.array
        First box. Considered to be previous location.
    box2 : np.array
        Second box. Considered to be current location.
    
    Returns
    -------
    (s1, s2) : touple
        shift on (x, y)
        
    """
    lu1 = np.array([box1[0],box1[1]])
    rb1 = np.array([box1[2],box1[3]])
    
    lu2 = np.array([box2[0],box2[1]])
    rb2 = np.array([box2[2],box2[3]])
    
    return np.hstack((lu2-lu1, rb2-rb1))

def roi_from_box(box):
    """Converts box (x1, y1, x2, y2) to roi (x1, y1, x1+w, y1+h)
    
    Parameters
    -------
    box : np.array
    
    Returns
    -------
    roi : tuple
    
    """
    return (box[0],box[1],box[2]-box[0],box[3]-box[1])

def box_from_roi(roi):
    """Converts roi (x1, y1, x1+w, y1+h) to box (x1, y1, x2, y2)
    
    Parameters
    -------
    roi : tuple
    
    Returns
    -------
    box : np.array
    
    """
    return np.array([roi[0],roi[1],roi[0]+roi[2],roi[1]+roi[3]])

def get_ang_dist(box1, box2):
    """Computes angular distanse. Sum of distanses from correspomding angles.
    
    Parameters
    -------
    box1 : np.array
    box2 : np.array
    
    Returns
    -------
    distance : float
    
    """
    distanse = 0
    lu1 = (box1[0],box1[1])
    ru1 = (box1[2],box1[0])
    rb1 = (box1[2],box1[3])
    lb1 = (box1[0],box1[3])
    
    lu2 = (box2[0],box2[1])
    ru2 = (box2[2],box2[0])
    rb2 = (box2[2],box2[3])
    lb2 = (box2[0],box2[3])
    
    distanse += np.linalg.norm(np.array(lu1)-np.array(lu2))
    distanse += np.linalg.norm(np.array(ru1)-np.array(ru2))
    distanse += np.linalg.norm(np.array(rb1)-np.array(rb2))
    distanse += np.linalg.norm(np.array(lb1)-np.array(lb2))
    
    return distanse


def get_intersection(bb1, bb2):
    """Computes intersection of two boxes. Returns interection of smaller box. So if the box is inside of another box, returns 1.
    
    Parameters
    -------
    bb1 : np.array
    bb2 : np.array
    
    Returns
    -------
    intersection area : float
        from 0.0 to 1.0
    """
    # Determine the coordinates of the intersection rectangle
    x_left = max(bb1[0], bb2[0])
    y_top = max(bb1[1], bb2[1])

    x_right = min(bb1[2], bb2[2])
    y_bottom = min(bb1[3], bb2[3])
    
    if x_right < x_left or y_bottom < y_top:
        return 0.0
    
    intersection_area = (x_right - x_left) * (y_bottom - y_top)

    bb1_area = (bb1[2] - bb1[0]) * (bb1[3] - bb1[1])
    bb2_area = (bb2[2] - bb2[0]) * (bb2[3] - bb2[1])
    
    # Divide intersection on smaller box.
    return intersection_area / min(bb1_area, bb2_area)

# code

#### Detection 

In [9]:
def update_create_objects(trackable_objects, bbox):
    """ Method finds the closest object to the box and assignes this box to the object.
    If no object found, new object is created.
    
    Parameters
    -------
    trackable_objects : dict
        Dictionary of all trackable objects.
    bbox : np.array
        Box that is to be assigned to an object.
        
    """
    # Understand which objects are already tracked
    min_dist = parameters.get('min_dist')
    min_id = 999
    # Set current box to NotTracked
    is_box_tracked = False
    # Look for an appropriate object for current box
    for (object_id, obj) in trackable_objects.items(): 
        # Calculate distance to object
        cur_dist = get_ang_dist(obj.box,bbox)
        # If Object is NOT Tracked and closer than others and not dead
        # set min_id to this object id
        if  cur_dist < min_dist and obj.is_tracked == False and obj.ttl > 0:
            min_dist = cur_dist
            min_id = object_id
            is_box_tracked = True

    # If the object was found, change its location, ttl, is_tracked and centroid.
    if is_box_tracked:
        trackable_objects.get(min_id).update(parameters['ttl'],True, bbox)

    # If there is no appropriate Object for current box, create new Object
    else:
        trackable_objects[len(trackable_objects)] = Trackable_object(box=bbox, is_tracked=True,object_id=len(trackable_objects))



In [10]:
def assign_tracker_to_object(multitracker, trackable_objects, frame):
    """ Method assignes new tracker to every is_tracked and alive object for trackable_objects.
    
    Parameters
    -------
    multitracker : cv2.MultiTracker
        Multitracker object. New trackers are added there.
    trackable_objects : dict
        Dictionary of trackable objects.
    frame : Image
        Current video frame.
    
    """
    # Assign tracker to objects
    for (object_id, obj) in trackable_objects.items():

        if obj.is_tracked and obj.ttl>0:
            tracker = OPENCV_OBJECT_TRACKERS[parameters["tracker"]]()
            multitracker.add(tracker, frame, roi_from_box((obj.box[0],obj.box[1],obj.box[2],obj.box[3])))
            obj.is_tracked = False
            obj.ttl -= 1
        else:
            # Check if object is overlaped by another object. If so, ttl+=1
            for (temp_object_id, temp_obj) in trackable_objects.items() :
                if get_intersection(obj.box,temp_obj.box) > 0.6 and object_id != temp_object_id and temp_obj.ttl>=0 and obj.ttl>=0:
                    if obj.get_diag_len() < temp_obj.get_diag_len():
                        obj.ttl+=1
                    else: temp_obj.ttl+=0.5
                    break

            obj.ttl -= 1

            # If the objects is still alive, assign tracker
            if obj.ttl>0:
                tracker = OPENCV_OBJECT_TRACKERS[parameters['tracker']]()
                multitracker.add(tracker, frame, roi_from_box((obj.box[0],obj.box[1],obj.box[2],obj.box[3])))              


In [11]:
def detect(frame, trackable_objects):
    """
    
    Parameters
    -------
    frame : image
        Current video frame.
    trackable_objects : dict
        Dictionary of trackable objects.
    
    Returns
    -------
    multitracker : cv2.MultiTracker
        Multitracker object, that containes trackers.
        
    boxes_to_draw : list
        List of boxes found. May be used for visualization.
        
    """
    
    status = 'Detecting'
    
    boxes_to_draw = []

    # Get list of predictions
    y_pred = model.predict(np.reshape(frame,(1,300,300,3)))

    # Sort out unconfident and non-person predictions
    y_pred_thresh = []
    for pred in y_pred[0]:
        if pred[0]==3 and pred[1]>parameters['confidence']:
            y_pred_thresh.append(pred)


    # Run through every prediction and assign an Object to it.
    for box in y_pred_thresh:

        # Get values from box
        (_,_,xmin,ymin,xmax,ymax) = [int(val) for val in box]
        bbox = np.array([xmin,ymin,xmax,ymax])
        boxes_to_draw.append(bbox)
        
        update_create_objects(trackable_objects, bbox)

    # After updating all objects. Create multitracker and track every object.
    multitracker = cv2.MultiTracker_create()

    assign_tracker_to_object(multitracker, trackable_objects, frame)
    
    return multitracker, boxes_to_draw

#### Tracking

In [12]:
def assign_box_to_obj(obj, boxes_pred, taken_boxes):
    """ Method assignes closest box to the object.
    
    Parameters
    -------
    obj : Trackable_object
        Object needed to be assigned.
    boxes_pred : np.array
        Array of boxes, predicted by multitracker.
    tacken_boxes : np.array
        Array of 0 and 1. 0 indicates the box in not tacken. 1 indicates the box is tacken.
   
    Returns
    -------
    obj.box : np.array
        Returns appropriate box location.
        
    """
    min_dist = parameters.get('min_dist')
    cur_box = (0,0,0,0)
    box_id = 1337

    # Find closest box
    for i in range(len(boxes_pred)): 
        if taken_boxes[i] == 0:
            cur_dist = get_ang_dist(box_from_roi(boxes_pred[i]), obj.future_box)
            if cur_dist < min_dist:
                min_dist = cur_dist
                obj.has_assigned_box = True
                cur_box = boxes_pred[i]
                box_id = i

    # Assign box to object is found
    if obj.has_assigned_box:
        obj.update(obj.ttl, obj.is_tracked, (cur_box[0],cur_box[1],cur_box[0]+cur_box[2],cur_box[1]+cur_box[3]))
        taken_boxes[box_id] = 1

        
    return obj.box      

In [13]:
def track(frame, multitracker,trackable_objects): 
    """ Method trackes objects in the frame.
    
    Parameters
    -------
    frame : Image
        Current frame of the video.
    multitracker : cv2.Multitracker
        Multitracker object to predict positions.
    trackable_objects : dict
        Dictionary of trackable objects.
    
    Returns
    -------
    objects_to_draw : list
        List of tuples (ID, BOX). May be used for visualization.
        
    """
    status = 'Tracking'

    # Get boxes from tracekrs
    (success, boxes_pred) = multitracker.update(frame)

    for (object_id, obj) in trackable_objects.items():
        obj.has_assigned_box = False

    # Array to handle 1-1 box-object
    taken_boxes = np.zeros(len(boxes_pred))

    boxes_pred = np.array(boxes_pred)
    boxes_pred = boxes_pred.astype(int)

    objects_to_draw = []
    # Assign box for each object
    for (object_id, obj) in trackable_objects.items():
        if obj.ttl > 0:
            o_box = assign_box_to_obj(obj, boxes_pred, taken_boxes)
            objects_to_draw.append((obj.object_id, o_box))
            
            
    return objects_to_draw

In [14]:
def draw(to_show, to_save, frame, frames_processed, boxes_to_draw, objects_to_draw):
    """Show or Save frame.
    
    Parameters
    -------
    to_show : bool
        Indicates the need in showing the frame.
    to_save : bool
        Indicates the need in saving the frame.
    frame : Image
        Current frame of the video.
    frames_processed : int
        Number of the current frame
    boxes_to_draw : list
        List of boxes to visualize.
    objects_to_draw : list
        List of object boxes to visualize.
        
    """
    
    for box in boxes_to_draw:
        cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3]), (0, 0, 255), 2)

    for i, box in objects_to_draw:
        cv2.putText(frame, '%d'%(i), (box[0],box[1]-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6,(25,111,255),2)
        cv2.rectangle(frame,(box[0],box[1]),(box[2],box[3]),(0,0,255),2)
       
    cv2.putText(frame, 'Frame: '+str(frames_processed), (10,40), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (0,0,255),1)
    
    if to_show:
        cv2.imshow('Frame', frame)
    if to_save:
        out.write(frame)

#### Running 

In [15]:
def init_vs():
    # if a video path was not supplied, grab the reference to the web cam
    if not parameters.get('video', False):
        print("[INFO] starting video stream...")
        vs = VideoStream(src=0).start()
        time.sleep(1.0)
 
    # otherwise, grab a reference to the video file
    else:
        vs = cv2.VideoCapture(parameters['video'])
    return vs

In [16]:
def track_objects_in_video(to_show, to_save):
    """ General method to run object tracking.
    Parameters
    -------
    to_show : bool
        Indicates the need in showing the frame.
    to_save : bool
        Indicates the need in saving the frame.
        
    """
    
    vs = init_vs()

    trackable_objects = {} # dict of trackable objects 'id':TrackableObject
    frames_processed = 0
    multitracker = cv2.MultiTracker_create()
    objects_to_draw = []
    
    if to_save:
        fourcc = cv2.VideoWriter_fourcc(*'XVID')
        out = cv2.VideoWriter('output_0.avi',fourcc, 24.0, (300,300))

    
    while True:
         # get the frame
        frame = vs.read()
        frame = frame[1] if parameters.get("video", False) else frame

        # if no frame -> break
        if frame is None:
            break

        # resize frame to fit into SSD
        frame = cv2.resize(frame,(300,300))

        

        if frames_processed % parameters['frame_skip'] == 0:
            # Detect
            multitracke, boxes_to_draw = detect(frame, trackable_objects)
        else:
            # Track
            objects_to_draw = track(frame,multitracker,trackable_objects)

            
        # Draw
        draw(to_show,to_save, frame, frames_processed, boxes_to_draw, objects_to_draw)
        
        
        key = cv2.waitKey(1) & 0xFF

        if key == ord('q'):
            break

        frames_processed+=1

    if to_save:
        out.release()
        
    vs.release()     
    cv2.destroyAllWindows()        

In [17]:
track_objects_in_video(to_show=True, to_save=False)