In [2]:
from ultralytics import YOLO
import time
import datetime
import streamlit as st
import cv2
from pytube import YouTube
from count_obj import CountObject
import settings
import torch
import pandas as pd
import PIL
import math
import json
import requests
import settings
from streamlit_drawable_canvas import st_canvas
import speed_check
import base64


In [1]:
def detect_speed(source, list_line, conf, model):
    vid_cap = cv2.VideoCapture(source)
    print(vid_cap)
    w, h, fps = (
        int(vid_cap.get(x))
        for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS)
    )
    print(w, h, fps)
    line_pts = list_line
    # st_frame = st.empty()
    # btn = st.button("stop")
    # Get video info
    speed_obj = speed_check.SpeedEstimator()
    speed_obj.set_args(reg_pts=line_pts, names=model.model.names)
    frame_time = 1.0 / fps
    frames_processed = 0
    frames_to_skip = 0
    while vid_cap.isOpened():
        success, image = vid_cap.read()

        if success:
            # Process and display every nth frame, where n is frames_to_skip + 1
            if frames_processed % (frames_to_skip + 1) == 0:
                process_start_time = time.time()
                tracks = model.track(image, persist=True, show=False)

                image = speed_obj.estimate_speed(image, tracks)
                # display_count(st_frame, image)
                print(speed_obj.get_speed_data())
                process_end_time = time.time()
                processing_time = process_end_time - process_start_time
                frames_to_skip = max(
                    0, math.floor((processing_time - frame_time) / frame_time)
                )
                print(
                    f"Processing time: {processing_time}, Frames to skip: {frames_to_skip}"
                )
                frames_processed = 0

            frames_processed += 1
        else:
            vid_cap.release()
            break

        # if btn:
        #     st.write("stop")


In [4]:
def convert_to_base64(image):
    _, im_arr = cv2.imencode(
        ".jpg", image
    )  # im_arr: image in Numpy one-dim array format.
    im_bytes = im_arr.tobytes()
    im_b64 = base64.b64encode(im_bytes)
    return im_b64


In [5]:
import datetime

def create_dict(ip_camera, length, speed, imgbase64, license_province=None, license_front=None, license_back=None):
    """
    Create a dictionary containing information about a vehicle.

    Parameters:
        ip_camera (str): The IP address of the camera capturing the vehicle.
        length (float): The length of the vehicle.
        speed (float): The speed of the vehicle.
        imgbase64 (str): The base64 encoded image of the vehicle.
        license_province (str, optional): The province of the vehicle's license plate.
        license_front (str, optional): The front image of the vehicle's license plate.
        license_back (str, optional): The back image of the vehicle's license plate.

    Returns:
        dict: A dictionary containing the vehicle information.
    """
    # Type checking
    if not isinstance(ip_camera, str):
        raise TypeError("ip_camera must be a string")
    if not isinstance(length, float):
        raise TypeError("length must be a float")
    if not isinstance(speed, float):
        raise TypeError("speed must be a float")
    if not isinstance(imgbase64, str):
        raise TypeError("imgbase64 must be a string")
    if license_province is not None and not isinstance(license_province, str):
        raise TypeError("license_province must be a string or None")
    if license_front is not None and not isinstance(license_front, str):
        raise TypeError("license_front must be a string or None")
    if license_back is not None and not isinstance(license_back, str):
        raise TypeError("license_back must be a string or None")

    # Create the dictionary
    vehicle_dict = {}
    vehicle_dict['cameraIp'] = ip_camera
    vehicle_dict['trxDatetime'] = str(datetime.datetime.now())
    vehicle_dict['vehicleLength'] = length
    vehicle_dict['vehicleSpeed'] = speed
    vehicle_dict['licencePlateProvince'] = license_province
    vehicle_dict['licencePlateFront'] = license_front
    vehicle_dict['licencePlateBack'] = license_back
    vehicle_dict['vehicleImage'] = imgbase64
    return vehicle_dict


In [8]:
device: str = "cuda" if torch.cuda.is_available() else "cpu"
model = YOLO('yolov8s.pt')
model.to(device)
path='videos/video1.mp4'
list_line = [(0, 360), (1920, 360)]
detect_speed(path, list_line, conf=0.5, model=model)




< cv2.VideoCapture 000001B807BD8DF0>
1920 1080 59

0: 384x640 21 cars, 1 truck, 458.8ms
Speed: 3.0ms preprocess, 458.8ms inference, 4.0ms postprocess per image at shape (1, 3, 384, 640)
{}
Processing time: 1.0404431819915771, Frames to skip: 60

0: 384x640 17 cars, 1 truck, 827.5ms
Speed: 5.7ms preprocess, 827.5ms inference, 5.2ms postprocess per image at shape (1, 3, 384, 640)
{}
Processing time: 1.0268161296844482, Frames to skip: 59

0: 384x640 17 cars, 2 trucks, 711.1ms
Speed: 5.0ms preprocess, 711.1ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)
{}
Processing time: 0.8169064521789551, Frames to skip: 47

0: 384x640 21 cars, 3 trucks, 407.0ms
Speed: 5.0ms preprocess, 407.0ms inference, 4.0ms postprocess per image at shape (1, 3, 384, 640)
{}
Processing time: 0.5276579856872559, Frames to skip: 30

0: 384x640 21 cars, 2 trucks, 455.8ms
Speed: 4.9ms preprocess, 455.8ms inference, 4.0ms postprocess per image at shape (1, 3, 384, 640)
{}
Processing time: 0.594647884

: 

In [9]:
import json
json_file='list_line.json'
with open(json_file) as f:
    data = json.load(f)
    print(data)

data

{'1.1.1.1': [[0, 360], [1920, 360]], '2.2.2.2': [[0, 360], [1920, 360]]}


{'1.1.1.1': [[0, 360], [1920, 360]], '2.2.2.2': [[0, 360], [1920, 360]]}

In [13]:
import json

class IPCameraDatabase:
    def __init__(self, filename):
        self.filename = filename
        self.data = self.load_data()

    def load_data(self):
        try:
            with open(self.filename, 'r') as file:
                data = json.load(file)
            return data
        except FileNotFoundError:
            return {}

    def save_data(self):
        with open(self.filename, 'w') as file:
            json.dump(self.data, file, indent=4)

    def add_ip_camera(self, ip_camera, lines):
        self.data[ip_camera] = lines
        self.save_data()

    def edit_ip_camera(self, ip_camera, new_lines):
        if ip_camera in self.data:
            self.data[ip_camera] = new_lines
            self.save_data()
            return True
        return False

# Example usage:
filename = "list_line.json"

# Create instance of IPCameraDatabase
ip_camera_db = IPCameraDatabase(filename)

# Add new ip_camera and lines
ip_camera_db.add_ip_camera("3.3.3.3", [[0, 360], [1920, 360]])

# Edit existing ip_camera to new lines
if ip_camera_db.edit_ip_camera("1.1.1.1", [[0, 200], [1920, 200]]):
    print("Edit successful.")
else:
    print("Edit failed. IP camera not found.")

# Output the updated data
print("Updated data:")
print(ip_camera_db.data)


Edit successful.
Updated data:
{'1.1.1.1': [[100, 200], [300, 400]], '2.2.2.2': [[0, 360], [1920, 360]], '3.3.3.3': [[0, 360], [1920, 360]]}
