<a href="https://colab.research.google.com/github/IqbalLx/Physical-Distnace-Tracker/blob/master/Physical_Distance_Tracker.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import cv2
import numpy as np

%tensorflow_version 1.x
import tensorflow as tf
tf.__version__

TensorFlow 1.x selected.


'1.15.2'

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

In [0]:
cd '/content/gdrive/My Drive/physicaldistance'

/content/gdrive/My Drive/physicaldistance


In [0]:
def euclidean_dist(point_a, point_b):
  return np.sqrt((point_a[1] - point_b[1])**2 + (point_a[0] - point_b[0])**2)

In [0]:
#Transforming into bird's eye view
def transform(img, points):
    #Ordering points
    #rect = order_points(points)
    tl, tr, br, bl = points

    #Construct bird's eye image matrix
    #I want it vertical, so it'll find minimum width
    #between distance for tl to tr and bl to br
    upperW = euclidean_dist(tl, tr)
    lowerW = euclidean_dist(bl, br)
    minW = min(upperW, lowerW)

    #And max height between distance for tl to bl
    #and tr to br
    upperH = euclidean_dist(tl, bl)
    lowerH = euclidean_dist(tr, br)
    maxH = max(upperH, lowerH)

    #Put all the information
    tmp = np.array([
        [0, 0],
        [minW-1, 0],
        [minW-1, maxH-1],
        [0, maxH-1]
    ], dtype='float32')

    #Warping and transform
    M = cv2.getPerspectiveTransform(points, tmp)
    warped = cv2.warpPerspective(img, M, (int(minW), int(maxH)))

    return warped, M

In [0]:
#----------------------------
#
#Code originated from https://gist.github.com/madhawav/1546a4b99c8313f06c0b2d7d7b4a09e2
#Tensorflow Human Detection - with minor tweaks
#
#-----------------------------

class DetectorAPI:
    def __init__(self, path_to_ckpt):
        self.path_to_ckpt = path_to_ckpt

        self.detection_graph = tf.Graph()
        with self.detection_graph.as_default():
            od_graph_def = tf.GraphDef()
            with tf.gfile.GFile(self.path_to_ckpt, 'rb') as fid:
                serialized_graph = fid.read()
                od_graph_def.ParseFromString(serialized_graph)
                tf.import_graph_def(od_graph_def, name='')

        self.default_graph = self.detection_graph.as_default()
        self.sess = tf.Session(graph=self.detection_graph)

        # Definite input and output Tensors for detection_graph
        self.image_tensor = self.detection_graph.get_tensor_by_name('image_tensor:0')
        # Each box represents a part of the image where a particular object was detected.
        self.detection_boxes = self.detection_graph.get_tensor_by_name('detection_boxes:0')
        # Each score represent how level of confidence for each of the objects.
        # Score is shown on the result image, together with the class label.
        self.detection_scores = self.detection_graph.get_tensor_by_name('detection_scores:0')
        self.detection_classes = self.detection_graph.get_tensor_by_name('detection_classes:0')
        self.num_detections = self.detection_graph.get_tensor_by_name('num_detections:0')

    def processFrame(self, image):
        # Expand dimensions since the trained_model expects images to have shape: [1, None, None, 3]
        image_np_expanded = np.expand_dims(image, axis=0)
        # Actual detection.
        (boxes, scores, classes, num) = self.sess.run(
            [self.detection_boxes, self.detection_scores, self.detection_classes, self.num_detections],
            feed_dict={self.image_tensor: image_np_expanded})

        im_height, im_width,_ = image.shape
        boxes_list = [None for i in range(boxes.shape[1])]
        for i in range(boxes.shape[1]):
            boxes_list[i] = (int(boxes[0,i,0] * im_height),
                        int(boxes[0,i,1]*im_width),
                        int(boxes[0,i,2] * im_height),
                        int(boxes[0,i,3]*im_width))

        return boxes_list, scores[0].tolist(), [int(x) for x in classes[0].tolist()], int(num[0])

    def close(self):
        self.sess.close()
        self.default_graph.close()

You can run one of this cell, each output is defined by the title

## Normal view video with Human bounding box

In [0]:
model_path = 'faster_rcnn_nas/frozen_inference_graph.pb'
odapi = DetectorAPI(path_to_ckpt=model_path)
threshold = 0.7
cap = cv2.VideoCapture('sample.mp4')
baseW = 640
res=(baseW, 360) #resolution horizontal
fourcc = cv2.VideoWriter_fourcc(*'MP4V') #codec
out = cv2.VideoWriter('video-out-nas.mp4', fourcc, 20.0, res)

while True:
  r, img = cap.read()
  if r:
      height, width = img.shape[:2]
      ratio = baseW / width
      newH = int(ratio * height)
      img = cv2.resize(img, (baseW, newH), interpolation= cv2.INTER_LINEAR)

      boxes, scores, classes, num = odapi.processFrame(img)

      human_boxes = [boxes[i] for i in range(len(boxes)) if classes[i]==1 and scores[i]>threshold]
      for box in human_boxes:
          w = box[3] - box[1]
          h = box[2] - box[0]
          if h > w:
            cv2.rectangle(img, (box[1], box[0]), (box[3], box[2]), (0, 255, 0), 2)

      out.write(img)
  else: 
      break

out.release()
cap.release()

##Warped view (birds eye view)

In [0]:
model_path = 'faster_rcnn_nas/frozen_inference_graph.pb'
odapi = DetectorAPI(path_to_ckpt=model_path)
threshold = 0.7
cap = cv2.VideoCapture('sample.mp4')
res=(265, 405) #resulotion vertical
fourcc = cv2.VideoWriter_fourcc(*'MP4V') #codec
out = cv2.VideoWriter('video-vert.mp4', fourcc, 20.0, res)

while True:
  r, img = cap.read()
  if r:
      height, width = img.shape[:2]
      baseW = 640
      ratio = baseW / width
      newH = int(ratio * height)
      img = cv2.resize(img, (baseW, newH), interpolation= cv2.INTER_LINEAR)
      copy_img = img.copy()

      boxes, scores, classes, num = odapi.processFrame(img)

      human_boxes = [boxes[i] for i in range(len(boxes)) if classes[i]==1 and scores[i]>threshold]
      warp_img, mat = transform(copy_img, np.array([[353, 11], [618, 13], [530, 350], [4, 217]], dtype='float32'))
      for box in human_boxes:
          x= box[1]
          w = box[3] - box[1]
          h = box[2] - box[0]
          if h > w:
            points = [x + (w//2), box[2]]
            src = np.array(points, dtype='float32').reshape(1, 1, 2)
            dst = np.zeros((1, 2), dtype='float32')

            warp_pts = cv2.perspectiveTransform(src, mat, dst).reshape(2,)
            cv2.circle(warp_img, (warp_pts[0], warp_pts[1]), 5, (0, 0, 255), 3)

      out.write(warp_img)
  else: 
      break

out.release()
cap.release()

##Black vertical video with point mapped

In [0]:
model_path = 'faster_rcnn_nas/frozen_inference_graph.pb'
odapi = DetectorAPI(path_to_ckpt=model_path)
threshold = 0.7
cap = cv2.VideoCapture('sample.mp4')
res=(265, 405) #resulotion vertical
fourcc = cv2.VideoWriter_fourcc(*'MP4V') #codec
out = cv2.VideoWriter('video-black.mp4', fourcc, 20.0, res)

while True:
  r, img = cap.read()
  if r:
      height, width = img.shape[:2]
      baseW = 640
      ratio = baseW / width
      newH = int(ratio * height)
      img = cv2.resize(img, (baseW, newH), interpolation= cv2.INTER_LINEAR)
      copy_img = img.copy()

      boxes, scores, classes, num = odapi.processFrame(img)

      human_boxes = [boxes[i] for i in range(len(boxes)) if classes[i]==1 and scores[i]>threshold]
      warp_img, mat = transform(copy_img, np.array([[353, 11], [618, 13], [530, 350], [4, 217]], dtype='float32'))
      black_img = np.zeros((warp_img.shape[0], warp_img.shape[1], 3), np.uint8)
      for box in human_boxes:
          x= box[1]
          w = box[3] - box[1]
          h = box[2] - box[0]
          if h > w:
            points = [x + (w//2), box[2]]
            src = np.array(points, dtype='float32').reshape(1, 1, 2)
            dst = np.zeros((1, 2), dtype='float32')

            warp_pts = cv2.perspectiveTransform(src, mat, dst).reshape(2,)
            cv2.circle(black_img, (warp_pts[0], warp_pts[1]), 5, (0, 0, 255), 3)

      out.write(black_img)
  else: 
      break

out.release()
cap.release()

##Measured distance video

In [0]:
model_path = 'faster_rcnn_nas/frozen_inference_graph.pb'
odapi = DetectorAPI(path_to_ckpt=model_path)
threshold = 0.4
cap = cv2.VideoCapture('sample.mp4')

fourcc = cv2.VideoWriter_fourcc(*'MP4V') #codec
#Black vid
res=(265, 405) #resulotion vertical
out = cv2.VideoWriter('video-black-measureed.mp4', fourcc, 20.0, res)

#Measured normal vid
res_norm=(640, 360) #resolution horizontal
out_norm = cv2.VideoWriter('video-out-measured.mp4', fourcc, 20.0, res_norm)

ref_pts = [[353, 11], [618, 13], [530, 350], [4, 217]]
counter = 0
while True:
  r, img = cap.read()
  if r:
      height, width = img.shape[:2]
      baseW = 640
      ratio = baseW / width
      newH = int(ratio * height)
      img = cv2.resize(img, (baseW, newH), interpolation= cv2.INTER_LINEAR)
      copy_img = img.copy()

      boxes, scores, classes, num = odapi.processFrame(img)

      human_boxes = [boxes[i] for i in range(len(boxes)) if classes[i]==1 and scores[i]>threshold]
      warp_img, mat = transform(copy_img, np.array(ref_pts, dtype='float32'))
      black_img = np.zeros((warp_img.shape[0], warp_img.shape[1], 3), np.uint8)

      coord_points = []
      centroid = []
      for box in human_boxes:
          #For easiness
          x= box[1]
          w= box[3] - x
          y= box[0]
          h= box[2] - y 
          if h > w:
            #Human boxes' centroid in img (normal image)
            cv2.rectangle(img, (box[1], box[0]), (box[3], box[2]), (0, 255, 0), 3)
            ctr= [x + (w//2), y + (h//2)]
            centroid.append(ctr)

            #Perspective transformation foot circle in black_img
            points = [x + (w//2), box[2]]
            src = np.array(points, dtype='float32').reshape(1, 1, 2)
            dst = np.zeros((1, 2), dtype='float32')
            warp_pts = cv2.perspectiveTransform(src, mat, dst).reshape(2,)

            current_a, current_b = warp_pts[:2]
            coord_points.append([current_a, current_b])
            cv2.circle(black_img, (current_a, current_b), 5, (0, 255, 0), 3)

      coord_points = np.array(coord_points, dtype='float32')
      distances = np.linalg.norm(coord_points - coord_points[:,None], axis=-1)
      distances = np.where(distances == 0, np.nan, distances)

      violation = np.where((distances < 30) & (distances > 15))
      if len(violation[0]) !=0:
        for violate in violation:
          x, y = violate[0], violate[1]
          cv2.line(img, tuple(centroid[x]), tuple(centroid[y]), (0, 0, 255), 2)
          cv2.line(black_img, tuple(coord_points[x]), tuple(coord_points[y]), (0, 0, 255), 2)
          counter += 1
      
      cv2.putText(img, f"Close contact: {counter}", (23, 23), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1)
      out.write(black_img)
      out_norm.write(img)
  else: 
      break

out.release()
out_norm.release()
cap.release()