In [None]:
# import all the libraries
import supervision as sv
# supervision==0.19.0
import cv2
import numpy as np

In [None]:
# dowmload model
from ultralytics import YOLO
model = YOLO('yolov8s.pt')

In [None]:
# connect model to device (mps, cuda or cpu)
device = 'mps'
model.to(device)

In [None]:
# connecting to our firebase database

import firebase_admin

from firebase_admin import credentials
from firebase_admin import db


cred = credentials.Certificate("sirius-6b2b4-firebase-adminsdk-w0nf0-d148ad0356.json")

firebase_admin.initialize_app(cred, {'databaseURL': 'https://sirius-6b2b4-default-rtdb.europe-west1.firebasedatabase.app/'})

ref_time = db.reference("/waitingtime")
ref_count = db.reference("/countofpeople")

In [None]:
# 1st way of calculating the waiting time

# get all variables that we need to work with supevision Detections object (creating polygons, tracker, box and zone annotators)
video_info = sv.VideoInfo.from_video_path('videos/file5.mp4')
byte_track = sv.ByteTrack(frame_rate=video_info.fps)
frames_generator = sv.get_video_frames_generator('videos/file5.mp4', stride=5)
time = 0
polygon_entry = np.array([
[495, 315],[639, 403],[755, 319],[555, 215],[495, 311]
])
polygon_queue = np.array([
[8, 83],[8, 251],[192, 203],[808, 571],[1016, 567],[1028, 295],[244, 87],[100, 127],[101, 63],[7, 83]
])
polygon_table = np.array([
[39, 209],[55, 253],[175, 229],[159, 169],[51, 181],[35, 205]
])
polygon_check = np.array([
[206, 106],[218, 198],[182, 206],[170, 114],[206, 106]
])

box_annotator = sv.BoxAnnotator(thickness=1, text_thickness=1, text_scale=0.5)
zone_entry = sv.PolygonZone(polygon=polygon_entry, frame_resolution_wh=video_info.resolution_wh)
zone_queue = sv.PolygonZone(polygon=polygon_queue, frame_resolution_wh=video_info.resolution_wh)
zone_table = sv.PolygonZone(polygon=polygon_table, frame_resolution_wh=video_info.resolution_wh)
zone_check = sv.PolygonZone(polygon=polygon_check, frame_resolution_wh=video_info.resolution_wh)
zone_annotator_queue = sv.PolygonZoneAnnotator(zone=zone_queue, color=sv.Color.white(), thickness=3, text_thickness=3, text_scale=2)
zone_annotator_table = sv.PolygonZoneAnnotator(zone=zone_table, color=sv.Color.white(), thickness=3, text_thickness=3, text_scale=2)
zone_annotator_check = sv.PolygonZoneAnnotator(zone=zone_check, color=sv.Color.white(), thickness=3, text_thickness=3, text_scale=2)


# initialize arrays that we will use for saving tracked ids of people walking into the zone and counting the time at what they did it
entry_ids = []
last_entry_time = 0
times = []
start = time.time()

# now iterate through frames and saving everything we need
with sv.VideoSink(target_path='videos_results/file5_result.mp4', video_info=video_info) as sink:
  for frame in frames_generator:
      time += 5/10
      results = model(frame, imgsz=1280)[0]
      detections = sv.Detections.from_ultralytics(results)
      detections = detections[detections.class_id==0]
      detections = byte_track.update_with_detections(detections=detections)
      zone_entry.trigger(detections=detections)
      CONST = 7 if (zone_check.trigger(detections=detections)==True).sum() > 0 else 0
      count = (zone_queue.trigger(detections=detections)==True).sum() - (zone_table.trigger(detections=detections)==True).sum() + CONST
      if start >= 10:
          ref_count.set(int(count)) # change our firebase database value for number of people in the queue
      detections = detections[zone_entry.trigger(detections=detections)==True]
      for id in detections.tracker_id:
          if id not in entry_ids:
              if len(entry_ids) == 0:
                  last_entry_time = time
                  times.append((time - last_entry_time) * count)
                  last_entry_time = time
                  entry_ids.append(id)
              else:
                  times.append((time - last_entry_time) * count)
                  last_entry_time = time
                  entry_ids.append(id)

      if time % 10 == 0 and len(times) > 2:
          ref_time.set(int(sum(times[-1:])/len(times[-1:]))) # changing database value for waiting time

      box_annotator = sv.BoxAnnotator(thickness=1, text_thickness=1, text_scale=0.5)
      labels = [
          f'#{tracker_id}'
          for tracker_id
          in detections.tracker_id]
      frame = box_annotator.annotate(scene=frame, detections=detections, labels=labels)
      frame = zone_annotator_check.annotate(zone_annotator_queue.annotate(zone_annotator_table.annotate(scene=frame)))

      sink.write_frame(frame=frame)

print(times)
print(f'average time for waiting: {sum(times)/len(times)}s')

In [None]:
# 2 way of calculating waitiing time

# do everything the same except for other polygons
video_info = sv.VideoInfo.from_video_path('/Users/vladimirkalajcidi/Downloads/itog.mp4')
byte_track = sv.ByteTrack(frame_rate=video_info.fps)
frames_generator = sv.get_video_frames_generator('/Users/vladimirkalajcidi/Downloads/itog.mp4')
time = 0
polygon_left = np.array([
[11, 583],[19, 699],[171, 703],[159, 575],[15, 579]
])
polygon_right = np.array([
[964, 541],[1068, 489],[972, 393],[1068, 345],[1152, 405],[1132, 705],[980, 709],[964, 537]
])

box_annotator = sv.BoxAnnotator(thickness=1, text_thickness=1, text_scale=0.5)
zone_left = sv.PolygonZone(polygon=polygon_left, frame_resolution_wh=video_info.resolution_wh)
zone_right = sv.PolygonZone(polygon=polygon_right, frame_resolution_wh=video_info.resolution_wh)
zone_annotator_left = sv.PolygonZoneAnnotator(zone=zone_left, color=sv.Color.white(), thickness=3, text_thickness=3, text_scale=2)
zone_annotator_right = sv.PolygonZoneAnnotator(zone=zone_right, color=sv.Color.white(), thickness=3, text_thickness=3, text_scale=2)


# here we initialize twice more arrays (as we want to see the number of people enetring and exiting the queue)
entry_ids = []
exit_ids = []
entry_times = []
exit_times = []
with sv.VideoSink(target_path='queues_results.mp4', video_info=video_info) as sink:
  for frame in frames_generator:
      time += 1/video_info.fps
      results = model(frame, imgsz=1280)[0]
      detections = sv.Detections.from_ultralytics(results)
      detections = detections[detections.class_id==0]
      zone_left.trigger(detections=detections)
      zone_right.trigger(detections=detections)
      detections = byte_track.update_with_detections(detections=detections)

      for id in detections.tracker_id[zone_left.trigger(detections=detections)==True]:
          if id not in exit_ids:
              exit_times.append(time)
              exit_ids.append(id)
      for id in detections.tracker_id[zone_right.trigger(detections=detections)==True]:
          if id not in entry_ids:
              entry_times.append(time)
              entry_ids.append(id)

      if len(exit_times) >= 3:
          waiting_time = int(np.mean(np.asarray(exit_times[-3:]) - np.asarray(entry_times[len(exit_times)-3:len(exit_times)])))
          print(f'waiting time is {waiting_time}s')

      box_annotator = sv.BoxAnnotator(thickness=1, text_thickness=1, text_scale=0.5)

      labels_left = [
          f'#{tracker_id}'
          for tracker_id
          in detections.tracker_id[zone_left.trigger(detections=detections)==True]]
      labels_right = [
          f'#{tracker_id}'
          for tracker_id
          in detections.tracker_id[zone_right.trigger(detections=detections)==True]]

      frame = box_annotator.annotate(scene=frame, detections=detections[zone_left.trigger(detections=detections)==True], labels=labels_left)
      frame = box_annotator.annotate(scene=frame, detections=detections[zone_right.trigger(detections=detections)==True], labels=labels_right)
      frame = zone_annotator_left.annotate(zone_annotator_right.annotate(scene=frame))

      sink.write_frame(frame=frame)

In [None]:
import time

In [None]:
# this is the same code which updates the information about how many people are at the tables so we can calcu

video_info = sv.VideoInfo.from_video_path('videos/table.mp4')
polygon = np.array([
[554, 697],[62, 201],[62, 145],[474, 33],[1210, 237],[1210, 689],[558, 693]
])
zone = sv.PolygonZone(polygon=polygon, frame_resolution_wh=video_info.resolution_wh)

box_annotator = sv.BoxAnnotator(thickness=1, text_thickness=1, text_scale=0.5)
zone_annotator = sv.PolygonZoneAnnotator(zone=zone, color=sv.Color.white(), thickness=3, text_thickness=3, text_scale=2)

with sv.VideoSink(target_path='/Users/vladimirkalajcidi/Downloads/table_result', video_info=video_info) as sink:

    for frame in sv.get_video_frames_generator('videos/table.mp4', stride=300):
        time.sleep(10)

        results = model(frame, imgsz=1280)[0]
        detections = sv.Detections.from_ultralytics(results)
        detections = detections[detections.class_id == 0]
        zone.trigger(detections=detections)
        print(f'People at the tables {(zone.trigger(detections=detections)==True).sum()}')
        ref.set(int((zone.trigger(detections=detections)==True).sum()))

        box_annotator = sv.BoxAnnotator(thickness=1, text_thickness=1, text_scale=0.5)
        labels = [f"{model.names[class_id]} {confidence:0.2f}" for confidence, class_id in \
                  zip(detections.confidence, detections.class_id)]
        frame = box_annotator.annotate(scene=frame, detections=detections, labels=labels)
        frame = zone_annotator.annotate(scene=frame)

        sink.write_frame(frame=frame)