In [1]:
# Mount Google Drive (If using Colab)
from google.colab import drive
drive.mount("drive")

Mounted at drive


In [2]:
# # Install Required Libraries from PyPI
!pip install git+https://github.com/hukkelas/DSFD-Pytorch-Inference.git

Collecting git+https://github.com/hukkelas/DSFD-Pytorch-Inference.git
  Cloning https://github.com/hukkelas/DSFD-Pytorch-Inference.git to /tmp/pip-req-build-pz6s7l88
  Running command git clone -q https://github.com/hukkelas/DSFD-Pytorch-Inference.git /tmp/pip-req-build-pz6s7l88
Building wheels for collected packages: face-detection
  Building wheel for face-detection (setup.py) ... [?25l[?25hdone
  Created wheel for face-detection: filename=face_detection-0.2.1-py3-none-any.whl size=29730 sha256=f9936f6177a1530fc7b56dd2401b8dc1e6948b52505b17b383894d102b07fdcc
  Stored in directory: /tmp/pip-ephem-wheel-cache-ts7k7pnn/wheels/11/5d/8c/04ffb7a0ca5427f3e674703ea75ecb16542e94efcc46d6bc1b
Successfully built face-detection
Installing collected packages: face-detection
Successfully installed face-detection-0.2.1


In [3]:
# Import Required Libraries
import os
import numpy as np
import cv2
import face_detection 
from sklearn.cluster import DBSCAN
from keras.models import load_model
from tensorflow.keras.applications.resnet50 import preprocess_input
from google.colab.patches import cv2_imshow

In [4]:
def social_with_mask_face(BASE_PATH, FILE_PATH, detector, mask_classifier, threshold_distance):
  ##################################### Analyze the Video ################################################

  # Load YOLOv3
  net = cv2.dnn.readNet(BASE_PATH+"Models/"+"yolov3.weights", BASE_PATH+"Models/"+"yolov3.cfg")

  # Load COCO Classes
  classes = []
  with open(BASE_PATH+"Models/"+"coco.names", "r") as f:
      classes = [line.strip() for line in f.readlines()]
      
  layer_names = net.getLayerNames()
  output_layers = [layer_names[i[0] - 1] for i in net.getUnconnectedOutLayers()]

  # Fetch Video Properties
  cap = cv2.VideoCapture(BASE_PATH + FILE_PATH )
  fps = cap.get(cv2.CAP_PROP_FPS)
  width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
  height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
  n_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)

  # Create Directory for Results
  if not os.path.exists(BASE_PATH+"Results"):
    os.mkdir(BASE_PATH+"Results")

  # Initialize Output Video Stream
  out_stream = cv2.VideoWriter(
      BASE_PATH+'Results/Output.mp4',
      cv2.VideoWriter_fourcc('X','V','I','D'),
      fps,
      (int(width),int(height)))

  while True:
      
      # Capture Frame-by-Frame
      ret, img = cap.read()

      # Check EOF
      if ret == False:
          break;

      # Get Frame Dimentions
      height, width, channels = img.shape

      # Detect Objects in the Frame with YOLOv3
      blob = cv2.dnn.blobFromImage(img, 0.00392, (416,416), (0, 0, 0), True, crop=False)
      net.setInput(blob)
      outs = net.forward(output_layers)

      class_ids = []
      confidences = []
      boxes = []
      
      # Store Detected Objects with Labels, Bounding_Boxes and their Confidences
      for out in outs:
          for detection in out:
              scores = detection[5:]
              class_id = np.argmax(scores)
              confidence = scores[class_id]
              if confidence > 0.5:
                  
                  # Get Center, Height and Width of the Box
                  center_x = int(detection[0] * width)
                  center_y = int(detection[1] * height)
                  w = int(detection[2] * width)
                  h = int(detection[3] * height)

                  # Topleft Co-ordinates
                  x = int(center_x - w / 2)
                  y = int(center_y - h / 2)

                  boxes.append([x, y, w, h])
                  confidences.append(float(confidence))
                  class_ids.append(class_id)

      indexes = cv2.dnn.NMSBoxes(boxes, confidences, 0.5, 0.4)

      # Initialize empty lists for storing Bounding Boxes of People and their Faces
      persons = []
      masked_faces = []
      unmasked_faces = []

      # Work on Detected Persons in the Frame
      for i in range(len(boxes)):
          if i in indexes:

              box = np.array(boxes[i])
              box = np.where(box<0,0,box)
              (x, y, w, h) = box

              label = str(classes[class_ids[i]])

              if label=='person':

                  persons.append([x,y,w,h])

                  # Detect Face in the Person
                  person_rgb = img[y:y+h,x:x+w,::-1]   # Crop & BGR to RGB
                  detections = detector.detect(person_rgb)

                  # If a Face is Detected
                  if detections.shape[0] > 0:

                    detection = np.array(detections[0])
                    detection = np.where(detection<0,0,detection)

                    # Calculating Co-ordinates of the Detected Face
                    x1 = x + int(detection[0])
                    x2 = x + int(detection[2])
                    y1 = y + int(detection[1])
                    y2 = y + int(detection[3])

                    try :

                      # Crop & BGR to RGB
                      face_rgb = img[y1:y2,x1:x2,::-1]   

                      # Preprocess the Image
                      face_arr = cv2.resize(face_rgb, (224, 224), interpolation=cv2.INTER_NEAREST)
                      face_arr = np.expand_dims(face_arr, axis=0)
                      face_arr = preprocess_input(face_arr)

                      # Predict if the Face is Masked or Not
                      score = mask_classifier.predict(face_arr)

                      # Determine and store Results
                      if score[0][0]<0.5:
                        masked_faces.append([x1,y1,x2,y2])
                      else:
                        unmasked_faces.append([x1,y1,x2,y2])

                      # Save Image of Cropped Face (If not required, comment the command below)
                      cv2.imwrite(BASE_PATH + "Results/Extracted_Faces/"+str(frame)
                                  +"_"+str(len(persons))+".jpg",
                                  img[y1:y2,x1:x2])

                    except:
                      continue
      
      # Calculate Coordinates of People Detected and find Clusters using DBSCAN
      person_coordinates = []

      for p in range(len(persons)):
        person_coordinates.append((persons[p][0]+int(persons[p][2]/2),persons[p][1]+int(persons[p][3]/2)))

      if len(persons)>0:
        clustering = DBSCAN(eps=threshold_distance,min_samples=2).fit(person_coordinates)
        isSafe = clustering.labels_
      else:
        isSafe = []

      # Count 
      person_count = len(persons)
      masked_face_count = len(masked_faces)
      unmasked_face_count = len(unmasked_faces)
      safe_count = np.sum((isSafe==-1)*1)
      unsafe_count = person_count - safe_count

      # Show Clusters using Red Lines
      arg_sorted = np.argsort(isSafe)

      for i in range(1,person_count):

        if isSafe[arg_sorted[i]]!=-1 and isSafe[arg_sorted[i]]==isSafe[arg_sorted[i-1]]:
          cv2.line(img,person_coordinates[arg_sorted[i]],person_coordinates[arg_sorted[i-1]],(0,0,255),2)

      # Put Bounding Boxes on People in the Frame
      for p in range(person_count):

        a,b,c,d = persons[p]

        # Green if Safe, Red if UnSafe
        if isSafe[p]==-1:
          cv2.rectangle(img, (a, b), (a + c, b + d), (0,255,0), 2)
        else:
          cv2.rectangle(img, (a, b), (a + c, b + d), (0,0,255), 2)

      # Put Bounding Boxes on Faces in the Frame
      # Green if Safe, Red if UnSafe
      for f in range(masked_face_count):

        a,b,c,d = masked_faces[f]
        cv2.rectangle(img, (a, b), (c,d), (0,255,0), 2)

      for f in range(unmasked_face_count):

        a,b,c,d = unmasked_faces[f]
        cv2.rectangle(img, (a, b), (c,d), (0,0,255), 2)

      # Show Monitoring Status in a Black Box at the Top
      cv2.rectangle(img,(0,0),(width,50),(0,0,0),-1)
      cv2.rectangle(img,(1,1),(width-1,50),(255,255,255),2)

      xpos = 15

      string = "Total People = "+str(person_count)
      cv2.putText(img,string,(xpos,35),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2)
      xpos += cv2.getTextSize(string,cv2.FONT_HERSHEY_SIMPLEX,1,2)[0][0]

      string = " ( "+str(safe_count) + " Safe "
      cv2.putText(img,string,(xpos,35),cv2.FONT_HERSHEY_SIMPLEX,1,(0,255,0),2)
      xpos += cv2.getTextSize(string,cv2.FONT_HERSHEY_SIMPLEX,1,2)[0][0]

      string = str(unsafe_count)+ " Unsafe ) "
      cv2.putText(img,string,(xpos,35),cv2.FONT_HERSHEY_SIMPLEX,1,(0,0,255),2)
      xpos += cv2.getTextSize(string,cv2.FONT_HERSHEY_SIMPLEX,1,2)[0][0]
      
      string = "( " +str(masked_face_count)+" Masked "+str(unmasked_face_count)+" Unmasked "+\
              str(person_count-masked_face_count-unmasked_face_count)+" Unknown )"
      cv2.putText(img,string,(xpos,35),cv2.FONT_HERSHEY_SIMPLEX,1,(0,255,255),2)

      # Write Frame to the Output File
      out_stream.write(img)

      # Exit on Pressing Q Key
      if cv2.waitKey(25) & 0xFF == ord('q'):
          break

  # Release Streams
  out_stream.release()
  cap.release()
  cv2.destroyAllWindows()

In [5]:
# Path to the Working Environment
BASE_PATH = "drive/MyDrive/Social_Distancing_with_AI/"

# Initialize a Face Detector 
# Confidence Threshold can be Adjusted, Greater values would Detect only Clear Faces

detector = face_detection.build_detector("DSFDDetector", confidence_threshold=.5, nms_iou_threshold=.3)

# Load Pretrained Face Mask Classfier (Keras Model)

mask_classifier = load_model("/content/drive/MyDrive/Social_Distancing_with_AI/Models/ResNet50_Classifier.h5")

# Set the Safe Distance in Pixel Units (Minimum Distance Expected to be Maintained between People)
# This Parameter would Affect the Results, Adjust according to the Footage captured by CCTV Camera 

threshold_distance = 150 # Try with different Values before Finalizing

Downloading: "https://folk.ntnu.no/haakohu/WIDERFace_DSFD_RES152.pth" to /root/.cache/torch/hub/checkpoints/WIDERFace_DSFD_RES152.pth


  0%|          | 0.00/459M [00:00<?, ?B/s]

In [6]:
!pip install dash
!pip install dash_core_components
!pip install dash_html_components
!pip install dash_player
!pip install jupyter-dash

Collecting dash
  Downloading dash-2.0.0-py3-none-any.whl (7.3 MB)
[K     |████████████████████████████████| 7.3 MB 5.2 MB/s 
[?25hCollecting dash-table==5.0.0
  Downloading dash_table-5.0.0.tar.gz (3.4 kB)
Collecting dash-core-components==2.0.0
  Downloading dash_core_components-2.0.0.tar.gz (3.4 kB)
Collecting plotly>=5.0.0
  Downloading plotly-5.5.0-py2.py3-none-any.whl (26.5 MB)
[K     |████████████████████████████████| 26.5 MB 1.5 MB/s 
[?25hCollecting dash-html-components==2.0.0
  Downloading dash_html_components-2.0.0.tar.gz (3.8 kB)
Collecting flask-compress
  Downloading Flask_Compress-1.10.1-py3-none-any.whl (7.9 kB)
Collecting tenacity>=6.2.0
  Downloading tenacity-8.0.1-py3-none-any.whl (24 kB)
Collecting brotli
  Downloading Brotli-1.0.9-cp37-cp37m-manylinux1_x86_64.whl (357 kB)
[K     |████████████████████████████████| 357 kB 50.6 MB/s 
[?25hBuilding wheels for collected packages: dash-core-components, dash-html-components, dash-table
  Building wheel for dash-core-

In [None]:
import base64
import os
from urllib.parse import quote as urlquote
from flask import Flask, send_from_directory
import dash
import shutil
from jupyter_dash import JupyterDash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output


UPLOAD_DIRECTORY = "drive/My Drive/Social_Distancing_with_AI/Inputs/"


# Normally, Dash creates its own Flask server internally. By creating our own,
# we can create a route for downloading files directly:
app = JupyterDash(__name__,
  external_stylesheets= [
    'http://127.0.0.1:8887/css/bootstarp/bootstrap.min.css',
    'http://127.0.0.1:8887/css/style.css',
  ],
  external_scripts=[
    'http://127.0.0.1:8887/js/jquery-3.6.0.min.js',
    'http://127.0.0.1:8887/js/popper.min.js',
    'http://127.0.0.1:8887/js/bootstarp/bootstrap.min.js'
  ])


app.layout = html.Div(
    [
        html.H1("Social Distance with Mask Face Detect"),
        html.H2("Upload Your Video"),
        dcc.Upload(
            id="upload-data",
            children=html.Div(
                ["Drag and drop or click to select a file to upload."]
            ),
            style={
                "width": "100%",
                "height": "60px",
                "lineHeight": "60px",
                "borderWidth": "1px",
                "borderStyle": "dashed",
                "borderRadius": "5px",
                "textAlign": "center",
                "margin": "10px",
            },
        ),
      # html.Div(id='output-video')
      dcc.Loading(
          id="loading-2",
          children=[html.Div([html.Div(id="output-video")])],
          type="circle",
      ),
    ],
)


def save_file(name, content):
  if not os.path.exists(UPLOAD_DIRECTORY):
    os.makedirs(UPLOAD_DIRECTORY)

  """Decode and store a file uploaded with Plotly Dash."""
  data = content.encode("utf8").split(b";base64,")[1]
  with open(os.path.join(UPLOAD_DIRECTORY, name), "wb") as fp:
      fp.write(base64.decodebytes(data))


# social_with_mask_face(BASE_PATH, 'Inputs/'+'v.mp4', detector, mask_classifier, threshold_distance) #Load Model

@app.callback(
    Output("output-video", "children"),
    [Input("upload-data", "filename"), Input("upload-data", "contents")],
)
def update_output(uploaded_filenames, uploaded_file_contents):
  if uploaded_filenames is not None and uploaded_file_contents is not None:
    if os.path.isdir(BASE_PATH+"Results"):
      shutil.rmtree(BASE_PATH+"Results")
    if os.path.isdir(BASE_PATH+"Inputs"):
      shutil.rmtree(BASE_PATH+"Inputs")

    save_file(uploaded_filenames, uploaded_file_contents) #Save uploaded file
    social_with_mask_face(BASE_PATH, 'Inputs/'+'v.mp4', detector, mask_classifier, threshold_distance) #Load Model

    return html.Video(
            controls = True,
            id = 'movie_player',
            src = "drive/My Drive/Social_Distancing_with_AI/Results/Output.mp4",
            autoPlay=True)

if __name__ == "__main__":
  app.run_server(mode='external',debug=True)


User provided device_type of 'cuda', but CUDA is not available. Disabling


User provided device_type of 'cuda', but CUDA is not available. Disabling


User provided device_type of 'cuda', but CUDA is not available. Disabling


User provided device_type of 'cuda', but CUDA is not available. Disabling


User provided device_type of 'cuda', but CUDA is not available. Disabling


User provided device_type of 'cuda', but CUDA is not available. Disabling


User provided device_type of 'cuda', but CUDA is not available. Disabling


User provided device_type of 'cuda', but CUDA is not available. Disabling


User provided device_type of 'cuda', but CUDA is not available. Disabling


User provided device_type of 'cuda', but CUDA is not available. Disabling


User provided device_type of 'cuda', but CUDA is not available. Disabling


User provided device_type of 'cuda', but CUDA is not available. Disabling


User provided device_type of 'cuda', but CUDA is not available. Disabling


User provid