# People Tracker

Import Packages

In [1]:
from submodules.centroidtracker import CentroidTracker
from submodules.trackableobject import TrackableObject
from imutils.video import VideoStream # work with webcam
from imutils.video import FPS # calculate frames per second
import numpy as np
import argparse
import imutils
import time
import dlib # correlation tracker implementation
import cv2 # opencv bindings

Argument Parser

In [2]:
# since using notebooks, replace an argument parser with hard coded arguments and values

args = {
    "prototxt": "mobilenet_ssd/MobileNetSSD_deploy.prototxt",
    "model": "mobilenet_ssd/MobileNetSSD_deploy.caffemodel",
    "input": "videos/example_01.mp4",
    "output": "output/output_01.avi",
    "confidence": 0.4,
    "skip_frames": 30
}

Initialize set of class labels MobileNet SSD was trained to detect

In [3]:
CLASSES = ["background",
           "aeroplane",
           "bicycle",
           "bird",
           "boat",
           "bottle",
           "bus",
           "car",
           "cat",
           "chair",
           "cow",
           "diningtable",
           "dog",
           "horse",
           "motorbike",
           "person",
           "pottedpolant",
           "sheep",
           "sofa",
           "train",
           "tvmonitor"]

Load the serialized model from disk

In [4]:
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])

[INFO] loading model...


Grab the input video file

In [5]:
print("[INFO] opening video file...")
vs = cv2.VideoCapture(args["input"])

[INFO] opening video file...


Initialize the video writer and frame dimensions

In [6]:
writer = None
W = None
H = None

Instantiate the centroid tracker
then initialize a list to store each dlib correlation tracker
then a dictionary to map each unique objectID to a trackableobject

In [7]:
ct = CentroidTracker(maxDisappeared=40, maxDistance=50)
trackers = []
trackableObjects = {}

Initialize the total number of frames processed so far
Initialize the total number of objects that have moved up or down

In [8]:
totalFrames = 0
totalDown = 0
totalUp = 0

Start the frames per second throughput estimator

In [9]:
fps = FPS().start()

Loop over the frames from the video stream

In [10]:
while True:
    # grab the next frame and handle from VideoCapture or VideoStream
    frame = vs.read()
    frame = frame[1] if args.get("input", False) else frame

    # if viewing a video and did not grab a frame, it's the end of the video
    if args["input"] is not None and frame is None:
        break

    # resize the frame to have a maximum width of 500px
    # less data = faster processing
    # convert from BGR to RGB to dlib
    frame = imutils.resize(frame, width=500)
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # if the frame dims are empty, set them
    if W is None or H is None:
        (H, W) = frame.shape[:2]

    # if writing video to disk, initialize the writer
    if args["output"] is not None and writer is None:
        fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        writer = cv2.VideoWriter(args["output"], fourcc, 30, (W, H), True)

    # initialize  the current status with the bounding box rectangles
    # returned by the objectdetector or the correlationtracker
    status = "Waiting"
    rects = []

    # check to see if we should run more detection
    # more detection = more computation = more $$ cost
    if totalFrames % args["skip_frames"] == 0:
        # set the status and initialize new set of object trackers
        status = "Detecting"
        trackers = []

        # convert the frame to a blobk
        # pass the blob through the network
        # obtain the detections
        blob = cv2.dnn.blobFromImage(frame, 0.007843, (W, H), 127.5)
        net.setInput(blob)
        detections = net.forward()

        # loop over ddetections
        for i in np.arange(0, detections.shape[2]):
            # extract the confidence (probability) associated with the prediction
            confidence = detections[0, 0, i, 2]

            # filter out weak detections by requiring minimum confidence
            if confidence > args["confidence"]:
                # extract the index of class label from the detection list
                idx = int(detections[0, 0, i, 1])

                # if the class label is not a person, ignore it
                if CLASSES[idx] != "person":
                    continue

                # compute the x,y coordiante of the bounding box
                box = detections[0, 0, i, 3:7] * np.array([W, H, W, H])
                (startX, startY, endX, endY) = box.astype("int")

                # construct a dlib rectangle object from the bounding
                # box coordinates and then start dlib correlation tracker
                tracker = dlib.correlation_tracker()
                rect = dlib.rectangle(startX, startY, endX, endY)
                tracker.start_track(rgb, rect)

                # add the tracker to list of trackers and use to skip frames
                trackers.append(tracker)

        # otherwise, utilize the object trackers rather than object detectors
        # to get a higher frame throughput
        else:
            # loop over the trackers
            for tracker in trackers:
                # set the status of the system to tracking, rather than waiting/detecting
                status = "Tracking"

                # update the tracker and grab the updated position
                tracker.update(rgb)
                pos = tracker.get_position()

                # unplac the position oobject
                startX = int(pos.left())
                startY = int(pos.top())
                endX = int(pos.right())
                endY = int(pos.bottom())

                # add the bounding box coordinates to the rectangle list
                rects.append((startX, startY, endX, endY))

        # draw a horizontal line across the center of the frame
        # once an object crosses, determine if moving up or down
        cv2.line(frame, (0, H // 2), (W, H // 2), (0, 255, 255), 2)

        # use the centroid tracker to associate
        # (1) the old object centroid with
        # (2) the newly computed object centroids
        objects = ct.update(rects)

        # loop over tracked objects
        for (objectID, centroid) in objects.items():
            # check if the trackable object exists for current objectID
            to = trackableObjects.get(objectID, None)

            # if there are no existing trackable objects, create one
            if to is None:
                to = TrackableObject(objectID, centroid)

            # otherwise, there is a trackeable object
            # use to get direction
            else:
                # the difference between the y-coord of the current centroid
                # and the mean of the previous centroid results in
                # which direction the object is moving
                # negative is up, positive is down
                y = [c[1] for c in to.centroids]
                direction = centroid[1] = np.mean(y)
                to.centroids.append(centroid)

                # check if object was counted or not
                if not to.counted:
                    # if the direction is negative (moving up)
                    # AND the centroid is above the center line
                    # count the object
                    if direction < 0 and centroid[1] < H // 2:
                        totalUp += 1
                        to.counted = True

                    # if the direction is positive (moving down)
                    # AND the centroid is below the center line
                    # count the object
                    if direction > 0 and centroid[1] > H // 2:
                        totalDown += 1
                        to.counted = True

            # store trackable object in the dictionary
            trackableObjects[objectID] = to

            # draw both the ID of the object and centroid of the oobject
            text = "Id {}".format(objectID)
            cv2.putText(frame, text, (centroid[0] - 10, centroid[1] - 10),
              cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
            cv2.circle(frame, (centroid[0], centroid[1]), 4, (0, 255, 0), -1)

        # construct a tuple of information we will be displaying on the frame
        info = [
            ("Up", totalUp),
            ("Down", totalDown),
            ("Status", status),
        ]

            # loop over info tuples and draw on our frame
        for (i, (k, v)) in enumerate(info):
            text = "{}: {}".format(k, v)
            cv2.putText(frame, text, (10, H - ((i * 20) + 20)),
              cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

        # check if write frame to disk
        if writer is not None:
            writer.write(frame)

        # increment the total number of frame processed and update FPS coutner
        totalFrames += 1
        fps.update()

In [11]:
# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))

# check to see if we need to release the video writer pointer
if writer is not None:
	writer.release()

# release the video file pointer
vs.release()

[INFO] elapsed time: 1.06
[INFO] approx. FPS: 0.95


In [12]:
!ffmpeg -i output/output_01.avi output/output.mp4

ffmpeg version 4.3.1 Copyright (c) 2000-2020 the FFmpeg developers
  built with Apple clang version 12.0.0 (clang-1200.0.32.27)
  configuration: --prefix=/usr/local/Cellar/ffmpeg/4.3.1_4 --enable-shared --enable-pthreads --enable-version3 --enable-avresample --cc=clang --host-cflags= --host-ldflags= --enable-ffplay --enable-gnutls --enable-gpl --enable-libaom --enable-libbluray --enable-libdav1d --enable-libmp3lame --enable-libopus --enable-librav1e --enable-librubberband --enable-libsnappy --enable-libsrt --enable-libtesseract --enable-libtheora --enable-libvidstab --enable-libvorbis --enable-libvpx --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxml2 --enable-libxvid --enable-lzma --enable-libfontconfig --enable-libfreetype --enable-frei0r --enable-libass --enable-libopencore-amrnb --enable-libopencore-amrwb --enable-libopenjpeg --enable-librtmp --enable-libspeex --enable-libsoxr --enable-videotoolbox --disable-libjack --disable-indev=jack
  libavutil      56. 51.10

In [13]:
#@title Display video inline
from IPython.display import HTML
from base64 import b64encode

mp4 = open("output/output.mp4", "rb").read()
dataURL = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=400 controls>
      <source src="%s" type="video/mp4">
</video>
""" % dataURL)