# Loading libs and algorithms

In [1]:
import numpy as np
import cv2 as cv
import os
from collections import defaultdict
from itertools import combinations
import threading
from os import listdir
from os.path import isfile, join
import time
# accuracy
CONF_THRESHHOLD = 0.6
NMS_THRESHHOLD = 0.4


# color and distance
CRITICAL_COL = (20, 20, 220) # red
WARNING_COL = (20, 120, 220) # orange
SAFE_COL = (20, 220, 20) # green
DIST_COL = (220, 20, 20) # blue
WARNING_DISTANCE = 500
CRITICAL_DISTANCE = 300

CLASS_NAMES = []
OBJ_COL = (20, 220, 20)

# load coco names
with open('classes.txt', 'r') as f:
    CLASS_NAMES = [cname.strip() for cname in f.readlines()]

# loading models 
weights = 'res\\yolov4-home_best.weights'
res = 'res\\yolov4-home.cfg'
net = cv.dnn.readNet(weights, res)
net.setPreferableBackend(cv.dnn.DNN_BACKEND_CUDA)
net.setPreferableTarget(cv.dnn.DNN_TARGET_CUDA)
model = cv.dnn_DetectionModel(net)
model.setInputParams(size=(416, 416), scale=1/255, swapRB=True)


def draw_text(img, text,
          font=cv.FONT_HERSHEY_PLAIN,
          pos=(0, 0),
          font_scale=3,
          font_thickness=2,
          text_color=(255, 255, 255),
          text_color_bg=SAFE_COL
          ):

    x, y = pos
    text_size, _ = cv.getTextSize(text, font, font_scale, font_thickness)
    text_w, text_h = text_size
    cv.rectangle(img, pos, (x + text_w + 10, y + text_h + 5), text_color_bg, -1)
    img = cv.putText(img, text, (x, y + text_h + font_scale - 1), font, font_scale, text_color, font_thickness)
    return img

def max_image_res(w, h, max_res):
    max_ = max(w, h)
    coeff = max_res / max_
    w *= coeff
    h *= coeff
    return (round(w), round(h))


# Object detection and Distance Detection algorithms

In [2]:

def object_detection(frame, model=model):
    objects = []
    classes, scores, boxes = model.detect(frame, CONF_THRESHHOLD, NMS_THRESHHOLD)
    # loop detected classes 
    for (class_id, _, box) in zip(classes, scores, boxes):
        class_name = CLASS_NAMES[class_id[0]]
        label = f"{class_name}"

        # setting default color 
        col = (20, 220, 20)

        # add text and bounding box
        frame = draw_text(frame, label, pos=(box[0]+5, box[1]+5), text_color_bg=col, font_scale=2)
        cv.rectangle(frame, box, col, thickness=4)

        # get coordinates
        (x, y) = (box[0], box[1])
        (w, h) = (box[2], box[3])
        left, right = ((x, y + h // 2), (x + w, y + h // 2))
        top, bottom = ((x + w//2, y), (x + w//2, y+h))
        
        # add data about objects to objects list
        objects.append((class_name, box, (top, right, bottom, left)))
    return frame, objects

def distance_detection(frame, objects, CRITICAL_DISTANCE=CRITICAL_DISTANCE, WARNING_DISTANCE=WARNING_DISTANCE, show_distance=False):
    level = 0
    color = (0, 0, 0)

    critical_set = set()
    for obj1, obj2 in list(combinations(objects, 2)):
        class_name1, box1, edges_a = obj1
        class_name2, box2, edges_b = obj2

        # break
        if (class_name1 == 'person' and class_name2 == 'person') or \
            (class_name1 != 'person' and class_name2 != 'person'):
            continue


        tl1, tr1 = (( box1[0], box1[1] ), ( box1[0] + box1[2], box1[1]))
        bl1, br1 = (( box1[0], box1[1] + box1[3]), ( box1[0] + box1[2], box1[1] + box1[3]))

        tl2, tr2 = (( box2[0], box2[1] ), ( box2[0] + box2[2], box2[1]))
        bl2, br2 = (( box2[0], box2[1] + box2[3]), ( box2[0] + box2[2], box2[1] + box2[3]))

        # caring about only x and ys
        b1x1, b1x2, = (box1[0], box1[0] + box1[2] )
        b1y1, b1y2, = (box1[1], box1[1] + box1[3] )

        b2x1, b2x2, = (box2[0], box2[0] + box2[2] )
        b2y1, b2y2, = (box2[1], box2[1] + box2[3] )

        overlap = False
        if ((b2x1 >= b1x1 and b2x1 <= b1x2) or (b2x2 >= b1x1 and b2x2 <= b1x2)) \
            and ((b2y1 >= b1y1 and b2y1 <= b1y2) or (b2y2 >= b1y1 and b2y2 <= b1y2)):
                overlap = True
                color = CRITICAL_COL
                level = 2
        
        min_distance = None
        case = None
        # checking for corner candidates:
        if (b2x2 < b1x1):
            if (b2y2 < b1y1):
                case = 'tl'
                # distance = br2 and tl1 
                coord_a = tl1
                coord_b = br2

                min_distance = ()
            elif (b2y1 > b1y2):
                case = 'bl' 
                # distance = tr2 and bl1
                coord_a = bl1
                coord_b = tr2
        if (b2x1 > b1x2):
            if (b2y2 < b1y1):
                case = 'tr'
                # distance = tr1 and bl2
                coord_a = tr1
                coord_b = bl2
            elif (b2y1 > b1y2):
                case = 'br'
                # distance = tl2 and br1
                coord_a = br1
                coord_b = tl2
            
        # gettingdistances
        distances = []
        code = None
        if not case:
            # l - r
            distance = abs(b1x1 - b2x2)
            min_distance = distance
            code = 'lr'
            # r - l
            distance = abs(b1x2 - b2x1)
            if distance < min_distance:
                min_distance = distance
                code = 'rl'

            # u - d
            distance = abs(b1y1 - b2y2)
            if distance < min_distance:
                min_distance = distance
                code = 'ud'

            # d - u
            distance = abs(b1y2 - b2y1)
            if distance < min_distance:
                min_distance = distance
                code = 'du'

            # checking for codes 
            if code == 'lr':
                if b1y1 > b2y1: # if b1 is lower
                    coord_a = tl1
                    coord_b = (b2x2 , tl1[1])
                elif b1y1 < b2y1: # if b1 is higher
                    coord_a = (b1x1 , tr2[1])
                    coord_b = tr2
                else:
                    coord_a = (b1x1, (b1y1 + b1y2) // 2 )
                    coord_b = (b2x2, (b2y1 + b2y2) // 2 )
            elif code == 'rl':
                if b1y1 > b2y1: # if b1 is lower
                    coord_a = tr1
                    coord_b = (b2x1, tr1[1])
                elif b1y1 < b2y1: # if b1 is higher
                    coord_a = (b1x2, tl2[1])
                    coord_b = tl2
                else:
                    coord_a = (b1x2, (b1y1 + b1y2) // 2 )
                    coord_b = (b2x1, (b2y1 + b2y2) // 2 )
            elif code == 'ud':
                if b1x1 > b2x1: # if b1 is lower
                    coord_a = tl1
                    coord_b = (tl1[0] , b2y2)
                elif b1x1 < b2x1: # if b1 is higher
                    coord_a = (bl2[0], b1y1)
                    coord_b = bl2
                else:
                    coord_a = ((b1x1 + b1x2) // 2, b1y1 )
                    coord_b = ((b2x1 + b2x2) // 2, b2y2 )
            elif code == 'du':
                if b1x1 > b2x1: # if b1 is lower
                    coord_a = bl1
                    coord_b = (bl1[0] , b2y1)
                elif b1x1 < b2x1: # if b1 is higher
                    coord_a = (tl2[0], b1y2)
                    coord_b = tl2
                else:
                    coord_a = ((b1x1 + b1x2) // 2, b1y2 )
                    coord_b = ((b2x1 + b2x2) // 2, b2y1 )

        # distance formula ( (x2 - x1) ^ 2 + (y2 - y1) ^2 ) ** 0.5
        # min distance

        
        # checking for distance 
        if not overlap:
            if case: # corners
                # distance formula ( (x2 - x1) ^ 2 + (y2 - y1) ^2 ) ** 0.5
                min_distance = (((coord_b[0] - coord_a[0]) ** 2) + (coord_b[1] - coord_a[1]) ** 2) ** 0.5
                # frame = cv.line(frame, (coord_a[0], coord_a[1]), (coord_b[0], coord_b[1]), DIST_COL, thickness=4)
            if min_distance <= CRITICAL_DISTANCE:
                color = CRITICAL_COL
                level = 2
                if show_distance: frame = cv.line(frame, (coord_a[0], coord_a[1]), (coord_b[0], coord_b[1]), CRITICAL_COL, thickness=4)
            elif min_distance <= WARNING_DISTANCE:
                level = 1
                color = WARNING_COL
                if show_distance: frame = cv.line(frame, (coord_a[0], coord_a[1]), (coord_b[0], coord_b[1]), WARNING_COL, thickness=4)
            else:
                level = 0
                color = SAFE_COL
                if show_distance: frame = cv.line(frame, (coord_a[0], coord_a[1]), (coord_b[0], coord_b[1]), DIST_COL, thickness=4)
        else:
            level = 2
            color = CRITICAL_COL

        if hex(id(obj1)) not in critical_set:
            frame = cv.rectangle(frame, box1, color, thickness=4) 
            frame = draw_text(frame, class_name1, pos=(box1[0]+5, box1[1]+5), text_color_bg=color, font_scale=2)
        
        if hex(id(obj2)) not in critical_set:
            frame = cv.rectangle(frame, box2, color, thickness=4) 
            frame = draw_text(frame, class_name2, pos=(box2[0]+5, box2[1]+5), text_color_bg=color, font_scale=2)

        if level == 2:
            critical_set.add(hex(id(obj1)))
            critical_set.add(hex(id(obj2)))

    return frame, level, color





## Detecting Violations on a folder with images

In [3]:
# as a service: checking distance violation on a folder of 25 images
def detect_on_image(path, model=None, max_size=416, save_modified=True, save_raw=False):
    iframe = cv.imread(path) # orig image
    directory, file = os.path.split(path)
    if not model:
        frame, objects = object_detection(iframe)
    else:
        frame, objects = object_detection(iframe, model)

    if len(objects) > 1:
        frame, level, color = distance_detection(frame, objects, CRITICAL_DISTANCE=30, WARNING_DISTANCE=50)
        print(f"viewed: {path}, level: {level}, color: {color}")
        if level > 0:
            print(f"-violation (level: {level})")
            result = os.path.join(directory, 'violations')
            if not os.path.exists(result):
                print("violations folder does not exist. Creating folder")
                os.mkdir(result)
            cv.imwrite(os.path.join(result, file), frame)
            print(f"saved {file} to /violations")
            if save_raw:
                filename, ext = os.path.splitext(file)
                filename += f"_raw{ext}"
                cv.imwrite(os.path.join(directory, 'violations', filename), cv.imread(path))
    else:
        print(f"viewed: {path}, there are there are no recognizable objects")


def main():
    # loading models 
    net = cv.dnn.readNet('res\\yolov4-home_best.weights', 'res\\yolov4-home.cfg')
    net.setPreferableBackend(cv.dnn.DNN_BACKEND_CUDA)
    net.setPreferableTarget(cv.dnn.DNN_TARGET_CUDA)
    model = cv.dnn_DetectionModel(net) 
    model.setInputParams(size=(416, 416), scale=1/255, swapRB=True)
    # detecting on /test directory
    test_path = '.\\test\\image'

    images = [os.path.join(test_path, f) for f in listdir(test_path) if isfile(join(test_path, f)) and f.endswith('.jpg')][:100]
    for path in images:
        detect_on_image(path, model, save_raw=True)
# main()
# 820 size ---
# test 1: 25 images takes 3.9 seconds 
# test 2: 100 images takes 9.6 seconds
# test 3: 1000 images - 1m23 seconds ~ 1m43 seconds 
# ---
# test 4: 1000 images (512 size)  - 1m 22s

## Detecting Violations on a video file

In [4]:
# as a service: detecting violations on a video file
import time
import copy
def detect_on_video(path, model=None, max_size=512, save_modified=True, save_raw=False):
    cap = cv.VideoCapture(path)
    prev_time = time.time()
    directory, file = os.path.split(path)
    filename, ext = os.path.splitext(file)

    allowed_saving = True
    idx = 1
    frame_id = 1
    # generate violation folder
    result = os.path.join(directory, 'violations', filename)
    if not os.path.exists(result):
        print("violation folder does not exist. Creating folder")
        os.makedirs(result)
    while True:
        ret, frame = cap.read()
        if ret == False:
            break

        height, width = frame.shape[:2]
        frame = cv.resize(frame, max_image_res(width, height, max_size), interpolation = cv.INTER_CUBIC)
        frame_raw = copy.deepcopy(frame)
        if not model:
            frame, objects = object_detection(frame)
        else:
            frame, objects = object_detection(frame, model)

        if len(objects) > 1:
            frame, level, color = distance_detection(frame, objects, CRITICAL_DISTANCE=30, WARNING_DISTANCE=50)
            print(f"viewed: {path} frame: {frame_id}, level: {level}, color: {color}")
            if level > 0:
                print(f"-violation (level: {level})")
                if allowed_saving:
                    if save_modified:
                        modified = filename + f"_{idx}.jpg"
                        cv.imwrite(os.path.join(result, modified), frame)
                        print(f"saved modified {filename} to /violations folder")
                        allowed_saving = False
                    if save_raw:
                        raw = filename + f"_{idx}_raw.jpg"
                        cv.imwrite(os.path.join(result, raw), frame_raw)
                        print(f"saved raw {filename} to /violations folder")
                    idx += 1
                    
                if not allowed_saving:
                    if time.time() - prev_time >= 3:
                        allowed_saving = True
                        prev_time = time.time()

        else:
            print(f"viewed: {path}, frame: {frame_id}, there are there are no recognizable objects")
        frame_id += 1
            
def main():
    # loading models 
    net = cv.dnn.readNet('res\\yolov4-home_best.weights', 'res\\yolov4-home.cfg')
    net.setPreferableBackend(cv.dnn.DNN_BACKEND_CUDA)
    net.setPreferableTarget(cv.dnn.DNN_TARGET_CUDA)
    model = cv.dnn_DetectionModel(net)
    model.setInputParams(size=(416, 416), scale=1/255, swapRB=True)
    # detecting on /test directory
    test_path = '.\\test\\video'
    # detecting on video
    # path = os.path.join(test_path, 'man.mkv')
    path = os.path.join(test_path, 'festivalyurifancam.mp4')
    detect_on_video(path, save_raw=True)
# main()
# test 1: 5minute37second cctv video feed took 13 minutes to check 
# test 2: festival yuri fancam (2minute video) took 4m 49.9 seconds to check 

## Detecting violations on a Webcam feed

In [5]:
# as a service: detecting violations on a single webcam feed
import time
import datetime
import copy
def detect_on_feed(output_path, name, capture=0, model=None, max_size=512, save_modified=True, save_raw=False):
    cap = cv.VideoCapture(capture)
    
    prev_time = time.time()

    allowed_saving = True
    frame_id = 1
    # generate violation folder
    result = os.path.join(output_path, name)
    if not os.path.exists(result):
        print("violation folder does not exist. Creating folder")
        os.makedirs(result)
    while True:
        ret, frame = cap.read()
        if ret == False:
            print("False")
            break

        height, width = frame.shape[:2]
        frame = cv.resize(frame, max_image_res(width, height, max_size), interpolation = cv.INTER_CUBIC)
        frame_raw = copy.deepcopy(frame)
        if not model:
            frame, objects = object_detection(frame)
        else:
            frame, objects = object_detection(frame, model)

        if len(objects) > 1:
            frame, level, color = distance_detection(frame, objects, CRITICAL_DISTANCE=30, WARNING_DISTANCE=50)
            print(f"viewed: {name} frame: {frame_id}, level: {level}, color: {color}")
            if level > 0:
                print(f"-violation (level: {level})")
                if allowed_saving:
                    timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H_%M_%S")
                    if save_modified:
                        modified = f"{timestamp}.jpg"
                        cv.imwrite(os.path.join(result, modified), frame)
                        print(f"saved modified frame to /violations folder. detected on {timestamp} ")
                        allowed_saving = False
                    if save_raw:
                        raw = f"{timestamp}_raw.jpg"
                        cv.imwrite(os.path.join(result, raw), frame_raw)
                        print(f"saved raw frame to /violations folder. detected on {timestamp} ")
                    
                if not allowed_saving:
                    if time.time() - prev_time >= 3:
                        allowed_saving = True
                        prev_time = time.time()

        else:
            print(f"viewed: {name}, frame: {frame_id}, there are there are no recognizable objects")
        frame_id += 1
            
def main():
    # loading models 
    net = cv.dnn.readNet('res\\yolov4-home_best.weights', 'res\\yolov4-home.cfg')
    net.setPreferableBackend(cv.dnn.DNN_BACKEND_CUDA)
    net.setPreferableTarget(cv.dnn.DNN_TARGET_CUDA)
    model = cv.dnn_DetectionModel(net)
    model.setInputParams(size=(416, 416), scale=1/255, swapRB=True)
    output_path = '.\\output\\capture\\no_ui'
    detect_on_feed(output_path, 'webcam 1', save_raw=True)
# main()


## Detecting violations on 2 webcam feeds

In [6]:
# as a service: detecting violations on 2 webcam feeds
import time
import datetime
import copy
import threading 

def detect_on_feed(output_path, name, capture=0, model=None, max_size=512, save_modified=True, save_raw=False):
    cap = cv.VideoCapture(capture)
    
    prev_time = time.time()

    allowed_saving = True
    frame_id = 1
    # generate violation folder
    result = os.path.join(output_path, name)
    if not os.path.exists(result):
        print("violation folder does not exist. Creating folder")
        os.makedirs(result)
    while True:
        ret, frame = cap.read()
        if ret == False:
            print("False")
            break

        height, width = frame.shape[:2]
        frame = cv.resize(frame, max_image_res(width, height, max_size), interpolation = cv.INTER_CUBIC)
        frame_raw = copy.deepcopy(frame)
        if not model:
            frame, objects = object_detection(frame)
        else:
            frame, objects = object_detection(frame, model)

        if len(objects) > 1:
            frame, level, color = distance_detection(frame, objects, CRITICAL_DISTANCE=30, WARNING_DISTANCE=50)
            print(f"viewed: {name} frame: {frame_id}, level: {level}, color: {color}")
            if level > 0:
                print(f"-violation (level: {level})")
                if allowed_saving:
                    timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H_%M_%S")
                    if save_modified:
                        modified = f"{timestamp}.jpg"
                        cv.imwrite(os.path.join(result, modified), frame)
                        print(f"saved modified frame to /violations folder. detected on {timestamp} ")
                        allowed_saving = False
                    if save_raw:
                        raw = f"{timestamp}_raw.jpg"
                        cv.imwrite(os.path.join(result, raw), frame_raw)
                        print(f"saved raw frame to /violations folder. detected on {timestamp} ")
                    
                if not allowed_saving:
                    if time.time() - prev_time >= 3:
                        allowed_saving = True
                        prev_time = time.time()

        else:
            print(f"viewed: {name}, frame: {frame_id}, there are there are no recognizable objects")
        frame_id += 1

def main(name, camera):
    # loading models 
    net = cv.dnn.readNet('res\\yolov4-home_best.weights', 'res\\yolov4-home.cfg')
    net.setPreferableBackend(cv.dnn.DNN_BACKEND_CUDA)
    net.setPreferableTarget(cv.dnn.DNN_TARGET_CUDA)
    model = cv.dnn_DetectionModel(net)
    model.setInputParams(size=(416, 416), scale=1/255, swapRB=True)
    output_path = '.\\output\\capture\\no_ui'
    detect_on_feed(output_path, name, camera, model=model, save_raw=True, save_modified=True)

# detecting on different webcams on different threads
x = threading.Thread(target=main, args=('webcam 1', 0))
y = threading.Thread(target=main, args=('webcam 2', 1))
x.start()
y.start()


viewed: webcam 2, frame: 1, there are there are no recognizable objects
viewed: webcam 2, frame: 2, there are there are no recognizable objects
viewed: webcam 2, frame: 3, there are there are no recognizable objects
viewed: webcam 2, frame: 4, there are there are no recognizable objects
viewed: webcam 2, frame: 5, there are there are no recognizable objects
viewed: webcam 2, frame: 6, there are there are no recognizable objects
viewed: webcam 2, frame: 7, there are there are no recognizable objects
viewed: webcam 2, frame: 8, there are there are no recognizable objects
viewed: webcam 2, frame: 9, there are there are no recognizable objects
viewed: webcam 2, frame: 10, there are there are no recognizable objects
viewed: webcam 2, frame: 11, there are there are no recognizable objects
viewed: webcam 2, frame: 12, there are there are no recognizable objects
viewed: webcam 2, frame: 13, there are there are no recognizable objects
viewed: webcam 1, frame: 1, there are there are no recogniza