In [167]:
from ultralytics import YOLO
import cv2
import torch
import os
import numpy as np
import math
from collections import deque

Importing dependecies and paths

In [168]:
DATA_path = r'D:\Datasets\fight'
actions = ['fighting','not_fighting']
fighting_path = r'D:\Datasets\fight\fighting'
not_fighting_path = r'D:\Datasets\fight\not_fighting'

Main functions to process and extract keypoints

In [169]:

def calc_distances(hands_dict,body_kp,head_kp):

    # creating a dictionary of distances between each keypoint (except of the same object) in the keypoint_dict
    dist_dict = {}
    keyhead = head_kp.keys()
    keysh = hands_dict.keys()
    keysb = body_kp.keys()

    # calculating distances between keypoints on each hand: left to left, right to left, left to right and right to right
    for i,keyi in enumerate(keysh,start =1):
        for j,keyj in enumerate(keysh,start =1):
            if j>=i:
                break
            distll = calc_euclid_dist(hands_dict[keyi][0],hands_dict[keyj][0])
            distlr = calc_euclid_dist(hands_dict[keyi][0],hands_dict[keyj][1])
            distrl = calc_euclid_dist(hands_dict[keyi][1],hands_dict[keyj][0])
            distrr = calc_euclid_dist(hands_dict[keyi][1],hands_dict[keyj][1])
            dist_dict[f'{keyi}'+f'{keyj}'] = list([distll,distlr,distrl,distrr])
 
    # calculating distances between hands and bodies
    for i,keyi in enumerate(keysh,start =1):
        for j,keyj in enumerate(keysb,start =1):
            if j>=i:
                break

            distlb = calc_euclid_dist(hands_dict[keyi][0],body_kp[keyj])
            
            distrb = calc_euclid_dist(body_kp[keyj],hands_dict[keyi][1])

            dist_dict[f'{keyi}'+f'{keyj}'].append(distlb)
            dist_dict[f'{keyi}'+f'{keyj}'].append(distrb)
                
    for i,keyi in enumerate(keysb,start =1):
        for j,keyj in enumerate(keysh,start =1):
            if j>=i:
                break

            distlb = calc_euclid_dist(hands_dict[keyj][0],body_kp[keyi])
            
            distrb = calc_euclid_dist(body_kp[keyi],hands_dict[keyj][1])
            

            dist_dict[f'{keyi}'+f'{keyj}'].append(distlb)
            dist_dict[f'{keyi}'+f'{keyj}'].append(distrb)


    # calculating distances between hands and heads
    for i,keyi in enumerate(keysh,start =1):
        for j,keyj in enumerate(keyhead,start =1):
            if j>=i:
                break

            distlh = calc_euclid_dist(hands_dict[keyi][0],head_kp[keyj])
            
            distrh = calc_euclid_dist(head_kp[keyj],hands_dict[keyi][1])
            
            dist_dict[f'{keyi}'+f'{keyj}'].append(distlh)
            dist_dict[f'{keyi}'+f'{keyj}'].append(distrh)



    for i,keyi in enumerate(keyhead,start =1):
        for j,keyj in enumerate(keysh,start =1):
            if j>=i:
                break

            distlh = calc_euclid_dist(hands_dict[keyj][0],head_kp[keyi])
            
            distrh = calc_euclid_dist(head_kp[keyi],hands_dict[keyj][1])
            
 
            dist_dict[f'{keyi}'+f'{keyj}'].append(distlh)
            dist_dict[f'{keyi}'+f'{keyj}'].append(distrh)



    # calculating distances between bodies
    for i,keyi in enumerate(keysb,start =1):
        for j,keyj in enumerate(keysb,start =1):
            if j>=i:
                break

            distbb = calc_euclid_dist(body_kp[keyi],body_kp[keyj])

            dist_dict[f'{keyi}'+f'{keyj}'].append(distbb)


    # calculating distances between heads
    for i,keyi in enumerate(keyhead,start =1):
        for j,keyj in enumerate(keyhead,start =1):
            if j>=i:
                break

            disthh = calc_euclid_dist(head_kp[keyi],head_kp[keyj])

            dist_dict[f'{keyi}'+f'{keyj}'].append(disthh)
    
    return dist_dict


def extract_hands_keypoints(results, threshold_class, threshold_keypoint):
    # creating a dictionary to collect keypoints to each object id as dictionary key
    existing_kp = {}
    for result,i_d in zip(results[0],results[0].boxes.id):
        # There results for bounding boxes, and confidence scores for general detect
        x1, y1, x2, y2,_, conf_for_detect, class_id_detected = (result.boxes.data.tolist())[0]
        # If the confidence score for general detect is lower than threshold, skip
        if conf_for_detect < threshold_class:
            continue
        # keypoints
        keys = (result.keypoints.data.tolist())[0]
        xl_key, yl_key, confl = keys[9]
        if confl > threshold_keypoint:
           l = [int(xl_key),int(yl_key)]
        else:
            l = []
        xr_key, yr_key, confr = keys[10]
        if confr > threshold_keypoint:
           r = [int(xr_key),int(yr_key)]
        else:
            r = []
        hands_coords = list([l,r])
        # Adding existing hand keypoints of an object in a frame to the dictionary   
        existing_kp[int(i_d)] = hands_coords
    return existing_kp

def extract_body_keypoints(results,threshold_class, threshold_keypoint):
    # creating a dictionary to collect keypoints to each object id as dictionary key
    existing_kp = {}
    for result,i_d in zip(results[0],results[0].boxes.id):
        # There results for bounding boxes, and confidence scores for general detect
        x1, y1, x2, y2,_, conf_for_detect, class_id_detected = (result.boxes.data.tolist())[0]
        # If the confidence score for general detect is lower than threshold, skip
        if conf_for_detect < threshold_class:
            continue
        # keypoints
        keys = (result.keypoints.data.tolist())[0]
        xl_key, yl_key, confl = keys[5]
        xr_key, yr_key, confr = keys[6]
        if (confl>threshold_keypoint) and (confr>threshold_keypoint):
            # Adding existing hand keypoints of an object in a frame to the dictionary   
            mid_point  = list([int((xr_key+xl_key)/2),int((yl_key+yr_key)/2)])
            
        else:
            mid_point = []

        existing_kp[int(i_d)] = mid_point

    return existing_kp

def extract_head_keypoints(results,threshold_class, threshold_keypoint):
    # creating a dictionary to collect keypoints to each object id as dictionary key
    existing_kp = {}
    for result,i_d in zip(results[0],results[0].boxes.id):
        # There results for bounding boxes, and confidence scores for general detect
        x1, y1, x2, y2,_, conf_for_detect, class_id_detected = (result.boxes.data.tolist())[0]
        # If the confidence score for general detect is lower than threshold, skip
        if conf_for_detect < threshold_class:
            continue
        # keypoints
        keys = (result.keypoints.data.tolist())[0]
        xh_key, yh_key, confh = keys[0]
        if confh>threshold_keypoint:
            # Adding existing hand keypoints of an object in a frame to the dictionary   
            mid_point  = list([int(xh_key),int(yh_key)])
        else:
            mid_point = []
        existing_kp[int(i_d)] = mid_point
    return existing_kp


def extract_keypoints(results, threshold_class):
    existing_kp = {}
    for result,i_d in zip(results[0],results[0].boxes.id):
        # There results for bounding boxes, and confidence scores for general detect
        x1, y1, x2, y2,_, conf_for_detect, class_id_detected = (result.boxes.data.tolist())[0]
        # If the confidence score for general detect is lower than threshold, skip
        if conf_for_detect < threshold_class:
            continue
        # keypoints
        keys = (result.keypoints.data.tolist())[0]
        keyp_arr = list()
        for key in keys:
            keyp_arr.append(key)
        # Adding existing hand keypoints of an object in a frame to the dictionary   
        existing_kp[int(i_d)] = keyp_arr
    return existing_kp

def calc_kp_to_kp_dist(keypoints_dict):
    # creating a dictionary of distances between each keypoint (except of the same object) in the keypoint_dict
    dist_dict = {}
    keys = keypoints_dict.keys()
    # calculating distances between keypoints 
    for l,keyi in enumerate(keys,start =1):
        for m,keyj in enumerate(keys,start =1):
            if m>=l:
                break  
            for i,p1 in enumerate(keypoints_dict[keyi]):
                for j,p2 in enumerate(keypoints_dict[keyj]):
                    dist = calc_euclid_dist(p1,p2)
                    dist_dict[f'{keyi}'+f'{keyj}'+f'{i}'+f'{j}'] = dist
    return dist_dict

def calc_euclid_dist(p1,p2):
    if (len(p1)>0) and (len(p2)>0):
        dist = int(math.sqrt((p1[0]-p2[0])*(p1[0]-p2[0]) + (p1[1]-p2[1])*(p1[1]-p2[1])))
        return dist
    else: 
        return np.nan
    
def calc_grad(dist_dict):
    return

Initializing dictionaries and parametres

In [170]:
text2 = "No suspicious activity"
text1 = "Suspicious activity"
text3 = "No people in sight"
color2 = (100, 200, 0)
color1 = (100, 0, 200)
color3 = (100, 100, 100)
font_scale = 1.6
thickness = 2

winsize = 40
all_keypoints = {}
distance_dict = {}
average_dist = {}
grad_dict = {}
outputs = [0,1]
nums_sequences = 0

Initializing YOLOv8 pose model and caption from file

In [5]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
modely = YOLO('yolov8l-pose.pt')  # load a pretrained YOLOv8n classification model
modely.to(device)
video_path = r"D:\videos\fight4.mp4"
vid_name = 'v4'
cap = cv2.VideoCapture(video_path)
# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fps = cap.get(cv2.CAP_PROP_FPS) # or number
# Create a VideoWriter object to save the output video
output_video_path = r"D:\videos_processed\fight4_processed.mp4"
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

Data coleccting cycle. Each (winsize) frames you will have to press 'f' key if fight was on those frames and any other key if not

In [6]:
while cap.isOpened():
# Read a frame from the video
    success, frame = cap.read()
    if success:

        results = modely.track(frame, persist=True, retina_masks=True, boxes=True, show_conf=False, line_width=1,  conf=0.8, iou=0.5,  classes=0, show_labels=False, device=device,verbose = False,tracker="bytetrack.yaml")
        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy().astype(int)
            ids = results[0].boxes.id.cpu().numpy().astype(int)
            for box, i_d in zip(boxes, ids):
                x1, y1, x2, y2 = box[0], box[1], box[2], box[3]



                # Draw bounding box
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

                # Draw customer id on the frame above the bounding box
                text = f"{i_d}"
                font = cv2.FONT_HERSHEY_SIMPLEX
                font_scale = 0.6
                font_thickness = 1
                text_size = cv2.getTextSize(text, font, font_scale, font_thickness)[0]

                # Calculate the position to align the label with the top of the bounding box
                text_x = x1 + (x2 - x1 - text_size[0]) // 2
                text_y = y1 - 10  # Adjust this value for the desired vertical offset

                # Make sure the text_y position is within the frame's bounds
                if text_y < 0:
                    text_y = 0

                # Draw the label background rectangle
                cv2.rectangle(frame, (text_x - 5, text_y - text_size[1] - 5), (text_x + text_size[0] + 5, text_y + 5), (0, 0, 0), -1)

                #Draw the customer id text
                cv2.putText(
                    frame,
                    text,
                    (text_x, text_y),
                    font,
                    font_scale,
                    (255, 255, 255),  # White color
                    font_thickness,
                    lineType=cv2.LINE_AA
                )
    
            #extracting keypoints
            body_kp = extract_body_keypoints(results = results,threshold_class=0.4,threshold_keypoint=0.4)
            hands_kp = extract_hands_keypoints(results = results,threshold_class=0.4,threshold_keypoint=0.4)
            head_kp = extract_head_keypoints(results = results,threshold_class=0.4,threshold_keypoint=0.4)
            #calculating distances between keypoints

            dd = calc_distances(hands_kp,body_kp,head_kp)
            #appending distances dictionary and evaluating average distance and classification based on it
            for key in dd.keys():

                if key not in distance_dict.keys():
                    distance_dict[key] = deque(maxlen=40)

                distance_dict[key].append(dd[key])
                
                if len(distance_dict[key]) == winsize:
                    nums_sequences = nums_sequences + 1
                    print(f'Processing pair {key}.')
                    keypoints = np.array(distance_dict[key])
                    if cv2.waitKey(-1) & 0xFF == ord('f'):
                        if cv2.waitKey(-1) & 0xFF == ord('f'):
                            save_path = fighting_path   + f'\{vid_name}' +  f'{nums_sequences}'
                        else:
                            save_path = not_fighting_path  + f'\{vid_name}' +  f'{nums_sequences}'
                    else:
                        distance_dict[key].clear()
                        continue
                    np.save(save_path,keypoints)
                    distance_dict[key].clear()
            

        annotated_frame_show = cv2.resize(frame, (1080, 720))
        cv2.imshow("YOLOv8 Inference", annotated_frame_show)
        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
            
        
    else:
        # Break the loop if the end of the video is reached
        break

cap.release()
cv2.destroyAllWindows()



Processing pair 42.
Processing pair 41.
Processing pair 12.
Processing pair 92.
Processing pair 21.
Processing pair 12.
Processing pair 12.
Processing pair 228.
Processing pair 3528.
Processing pair 3628.
Processing pair 3928.
Processing pair 3936.
Processing pair 3628.
Processing pair 3628.
Processing pair 3628.
Processing pair 4436.
Processing pair 4536.
Processing pair 2836.
Processing pair 4636.
Processing pair 4836.
Processing pair 4936.
Processing pair 4948.
Processing pair 4836.
Processing pair 4936.
Processing pair 4948.
Processing pair 5036.
Processing pair 5048.
Processing pair 5049.
Processing pair 4836.
Processing pair 4936.
Processing pair 4948.
Processing pair 5036.
Processing pair 5048.
Processing pair 5049.
Processing pair 4836.
Processing pair 4936.
Processing pair 4948.
Processing pair 4836.
Processing pair 5036.
Processing pair 5048.
Processing pair 4836.
Processing pair 4836.
Processing pair 5848.
Processing pair 5948.
Processing pair 5958.
Processing pair 5948.
Pro

Preprocessing data

In [171]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [172]:
from math import sqrt

In [305]:
label_map = {label: num for num, label in enumerate(actions)}
sequences,labels = [],[]
for action in actions:
    for file in os.listdir(os.path.join(DATA_path,action)):
        sequences.append(np.load(os.path.join(DATA_path,action,file),allow_pickle=True))
        labels.append(label_map[action])


seq_data = np.array(sequences)
data_shape = seq_data.shape
transposed_data = np.zeros((data_shape[0],data_shape[2],data_shape[1]))


for i,row in enumerate(seq_data):
    for j,line in enumerate(row):
        nums = 0
        num_num = 0
        deviation = 0
        for num in line:
            if np.isnan(num) == False:        
                nums = nums + num
                num_num = num_num + 1

        if num_num == 0:
            line = np.nan_to_num(x = line,copy= False,nan = 0)
            continue

        mean = nums/num_num

        for num in line:
            if np.isnan(num) == False:        
                deviation = deviation + (num - mean)*(num-mean)

        std_dev = sqrt(deviation/num_num)

        for k,num in enumerate(line):
            if np.isnan(num) == False:  
                seq_data[i][j][k]= (num - mean)/std_dev

        line = np.nan_to_num(x = line,copy= False,nan = 0)
        

for i,sample in enumerate(seq_data):
    sample = sample.transpose()
    transposed_data[i] = sample
   
seq_labels = to_categorical(labels).astype(int)
X_train, X_test, y_train, y_test = train_test_split(transposed_data,seq_labels,test_size=0.1) 

  seq_data[i][j][k]= (num - mean)/std_dev


Transfering data to pytorch compatible type

In [306]:
class Keypoint_sequence_dataset(torch.utils.data.Dataset):

    def __init__(self, x, y):
        self.x = torch.from_numpy(x)
        self.y = torch.from_numpy(y)

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return self.x[idx, :], self.y[idx]
        
    


Building neural network and train

In [307]:
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from torchvision import datasets, models, transforms
import torch.optim as optim
import torch.nn.functional as F
from sklearn.metrics import accuracy_score
from torch.utils.tensorboard import SummaryWriter

device  = ("cuda")

In [406]:
class Seq_conv_net(nn.Module):
    def __init__(self):
        super().__init__()
        
        # размер исходной картинки 32х32

        # conv 1
        self.conv1 = nn.Conv1d(in_channels=14, out_channels=32, kernel_size= 10,padding= 'same') 
        
        #bn 1
        self.bn1 = nn.BatchNorm1d(32)
        self.dp1 = nn.Dropout1d(0.3)
        # conv 1
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size= 10,padding= 'same') 
        
        #bn 1
        self.bn2 = nn.BatchNorm1d(64)
        self.dp2 = nn.Dropout1d(0.3)
        # conv 1
        self.conv3 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size= 20,padding= 'same') 

        #bn 1
        self.bn3 = nn.BatchNorm1d(128)
        self.dp3 = nn.Dropout1d(0.3)

        self.lstm1 = nn.LSTM(128,128,5,batch_first=True)
        self.dp4 = nn.Dropout1d(0.3)
        #self.lstm3 = nn.LSTM(64,128,1,batch_first=True)


        self.flatten = nn.Flatten()
        
        # linear 1
        self.fc1 = nn.Linear(128*40, 1024)
        # bnl1
        self.bnl1 = nn.BatchNorm1d(1024)
        # dp1
        self.dp5 = nn.Dropout1d(0.3)
        # linear 2
        self.fc2 = nn.Linear(1024, 128)
        # bnl2
        self.bnl2 = nn.BatchNorm1d(128)
        # dp2
        self.dp6 = nn.Dropout1d(0.3)
        # linear 3
        self.fc3 = nn.Linear(128, 2)

    
    def forward(self, x):
        # forward pass сети

        x = F.gelu(self.bn1(self.conv1(x)))
        x = self.dp1(x)
        x = F.gelu(self.bn2(self.conv2(x)))
        x = self.dp2(x)
        x = F.gelu(self.bn3(self.conv3(x)))
        x = self.dp3(x)

        x = torch.transpose(x,1,2)

        x,_ = self.lstm1(x)
        x = self.dp4(x)
        #x = F.gelu(self.bn1(self.lstm1(x)))
        #x = F.gelu(self.bn2(self.lstm2(x)))
        #x = F.gelu(self.bn3(self.lstm3(x)))

        x = self.flatten(x)

        x = F.gelu(self.bnl1(self.fc1(x)))
        x = self.dp5(x)
        x = F.gelu(self.bnl2(self.fc2(x)))
        x = self.dp6(x)
        x = self.fc3(x)
        x = F.softmax(x,dim  = 1)
        
        return x



In [407]:
def evaluate(model, dataloader, loss_fn):
    
    losses = []

    num_correct = 0
    num_elements = len(dataloader.dataset)

    for i, batch in enumerate(dataloader):
        
        # так получаем текущий батч
        X_batch, y_batch = batch
        
        with torch.no_grad():
            logits = model(X_batch.to(device,dtype = torch.float))
            
            loss = loss_fn(logits, y_batch.to(device,dtype = torch.float))
            
            losses.append(loss.item())
            
            y_pred = torch.argmax(logits, dim=1).cpu()
            
            y_answers = torch.argmax(y_batch, dim=1).cpu()

            num_correct += torch.sum(y_answers == y_pred)
    accuracy = num_correct / num_elements   
    return accuracy, np.mean(losses)

def train(model, loss_fn, optimizer,train_loader,val_loader, n_epoch=3):

    num_iter = 0

    # цикл обучения сети
    for epoch in range(n_epoch):

        

        model.train(True)
        for i, batch in enumerate(train_loader):
            # так получаем текущий батч
            X_batch, y_batch = batch 
            
            # forward pass (получение ответов на батч картинок)
            logits = model(X_batch.to(device,dtype = torch.float)) 
            
            # вычисление лосса от выданных сетью ответов и правильных ответов на батч
            loss = loss_fn(logits, y_batch.to(device,dtype = torch.float)) 
            
            
            loss.backward() # backpropagation (вычисление градиентов)
            optimizer.step() # обновление весов сети
            optimizer.zero_grad() # обнуляем веса

            num_iter += 1


        # после каждой эпохи получаем метрику качества на валидационной выборке
        model.train(False)

        if epoch%10 ==0:
            print("Epoch:", epoch)
            val_accuracy, val_loss = evaluate(model, val_loader, loss_fn=loss_fn)
            train_accuracy, train_loss = evaluate(model, train_loader, loss_fn=loss_fn)
            #print('Loss/train', train_loss.item(), epoch)
            print(f'Accuracy/train{train_accuracy.item():0.6f}')
            #print('Loss/val', val_loss.item(), epoch)
            print(f'Accuracy/val{val_accuracy.item():0.6f}')
        
    return model

In [408]:
train_data = Keypoint_sequence_dataset(X_train,y_train)
test_data = Keypoint_sequence_dataset(X_test,y_test)

train_size = int(len(train_data) * 0.9)

val_size = len(train_data) - train_size

train_data, val_data = torch.utils.data.random_split(train_data, [train_size, val_size])

train_loader = torch.utils.data.DataLoader(train_data, batch_size=16, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_data, batch_size=16, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=16, shuffle=True)



    
conv_net = Seq_conv_net()
conv_net = conv_net.to(device)


loss_fn = torch.nn.CrossEntropyLoss()

# выбираем алгоритм оптимизации и learning_rate
learning_rate = 1e-5
optimizer = torch.optim.Adam(conv_net.parameters(), lr=learning_rate)

conv_net = train(model=conv_net,loss_fn=loss_fn,optimizer=optimizer,train_loader=train_loader,val_loader=val_loader,n_epoch=300)

train_accuracy, _ = evaluate(conv_net, train_loader, loss_fn)
print('Train accuracy is', train_accuracy)

test_accuracy, _ = evaluate(conv_net, test_loader, loss_fn)
print('Test accuracy is', test_accuracy)

Epoch: 0
Accuracy/train0.795045
Accuracy/val0.800000
Epoch: 10
Accuracy/train0.795045
Accuracy/val0.800000
Epoch: 20
Accuracy/train0.795045
Accuracy/val0.800000
Epoch: 30
Accuracy/train0.795045
Accuracy/val0.800000
Epoch: 40
Accuracy/train0.797297
Accuracy/val0.780000
Epoch: 50
Accuracy/train0.808559
Accuracy/val0.760000
Epoch: 60
Accuracy/train0.810811
Accuracy/val0.760000
Epoch: 70
Accuracy/train0.815315
Accuracy/val0.800000
Epoch: 80
Accuracy/train0.824324
Accuracy/val0.800000
Epoch: 90
Accuracy/train0.824324
Accuracy/val0.760000
Epoch: 100
Accuracy/train0.824324
Accuracy/val0.800000
Epoch: 110
Accuracy/train0.824324
Accuracy/val0.780000
Epoch: 120
Accuracy/train0.819820
Accuracy/val0.800000
Epoch: 130
Accuracy/train0.831081
Accuracy/val0.780000
Epoch: 140
Accuracy/train0.851351
Accuracy/val0.760000
Epoch: 150
Accuracy/train0.842342
Accuracy/val0.800000
Epoch: 160
Accuracy/train0.842342
Accuracy/val0.780000
Epoch: 170
Accuracy/train0.828829
Accuracy/val0.800000
Epoch: 180
Accuracy/t

In [415]:
model_scripted = torch.jit.script(conv_net)
model_scripted.save('fight_detection.pt')

Testing on a real video

In [409]:
def preprocess_keypoints(keypoints):
    kepoint = np.array(keypoints)
    for i,line in enumerate(kepoint):
        nums = 0
        num_num = 0
        deviation = 0
        for num in line:
            if np.isnan(num) == False:        
                nums = nums + num
                num_num = num_num + 1

            if num_num == 0:
                line = np.nan_to_num(x = line,copy= False,nan = 0)
                continue

        mean = nums/num_num

        for num in line:
            if np.isnan(num) == False:        
                deviation = deviation + (num - mean)*(num-mean)

        std_dev = sqrt(deviation/num_num)

        for j,num in enumerate(line):
            if np.isnan(num) == False:  
                kepoint[i][j]= (num - mean)/std_dev

        line = np.nan_to_num(x = line,copy= False,nan = 0)
    return torch.Tensor([kepoint.T])



In [411]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
modely = YOLO('yolov8l-pose.pt')  # load a pretrained YOLOv8n classification model
modely.to(device)
video_path = r"D:\videos\hands3.mp4"
cap = cv2.VideoCapture(video_path)
# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fps = cap.get(cv2.CAP_PROP_FPS) # or number
# Create a VideoWriter object to save the output video
output_video_path = r"D:\videos_processed\fight1_processed.mp4"
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))


actions = ['fighting','not_fighting']

text2 = "No suspicious activity"
text1 = "Suspicious activity"
text3 = "No people in sight"
color_map = {'fighting': (200,100,0),'not_fighting': (0,100,200)}
font_scale = 1.6
thickness = 2

winsize = 40

distance_dict = {}


label_map = {num: label for num, label in enumerate(actions)}


In [412]:
ans = 'not_fighting'
while cap.isOpened():
# Read a frame from the video
    success, frame = cap.read()
    if success:

        results = modely.track(frame, persist=True, retina_masks=True, boxes=True, show_conf=False, line_width=1,  conf=0.6, iou=0.5,  classes=0, show_labels=False, device=device,verbose = False,tracker="bytetrack.yaml")




        if results[0].boxes.id is not None:
            
            #extracting keypoints
            body_kp = extract_body_keypoints(results = results,threshold_class=0.4,threshold_keypoint=0.4)
            hands_kp = extract_hands_keypoints(results = results,threshold_class=0.4,threshold_keypoint=0.4)
            head_kp = extract_head_keypoints(results = results,threshold_class=0.4,threshold_keypoint=0.4)
            #calculating distances between keypoints

            dd = calc_distances(hands_kp,body_kp,head_kp)
            #appending distances dictionary and evaluating average distance and classification based on it
            for key in dd.keys():

                if key not in distance_dict.keys():
                    distance_dict[key] = deque(maxlen=40)

                distance_dict[key].append(dd[key])
                
                if len(distance_dict[key]) == winsize:
                    nums_sequences = nums_sequences + 1
                    keypoints = preprocess_keypoints(distance_dict[key])
                    logits = conv_net(keypoints.to(device,dtype = torch.float))
                    prediction = int(torch.argmax(logits, dim=1).cpu())
                    print(logits)
                    ans = label_map[prediction]
                    distance_dict[key].clear()
                    if ans == 'fighting':
                        break
                    

            text_size, _ = cv2.getTextSize(ans, cv2.FONT_HERSHEY_SIMPLEX, font_scale, thickness)
            text_position = (frame_width - text_size[0] - 10, text_size[1] + 10)
            cv2.rectangle(frame, (text_position[0] - 5, text_position[1] - text_size[1] - 5),
                                    (text_position[0] + text_size[0] + 5, text_position[1] + 5), color=(0, 0, 0),
                                    thickness=cv2.FILLED)
            cv2.putText(frame, ans, text_position, cv2.FONT_HERSHEY_SIMPLEX, font_scale, color_map[ans], thickness, cv2.LINE_AA)


                    
            

        annotated_frame_show = cv2.resize(frame, (1080, 720))
        out.write(frame)
        cv2.imshow("YOLOv8 Inference", annotated_frame_show)
        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
            
        
    else:
        # Break the loop if the end of the video is reached
        break
out.release()
cap.release()
cv2.destroyAllWindows()



tensor([[0.1414, 0.8586]], device='cuda:0', grad_fn=<SoftmaxBackward0>)
tensor([[0.1488, 0.8512]], device='cuda:0', grad_fn=<SoftmaxBackward0>)
tensor([[0.1460, 0.8540]], device='cuda:0', grad_fn=<SoftmaxBackward0>)
tensor([[0.0917, 0.9083]], device='cuda:0', grad_fn=<SoftmaxBackward0>)
tensor([[0.1766, 0.8234]], device='cuda:0', grad_fn=<SoftmaxBackward0>)
tensor([[0.1853, 0.8147]], device='cuda:0', grad_fn=<SoftmaxBackward0>)
tensor([[0.1427, 0.8573]], device='cuda:0', grad_fn=<SoftmaxBackward0>)
tensor([[0.1096, 0.8904]], device='cuda:0', grad_fn=<SoftmaxBackward0>)
tensor([[0.1707, 0.8293]], device='cuda:0', grad_fn=<SoftmaxBackward0>)
tensor([[0.0872, 0.9128]], device='cuda:0', grad_fn=<SoftmaxBackward0>)
tensor([[0.0937, 0.9063]], device='cuda:0', grad_fn=<SoftmaxBackward0>)
tensor([[0.0897, 0.9103]], device='cuda:0', grad_fn=<SoftmaxBackward0>)
tensor([[0.3615, 0.6385]], device='cuda:0', grad_fn=<SoftmaxBackward0>)
tensor([[0.9319, 0.0681]], device='cuda:0', grad_fn=<SoftmaxBack