<a href="https://colab.research.google.com/github/caionms/csis-suspicious-behavior-close-to-vehicles/blob/main/YOLOv8_Suspicious_Behavior_Close_To_Vehicles.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **REAL-TIME POSE ESTIMATION WITH YOLOv8**
## Author: Caio Nery Matos Santos

Code based on: https://github.com/ultralytics/ultralytics

### **Sobre o projeto:**

O objetivo deste projeto é melhorar a segurança nos estacionamentos universitários, construindo um sistema capaz de reconhecer veículos e indivíduos, avaliando sua proximidade e detectando gestos e ações que são previamente classificados como comportamento suspeitos, e interoperar com sistemas de câmeras a fim de alertar as autoridades competentes.



### **Conexão com o Google Drive (Opcional)**

In [None]:
from google.colab import drive
drive.mount ("/content/drive")

Mounted at /content/drive


### **Execute para preparar o ambiente para a detecção**

#### **Instalando o YOLOv8 ...**

In [None]:
# Install the ultralytics package from PyPI
! pip install ultralytics

#### **Importando o YOLOv8 e outras bibliotecas ...**

In [None]:
import os
import time
import random
import math
import cv2
import numpy as np
from collections import defaultdict
from os import listdir
from os.path import exists, join, basename, splitext, isfile, isdir

from ultralytics import YOLO

# Carrega modelos
model = YOLO('yolov8n.pt')
model_kpts = YOLO('yolov8n-pose.pt')

Downloading https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-pose.pt to 'yolov8n-pose.pt'...
100%|██████████| 6.51M/6.51M [00:00<00:00, 72.0MB/s]


#### **Utils**

In [None]:
def bbox_iou_vehicle(box1, box2):
    # Extrai as coordenadas das caixas delimitadoras
    x1_box1, y1_box1, x2_box1, y2_box1 = box1
    x1_box2, y1_box2, x2_box2, y2_box2 = box2

    # Coordenadas da intersecção
    x1_intersection = max(x1_box1, x1_box2)
    y1_intersection = max(y1_box1, y1_box2)
    x2_intersection = min(x2_box1, x2_box2)
    y2_intersection = min(y2_box1, y2_box2)

    # Área da intersecção
    intersection_area = max(0, x2_intersection - x1_intersection + 1) * max(0, y2_intersection - y1_intersection + 1)

    # Áreas das caixas delimitadoras
    box1_area = (x2_box1 - x1_box1 + 1) * (y2_box1 - y1_box1 + 1)
    box2_area = (x2_box2 - x1_box2 + 1) * (y2_box2 - y1_box2 + 1)

    # União das áreas
    union_area = box1_area + box2_area - intersection_area

    # Cálculo do IoU
    iou = intersection_area / union_area

    return iou

In [None]:
def is_front(kpts):
  # Extrai as coordenadas x e y do nariz
  x_nariz, y_nariz = kpts[0][0], kpts[0][1]

  # Extrai as coordenadas x e y dos olhos
  x_olho_esquerdo, y_olho_esquerdo = kpts[1][0], kpts[1][1]
  x_olho_direito, y__olho_direito = kpts[2][0], kpts[2][1]

  # Ordena as coordenadas
  coord_x_olhos = sorted([x_olho_esquerdo, x_olho_direito])

  # Extrai as coordenadas x e y dos ouvidos
  x_ouvido_esquerdo, y_ouvido_esquerdo = kpts[3][0], kpts[3][1]
  x_ouvido_direito, y_ouvido_direito = kpts[4][0], kpts[4][1]

  # Ordena as coordenadas
  coord_x_ouvidos = sorted([x_ouvido_esquerdo, x_ouvido_direito])

  #Se o nariz não estiver entre os olhos ou entre os ouvidos, está de lado
  return (coord_x_olhos[0] <= x_nariz <= coord_x_olhos[1]) and (coord_x_ouvidos[0] <= x_nariz <= coord_x_ouvidos[1])

In [None]:
def three_points_angle(kpts, kpts_ind):
  # Extrai os pontos para definir o angulo (linha = k_1-k_2 | k_2-k_3)
  k_1, k_2, k_3 = kpts_ind[0], kpts_ind[1], kpts_ind[2]

  # Ponto 1
  x1_coord, y1_coord = kpts[k_1][0], kpts[k_1][1]
  # Ponto 2
  x2_coord, y2_coord = kpts[k_2][0], kpts[k_2][1]
  # Ponto 3
  x3_coord, y3_coord = kpts[k_3][0], kpts[k_3][1]

  # Calcula os vetores
  v1 = (x1_coord - x2_coord, y1_coord - y2_coord)
  v2 = (x3_coord - x2_coord, y3_coord - y2_coord)

  # Calcula o produto escalar
  dot_product = v1[0]*v2[0] + v1[1]*v2[1]

  # Calcula as magnitudes dos vetores
  v1_mag = math.sqrt(v1[0]**2 + v1[1]**2)
  v2_mag = math.sqrt(v2[0]**2 + v2[1]**2)

  # Calcula o angulo entre os vetores
  # Versão para lidar como erro math domain error
  angle = math.acos(min(max(dot_product / (v1_mag * v2_mag), -1), 1))

  # Converte o angulo de radianos para graus
  angle_degrees = math.degrees(angle)

  return angle_degrees

In [None]:
def is_squat(kpts):
  # Indices dos pontos-chave dos joelhos
  kpts_ind_joelho_dir_int = [12, 14, 16]
  kpts_ind_joelho_esq_int = [11, 13, 15]

  # Calcula os angulos internos dos joelhos
  angle_joelho_dir_int = three_points_angle(kpts, kpts_ind_joelho_dir_int)
  angle_joelho_esq_int = three_points_angle(kpts, kpts_ind_joelho_esq_int)

  # Calcula os angulos externos dos joelhos
  angle_joelho_dir_ext = 180 - angle_joelho_dir_int
  angle_joelho_esq_ext = 180 - angle_joelho_esq_int

  # Calcula a media dos angulos dos joelhos
  avg_leg_angle_ext = ((angle_joelho_esq_ext) + (angle_joelho_dir_ext)) // 2
  avg_leg_angle_int = ((angle_joelho_esq_int) + (angle_joelho_dir_int)) // 2

  # Condicao para definir agachamento (frontal e lateral)
  return (is_front(kpts) and angle_joelho_dir_int < 145 and angle_joelho_esq_int < 145 and avg_leg_angle_int < 130)
   or (angle_joelho_dir_ext > 60 and angle_joelho_esq_ext > 60 and avg_leg_angle_ext > 80)

In [None]:
def plot_detections(frame, persons, color=None, label=None, line_thickness=3):
  # Percorre todas as caixas de pessoas
  for person_box in persons:
    # Define espessura da linha
    tl = line_thickness or round(0.002 * (frame.shape[0] + frame.shape[1]) / 2) + 1

    # Define cores para as caixas
    color = color or [random.randint(0, 255) for _ in range(3)]

    # Extrai coordenadas da caixa
    c1, c2 = (int(person_box[0]), int(person_box[1])), (int(person_box[2]), int(person_box[3]))

    # Plota a caixa delimitadora
    cv2.rectangle(frame, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)

    # Se existir um label, plota ele
    if label:
      # Define espessura da fonte
      tf = max(tl - 1, 1)

      # Extrai o tamanho do texto
      t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]

      # Define nova coordenada para o retangulo do texto
      c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3

      # Plota o retangulo do texto
      cv2.rectangle(frame, c1, c2, color, -1, cv2.LINE_AA)

      # Plota o texto definido
      cv2.putText(frame, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)


In [None]:
def plot_skeleton_kpts(frame, kpts, kpts_conf, color=None, orig_shape=None):
    # Define cores para pontos-chave
    palette = np.array([[255, 128, 0], [255, 153, 51], [255, 178, 102],
                        [230, 230, 0], [255, 153, 255], [153, 204, 255],
                        [255, 102, 255], [255, 51, 255], [102, 178, 255],
                        [51, 153, 255], [255, 153, 153], [255, 102, 102],
                        [255, 51, 51], [153, 255, 153], [102, 255, 102],
                        [51, 255, 51], [0, 255, 0], [0, 0, 255], [255, 0, 0],
                        [255, 255, 255]])

    # Conexoes entre pontos-chave
    skeleton = [
                #Rosto
                [1, 2], [1, 3], [2, 3], [2, 4], [3, 5], [4, 6], [5, 7],
                #Braços
                [6, 7], [6, 8], [7, 9],  [8, 10], [9, 11],
                #Tronco
                [7, 13], [6, 12], [12, 13],
                #Cintura e pernas
                [14, 12], [15, 13], [16, 14], [17, 15]
      ]

    # Cores base para as linhas (seguindo a ordem de skeleton)
    pose_limb_color = palette[[16, 16, 16, 16, 16, 16, 16, 0, 0, 0, 0, 0, 7, 7, 7, 9, 9, 9, 9]]

    # Cores base para keypoints (seguindo a ordem definida)
    pose_kpt_color = palette[[16, 16, 16, 16, 16, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 9, 9]]

    # Define raio
    radius = 5

    # Recebe cores caso tenham sido passadas
    r, g, b = None
    if color is not None:
      r, g, b = color[0], color[1], color[2]

    # Percorre cada ponto-chave
    for kid in range(len(kpts)):
      # Extrai as coordenadas desse ponto-chave
      x_coord, y_coord = kpts[kid]

      # Verifica se as coordenadas estão no limite proporcional e sua confiança
      if x_coord % 640 != 0 and y_coord % 640 != 0 and kpts_conf[kid] >= 0.1:
        # Plota o circulo desse ponto chave
        cv2.circle(frame, (int(x_coord), int(y_coord)), radius, (int(r), int(g), int(b)), -1)

    # Percorre cada conexao do esqueleto
    for sk_id, sk in enumerate(skeleton):
      # Extrai as coordenadas e confiança do primeiro ponto
      x,y = kpts[sk[0]-1]
      conf1 = kpts_conf[sk[0]-1]
      pos1 = (int(x), int(y))

      # Extrai as coordenadas e confiança do segundo ponto
      x,y = kpts[sk[1]-1]
      conf2 = kpts_conf[sk[1]-1]
      pos2 =(int(x), int(y))

      # Verifica se as coordenadas estão no limite proporcional e sua confiança
      if (conf1 >= 0.1 and conf2 >= 0.1 and
          0 < pos1[0] % 640 < 640 and 0 < pos1[1] % 640 < 640 and
          0 < pos2[0] % 640 < 640 and 0 < pos2[1] % 640 < 640):
        # Plota a linha dessa conexao
        cv2.line(frame, pos1, pos2, (int(r), int(g), int(b)), thickness=2)

In [None]:
def plot_keypoints_detection(frame, kpts, kpts_conf, box, estado, id, frames_passados, orig_shape=None):
  # Cria um texto para plot com o id da pessoa
  label = str(id)

  # Adiciona no texto o estado em que a pessoa se encontra e define a cor correspondente
  if(estado == 0):
    label = label + ": Em pe proximo a um veiculo"
    color = [0, 215, 255]
  elif(estado == 1):
    label = label + ": Agachado(a) proximo a um veiculo"
    color = [0, 95, 255]
  else:
    label = label + ": Suspeito(a)"
    color = [0, 0, 255]

  # Adiciona os frames de deteccao da pessoa no texto
  label = label + "(" + str(frames_passados) + "f)"

  # Chama o metodo de plot do esqueleto usando as cores definidas
  plot_skeleton_kpts(frame, kpts, kpts_conf, color, orig_shape=None)

  # Extrai os tons
  r,g,b = color[:]

  # Extrai as coordenadas da caixa
  x1 = int(box[0].item())
  y1 = int(box[1].item())
  x2 = int(box[2].item())
  y2 = int(box[3].item())

  # Define espessura da linha
  tl = round(0.002 * (frame.shape[0] + frame.shape[1]) / 2) + 1

  # Define espessura da fonte
  tf = max(tl - 1, 1)

  # Extrai o tamanho do texto
  t_size = cv2.getTextSize(label, 0, fontScale=tl / 3.7, thickness=tl)[0]

  # Define nova coordenada para o retangulo do texto
  c2 = x1 + t_size[0], y1 - t_size[1] - 3

  # Extrai altura e largura do texto para ser proporcional com o tamanho
  (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 1, 2)

  # Plota o retangulo da caixa
  cv2.rectangle(frame, (x1, y1), (x2, y2), (r, g, b), 2)

  # Plota o retangulo do texto
  cv2.rectangle(frame, (x1, y1), c2, (r, g, b), -1, cv2.LINE_AA)

  # Plota o texto
  cv2.putText(frame, label, (x1, y1 - 2), 0, tl / 3.7, [255, 255, 255], 2, cv2.LINE_AA)

#### **Método que efetua detecção de comportamentos pré-definidos como suspeitos**


In [None]:
def run_detection(video_path, video, out_path='/content/'):

  # Obtem o caminho do video
  colab_video_path = video_path + video

  # Abre o video
  cap = cv2.VideoCapture(colab_video_path)

  # Finaliza se o video for vazio
  if cap is None:
    return

  # Define nome do video de saida
  if video.endswith('.mp4'):
    filename = out_path + (video.replace('.mp4', '') +'_output.mp4')
  elif video.endswith('.avi'):
    filename = out_path + (video.replace('.avi', '') +'_output.avi')

  # Define VideoWriter para salvar video de saida
  fourcc = cv2.VideoWriter_fourcc(*'MP4V')
  out = cv2.VideoWriter(video, fourcc, (30.0 if fps is None else fps), (int(cap.get(3)), int(cap.get(4))))

  # Obtem o framerate para contabilizar o tempo
  fps = None
  if cap is not None:
      fps = cap.get(cv2.CAP_PROP_FPS)
  else:
      fps = 30

  # Dicionários para armazenar o histórico de track
  track_history_reference = defaultdict(lambda: 30)
  track_history_time = defaultdict(lambda: 0)

  # Percorre os frames do video
  while cap.isOpened():
      # Faz leitura do frame atual
      success, frame = cap.read()

      # Verifica se a leitura foi um sucesso e o frame nao eh nulo
      if success and frame is not None:
          # Executa a inferencia do YOLOv8 de pessoas, motos e carros sem tracking
          results = model(frame, classes=[0,2,3], conf=0.7, iou=0.7)

          # Array para testar interseccao
          persons = []
          vehicles = []

          # Verifica se a caixa eh de uma pessoa ou veiculo e guarda no array correspondente
          for r in results: # 1 resultado por entrada
              boxes = r.boxes
              for box in boxes:
                  b = box.xyxy[0].numpy()  # get box coordinates in (top, left, bottom, right) format (transformando em array)
                  c = int(box.cls)
                  if (c == 0):
                    persons.append(b)
                  elif (c == 2 or c == 3):
                    vehicles.append(b)

          # Testa ocorreu interseccao entre pessoa e veiculo para rodar o outro modelo
          intersection = False
          if persons and vehicles:
            for aux, person_box in enumerate(persons):
                for vehicle_box in vehicles:
                    if bbox_iou_vehicle(vehicle_box, person_box) > 0: # Detecta a intersecao
                        intersection = True
                        persons.pop(aux) # Retira a pessoa a ser analisada da lista de pessoas
                        break

          # Se houve alguma interseccao, roda o modelo de pontos-chave
          pessoas_atualizadas = [] # Pessoas detectadas no frame
          if(intersection):
            # Executa a inferencia do YOLOv8 de pontos-chave com tracking
            results = model_kpts.track(frame, persist=True, conf=0.3, iou=0.7)

            # Obtem caixas das pessoas com pontos-chave
            boxes = results[0].boxes.xyxy.cpu()

            # Obtem IDs
            track_ids = None
            if results[0].boxes.id is not None:
              track_ids = results[0].boxes.id.int().cpu().tolist()

            # Obtem coordenadas dos pontos-chave
            keypoints = results[0].keypoints.xy.numpy(); # Limita a informação para xyxy

            # Obtem confianca dos pontos-chave
            keypoints_conf = None
            if results[0].keypoints.conf is not None:
              keypoints_conf = results[0].keypoints.conf.numpy(); # Conf de cada keypoint

            # Verifica se todas as pessoas detectadas tem ids
            if track_ids and keypoints_conf is not None and (len(track_ids) == len(keypoints_conf)):
              # Percorre cada pessoa detectada com o modelo de pontos-chave
              for box, track_id, kpts, kpts_conf in zip(boxes, track_ids, keypoints, keypoints_conf):
                # Adiciona 1 frame no historico da pessoa com o ID correspondente
                track_history_time[track_id] += 1

                # Testa se pessoa esta agachada e define o estado baseado no tempo (frames)
                estado = None
                squat = is_squat(kpts)
                if not squat and track_history_time[track_id] < (fps*6):
                  estado = 0
                elif squat and track_history_time[track_id] < (fps*3):
                  estado = 1
                else:
                  estado = 2
                  suspeito = True

                # Chama o metodo de plot
                plot_keypoints_detection(frame, kpts, kpts_conf, box, estado, track_id, track_history_time[track_id])

                # Atualiza a lista de pessoas atualizadas
                pessoas_atualizadas.append(track_id)

          # Percorre cada ID no dic de pessoas rastreadas e diminui em 1 frame a contagem limite de referencia caso nao esteja no frame
          for chave in list(track_history_reference.keys()):
              if chave not in pessoas_atualizadas:
                  track_history_reference[chave] -= 1
                  if track_history_reference[chave] <= 0: # Se a contagem chegar a 0, deleta a referência dessa pessoa
                    track_history_reference.pop(chave)
                    track_history_time.pop(chave)

          # Plota pessoas sem interseccao e todos os veiculos
          if persons:
            plot_detections(frame, persons, [0, 255, 0], 'pessoa', 3)
          if vehicles:
            plot_detections(frame, vehicles, [140, 45, 45], 'veiculo', 3)

          # Escreve o frame no video de saida
          frame = cv2.resize(frame, (int(cap.get(3)), int(cap.get(4))))
          out.write(frame)

          # Interrompe o loop se a tecla 'q' for pressionada
          if cv2.waitKey(1) & 0xFF == ord("q"):
              break
      else:
          # Interrompe o loop se o final do video for alcançado
          break

  # Libera o objeto de captura de video e fecha a janela de exibição
  cap.release()
  out.release()
  cv2.destroyAllWindows()

### **Execução da detecção de comportamentos pré-definidos como suspeitos**
#### Para definir o caminho do diretório ou vídeo, basta alterar o caminho na variável *path*.

In [None]:
# Caminho do video (mude aqui)
path = '/content/drive/MyDrive/Rodar/SBSI/positivo/'

# Inicia contador
inicio_tempo = time.time()

# Verifica se eh um diretorio
if isdir(path):
  # Obtem a lista de videos na pasta (caso seja uma pasta)
  videos = [f for f in listdir(path) if isfile(join(path, f))]
  # Percorre os videos
  for video in videos:
    # Verifica se o video esta no formato correto e nao eh um video processado
    if video.endswith('.mp4') or video.endswith('.avi') and not video.endswith('_output.mp4'):
      print("Rodando o video: " + str(video))
      # Executa deteccao
      run_detection(path, video)
elif isfile(path):
  # Executa deteccao
  run_detection(path)

# Finaliza contador
fim_tempo = time.time()

print('Tempo médio de execução: ' + str((fim_tempo-inicio_tempo)/len(videos)) + 's')