## Before start

Let's make sure that we have access to GPU. We can use `nvidia-smi` command to do that. In case of any problems navigate to `Edit` -> `Notebook settings` -> `Hardware accelerator`, set it to `GPU`, and then click `Save`.

# Object Detection, Counting amd Tracking
User interface for live and recorded video.

## User Interface




In [14]:
!nvidia-smi

Mon Oct  9 11:21:01 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 525.105.17   Driver Version: 525.105.17   CUDA Version: 12.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   37C    P8     9W /  70W |      0MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

## Interface

In [16]:
!pip install ffmpeg -q
# !git clone https://github.com/DmytroNorth/Text_To_Subtitles-Python.git

!pip install gradio -q


In [15]:
# Getting current working direoctory
import os
HOME = os.getcwd()
print(HOME)

/content


### Install YOLOv8


In [17]:
# Pip install method (recommended)


from IPython import display
display.clear_output()

import ultralytics
ultralytics.checks()

Ultralytics YOLOv8.0.195 🚀 Python-3.10.12 torch-2.0.1+cu118 CUDA:0 (Tesla T4, 15102MiB)
Setup complete ✅ (2 CPUs, 12.7 GB RAM, 27.0/78.2 GB disk)


### Install Requirements


In [18]:


# workaround related to https://github.com/roboflow/notebooks/issues/80
!sed -i 's/onnx==1.8.1/onnx==1.9.0/g' requirements.txt

!pip3 install -q -r requirements.txt
!python3 setup.py -q develop

from IPython import display
display.clear_output()
import yolox
print("yolox.__version__:", yolox.__version__)

yolox.__version__: 0.1.0


In [19]:
from yolox.tracker.byte_tracker import BYTETracker, STrack
from onemetric.cv.utils.iou import box_iou_batch
from dataclasses import dataclass


@dataclass(frozen=True)
class BYTETrackerArgs:
    track_thresh: float = 0.25
    track_buffer: int = 30
    match_thresh: float = 0.8
    aspect_ratio_thresh: float = 3.0
    min_box_area: float = 1.0
    mot20: bool = False

In [None]:
%cd {HOME}/ByteTrack

/content/ByteTrack


In [20]:
from IPython import display
display.clear_output()

import supervision
print("supervision.__version__:", supervision.__version__)

supervision.__version__: 0.1.0


In [21]:
from supervision.draw.color import ColorPalette
from supervision.geometry.dataclasses import Point
from supervision.video.dataclasses import VideoInfo
from supervision.video.source import get_video_frames_generator
from supervision.video.sink import VideoSink
from supervision.notebook.utils import show_frame_in_notebook
from supervision.tools.detections import Detections, BoxAnnotator
from supervision.tools.line_counter import LineCounter, LineCounterAnnotator

### Tracking utils

Unfortunately, we have to manually match the bounding boxes coming from our model with those created by the tracker.

In [22]:
from typing import List

import numpy as np


# converts Detections into format that can be consumed by match_detections_with_tracks function
def detections2boxes(detections: Detections) -> np.ndarray:
    return np.hstack((
        detections.xyxy,
        detections.confidence[:, np.newaxis]
    ))


# converts List[STrack] into format that can be consumed by match_detections_with_tracks function
def tracks2boxes(tracks: List[STrack]) -> np.ndarray:
    return np.array([
        track.tlbr
        for track
        in tracks
    ], dtype=float)


# matches our bounding boxes with predictions
def match_detections_with_tracks(
    detections: Detections,
    tracks: List[STrack]
) -> Detections:
    if not np.any(detections.xyxy) or len(tracks) == 0:
        return np.empty((0,))

    tracks_boxes = tracks2boxes(tracks=tracks)
    iou = box_iou_batch(tracks_boxes, detections.xyxy)
    track2detection = np.argmax(iou, axis=1)

    tracker_ids = [None] * len(detections)

    for tracker_index, detection_index in enumerate(track2detection):
        if iou[tracker_index, detection_index] != 0:
            tracker_ids[detection_index] = tracks[tracker_index].track_id

    return tracker_ids

### Load pre-trained YOLOv8 model
Also available in releases.

In [23]:
# settings
MODEL = "yolov8x.pt"

In [24]:
from ultralytics import YOLO

model = YOLO(MODEL)
model.fuse()

YOLOv8x summary (fused): 268 layers, 68200608 parameters, 0 gradients, 257.8 GFLOPs


### Predict and annotate Image




In [25]:
# Import Libraries
import gradio as gr
from PIL import Image
from tqdm.notebook import tqdm

In [26]:
# dict maping class_id to class_name
CLASS_NAMES_DICT = model.model.names

In [27]:
def image_processing(img : Image.Image) -> Image.Image:
  """
    In this method processing will be done on image.

    Parameters
    ----------
    img
      Image send by iser

    Return
    ------
    Image
      Return the process image .
    """
  # create instance of BoxAnnotator
  box_annotator = BoxAnnotator(color=ColorPalette(), thickness=4, text_thickness=4, text_scale=2)

  # model prediction on single frame and conversion to supervision Detections
  results = model(img)
  detections = Detections(
      xyxy=results[0].boxes.xyxy.cpu().numpy(),
      confidence=results[0].boxes.conf.cpu().numpy(),
      class_id=results[0].boxes.cls.cpu().numpy().astype(int)
  )
  # format custom labels
  labels = [
      f"{CLASS_NAMES_DICT[class_id]} {confidence:0.2f}"
      for _, confidence, class_id, tracker_id
      in detections
  ]
  # annotate and display frame
  frame = box_annotator.annotate(frame= img, detections=detections, labels=labels)
  return frame
  # %matplotlib inline
  # show_frame_in_notebook(frame, (16, 16))

### Predict and annotate whole video

In [28]:
# settings
LINE_START = Point(50, 1500)
LINE_END = Point(3840-50, 1500)



In [29]:
import os

In [30]:
def text_to_srt()-> bool:
  """
  Convert Text File into .srt file for caption

  Parameters
  ----------
  None

  Return
  ------
  bool
  """
  flag = os.system("python3 text_to_subtitles.py")
  if flag == 0:
    return True
  return False


In [31]:
print(text_to_srt()) # unit testing

True


In [32]:
%pwd

'/content/ByteTrack'

In [33]:
import subprocess

In [79]:
def merget_srt_to_video() -> bool:
    """
    Merge .srt file with video

    Parameters
    ----------
    None

    Return
    ------
    bool
    """
    query = "ffmpeg -i result.mp4 -vf subtitles=subtitles.srt output.mp4 -y"
    print(query)
    var = os.system(query)
    print(var)
    if var==0:
      return True
    return False


In [80]:
print(merget_srt_to_video()) # unit testing

ffmpeg -i result.mp4 -vf subtitles=subtitles.srt output.mp4 -y
0
True


In [36]:
def process_video(SOURCE_VIDEO_PATH : str )-> Image.Image:
  """
    In this method video processing will be done.

    Parameters
    ----------
    SOURCE_VIDEO_PATH
      Video to detect track and count object

    Return
    ------
    Image
      Return the process frame .
    """
  # create BYTETracker instance
  byte_tracker = BYTETracker(BYTETrackerArgs())
  # create VideoInfo instance
  video_info = VideoInfo.from_video_path(SOURCE_VIDEO_PATH)
  # create frame generator
  generator = get_video_frames_generator(SOURCE_VIDEO_PATH)
  # create LineCounter instance
  line_counter = LineCounter(start=LINE_START, end=LINE_END)
  # create instance of BoxAnnotator and LineCounterAnnotator
  box_annotator = BoxAnnotator(color=ColorPalette(), thickness=4, text_thickness=4, text_scale=2)
  line_annotator = LineCounterAnnotator(thickness=4, text_thickness=4, text_scale=2)
  print("Video Info :", video_info)
  # open target video file
      # loop over video frames
  for frame in tqdm(generator, total=video_info.total_frames):
      # model prediction on single frame and conversion to supervision Detections
      results = model(frame)
      detections = Detections(
          xyxy=results[0].boxes.xyxy.cpu().numpy(),
          confidence=results[0].boxes.conf.cpu().numpy(),
          class_id=results[0].boxes.cls.cpu().numpy().astype(int)
      )
      # filtering out detections with unwanted classes
      mask = np.array([class_id for class_id in detections.class_id], dtype=bool)
      detections.filter(mask=mask, inplace=True)
      # tracking detections
      tracks = byte_tracker.update(
          output_results=detections2boxes(detections=detections),
          img_info=frame.shape,
          img_size=frame.shape
      )
      tracker_id = match_detections_with_tracks(detections=detections, tracks=tracks)
      detections.tracker_id = np.array(tracker_id)
      # filtering out detections without trackers
      mask = np.array([tracker_id is not None for tracker_id in detections.tracker_id], dtype=bool)
      detections.filter(mask=mask, inplace=True)
      # format custom labels
      labels = [
          f"#{tracker_id} {model.model.names[class_id]} {confidence:0.2f}"
          for _, confidence, class_id, tracker_id
          in detections
      ]
      with open("subtitles.txt","a") as file:
        file.writelines(str(labels) + '\n' + '\n')
      ids = detections.class_id
      for id in ids:
        if object_count.count(id) == 0 :
          object_count.append(id)
      # updating line counter
      # line_counter.update(detections=detections)
      # annotate and display frame
      # line_annotator.annotate(frame=frame, line_counter=line_counter)
      yield (box_annotator.annotate(frame=frame, detections=detections, labels=labels), len(object_count))




In [66]:
object_count = []
def process_live(frame):
    """
    In this method live video processing will be done.

    Parameters
    ----------
    frame
      Image capture by web cam

    Return
    ------
    Image
      Return the process image realtime.
    """
    byte_tracker = BYTETracker(BYTETrackerArgs())
    box_annotator = BoxAnnotator(color=ColorPalette(), thickness=4, text_thickness=4, text_scale=2)

    results = model(frame)[0]
    detections = Detections(
        xyxy=results[0].boxes.xyxy.cpu().numpy(),
        confidence=results[0].boxes.conf.cpu().numpy(),
        class_id=results[0].boxes.cls.cpu().numpy().astype(int)
    )
    # filtering out detections with unwanted classes
    mask = np.array([class_id  for class_id in detections.class_id], dtype=bool)
    detections.filter(mask=mask, inplace=True)
    # tracking detections
    tracks = byte_tracker.update(
        output_results=detections2boxes(detections=detections),
        img_info=frame.shape,
        img_size=frame.shape
    )
    tracker_id = match_detections_with_tracks(detections=detections, tracks=tracks)
    detections.tracker_id = np.array(tracker_id)
    # filtering out detections without trackers
    mask = np.array([tracker_id is not None for tracker_id in detections.tracker_id], dtype=bool)
    detections.filter(mask=mask, inplace=True)
    # format custom labels
    labels = [
        f"#{tracker_id} {model.model.names[class_id]} {confidence:0.2f}"
        for _, confidence, class_id, tracker_id
        in detections
    ]
    with open("subtitles.txt","a") as file:
        file.writelines(str(labels) + '\n' + '\n')
    ids = detections.class_id
    for id in ids:
      object_count.append(id)
    # updating line counter
    # line_counter.update(detections=detections)
    # annotate and display frame
    # line_annotator.annotate(frame=frame, line_counter=line_counter)
    frame.resize(380, 640)
    print(frame.size)
    yield (box_annotator.annotate(frame=frame, detections=detections, labels=labels), len(object_count))



In [39]:
%pwd

'/content/ByteTrack'

In [40]:
# Uncooment following lines for if working on colab
#from google.colab import files

In [41]:
def export_video() -> None:
  """
  Download resultant video

  Parameters
  ----------
  None

  Return
  ------
  None
  """
  if text_to_srt():
    if merget_srt_to_video():
      #files.download("output.mp4") # Uncooment on colab
      print("Caption's file ok")


### Front End

In [None]:
title = "Object Detect Tracking and Counting"
with gr.Blocks(theme= gr.themes.Soft()) as io:
    with gr.Tab("Video Tracking") as record:
        gr.Markdown(f"<center><h1>{title}</h1></center>")
        with gr.Row():
            with gr.Column():
                input_image = gr.Video()

            with gr.Column():
                output_image = gr.Image()

        with gr.Row():
              total_count = gr.Textbox(label = "Number of Object")

        with gr.Row():
                input_button = gr.Button("Start Tracking")
                input_button.click(process_video, inputs=[input_image], outputs=[output_image, total_count])
        with gr.Row():
                input_button = gr.Button("Export Result")
                input_button.click(export_video, inputs= None, outputs = None)

    with gr.Tab("Live Tracking") as live:
        gr.Markdown(f"<center><h1>{title}</h1></center>")
        with gr.Row():
          with gr.Column():
             input_image = gr.Image(source='webcam', streaming=True)

          with gr.Column():
             output_image = gr.Image()
        with gr.Row():
              total_count = gr.Textbox(label = "Number of Object")
        with gr.Row():
             button  = gr.Button("Start Tracking")
             button.click(process_live, inputs=[input_image], outputs=[output_image])
        with gr.Row():
                input_button = gr.Button("Export Result")
                input_button.click(export_video, inputs= None, outputs = None)
io.queue()
io.launch(debug = True)

In [None]:
totalCount = []

# FLASK API

In [42]:
TARGET_VIDEO_PATH = f"result.mp4"

In [75]:
def flask_video(SOURCE_VIDEO_PATH : str ):
    """
    In this method video processing will be done.

    Parameters
    ----------
    SOURCE_VIDEO_PATH
      Video to detect track and count object

    Return
    ------
    Image
      Return the process frame .
    """
    # if os.path.isdir(TARGET_VIDEO_PATH):

    #     os.system("rm -rf /content/vid_out")
    #     os.system("mkdir vid_out")
      # create BYTETracker instance
    tokenize = SOURCE_VIDEO_PATH.split(".")
    if tokenize[-1] not in ("mp4"):
      return "Type Error"

    byte_tracker = BYTETracker(BYTETrackerArgs())
    # create VideoInfo instance
    video_info = VideoInfo.from_video_path(SOURCE_VIDEO_PATH)
    # create frame generator
    generator = get_video_frames_generator(SOURCE_VIDEO_PATH)
    # create LineCounter instance
    line_counter = LineCounter(start=LINE_START, end=LINE_END)
    # create instance of BoxAnnotator and LineCounterAnnotator
    box_annotator = BoxAnnotator(color=ColorPalette(), thickness=4, text_thickness=4, text_scale=2)
    line_annotator = LineCounterAnnotator(thickness=4, text_thickness=4, text_scale=2)
    print("Video Info" , video_info)
    # open target video file
    with VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
        # loop over video frames
        for frame in tqdm(generator, total=video_info.total_frames):
            # model prediction on single frame and conversion to supervision Detections
            results = model(frame)
            detections = Detections(
                xyxy=results[0].boxes.xyxy.cpu().numpy(),
                confidence=results[0].boxes.conf.cpu().numpy(),
                class_id=results[0].boxes.cls.cpu().numpy().astype(int)
            )
            # filtering out detections with unwanted classes
            mask = np.array([class_id  for class_id in detections.class_id], dtype=bool)
            detections.filter(mask=mask, inplace=True)
            # tracking detections
            tracks = byte_tracker.update(
                output_results=detections2boxes(detections=detections),
                img_info=frame.shape,
                img_size=frame.shape
            )
            tracker_id = match_detections_with_tracks(detections=detections, tracks=tracks)
            detections.tracker_id = np.array(tracker_id)
            # filtering out detections without trackers
            mask = np.array([tracker_id is not None for tracker_id in detections.tracker_id], dtype=bool)
            detections.filter(mask=mask, inplace=True)
            # format custom labels
            labels = [
                f"#{tracker_id} {CLASS_NAMES_DICT[class_id]} {confidence:0.2f}"
                for _, confidence, class_id, tracker_id
                in detections
            ]
            with open("subtitles.txt","a") as file:
              file.writelines(str(labels) + '\n' + '\n')
            # updating line counter
            line_counter.update(detections=detections)
            # annotate and display frame
            frame = box_annotator.annotate(frame=frame, detections=detections, labels=labels)
            line_annotator.annotate(frame=frame, line_counter=line_counter)
            sink.write_frame(frame)
    return "0k"


In [44]:
def image_processing(path : str) -> None:
  """
    In this method processing will be done on image.

    Parameters
    ----------
    path
      Path of Image send by user

    Return
    ------
    None
    """
  # create instance of BoxAnnotator
  box_annotator = BoxAnnotator(color=ColorPalette(), thickness=4, text_thickness=4, text_scale=2)
  img = Image.open(path)
  # model prediction on single frame and conversion to supervision Detections
  results = model(img)
  detections = Detections(
      xyxy=results[0].boxes.xyxy.cpu().numpy(),
      confidence=results[0].boxes.conf.cpu().numpy(),
      class_id=results[0].boxes.cls.cpu().numpy().astype(int)
  )
  # format custom labels
  labels = [
      f"{CLASS_NAMES_DICT[class_id]} {confidence:0.2f}"
      for _, confidence, class_id, tracker_id
      in detections
  ]
  # annotate and display frame
  frame = box_annotator.annotate(frame= img, detections=detections, labels=labels)
  frame.save("temp.jpg")
  # %matplotlib inline
  # show_frame_in_notebook(frame, (16, 16))

In [None]:
flask_video("traffic.mp4") # testing either working or not

In [52]:
from flask_uploads import UploadSet

In [49]:
from flask import (Flask, abort,
                   request, jsonify)
from werkzeug.utils import secure_filename

from pyngrok import ngrok
import gofile

### Request Body
for video
'''
{
  video : <video>
}
'''
for image
'''
{
   video : <img> 
}
'''

In [None]:
from dotenv import load_dotenv

load_dotenv()

NGROK = os.getenv('NGROK')

In [83]:
ngrok.set_auth_token(NGROK)
public_url = ngrok.connect(5000).public_url
app = Flask(__name__)

def upload_on_go(path : str)-> str:
  """
  Upload file on go server.

  Parameters
  ----------
  path
    path to upload on server

  Return
  ------
  str
    Downloadable link
  """
  server = gofile.getServer()
  dict_data = gofile.uploadFile(path)
  return dict_data['downloadPage']


print("PUBLIC URL", public_url)
@app.route('/')
def upload_file():
   return 'Hello'

@app.route('/upload', methods = ['GET', 'POST'])
def uploadfile():
   if request.method == 'POST': # check if the method is post
    if "video" in request.files:
        f = request.files['video'] # get the file from the files object
        path = f.filename
        f.save(secure_filename(f.filename)) # this will secure the file
        print("File save successfully")
        flag = flask_video(path)
        if flag=="0k":
          export_video()
          link = upload_on_go("output.mp4")
          return jsonify({'Download link': link ,
                          'status' : 'Ok'})
        else:
          return abort(400,jsonify({'Error': 'Type Error'}))

    elif "image" in request.files:
          f = request.files['image'] # get the file from the files object
          path = f.filename
          f.save(secure_filename(f.filename)) # this will secure the file
          print("File save successfully")
          image_processing(path)
          link = upload_on_go(f"content/ByteTrack/{TARGET_VIDEO_PATH}")
          return jsonify({'Download link': link ,
                          "type" : "JPG"})
    else:
        abort(400, msg = "Bad Params")

if __name__ == '__main__':
   app.run() # running the flask app



PUBLIC URL https://6b83-35-203-182-44.ngrok-free.app
 * Serving Flask app '__main__' (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off


 * Running on http://127.0.0.1:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m
ERROR:__main__:Exception on /upload [GET]
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/flask/app.py", line 2073, in wsgi_app
    response = self.full_dispatch_request()
  File "/usr/local/lib/python3.10/dist-packages/flask/app.py", line 1520, in full_dispatch_request
    return self.finalize_request(rv)
  File "/usr/local/lib/python3.10/dist-packages/flask/app.py", line 1539, in finalize_request
    response = self.make_response(rv)
  File "/usr/local/lib/python3.10/dist-packages/flask/app.py", line 1695, in make_response
    raise TypeError(
TypeError: The view function for 'uploadfile' did not return a valid response. The function either returned None or ended without a return statement.
INFO:werkzeug:127.0.0.1 - - [09/Oct/2023 12:24:03] "[35m[1mGET /upload HTTP/1.1[0m" 500 -
INFO:werkzeug:127.0.0.1 - - [09/Oct/2023 12:24:04] "[33mGET /favicon.ico HTTP/1.1[0m" 40

File save successfully
Video Info VideoInfo(width=1920, height=1080, fps=30, total_frames=99)


  0%|          | 0/99 [00:00<?, ?it/s]


0: 384x640 10 cars, 70.4ms
Speed: 3.3ms preprocess, 70.4ms inference, 2.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 traffic light, 69.9ms
Speed: 7.8ms preprocess, 69.9ms inference, 1.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 cars, 4 trucks, 1 traffic light, 63.6ms
Speed: 3.7ms preprocess, 63.6ms inference, 1.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 3 trucks, 2 traffic lights, 64.1ms
Speed: 3.1ms preprocess, 64.1ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 bus, 3 trucks, 1 traffic light, 43.0ms
Speed: 4.4ms preprocess, 43.0ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 cars, 4 trucks, 1 traffic light, 42.7ms
Speed: 3.1ms preprocess, 42.7ms inference, 2.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 cars, 3 trucks, 2 traffic lights, 45.2ms
Speed: 3.0ms preprocess, 45.2ms inference, 1.8ms postprocess per

ffmpeg -i result.mp4 -vf subtitles=subtitles.srt output.mp4 -y
0


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

INFO:werkzeug:127.0.0.1 - - [09/Oct/2023 12:26:07] "POST /upload HTTP/1.1" 200 -


In [None]:
# Request Body
'''
{
  video : <video>
}
'''
