In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd /content/drive/Shareddrives/EECS504_Computer_Vision_Project/ObjectTracking

/content/drive/Shareddrives/EECS504_Computer_Vision_Project/ObjectTracking


In [None]:
import os
import json
import cv2
import math
from matplotlib import pyplot as plt
import numpy as np

os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import tensorflow as tf
from yolov3.utils import Load_Yolo_model, image_preprocess, postprocess_boxes, nms, draw_bbox, read_class_names
from yolov3.configs import *
import time
from google.colab.patches import cv2_imshow

from deep_sort import nn_matching
from deep_sort.detection import Detection
from deep_sort.tracker import Tracker
from deep_sort import generate_detections as gdet

YOLO Object Tracking Function


In [None]:
def Object_tracking(Yolo, video_path, output_path, input_size=416, show=False, CLASSES=YOLO_COCO_CLASSES, score_threshold=0.3, iou_threshold=0.45, rectangle_colors='', Track_only = []):
    # Definition of the parameters
    max_cosine_distance = 0.7
    nn_budget = None
    
    #initialize deep sort object
    model_filename = 'model_data/mars-small128.pb'
    encoder = gdet.create_box_encoder(model_filename, batch_size=1)
    metric = nn_matching.NearestNeighborDistanceMetric("cosine", max_cosine_distance, nn_budget)
    tracker = Tracker(metric)

    times, times_2 = [], []

    if video_path:
        vid = cv2.VideoCapture(video_path) # detect on video
    else:
        vid = cv2.VideoCapture(0) # detect from webcam

    # by default VideoCapture returns float instead of int
    width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(vid.get(cv2.CAP_PROP_FPS))
    codec = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, codec, fps, (width, height)) # output_path must be .mp4

    NUM_CLASS = read_class_names(CLASSES)
    key_list = list(NUM_CLASS.keys()) 
    val_list = list(NUM_CLASS.values())
    
    all_tracked_bboxes = []
    fps = []

    while True:
        _, frame = vid.read()

        try:
            original_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            original_frame = cv2.cvtColor(original_frame, cv2.COLOR_BGR2RGB)
        except:
            break
        
        image_data = image_preprocess(np.copy(original_frame), [input_size, input_size])
        image_data = image_data[np.newaxis, ...].astype(np.float32)

        t1 = time.time()
        if YOLO_FRAMEWORK == "tf":
            pred_bbox = Yolo.predict(image_data)
        elif YOLO_FRAMEWORK == "trt":
            batched_input = tf.constant(image_data)
            result = Yolo(batched_input)
            pred_bbox = []
            for key, value in result.items():
                value = value.numpy()
                pred_bbox.append(value)
        
        t2 = time.time()
        
        pred_bbox = [tf.reshape(x, (-1, tf.shape(x)[-1])) for x in pred_bbox]
        pred_bbox = tf.concat(pred_bbox, axis=0)

        bboxes = postprocess_boxes(pred_bbox, original_frame, input_size, score_threshold)
        bboxes = nms(bboxes, iou_threshold, method='nms')

        # extract bboxes to boxes (x, y, width, height), scores and names
        boxes, scores, names = [], [], []
        for bbox in bboxes:
            if len(Track_only) !=0 and NUM_CLASS[int(bbox[5])] in Track_only or len(Track_only) == 0:
                boxes.append([bbox[0].astype(int), bbox[1].astype(int), bbox[2].astype(int)-bbox[0].astype(int), bbox[3].astype(int)-bbox[1].astype(int)])
                scores.append(bbox[4])
                names.append(NUM_CLASS[int(bbox[5])])

        # Obtain all the detections for the given frame.
        boxes = np.array(boxes) 
        names = np.array(names)
        scores = np.array(scores)
        features = np.array(encoder(original_frame, boxes))
        detections = [Detection(bbox, score, class_name, feature) for bbox, score, class_name, feature in zip(boxes, scores, names, features)]

        # Pass detections to the deepsort object and obtain the track information.
        tracker.predict()
        tracker.update(detections)

        # Obtain info from the tracks
        tracked_bboxes = []
        for track in tracker.tracks:
            if not track.is_confirmed() or track.time_since_update > 5:
                continue 
            bbox = track.to_tlbr() # Get the corrected/predicted bounding box
            class_name = track.get_class() #Get the class name of particular object
            tracking_id = track.track_id # Get the ID for the particular track
            index = key_list[val_list.index(class_name)] # Get predicted object index by object name
            tracked_bboxes.append(bbox.tolist() + [tracking_id, index]) # Structure data, that we could use it with our draw_bbox function
        
        # Save all tracked_bboxes
        all_tracked_bboxes.append(tracked_bboxes)

        # draw detection on frame
        image = draw_bbox(original_frame, tracked_bboxes, CLASSES=CLASSES, tracking=True)

        t3 = time.time()
        times.append(t2-t1)
        times_2.append(t3-t1)
        
        times = times[-20:]
        times_2 = times_2[-20:]

        ms = sum(times)/len(times)*1000
        fps.append(1000/ms)
        fps2 = 1000 / (sum(times_2)/len(times_2)*1000)
        
        # image = cv2.putText(image, "Time: {:.1f} FPS".format(fps), (0, 30), cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (0, 0, 255), 2)

        # draw original yolo detection
        #image = draw_bbox(image, bboxes, CLASSES=CLASSES, show_label=False, rectangle_colors=rectangle_colors, tracking=True)

        # print("Time: {:.2f}ms, Detection FPS: {:.1f}, total FPS: {:.1f}".format(ms, fps, fps2))
        if output_path != '': out.write(image)
        if show:
            cv2_imshow(image)
            
            if cv2.waitKey(25) & 0xFF == ord("q"):
                cv2.destroyAllWindows()
                break
            
    cv2.destroyAllWindows()
    return all_tracked_bboxes, fps

Command to Run YOLO Object Tracking

In [None]:
yolo = Load_Yolo_model()
video_path   = "./IMAGES/three_ppl_video_30fps_260frames.avi"
all_tracked_bboxes, fps = Object_tracking(yolo, video_path, "detection.mp4", input_size=YOLO_INPUT_SIZE, show=False, iou_threshold=0.1, rectangle_colors=(255,0,0), Track_only = ["person"])


GPUs [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


Calculate BBox Centroids in Pixel Coords


In [None]:
# Assign -1 to centroid coord when no bbox available
num_frames = 260
num_obj = 3
centroids_x = np.zeros((num_frames, num_obj)) 
centroids_y = np.zeros((num_frames, num_obj))
centroids_x -= 1
centroids_y -= 1
for i in range(num_frames):
  tracked_bboxes = all_tracked_bboxes[i]
  num_bboxes = len(tracked_bboxes)
  for j in range(num_bboxes):
    obj_ID = tracked_bboxes[j][4]
    corner_xmin = tracked_bboxes[j][0]
    corner_ymin = tracked_bboxes[j][1]
    corner_xmax = tracked_bboxes[j][2]
    corner_ymax = tracked_bboxes[j][3]
    centroids_x[i, obj_ID-1] = (corner_xmin + corner_xmax)/2
    centroids_y[i, obj_ID-1] = (corner_ymin + corner_ymax)/2

Load Camera Intrinsics, RGB and Depth Frame Arrays from JSON File. 

In [None]:
folder_path = "./Princeton Datasets/three_people"  ##Change the directory as needed
depth_images_path = folder_path + "/depth"
rgb_images_path = folder_path + "/rgb"
json_path = folder_path + "/frames.json"

In [None]:
with open(json_path) as json_file:
    frames_json = json.load(json_file)

In [None]:
num_frames = frames_json['length']

K_matrix = frames_json['K']
cx = K_matrix[0][2]
cy = K_matrix[1][2]
fx = K_matrix[0][0]
fy = K_matrix[1][1]

rgb_timestamps = frames_json['imageTimestamp']
depth_timestamps = frames_json['depthTimestamp']

print(cx, cy, fx, fy)

320 240 575.8157496 575.8157496


In [None]:
rgb_array = []
depth_array = []

for frame_id in range(1,num_frames+1):
  rgb_image = rgb_images_path + '/r-{}-{}.png'.format(frames_json['imageTimestamp'][frame_id-1], frames_json['imageFrameID'][frame_id-1])
  rgb = cv2.imread(rgb_image)
  depth_image = depth_images_path + '/d-{}-{}.png'.format(frames_json['depthTimestamp'][frame_id-1], frames_json['depthFrameID'][frame_id-1])
  depth = cv2.imread(depth_image,-1)
  depth = np.bitwise_or(np.right_shift(depth,3), np.left_shift(depth,13))
  rgb_array.append(rgb)
  depth_array.append(depth)


Transform from Image Coords to Camera Coords

In [None]:
cam_coords_x = np.zeros((num_frames, num_obj))
cam_coords_y = np.zeros((num_frames, num_obj))
cam_coords_z = np.zeros((num_frames, num_obj))
cam_coords_x += 1000
cam_coords_y += 1000
cam_coords_z += 1000

for i in range(num_frames):
  for j in range(num_obj):
    u = int(centroids_y[i,j])
    v = int(centroids_x[i,j])

    if (u != -1):
      z = depth_array[i][u,v]/1000            #in m
      cam_coords_x[i,j] = -(u-cx)*z*1/fx      #in m
      cam_coords_y[i,j] = (v-cy)*z*1/fy       #in m
      cam_coords_z[i,j] = z

Calculate Object Velocities

In [None]:
# If 1000 is encountered, retain same position, velocity as before

fps_const = 30

vel_x = np.zeros((num_frames, num_obj))
vel_y = np.zeros((num_frames, num_obj))
vel_z = np.zeros((num_frames, num_obj))

for i in range(1, num_frames):
  for j in range(num_obj):
    if (cam_coords_x[i,j] != 1000.0 and cam_coords_x[i-1,j] != 1000.0): 
      vel_x[i,j] = (cam_coords_x[i,j] - cam_coords_x[i-1,j])*fps_const   
      vel_y[i,j] = (cam_coords_y[i,j] - cam_coords_y[i-1,j])*fps_const                             
      vel_z[i,j] = (cam_coords_z[i,j] - cam_coords_z[i-1,j])*fps_const


Post-process Positions and Velocities

In [None]:
# First bounding box is at frame index 2
cam_coords_x[0,:] = cam_coords_x[2,:]
cam_coords_y[0,:] = cam_coords_y[2,:]
cam_coords_z[0,:] = cam_coords_z[2,:]
for i in range(1,num_frames):
  for j in range(num_obj):
    if cam_coords_x[i,j] == 1000.0: # no bbox for obj j in frame i 
      cam_coords_x[i,j] = cam_coords_x[i-1,j]
      cam_coords_y[i,j] = cam_coords_y[i-1,j]
      cam_coords_z[i,j] = cam_coords_z[i-1,j]
      vel_x[i,j] = vel_x[i-1,j]
      vel_y[i,j] = vel_y[i-1,j]
      vel_z[i,j] = vel_z[i-1,j]


Warning System Function

In [None]:
def warning_sys(frame_ind):
  i = frame_ind
  sys = []
  for j in range(0,2):
    for k in range(j+1, num_obj):

      px1 = cam_coords_x[i,j]
      pz1 = cam_coords_z[i,j]
      vx1 = vel_x[i,j]
      vz1 = vel_z[i, j]
      v1 = np.array((vx1, vz1))
      
      px2 = cam_coords_x[i,k]
      pz2 = cam_coords_z[i,k]
      vx2 = vel_x[i,k]
      vz2 = vel_z[i,k]
      v2 = np.array((vx2, vz2))

      r21 = np.array((vx2 - vx1, vz2 - vz1))  # relative vel of 2 wrt 1
      p12 = np.array((px1 - px2, pz1 - pz2))  # relative pos of 2 wrt 1

      approaching_vel = np.dot(r21, p12)
      distance = np.sqrt(p12[0]**2 + p12[1]**2)

      approaching_vel_tol = 0.5
      distance_tol = 0.3

      far_fast = False
      close_slow = False # if included, errs more on the side of safety, many warnings
      close_fast = False
      # Far away, but approaching fast
      if (distance > distance_tol and approaching_vel > approaching_vel_tol):
        far_fast = True
      # Close by and approaching slow
      if (distance < distance_tol and approaching_vel > 0):
        close_slow = False
      # Close by and approaching fast
      if (distance < distance_tol and approaching_vel > approaching_vel_tol):
        close_fast = True

      if (close_slow == True or close_fast == True):
        sys.append(["Danger", j+1, k+1])
      elif (far_fast == True):
        sys.append(["Approaching", j+1, k+1])
        
  return sys

Generate 2D figs

In [None]:
props_y_coord = dict(boxstyle='round', facecolor='wheat', alpha=0.35)
for frame_num in range(num_frames):
  
  ax = plt.axes()

  plt.text(0.77, 0.99, "Y (in m)", transform=ax.transAxes, fontsize=9,verticalalignment='top')
  
  for person in range(3):
    #PLOTTING ARROWS
    hL = 0.3
    hW = 0.2
    dx = vel_x[frame_num,person]
    dz = vel_z[frame_num,person]
    vec_ab_magnitude = math.sqrt(dx**2+dz**2)
    dx = dx / vec_ab_magnitude
    dz = dz / vec_ab_magnitude
    arrow_colors = ['blue', 'red', 'green']
    ax.arrow(cam_coords_x[frame_num,person],cam_coords_z[frame_num,person], dx,dz,head_width=hW, head_length=hL, fc=arrow_colors[person], ec=arrow_colors[person])
    #PLOTTING Y_COORDINATES
    plt.text(0.77, (0.94-person*0.05), 'Person '+ str(person+1)+': ' +  '{:.2f}'.format(-cam_coords_y[frame_num,person]), transform=ax.transAxes, fontsize=9,verticalalignment='top', color= arrow_colors[person])
  
  #SCATTER PLOT FOR POSITION OF PERSONS
  plt.scatter(cam_coords_x[frame_num,:],cam_coords_z[frame_num,:],color=arrow_colors)

  #TITLE
  plt.title("SAFETY WARNING SYSTEM")

  #WARNINGS
  warn_list = warning_sys(frame_num)
  s1=""
  s2=""
  for warn in warn_list:
    if (warn[0] == 'Danger'):
      if (s1 == ""):
        s1 = "DANGER: "+ " [" + str(warn[1])+", "+str(warn[2]) +"]"
      else: 
        s1 = s1+ " [" + str(warn[1])+", "+str(warn[2]) +"]"
      
    elif (warn[0] == 'Approaching'):
      if (s2==""):
        s2 = "Approaching: "+ " [" + str(warn[1])+", "+str(warn[2]) +"]"
      else: 
        s2 = s2+ " [" + str(warn[1])+", "+str(warn[2]) +"]"

  props1 = dict(boxstyle='round', facecolor='red', alpha=0.4)
  props2 = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
  if ((s1 != "") and (s2!="")):
    plt.text(0.05, 0.95, s1, transform=ax.transAxes, fontsize=12,verticalalignment='top', bbox=props1)
    plt.text(0.05, 0.85, s2, transform=ax.transAxes, fontsize=12,verticalalignment='top', bbox=props2)
  elif (s1!=""):
    plt.text(0.05, 0.95, s1, transform=ax.transAxes, fontsize=12,verticalalignment='top', bbox=props1)
  elif (s2!=""):
    plt.text(0.05, 0.95, s2, transform=ax.transAxes, fontsize=12,verticalalignment='top', bbox=props2)

  #AXIS LABELS
  plt.xlabel("X (in m)")
  plt.ylabel("Depth (in m)")

  #AXIS LIMITS
  plt.xlim(-1.5, 1.5)
  plt.ylim(0,4.5)
  plt.grid()

  #SAVE FIGURES
  plt.savefig('./2Dfigs/' + str(frame_num) + '.png')
  plt.clf()

  from ipykernel import kernelapp as app
  app.launch_new_instance()


<Figure size 432x288 with 0 Axes>

Convert 2D figs to mp4 video

In [None]:
import os
from os.path import isfile, join

pathIn= './2Dfigs'
pathOut = '2D_vid.mp4'
fps_const = 30
frame_array = []
files = [f for f in os.listdir(pathIn) if isfile(join(pathIn, f))]
#for sorting the file names properly
files.sort(key = lambda x: int((x.split(".")[0])))
total_frames = len(files)
num_files = 260
for i in range(num_files):
    filename = pathIn + '/' + files[i]
    #reading each files
    img = cv2.imread(filename) 
    height, width, layers = img.shape
    size = (width,height)
    #inserting the frames into an image array
    frame_array.append(img)
print("Frame array loaded!")
out = cv2.VideoWriter(pathOut,cv2.VideoWriter_fourcc(*'DIVX'), fps_const, size)
print("out initialized!")
for i in range(len(frame_array)):
    # writing to a image array
    out.write(frame_array[i])
out.release()
print("out loaded!")

Frame array loaded!
out initialized!
out loaded!
