## Importing libraries, modules and files

### importing modules

In [1]:

import numpy as np
import random
import cv2
import sort


In [2]:
# %pip install ultralytics
import ultralytics
from ultralytics import YOLO
ultralytics.__version__

'8.0.132'

In [3]:
import imageio
import time
import matplotlib.pyplot as plt
import shutil
import os


In [4]:
import skimage
skimage.__version__ 

'0.21.0'

## One-Way-Traffic

In [5]:
camera_url = "one_way.mp4"

In [6]:
class YOLOv8_ObjectDetector:

    def __init__(self, model_file = 'yolov8n.pt', labels= None, classes = None, conf = 0.40, iou = 0.45 ):

        self.classes = classes
        self.conf = conf
        self.iou = iou

        self.model = YOLO(model_file)
        self.model_name = model_file.split('.')[0]
        self.results = None

        if labels == None:
            self.labels = self.model.names

    def predict_img(self, img, verbose=True):

        results = self.model(img, classes=self.classes, conf=self.conf, iou=self.iou, verbose=verbose)
        self.orig_img = img
        self.results = results[0]

        # Return the detection results
        return results[0]
        

    def custom_display(self, colors, show_cls = True, show_conf = True):

        img = self.orig_img

        bbx_thickness = (img.shape[0] + img.shape[1]) // 450

        for box in self.results.boxes:
            textString = ""

            score = box.conf.item() * 100
            class_id = int(box.cls.item())

            x1 , y1 , x2, y2 = np.squeeze(box.xyxy.numpy()).astype(int)

            if show_cls:
                textString += f"{self.labels[class_id]}"

            if show_conf:
                textString += f" {score:,.2f}%"

            font = cv2.FONT_HERSHEY_COMPLEX
            fontScale = (((x2 - x1) / img.shape[0]) + ((y2 - y1) / img.shape[1])) / 2 * 2.5
            fontThickness = 1
            textSize, baseline = cv2.getTextSize(textString, font, fontScale, fontThickness)

            # Draw bounding box, a centroid and label on the image
            img = cv2.rectangle(img, (x1,y1), (x2,y2), colors[class_id], bbx_thickness)
            center_coordinates = ((x1 + x2)//2, (y1 + y2) // 2)

            img =  cv2.circle(img, center_coordinates, 5 , (0,0,255), -1)
            
             # If there are no details to show on the image
            if textString != "":
                if (y1 < textSize[1]):
                    y1 = y1 + textSize[1]
                else:
                    y1 -= 2
                # show the details text in a filled rectangle
                img = cv2.rectangle(img, (x1, y1), (x1 + textSize[0] , y1 -  textSize[1]), colors[class_id], cv2.FILLED)
                img = cv2.putText(img, textString , 
                    (x1, y1), font, 
                    fontScale,  (0, 0, 0), fontThickness, cv2.LINE_AA)

        return img


    def predict_video(self,**display_args):
        cap = cv2.VideoCapture(camera_url)
        cv2.namedWindow('Video', cv2.WINDOW_NORMAL)
        cv2.resizeWindow('Video', 800, 600)

        width = int(cap.get(3))  # get `width`
        height = int(cap.get(4))  # get `height`
        if not cap.isOpened():
            print("Error opening video stream or file")


        while cap.isOpened():
            ret, frame = cap.read()

            if not ret:
                break
            beg = time.time()
            results = self.predict_img(frame, verbose=False)
            if results is None:
                print('***********************************************')
            fps = 1 / (time.time() - beg)
            frame == self.custom_display(**display_args)
            cv2.waitKey(1)
            cv2.imshow('Video', frame)
            cv2.waitKey(1) 
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        cap.release()
        cv2.destroyAllWindows()

    


In [7]:
class YOLOv8_ObjectCounter(YOLOv8_ObjectDetector):

    def __init__(self, model_file = 'yolov8n.pt', labels= None, classes = [2, 3, 5, 7], conf = 0.25, iou = 0.45, 
                 track_max_age = 45, track_min_hits= 15, track_iou_threshold = 0.3 ):

        super().__init__(model_file , labels, classes, conf, iou)

        self.track_max_age = track_max_age
        self.track_min_hits = track_min_hits
        self.track_iou_threshold = track_iou_threshold


        

    def predict_video(self, verbose = True, **display_args):
        
        cap = cv2.VideoCapture(camera_url)
        cv2.namedWindow('Video', cv2.WINDOW_NORMAL)
        cv2.resizeWindow('Video', 800, 600)
        width  = int(cap.get(3) )  # get `width` 
        height = int(cap.get(4) )  # get `height` 
       
        if not cap.isOpened():
            print("Error opening video stream or file")

        # Initialize object tracker
        tracker = sort.Sort(max_age = self.track_max_age, min_hits= self.track_min_hits , 
                            iou_threshold = self.track_iou_threshold)
        
        # Initialize variables for object counting
        totalCount = []
        currentArray = np.empty((0, 5))


        # Read the video frames
        while cap.isOpened():
            detections = np.empty((0, 5))
            ret, frame = cap.read()
            if not ret:
                # print("Error reading frame")
                break

            beg = time.time()
            results = self.predict_img(frame, verbose = False)
            if results == None:
                print('***********************************************')
            fps = 1 / (time.time() - beg)
            for box in results.boxes:
                score = box.conf.item() * 100
                class_id = int(box.cls.item())

                x1 , y1 , x2, y2 = np.squeeze(box.xyxy.numpy()).astype(int)

                currentArray = np.array([x1, y1, x2, y2, score])
                detections = np.vstack((detections, currentArray))
            resultsTracker = tracker.update(detections)
            for result in resultsTracker:
                x1, y1, x2, y2, id = result
                x1, y1, x2, y2, id = int(x1), int(y1), int(x2), int(y2), int(id)


                # Display current objects IDs
                w, h = x2 - x1, y2 - y1
                cx, cy = x1 + w // 2, y1 + h // 2
                id_txt = f"ID: {str(id)}"
                cv2.putText(frame, id_txt, (cx, cy), 4, 0.5, (0, 0, 255), 1)

                # if we haven't seen aprticular object ID before, register it in a list 
                if totalCount.count(id) == 0:
                    totalCount.append(id)

            frame == self.custom_display(**display_args)


            count_txt = f"TOTAL COUNT : {len(totalCount)}"
            frame = cv2.putText(frame, count_txt, (5,45), cv2.FONT_HERSHEY_COMPLEX, 0.5, (255, 0, 0), 1,cv2.LINE_AA)
            cv2.imshow('Video', frame)
            cv2.waitKey(1)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()

### Instanciating YOLOv8_ObjectCounter objects 

In [8]:
yolo_names = ['yolov8n.pt']
colors = []
for _ in range(80):
    rand_tuple = (random.randint(50, 255), random.randint(50, 255), random.randint(50, 255))
    colors.append(rand_tuple)


In [9]:
counters = []
for yolo_name in yolo_names:
    counter = YOLOv8_ObjectCounter(yolo_name, conf = 0.25 )
    counters.append(counter)

### Performing object detection, tracking and counting 

In [10]:
for counter in counters:
    counter.predict_video( colors = colors)

## Two-Way-Traffic

In [17]:
import cv2
import numpy as np
import random
import time
import sort

class YOLOv8_ObjectDetector2:
    def __init__(self, model_file='yolov8n.pt', labels=None, classes=None, conf=0.40, iou=0.45):
        self.classes = classes
        self.conf = conf
        self.iou = iou

        self.model = YOLO(model_file)
        self.model_name = model_file.split('.')[0]
        self.results = None

        if labels is None:
            self.labels = self.model.names

    def predict_img(self, img, verbose=True):
        results = self.model(img, classes=self.classes, conf=self.conf, iou=self.iou, verbose=verbose)
        self.orig_img = img
        self.results = results[0]
        return results[0]

    def custom_display(self, colors, show_cls=True, show_conf=True):
        img = self.orig_img
        bbx_thickness = (img.shape[0] + img.shape[1]) // 450

        for box in self.results.boxes:
            textString = ""
            score = box.conf.item() * 100
            class_id = int(box.cls.item())
            x1, y1, x2, y2 = np.squeeze(box.xyxy.numpy()).astype(int)

            if show_cls:
                textString += f"{self.labels[class_id]}"

            if show_conf:
                textString += f" {score:,.2f}%"

            font = cv2.FONT_HERSHEY_COMPLEX
            fontScale = (((x2 - x1) / img.shape[0]) + ((y2 - y1) / img.shape[1])) / 2 * 2.5
            fontThickness = 1
            textSize, baseline = cv2.getTextSize(textString, font, fontScale, fontThickness)

            if textString != "":
                if y1 < textSize[1]:
                    y1 = y1 + textSize[1]
                else:
                    y1 -= 2
                img = cv2.rectangle(img, (x1, y1), (x1 + textSize[0], y1 - textSize[1]), colors[class_id], cv2.FILLED)
                img = cv2.putText(img, textString, (x1, y1), font, fontScale, (0, 0, 0), fontThickness, cv2.LINE_AA)

        return img

    def predict_video(self, camera_url, colors, **display_args):
        cap = cv2.VideoCapture(camera_url)
        cv2.namedWindow('Video0', cv2.WINDOW_NORMAL)
        cv2.resizeWindow('Video0', 800, 600)
        width = int(cap.get(3))
        height = int(cap.get(4))
        if not cap.isOpened():
            print("Error opening video stream or file")

        while cap.isOpened():
            cv2.namedWindow('Video0', cv2.WINDOW_NORMAL)
            cv2.resizeWindow('Video0', 800, 600)
            ret, frame = cap.read()

            if not ret:
                print("Error reading frame")
                break

            results = self.predict_img(frame, verbose=False)
            fps = cap.get(cv2.CAP_PROP_FPS)
            frame = self.custom_display(colors, **display_args)
            cv2.imshow('Video0', frame)
            cv2.waitKey(1)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()


class YOLOv8_ObjectCounter2(YOLOv8_ObjectDetector2):
    def __init__(self, model_file='yolov8n.pt', labels=None, classes=[2, 3, 5, 7], conf=0.25, iou=0.45,
                 track_max_age=45, track_min_hits=15, track_iou_threshold=0.3):
        super().__init__(model_file, labels, classes, conf, iou)
        self.track_max_age = track_max_age
        self.track_min_hits = track_min_hits
        self.track_iou_threshold = track_iou_threshold

        self.count_in = 0
        self.count_out = 0
        self.counted_ids = []

    def predict_video(self, camera_url, colors, **display_args):
        cap = cv2.VideoCapture(camera_url)

        cv2.namedWindow('Video1', cv2.WINDOW_NORMAL)
        cv2.resizeWindow('Video1', 800, 600)
        width = int(cap.get(3))
        height = int(cap.get(4))
        if not cap.isOpened():
            print("Error opening video stream or file")

        tracker = sort.Sort(max_age=self.track_max_age, min_hits=self.track_min_hits,
                            iou_threshold=self.track_iou_threshold)
       
        while cap.isOpened():
            cv2.namedWindow('Video1', cv2.WINDOW_NORMAL)
            cv2.resizeWindow('Video1', 800, 600)
            ret, frame = cap.read()

            if not ret:
                # print("Error reading frame"):
                break

            results = self.predict_img(frame, verbose=False)
            detections = np.empty((0, 5))
            for box in results.boxes:
                score = box.conf.item() * 100
                class_id = int(box.cls.item())
                x1, y1, x2, y2 = np.squeeze(box.xyxy.numpy()).astype(int)
                currentArray = np.array([x1, y1, x2, y2, score])
                detections = np.vstack((detections, currentArray))

            resultsTracker = tracker.update(detections)

            for result in resultsTracker:
                x1, y1, x2, y2, id = result
                x1, y1, x2, y2, id = int(x1), int(y1), int(x2), int(y2), int(id)

                if self.is_crossing_in(x1, y1, x2, y2, height, width) and id not in self.counted_ids:
                    frame = cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
                    self.count_in += 1
                    self.counted_ids.append(id)
                    
                elif self.is_crossing_out(x1, y1, x2, y2, height, width) and id not in self.counted_ids:
                    frame = cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                    self.count_out += 1
                    self.counted_ids.append(id)
                    
                w, h = x2 - x1, y2 - y1
                cx, cy = x1 + w // 2, y1 + h // 2
                
                if id in self.counted_ids:
                    id_color = (0, 0, 255)  # red counted yet
                else:
                    id_color = (255, 0, 0)  # Blue not counted yet

                id_txt = f"ID: {str(id)}"
                cv2.putText(frame, id_txt, (cx, cy), cv2.FONT_HERSHEY_SIMPLEX, 0.5, id_color, 1)

            count_in_txt = f"Count In: {self.count_in}"
            count_out_txt = f"Count Out: {self.count_out}"

            frame = cv2.putText(frame, count_in_txt, (12, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 255, 0), 2)
            frame = cv2.putText(frame, count_out_txt, (17, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), 2)

            line_y = int(0.8 * height)
            segment_length = int(0.8 * width)  

            cv2.line(frame, (0, line_y), (width, line_y), (0, 255, 0), 1)  # Segment horizontal plus fin
            
            line_y2 = int(0.5 * height)  # Ligne horizontale à 50% de la hauteur
            segment_start2 = int((width - segment_length) / 2)  # Point de départ du segment

            cv2.line(frame, (segment_start2, line_y2), (segment_start2 + segment_length, line_y2), (255, 255, 255), 1)  # Segment horizontal plus fin
            
            cv2.imshow('Video1', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()

    @staticmethod
    def is_crossing_in(x1, y1, x2, y2, height, width):
        line_y = int(0.8 * height)  # ligne horizontale
        line_x = int(0.5 * width)  # ligne verticale
        line_y2 = int(0.50 * height)  # ligne
        return y2 < line_y and x2 < line_x and y2 > line_y2

    @staticmethod
    def is_crossing_out(x1, y1, x2, y2, height, width):
        line_y = int(0.8 * height)  # Coordonnée y de la ligne horizontale
        line_x = int(0.5 * width)  # Coordonnée x de la ligne verticale
        line_y2 = int(0.50 * height)  # ligne
        return y2 < line_y and x2 > line_x and y2 > line_y2

yolo_names = ['yolov8n.pt']
colors = []
for _ in range(80):
    rand_tuple = (random.randint(50, 255), random.randint(50, 255), random.randint(50, 255))
    colors.append(rand_tuple)

counters = [YOLOv8_ObjectCounter2(model_file='yolov8n.pt')]
camera_url = 'twoo_way_traffic.mp4'
for counter in counters:
    counter.predict_video(camera_url, colors=colors)
