In [1]:
#@title Mount Drive
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
#@title Copy Models for Inference
!cp -r /content/gdrive/MyDrive/TugasAkhir/models models

In [3]:
#@title Install Dependencies {display-mode: "form"}
from IPython.display import clear_output

# YOLOv8 Dependencies
!pip install -q ultralytics
!pip install -q lap

# BoT-SORT Dependencies
!pip install -q boxmot==10.0.16

# OpenPose Dependencies
!git clone -q https://github.com/HaritsNasution/JAAD-TF-Pose-Estimation.git pose
!mkdir pose/models/graph/jaad && cp -r /content/models/OpenPose/jaad pose/models/graph
!pip install -q swig
!cd pose/ && pip install -q -r requirements.txt
!cd pose/tf_pose/pafprocess && swig -python -c++ pafprocess.i && python3 setup.py build_ext --inplace
!pip install -q git+https://github.com/adrianc-a/tf-slim.git@remove_contrib

# I3D Dependencies

clear_output(wait=False)

exit()

In [1]:
#@title Import Libraries {display-mode: "form"}
from IPython.display import clear_output
import cv2
from google.colab.patches import cv2_imshow
import numpy as np
import matplotlib.pyplot as plt
import statistics as st

import os
import time
import shutil
from google.colab import files

# YOLO Lib
from ultralytics import YOLO

# BoT-SORT Lib
from boxmot import BoTSORT
from pathlib import Path

# Pose Lib
import sys
sys.path.append('/content/pose')
from tf_pose import common
from tf_pose.estimator import TfPoseEstimator
from tf_pose.networks import get_graph_path, model_wh

# I3D Lib
import torch

# Load Model and Define Helper Functions

In [3]:
# Load YOLO Model
yolo = YOLO('/content/models/YOLO/YOLOv8.3.pt')

# Load Pose Model
model='jaad'
pose = TfPoseEstimator(get_graph_path(model), target_size=(224, 224))

# Load CNN Model
cnn = torch.jit.load('/content/models/CNN/I3Dv4.pt')
cnn = cnn.cuda()

clear_output(wait=False)

In [4]:
def get_human_pose(image, showBG = True):

  humans = pose.inference(image, resize_to_default=True, upsample_size=4.0)

  humans.sort(key=lambda human: human.score, reverse=True)

  if showBG == False:
    image = np.zeros(image.shape)

  image = pose.draw_humans(image, humans, imgcopy=False)

  return image

In [5]:
def pred_func(seq, conf_thresh):
  # pred = model_j.predict(X_test[0:1], verbose=0)
  pred = torch.softmax(cnn(seq[0:1].cuda()), dim=1).detach().cpu().numpy()
  Y = pred[0][0]>=conf_thresh

  return pred[0][0],Y

In [6]:
#@title Uji Coba
filename = "Faiz3.mp4"

conf_thresh = 0.63

# Hitung waktu runtime per video
start = time.time()

# Pendefinisian input dan output
video_path = f"/content/gdrive/MyDrive/TugasAkhir/DataSimulasi/{filename}"
output_name = f"/content/{os.path.splitext(filename)[0]}.avi"

# Detect with YOLOv8
results = yolo(video_path, stream=True, classes=0, verbose=False, conf=0.5, iou=0.5)

# Initialize BoT-SORT Tracker
tracker = BoTSORT(
          model_weights=Path('osnet_x0_25_msmt17.pt'),  # which ReID model to use
          device='cuda:0',  # 'cpu', 'cuda:0', 'cuda:1', ... 'cuda:N'
          fp16=True,  # wether to run the ReID model with half precision or not
          )

# Get video info, such as FPS and Size
cap = cv2.VideoCapture(video_path)
FPS = cap.get(cv2.CAP_PROP_FPS)
SIZE = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
cap.release()

# Define video writer to create output
out = cv2.VideoWriter(output_name, cv2.VideoWriter_fourcc(*'MJPG'), FPS, SIZE)

# Initializations
rolling_data={}
cls = {}
count = -1
color = (255, 0, 0)
thickness = 2
patience = 5*30
cooldown = 0
K = 30

# used to record the time when we processed last frame
prev_frame_time = 0

# used to record the time at which we processed current frame
new_frame_time = 0

# Process results generator
for result in results:
  # Read frame from YOLO
  frame = result.orig_img
  count+=1

  # Get all bboxes in a frame
  boxes = result.boxes  # Boxes object for bbox outputs
  dets = boxes.data.cpu().numpy()

  # Track detection results
  ts = tracker.update(dets,frame)

  # Save ID that Appears
  ID = []

  for d in ts:
    x0, y0 = tuple((np.array(d[0:2])).astype(np.int32))
    x0, y0 = np.max([(x0,y0),(0,0)],axis=0)
    x1, y1 = tuple((np.array(d[2:4])).astype(np.int32))
    x1, y1 = np.min([(x1,y1),(1920,1080)],axis=0)
    id = int(d[4])

    y = 0
    conf = 0.5

    if id in list(rolling_data.keys()):
      if len(rolling_data[id]) == 16:
        seq = ((torch.tensor(np.array(rolling_data[id])).permute(3,0,1,2)/255)-0.45)/0.225 # (3,16,224,224)
        seq = seq[None,:] # (1,3,16,224,224)
        conf,y = pred_func(seq,conf_thresh) # classification output
      else:
        seq = ((torch.tensor(np.array([rolling_data[id][-1]]*16)).permute(3,0,1,2)/255)-0.45)/0.225 # (3,16,224,224)
        seq = seq[None,:] # (1,3,16,224,224)
        conf,y = pred_func(seq,conf_thresh) # classification output

    # Keep each ID's state
    if id in list(cls.keys()):
      if len(cls[id])<K:
        cls[id].append(y)
      else:
        del cls[id][0]
        cls[id].append(y)
    else:
      cls[id] = [y]

    # Keep ID appearance in frame
    ID.append(id)

    # Determine color and label of crossing action (C = Cross, NC = Not Cross)
    if y == 1:
      color = (0, 0, 255)
      lab = 'C'
    else:
      color = (0, 255, 0)
      lab = 'NC'

    # Estimate pose for the detected pedestrian
    cropped = frame[y0:y1,x0:x1]
    frame[y0:y1,x0:x1] = get_human_pose(cropped)

    # storing the data for last 16 frames
    try:
      if id in list(rolling_data.keys()): # ID exists in dict
        if len(rolling_data[id]) < 16: # bboxes values for 16 frames
          cropped_img = cv2.resize(frame[y0:y1, x0:x1],(224,224))
          rolling_data[id].append(np.asarray(cropped_img)) # append the image
        else:
          del rolling_data[id][0] # delete oldest frame bbox and append latest frame bbox
          cropped_img = cv2.resize(frame[y0:y1, x0:x1],(224,224))
          rolling_data[id].append(np.asarray(cropped_img))
      else:
        cropped_img = cv2.resize(frame[y0:y1, x0:x1],(224,224))
        rolling_data[id] = [np.asarray(cropped_img)]
    except:
      pass

    # Annotate BBox
    frame = cv2.rectangle(frame, (x0,y0), (x1,y1), color, thickness)

    # Allocate Text Size
    label = f"ID:{id}|{lab}"
    (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 1, 1)

    # Annotates ID and Label on BBox.
    frame = cv2.rectangle(frame, (x0-2, y0 - 30), (x0 + w, y0), color, -1)
    frame = cv2.putText(frame, label, (x0, y0 - 5), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,0), 2)

  new_frame_time = time.time()

  # Calculates FPS and add to Video
  fps = 1/(new_frame_time-prev_frame_time)
  frame = cv2.putText(frame,f'FPS:{fps:.2f}',(0, 50),cv2.FONT_HERSHEY_SIMPLEX, 2, (255,0,0), 3)

  # Calculate modes of state every K frames for car status (STOP or GO)
  if count%K == 0:
    check = [st.mode(cls[i]) for i in ID]
    if len(check)>0:
      if any(c!=0 for c in check):
        state = {'car':'STOP','color':(0,0,255)}
        cooldown = patience
      else:
        state = {'car':'GO','color':(0,255,0)} if cooldown==0 else state
    else:
      state = {'car':'GO','color':(0,255,0)} if cooldown==0 else state

  # Annotating car status
  frame = cv2.putText(frame,f"{state['car']}",(0+5, 1080-10),cv2.FONT_HERSHEY_SIMPLEX, 3, state['color'], 5)

  # Reduce STOP cooldown per frame
  cooldown = max(0,cooldown-1)

  # Write frame into video
  out.write(frame)

  prev_frame_time = new_frame_time

out.release()
end = time.time() - start
print(f'{filename} finished in {end/3600:.0f}:{end/60%60:.0f}:{end%60:.0f} seconds')

Downloading...
From: https://drive.google.com/uc?id=1Kkx2zW89jq_NETu4u42CFZTMVD5Hwm6e
To: /content/osnet_x0_25_msmt17.pt
100%|██████████| 9.34M/9.34M [00:00<00:00, 130MB/s]
[32m2023-08-02 23:49:09.778[0m | [32m[1mSUCCESS [0m | [36mboxmot.appearance.reid_model_factory[0m:[36mload_pretrained_weights[0m:[36m229[0m - [32m[1mSuccessfully loaded pretrained weights from "osnet_x0_25_msmt17.pt"[0m


Faiz3.mp4 finished in 0:1:48 seconds


In [7]:
!cp Faiz3.avi /content/gdrive/MyDrive/Faiz3.avi