## Imports

In [1]:
from keras.models import model_from_json

import sys
import math
import re

import cv2
import numpy as np

## Global Variables

In [2]:
#For Prediction
CLASS_LABELS = ['non-violent','violent']

## Loading Model Weights

In [3]:
file = open("ViolentModel/CNN/violent_model_json", 'r')
model_json = file.read()
file.close()

loaded_model = model_from_json(model_json)
loaded_model.load_weights("ViolentModel/CNN/violent_model_weights.h5")

## Globals & Prepare Model

---

Object Detection

---

In [4]:
#Import Object Detection Model Parameters
OBJ_config = "Model/yolov4.cfg"
OBJ_weights = "Model/yolov4.weights"
OBJ_class_names_file = "Model/coco.names"

# read class names from text file
OBJ_class_names = None
with open(OBJ_class_names_file, 'r') as f:
    OBJ_class_names = [line.strip() for line in f.readlines()]

OBJ_threshold = 0.1

---

Object Tracking

---

In [5]:
TRACK_list = cv2.legacy.MultiTracker_create()

---

Pose Estimation

---

In [6]:
#Import Pose Estimation Model Parameters
POSE_proto = "Model/pose_deploy_linevec.prototxt"
POSE_weights = "Model/pose_iter_440000.caffemodel"

POSE_body_parts = { "Nose": 0, "Neck": 1, "RShoulder": 2, "RElbow": 3, "RWrist": 4,
               "LShoulder": 5, "LElbow": 6, "LWrist": 7, "RHip": 8, "RKnee": 9,
               "RAnkle": 10, "LHip": 11, "LKnee": 12, "LAnkle": 13, "REye": 14,
               "LEye": 15, "REar": 16, "LEar": 17, "Background": 18 }

POSE_part_pairs = [ ["Neck", "RShoulder"], ["Neck", "LShoulder"], ["RShoulder", "RElbow"],
               ["RElbow", "RWrist"], ["LShoulder", "LElbow"], ["LElbow", "LWrist"],
               ["Neck", "RHip"], ["RHip", "RKnee"], ["RKnee", "RAnkle"], ["Neck", "LHip"],
               ["LHip", "LKnee"], ["LKnee", "LAnkle"], ["Neck", "Nose"], ["Nose", "REye"],
               ["REye", "REar"], ["Nose", "LEye"], ["LEye", "LEar"] ]

POSE_threshold = 0.1

In [7]:
#Create Models
OBJ_net = cv2.dnn.readNet(OBJ_config, OBJ_weights)
POSE_net = cv2.dnn.readNetFromCaffe(POSE_proto, POSE_weights)

#Set Models to use GPU
OBJ_net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
OBJ_net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
POSE_net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
POSE_net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)


## Prediction Video

In [8]:
PREDICTION_VIDEO = "Dataset/videos_test/violent/cam1/6_Trim.mp4"


## Detect People

In [9]:
def get_output_layers(net):

    layer_names = net.getLayerNames()

    output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]

    return output_layers

def detect_individuals_from_image(OBJ_img):

    OBJ_width = OBJ_img.shape[1]
    OBJ_height = OBJ_img.shape[0]
    OBJ_scale = 0.00392

    OBJ_blob = cv2.dnn.blobFromImage(OBJ_img, OBJ_scale, (416, 416), (0,0,0), True, crop=False)

    OBJ_net.setInput(OBJ_blob)
    outs = OBJ_net.forward(get_output_layers(OBJ_net))

    # initialization
    class_ids = []
    confidences = []
    boxes = []
    nms_threshold = 0.4

    # for each detection from each output layer
    # get the confidence, class id, bounding box params
    # and ignore weak detections (confidence < 0.5)
    for out in outs:
        for detection in out:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]
            if confidence > OBJ_threshold:
                center_x = int(detection[0] * OBJ_width)
                center_y = int(detection[1] * OBJ_height)
                w = int(detection[2] * OBJ_width)
                h = int(detection[3] * OBJ_height)
                x = center_x - w / 2
                y = center_y - h / 2
                class_ids.append(class_id)
                confidences.append(float(confidence))
                boxes.append([x, y, w, h])

    #Apply non-max suppression
    indices = cv2.dnn.NMSBoxes(boxes, confidences, OBJ_threshold, nms_threshold)

    confident_people_box = []

    for i in indices:
        #Only return if detected object is a Person
        if class_ids[i] == 0:
            confident_people_box.append(boxes[i])

    return confident_people_box

## Track People

In [10]:
def start_tracking_from_boxes(TRACK_BBoxes, TRACK_img):

    #New Empty Track list
    new_TRACK_list = cv2.legacy.MultiTracker_create()

    #Fill new Track list with the new Bounding boxes
    for TRACK_box in TRACK_BBoxes:

        tracker = cv2.legacy.TrackerCSRT_create()
        new_TRACK_list.add(tracker, TRACK_img, TRACK_box)

    global TRACK_list
    TRACK_list = new_TRACK_list

def track_using_trackers(TRACK_img, FINAL_img):

    global TRACK_list

    # grab the updated bounding box coordinates (if any) for each object that is being tracked
    (success, TRACK_boxes) = TRACK_list.update(TRACK_img)

def expand_tracking_box(image, FINAL_img):

    global TRACK_list
    TRACK_boxes = TRACK_list.getObjects()
    TRACK_expanded_boxes = [[] for i in range(len(TRACK_boxes))]

    new_OBJ_boxes = detect_individuals_from_image(image)

    #Take account of tracking boxes
    for TRACK_box_i in range(len(TRACK_boxes)):
        b_box = TRACK_boxes[TRACK_box_i]
        TRACK_expanded_boxes[TRACK_box_i].append((int(b_box[0]),int(b_box[1]),int(b_box[2]),int(b_box[3])))

    #Take account new object detection boxes
    for OBJ_box_i in range(len(new_OBJ_boxes)):
        new_OBJ_Center = get_center_of_box(new_OBJ_boxes[OBJ_box_i])

        distance = sys.maxsize
        trackerIndex = -1

        for TRACK_box_i in range(len(TRACK_boxes)):
            curr_TRACK_Center = get_center_of_box(TRACK_boxes[TRACK_box_i])

            curr_distance = math.dist(curr_TRACK_Center, new_OBJ_Center)

            if curr_distance < distance:
                distance = curr_distance
                trackerIndex = TRACK_box_i

        b_box = new_OBJ_boxes[OBJ_box_i]
        TRACK_expanded_boxes[trackerIndex].append((int(b_box[0]),int(b_box[1]),int(b_box[2]),int(b_box[3])))

    return TRACK_expanded_boxes

def get_center_of_box(boundingBox):

    boundingBox_Xcenter = int((boundingBox[0] + (boundingBox[0] + boundingBox[2])) / 2)
    boundingBox_Ycenter = int((boundingBox[1] + (boundingBox[1] + boundingBox[3])) / 2)

    return(boundingBox_Xcenter, boundingBox_Ycenter)

## Estimate Human Poses

In [11]:
def seperate_person(person_BB, image, finalImage):

    POSE_IMG = np.zeros(finalImage.shape, dtype=np.uint8)

    for curr_BB in person_BB:

        tl = curr_BB[0]
        tr = curr_BB[0]+curr_BB[2]
        bl = curr_BB[1]
        br = curr_BB[1]+curr_BB[3]

        #Seperate each product/contour into a new image by cropping the image to the bounding box of the product
        POSE_IMG[bl:br, tl:tr] = image[bl:br, tl:tr]

    return POSE_IMG

def get_human_pose_from_img(POSE_img_box, FINAL_img):

    IMG_HEIGHT = POSE_img_box.shape[0]
    IMG_WIDTH = POSE_img_box.shape[1]

    #Resize for prediction
    BLOB_HEIGHT=368
    BLOB_WIDTH=int((BLOB_HEIGHT/IMG_HEIGHT)*IMG_WIDTH)

    # Use the given image as input, which needs to be blob(s).
    imgBlob = cv2.dnn.blobFromImage(POSE_img_box, 1.0/255, (BLOB_WIDTH, BLOB_HEIGHT), (0,0,0), swapRB=True, crop=False)
    POSE_net.setInput(imgBlob)

    # Runs a forward pass to compute the POSE_MODEL output
    out = POSE_net.forward()
    # MobilePOSE_MODEL output [1, 57, -1, -1], we only need the first 19 elements
    out = out[:, :19, :, :]

    assert(len(POSE_body_parts) == out.shape[1])

    points = []
    for i in range(len(POSE_body_parts)):
        # Slice heatmap of corresponding body's part.
        heatMap = out[0, i, :, :]

        # Originally, we try to find all the local maximums. To simplify a sample
        # we just find a global one. However only a single pose at the same time
        # could be detected this way.
        _, conf, _, point = cv2.minMaxLoc(heatMap)
        x = (IMG_WIDTH * point[0]) / out.shape[3]
        y = (IMG_HEIGHT * point[1]) / out.shape[2]
        # Add a point if it's confidence is higher than THRESHOLD.
        points.append((int(x), int(y)) if conf > POSE_threshold else None)

    for pair in POSE_part_pairs:
        partFrom = pair[0]
        partTo = pair[1]
        assert(partFrom in POSE_body_parts)
        assert(partTo in POSE_body_parts)

        idFrom = POSE_body_parts[partFrom]
        idTo = POSE_body_parts[partTo]

        if points[idFrom] and points[idTo]:
            cv2.line(FINAL_img, points[idFrom], points[idTo], (0, 255, 0), 3)
            cv2.ellipse(FINAL_img, points[idFrom], (3, 3), 0, 0, 360, (0, 0, 255), cv2.FILLED)
            cv2.ellipse(FINAL_img, points[idTo], (3, 3), 0, 0, 360, (0, 0, 255), cv2.FILLED)

    return points

## Predict Violent Behaviour

In [12]:
def PredictViolentBehaviour(person_id, pred_keypoints, image_shape, FINAL_img):

    TRACK_boxes = TRACK_list.getObjects()

    curr_action_keys = np.array(pred_keypoints[person_id][-3:])

    curr_action_keys = augment_data(curr_action_keys)

    curr_action_keys = FormatData(curr_action_keys, image_shape)

    #Predict using curr_action_keys
    predictions = loaded_model.predict(curr_action_keys)

    pred_class = (predictions > 0.5).astype(int)

    print(predictions)

    if pred_class[0] == 1:
        pred_label = "Violent"
        pred_color = (0,0,255)
    else:
        pred_label = "Non-Violent"
        pred_color = (0,255,0)

    x = int(TRACK_boxes[person_id][0])
    y = int(TRACK_boxes[person_id][1])
    w = int(TRACK_boxes[person_id][2])
    h = int(TRACK_boxes[person_id][3])

    cv2.rectangle(FINAL_img, (x,y), (x+w,y+h), pred_color, 2)
    cv2.putText(FINAL_img, pred_label, (x+10,y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, pred_color, 2)


## Prepare Data

In [13]:
def augment_data(coord_loc_list):

    try:
        for frame_keypoints in range(len(coord_loc_list)):
            for spec_keypoint in range(len(coord_loc_list[frame_keypoints])):
                if coord_loc_list[frame_keypoints][spec_keypoint] == None:
                    coord_loc_list[frame_keypoints][spec_keypoint] = (0,0)
    except:
        print("no Nones")
    return coord_loc_list

##

In [14]:
def FormatData(coord_loc_list, frame_shape):

    predict_list = []
    list_output_keypoints = []

    for joint in range(19):
        for frame in range(3):
            for coord_count in range(2):
                list_output_keypoints.append(coord_loc_list[frame][joint][coord_count] / frame_shape[coord_count])

    predict_list.append(list_output_keypoints)

    predict_list = np.array(predict_list)

    predict_list = np.reshape(predict_list, (predict_list.shape[0],19,3,2)).swapaxes(2,3).swapaxes(1,3)
    predict_list = np.reshape(predict_list,(predict_list.shape[0],3,38), order="F")

    return predict_list

## Main Pipeline

In [15]:
def DataPrediction(VIDEO_LOC):

    frameCount = 0
    OBJ_boxes = []
    poseLoc = []

    #Initialize the video stream
    MEDIA_RAW = cv2.VideoCapture(VIDEO_LOC)

    #Loop over the frames from the video stream
    while cv2.waitKey(1) < 0:

        #Grab the frame from the threaded video stream
        hasFrame, image = MEDIA_RAW.read()

        if not hasFrame:
            cv2.waitKey()
            cv2.destroyAllWindows()
            break

        finalImage = image.copy()
        frameCount += 1

        #Only Detect and Track People in the first frame
        if frameCount == 1:
            #Detect People in Image and return their Boxes Position
            OBJ_boxes = detect_individuals_from_image(image)
            #Pass Bounding boxes to Tracker to start tracking
            start_tracking_from_boxes(OBJ_boxes, image)
            #Prepare Pose Save Locations
            poseLoc = [[] for i in range(len(OBJ_boxes))]

        #Sync trackers with new frame
        track_using_trackers(image, finalImage)

        #Expand tracker bounding box with more accurate bounding boxes
        POSE_EST_LOC = expand_tracking_box(image, finalImage)

        #Apply Pose Estimation in Bounding Boxes of every person
        for person_BBs_i in range(len(POSE_EST_LOC)):

            #Create Image focusing on a single person
            POSE_IMG = seperate_person(POSE_EST_LOC[person_BBs_i], image, finalImage)

            #Estimate Pose on a single person
            poseLoc[person_BBs_i].append(get_human_pose_from_img(POSE_IMG, finalImage))

            #Predict
            if(frameCount >= 3):
                PredictViolentBehaviour(person_BBs_i, poseLoc, image.shape, finalImage)

        #Resize for better output
        finalImage = cv2.resize(finalImage, (int((700/finalImage.shape[0])*finalImage.shape[1]),700))

        #show the output frame
        cv2.imshow(VIDEO_LOC, finalImage)

    #Stop any videos
    MEDIA_RAW.release()

    #Close all windows
    cv2.destroyAllWindows()

## Main Method

In [16]:
DataPrediction(PREDICTION_VIDEO)

  curr_action_keys = np.array(pred_keypoints[person_id][-3:])


[[0.00012809]]
[[0.07290388]]
[[5.8280725e-10]]
[[6.3479854e-07]]
[[5.106786e-05]]
[[0.09767009]]
[[3.789538e-09]]
[[1.07027e-06]]
[[2.1542126e-05]]
[[0.0872013]]
[[9.881865e-07]]
[[3.4457803e-06]]
[[2.1460046e-05]]
[[0.10563692]]
[[2.140959e-06]]
[[3.4457803e-06]]
[[1.6413715e-05]]
[[0.13007183]]
[[6.0296284e-09]]
[[3.4457803e-06]]
[[4.6171317e-06]]
[[0.23393686]]
[[1.2830336e-08]]
[[3.4457803e-06]]
[[1.5088613e-06]]
[[0.25772834]]
[[3.7061265e-09]]
[[3.4457803e-06]]
[[2.3562138e-06]]
[[0.21926585]]
[[1.7098493e-08]]
[[3.4457803e-06]]
[[2.440804e-06]]
[[0.23778042]]
[[2.2146223e-08]]
[[8.422012e-06]]
[[1.31179595e-05]]
[[0.03861733]]
[[1.3040042e-05]]
[[0.00013723]]
[[1.385648e-05]]
[[0.02073218]]
[[0.00506704]]
[[1.0412142e-05]]
[[1.0166971e-05]]
no Nones
[[0.0238553]]
[[0.00699983]]
[[8.1513383e-07]]
[[8.2756305e-05]]
no Nones
[[0.04209863]]
[[1.1206783e-05]]
[[8.7040735e-07]]
[[0.00547374]]
no Nones
[[0.07117853]]
[[1.5326076e-07]]
[[1.4004335e-06]]
[[0.03406615]]
no Nones
[[0.0530