In [1]:
from deep_sort_realtime.deepsort_tracker import DeepSort

#Initialise the object tracker class
object_tracker = DeepSort()

In [2]:
import os
import time

In [3]:
# Creating a class for object detection which plots boxes and scores frames in addition to detecting an
# object
import cv2
import torch

class YoloDetector():

    def __init__(self, model):
        #Using yolov5s for our purposes of object detection, you may use a larger model
        self.model = torch.hub.load('ultralytics/yolov5', 'custom', path=model, force_reload= True)
        self.classes = self.model.names
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        print('Using Device: ', self.device)

    def score_frame(self, frame):
        self.model.to(self.device)
        downscale_factor = 2
        width = int(frame.shape[1] / downscale_factor)
        height = int(frame.shape[0] / downscale_factor)
        frame = cv2.resize(frame, (width, height))

        results = self.model(frame)

        labels, cord = results.xyxyn[0][:, -1], results.xyxyn[0][:, :-1]

        return labels, cord

    def class_to_label(self, x):
        return self.classes[int(x)]

    def plot_boxes(self, results, frame, height, width, confidence=0.15):

      labels, cord = results
      detections = []

      n = len(labels)
      x_shape, y_shape = width, height

      for i in range(n):
        row = cord[i]
        if row[4]>=confidence:
            x1, y1, x2, y2 = int(row[0]*x_shape), int(row[1]*y_shape), int(row[2]*x_shape), int(row[3]*y_shape)
            detections.append(([x1, y1, int(x2-x1), int(y2-y1)], row[4].item(), str(self.class_to_label(labels[i]))))
      return frame, detections

In [4]:
import numpy as np
import math
from PIL import Image, ImageEnhance
import cv2
import onnxruntime
import torch
class PreprocessingTracker():

    def __init__(self):
        #Using yolov5s for our purposes of object detection, you may use a larger model
        # self.onnx_model_path = '/home/firmen@student.ub.ac.id/Skripsi_Object_Tracking/uji_ablasi/model640.onnx'
        self.onnx_model_path = 'C:/Users/acer/Documents/coolyeah/skripsi/Revisi/real time code/model640.onnx'
        self.onnx_session = onnxruntime.InferenceSession(self.onnx_model_path)
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        print('Using Device: ', self.device)
    def learning_preprocessing(self, img):
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert to RGB format
        
        resized_image = cv2.resize(img, (640, 640)).astype(np.float32) /255.0
        input_data = np.transpose(resized_image, (2, 0, 1))
        input_data = np.expand_dims(input_data, axis=0)
        output = self.onnx_session.run(None, {'input': input_data})
        prediction = np.squeeze(output[0])

        pred_mask = prediction
        pred_mask[pred_mask < 0] = 0
        pred_mask[pred_mask > 0] = 1
        pred_mask = np.uint8(pred_mask * 255)
        resized_image = cv2.resize(img, (1920, 1080))
        resized_mask = cv2.resize(pred_mask, (1920, 1080))
        inpainted_image = cv2.inpaint(resized_image, resized_mask, 3, cv2.INPAINT_TELEA)
        fix_image = cv2.cvtColor(inpainted_image, cv2.COLOR_BGR2RGB)
        return fix_image

    def CV2_to_PIL_img(self, cv2_im):
        cv2_im = cv2.cvtColor(cv2_im, cv2.COLOR_BGR2RGB)
        pil_im = Image.fromarray(cv2_im)
        return pil_im

    def PIL_to_CV2_img(self, img):
        cv_image = np.array(img.convert('RGB')) 
        cv_image = cv_image[:, :, ::-1].copy() 
        return cv_image

    def first_polynomial_function(self, image):
        table = np.array([1.657766*i-0.009157128*(i**2) + 0.00002579473*(i**3)
            for i in np.arange(0, 256)]).astype("uint8")
        return cv2.LUT(image, table)

    def second_polynomial_function(self, image):
        table = np.array([
            -4.263256 * math.exp(-14)+1.546429*i-0.005558036*(i**2)+0.00001339286*(i**3)
            for i in np.arange(0, 256)]).astype("uint8")
        return cv2.LUT(image, table)

    def adjust_gamma(self, image, gamma=1.0):
        invGamma = 1.0 / gamma
        table = np.array([((i / 255.0) ** invGamma) * 255
            for i in np.arange(0, 256)]).astype("uint8")
                
        return cv2.LUT(image, table)

    def enhance_contrast(self, image, factor=1.4):
        _image = ImageEnhance.Contrast(
            self.CV2_to_PIL_img(image)
        ).enhance(factor)
        
        return self.PIL_to_CV2_img(_image)

    def reduce_glare(self, image):
        _image = self.adjust_gamma(
            self.second_polynomial_function(
                self.adjust_gamma(
                    self.first_polynomial_function(image), 
                    0.75
                )
            ),
            0.8
        )
        return _image

    def mix_filter(self, image):
        _image = self.enhance_contrast(
            self.reduce_glare(
                self.enhance_contrast(
                    self.reduce_glare(image), 
                    factor=1.6
                )
            ), 
            factor=1.4
        )
        return _image

In [5]:
cap = cv2.VideoCapture("rtsp://10.45.188.254:31554/nvstream/home/vst/vst_release/streamer_videos/veteran_in_20230522T111503.mkv")
object_tracker = DeepSort()

detector = YoloDetector('C:/Users/acer/Documents/coolyeah/skripsi/Revisi/real time code/no_preprocessing.pt')
preprocessing_frame = PreprocessingTracker()
set_id_datang = set()
set_id_lewat = set()
# Define the counting line
line_start = (0, 600)
line_end = (1700, 600)

# Initialize counters
counter_car = 0
counter_motorbike = 0

while True or cap.isOpened():
    start = time.perf_counter()

    success, img = cap.read()
    if not success:
        break
    # if preprocessing_type == 'machine_learning':
    #         preprocessing_img = preprocessing_frame.learning_preprocessing(img)
    # elif preprocessing_type == 'reduce_glare':
    #     preprocessing_img = preprocessing_frame.mix_filter(img)
    #     best_confidence = 0.421
    # else:
    #     preprocessing_img = img
    #     best_confidence = 0.566
    preprocessing_img = img
    best_confidence = 0.566

    results = detector.score_frame(preprocessing_img)
    img, detections = detector.plot_boxes(results, preprocessing_img, height=preprocessing_img.shape[0], width=preprocessing_img.shape[1], confidence=best_confidence)
    tracks = object_tracker.update_tracks(detections, frame=img)

    for track in tracks:
        if not track.is_confirmed():
            continue
        track_id = track.track_id
        ltrb = track.to_ltrb()

        bbox = ltrb
        # c_x, c_y, _, _ =  track.to_tlbr()

        # Draw the counting line
        cv2.line(img, line_start, line_end, (255, 250, 250), 2)

        # Draw bounding box
        cv2.rectangle(img, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (0, 0, 255), 2)

        # Draw track ID and class label
        cv2.putText(img, f"ID: {str(track_id)} {track.get_det_class()}", (int(bbox[0]), int(bbox[1] - 10)), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 2)

        # Check if the bounding box crosses the counting line
        if bbox[1] < line_start[1] and bbox[3] > line_start[1]:
            if track.get_det_class() == 'Car' and track_id not in set_id_lewat and track_id in set_id_datang:
                counter_car += 1
                set_id_lewat.add(track_id)
            elif track.get_det_class() == 'Motorbike' and track_id not in set_id_lewat and track_id in set_id_datang:
                counter_motorbike += 1
                set_id_lewat.add(track_id)
        else:
            set_id_datang.add(track_id) if track_id not in set_id_datang else None

    end = time.perf_counter()
    totalTime = end - start
    fps = 1 / totalTime

    cv2.putText(img, f'FPS: {int(fps)}', (20, 1080 - 170), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 2)

    # Display counters
    cv2.putText(img, f'Car Count: {counter_car}', (20, 1080 - 120), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 2)
    cv2.putText(img, f'Motorbike Count: {counter_motorbike}', (20, 1080 - 70), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 2)

    cv2.imshow("Object Counting", img)
    key = cv2.waitKey(1) & 0xFF
    if key == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

Downloading: "https://github.com/ultralytics/yolov5/zipball/master" to C:\Users\acer/.cache\torch\hub\master.zip
YOLOv5  2024-4-1 Python-3.9.2 torch-2.1.1+cpu CPU



Exception: cannot instantiate 'PosixPath' on your system. Cache may be out of date, try `force_reload=True` or see https://docs.ultralytics.com/yolov5/tutorials/pytorch_hub_model_loading for help.

In [None]:
import cv2
import time
import matplotlib.pyplot as plt
# Define video writer parameters
def make_track_video(src_video_path, dest_path, model, preprocessing_type):
    object_tracker = DeepSort()

    detector = YoloDetector(model)
    preprocessing_frame = PreprocessingTracker()

    cap = cv2.VideoCapture(src_video_path)

    # Setting resolution for webcam
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 640)
    output_path = dest_path
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Use 'XVID' or 'MJPG' for different video codecs
    fps = 20  # Adjust the frames per second as needed
    video_writer = cv2.VideoWriter(output_path, fourcc, fps, (1920, 1080))  # Make sure to set 'width' and 'height' to the appropriate values

    # Define the counting line
    line_start = (0, 600)
    line_end = (1700, 600)

    # Initialize counters
    counter_car = 0
    counter_motorbike = 0
    best_confidence = 0.3

    set_id_datang = set()
    set_id_lewat = set()
    while cap.isOpened():
        success, img = cap.read()
        if not success:
            break
        # print(img.shape)
        start = time.perf_counter()
        if preprocessing_type == 'machine_learning':
            preprocessing_img = preprocessing_frame.learning_preprocessing(img)
        elif preprocessing_type == 'reduce_glare':
            preprocessing_img = preprocessing_frame.mix_filter(img)
            best_confidence = 0.421
        else:
            preprocessing_img = img
            best_confidence = 0.566

        results = detector.score_frame(preprocessing_img)
        img, detections = detector.plot_boxes(results, preprocessing_img, height=preprocessing_img.shape[0], width=preprocessing_img.shape[1], confidence=best_confidence)
        tracks = object_tracker.update_tracks(detections, frame=img)

        for track in tracks:
            if not track.is_confirmed():
                continue
            track_id = track.track_id
            ltrb = track.to_ltrb()

            bbox = ltrb
            # c_x, c_y, _, _ =  track.to_tlbr()

            # Draw the counting line
            cv2.line(img, line_start, line_end, (255, 250, 250), 2)

            # Draw bounding box
            cv2.rectangle(img, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (0, 0, 255), 2)

            # Draw track ID and class label
            cv2.putText(img, f"ID: {str(track_id)} {track.get_det_class()}", (int(bbox[0]), int(bbox[1] - 10)), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 2)

            # Check if the bounding box crosses the counting line
            if bbox[1] < line_start[1] and bbox[3] > line_start[1]:
                if track.get_det_class() == 'Car' and track_id not in set_id_lewat and track_id in set_id_datang:
                    counter_car += 1
                    set_id_lewat.add(track_id)
                elif track.get_det_class() == 'Motorbike' and track_id not in set_id_lewat and track_id in set_id_datang:
                    counter_motorbike += 1
                    set_id_lewat.add(track_id)
            else:
                set_id_datang.add(track_id) if track_id not in set_id_datang else None

        end = time.perf_counter()
        totalTime = end - start
        fps = 1 / totalTime

        cv2.putText(img, f'FPS: {int(fps)}', (20, 1080 - 170), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 2)

        # Display counters
        cv2.putText(img, f'Car Count: {counter_car}', (20, 1080 - 120), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 2)
        cv2.putText(img, f'Motorbike Count: {counter_motorbike}', (20, 1080 - 70), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 2)

        # Write the frame to the video
        video_writer.write(img)

        if cv2.waitKey(1) & 0xFF == 27:
            break

    cap.release()
    video_writer.release()
    cv2.destroyAllWindows()


In [None]:
# import cv2

# cap = cv2.VideoCapture("rtsp://10.45.188.254:31554/nvstream/home/vst/vst_release/streamer_videos/veteran_in_20230522T111503.mkv")

# while True:
#     _, img = cap.read()
#     cv2.imshow("Object Counting", img)
#     key = cv2.waitKey(1) & 0xFF
#     if key == ord('q'):
#         break
# cap.release()
# cv2.destroyAllWindows()