Install Necessary Libraries (Database Training Part)

In [None]:
!pip install ultralytics
!nvidia-smi
pip install wandb
!pip install GPUtil

Set Training Configuration

In [None]:
# Build from YAML and transfer weights
model = YOLO('yolov8x.yaml').load('yolov8x.pt')  

# Training The Final Model
results = model.train(data="/kaggle/input/addbdvss/traffic_update.yaml",epochs=30, imgsz = 416, batch = 64 ,lr0=0.0001, dropout= 0.15, device = 0)

In [None]:
from ultralytics import YOLO
import os
!yolo task=detect mode=train model=yolov8x.pt data="/kaggle/input/addbdvss/traffic_update.yaml" epochs=30 imgsz=640

Free Up Occupied GPU Storage

In [None]:
import torch
from GPUtil import showUtilization as gpu_usage
from numba import cuda

def free_gpu_cache():
    print("Initial GPU Usage")
    gpu_usage()                             

    torch.cuda.empty_cache()

    cuda.select_device(0)
    cuda.close()
    cuda.select_device(0)

    print("GPU Usage after emptying the cache")
    gpu_usage()

free_gpu_cache()                           


Install Necessary Libraries (Test Part)

In [None]:
!pip install ultralytics
!pip install cvzone
!pip install supervision

Vehicles Classification, Zone Based Counting, Speed Estimation

In [None]:
# Import necessary libraries
import os
from ultralytics import YOLO, solutions
from ultralytics.utils.plotting import Annotator, colors
import cv2
from cvzone import putTextRect
import numpy as np
from cvzone import cornerRect
import utili as speed_estimation
import counter_utili as object_counter  #Added New 
import supervision as sv
from supervision import Color
import matplotlib.pyplot as plt
import time

# Initialize a dictionary to store tracking information for each vehicle
vehicle_data = {}
class_count = {}

# Known distance between the lines (in meters)
distance = 8  # example distance in meters

#counting parameters 

in_counts = 0 
out_counts = 0 
countZone=0

#Set Input Directory
input_video_dir = '/kaggle/input/testoll1'
input_video_filename = 'Test2_2.mp4'
video_path = os.path.join(input_video_dir, input_video_filename)

# Derive output video path
output_video_filename = f'{os.path.splitext(input_video_filename)[0]}_out.mp4'
output_video_dir = '/kaggle/working/'
video_path_out = os.path.join(output_video_dir, output_video_filename)



# Open the input video file for reading
cap = cv2.VideoCapture(video_path)

#Calculate the Input FPS
inputFps = cap.get(cv2.CAP_PROP_FPS)

# Read the first frame of the video
ret, frame = cap.read()

# Get the height, width, and number of channels of the frame
H, W, _ = frame.shape

# Coordinates of the rectangular portion (ROI)
roix1, roiy1, roix2, roiy2 = 0, int(H/3), W, H  # Update these coordinates according to your ROI

# Create a VideoWriter object to save the output video
out = cv2.VideoWriter(video_path_out, cv2.VideoWriter_fourcc(*'MP4V'), int(cap.get(cv2.CAP_PROP_FPS)), (W, H))

# Specify the path of the YOLO model
model_path2 = os.path.join('.', r"/kaggle/input/test-data/Test_On_Kaggle/results_BDVS1/runs/detect/train8/weights/best.pt")

# Load the YOLO model

model2 = YOLO(model_path2)

names = model2.model.names
flagLine1 = False
flagLine2 = False


# Set a threshold for object detection confidence
threshold = 0.5

 #Polyzone data for counting in zones 
    
p1 = (21, 542)
p2 = (190, 410)
p3 = (538, 395)
p4 = (541, 617)

#Array from direct generated zone
ptsArrayZone1 = [(93, 499), (290, 345), (545, 329), (548, 573)]
ptsArrayZone2 = [(649, 360), (821, 317), (1195, 514), (658, 610)]
# Convert points to a numpy array
#pts = np.array([p1, p2, p3, p4], np.int32)
ptsZone1 = np.array(ptsArrayZone1, np.int32).reshape((-1, 1, 2))
ptsZone2 = np.array(ptsArrayZone2, np.int32).reshape((-1, 1, 2))

zones = [ptsZone1, ptsZone2]

countZoneList=[]
curFrameId = []


#Function for countZone 
countZoneLists = [[] , []]
def findZoneCounts(zones, centerPoint, track_id, countZoneLists):
    """
    Updates the vehicle count for multiple zones based on the position of the bounding box center.

    Args:
        zones (list): List of zones where each zone is defined as a list of points [(x1, y1), (x2, y2), ...].
        centerPoint (int): XY coordinate of the bounding box center.
        track_id (int): Unique identifier for the object being tracked.
        countZoneLists (list): List of lists where each sublist contains track IDs for a specific zone.

    Returns:
        countZoneLists (list): Updated list of track IDs for each zone.
    """

    for i, pts in enumerate(zones):
        if track_id not in countZoneLists[i] and cv2.pointPolygonTest(pts, centerPoint, False) >= 0:
            countZoneLists[i].append(track_id)
          
        elif track_id in countZoneLists[i] and cv2.pointPolygonTest(pts, centerPoint, False) < 0:
            countZoneLists[i].remove(track_id)

    return countZoneLists


#Function for storing Traces
# Initialize the dictionary to store track history
trk_history = {}
def store_track_info(track_id, box, trk_history):
    """
    Store track data.

    Args:
        track_id (int): Object track id.
        box (list): Object bounding box data.
        trk_history (dict): Dictionary to store the history of tracks.
    """
    # Initialize history if track_id is not in trk_history
    if track_id not in trk_history:
        trk_history[track_id] = []

    # Calculate the center of the bounding box
    bbox_center = (float((box[0] + box[2]) / 2), float((box[1] + box[3]) / 2))

    # Store the center point in the track history
    trk_history[track_id].append(bbox_center)

    # Limit the history to the last 30 points/length of traces
    if len(trk_history[track_id]) > 30:
        trk_history[track_id].pop(0)

    # Convert the list of points to the format required by cv2.polylines
    trk_pts = np.hstack(trk_history[track_id]).astype(np.int32).reshape((-1, 1, 2))
    
    return trk_pts



# Process each frame of the input video
while ret:

    # Extract the ROI from the frame
    roi = frame[roiy1:roiy2, roix1:roix2]
    
    # Speed Estimation Data
    width = int(W*0.9) #0.4
    speedLine1x , speedLine1y = 93 , 484
    speedLine2x , speedLine2y = 189 , 404
    offSet = 15  #For count region blink
    
    #Set Annotators
    annotator = Annotator(
    frame,
    line_width=None, #2
    font_size=None, #2
    font=cv2.FONT_HERSHEY_SIMPLEX,
    pil=False)
    

    
    tracks = model2.track(roi, persist=True, show=False) #frame
    # detections = sv.Detections.from_ultralytics(tracks[0])
#     print(detections)
#     break
#     #print("Tracks Reslut", tracks)
#     print("Preprocess", tracks[0].speed["preprocess"])
#     print("Inference", tracks[0].speed["inference"])
#     print("Postprocess", tracks[0].speed["postprocess"])
#     break

    for track in tracks[0]: 
        
        x1, y1, x2, y2, track_id, score, class_id = track.boxes.data.numpy()[0] 
        
        #Adjust the ROI
        x1+=roix1
        x2+=roix1
        y1+=roiy1
        y2+=roiy1
        cv2.line(frame, (0, int(roiy1)),(W,int(roiy1)), (0,255,255),2)
        
        x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
        track_id = int(track_id)
        class_id = int(class_id)
        curFrameId.append(track_id)
        confidence = round(score * 100, 2)
        class_name = names[int(class_id)].upper()
        box =  x1, y1, x2, y2
        # Calculate the center of the bounding box
        center_x = int((x1 + x2) / 2)
        center_y = int((y1 + y2) / 2)
        centerPoint = center_x, center_y
        
        #Adjust the colourPalatte from roboflow/ultralytics
        clrplate= sv.ColorPalette.DEFAULT
        b,g,r = clrplate.by_idx(track_id).b , clrplate.by_idx(track_id).g, clrplate.by_idx(track_id).r
        colorr = b,g,r
        #print(colors(int(track_id), True))
        #print(colorr)
        #cornerRect(frame, (int(x1), int(y1), int(x2)-int(x1), int(y2)-int(y1)), l=8, rt=2, t=5, colorR=(153, 51, 255), colorC=(0, 255, 0))
        #putTextRect(frame, f'{class_name} {track_id}', [int(x1), int(y1) -10],
                                #scale=1, thickness=2, colorR=(153, 51, 255), offset=5)
        
     #if score > threshold:
        #cv2.circle(frame, (center_x,center_y),2,(0,0,210),2, cv2.FILLED)
        
        #Count in zones 
#         if track_id not in countZoneList and cv2.pointPolygonTest(pts, (center_x, center_y), False) >= 0:
#             countZoneList.append(track_id)
#             countZone+=1
#         elif track_id in countZoneList and cv2.pointPolygonTest(pts, (center_x, center_y), False) < 0:
#             countZoneList.remove(track_id)
        countZoneListUpdated = findZoneCounts(zones, centerPoint, track_id, countZoneLists)
                
        
        #Create the dictionary with lists 
        if track_id not in vehicle_data:
            vehicle_data[track_id] = {"start_time": None, "end_time": None, "timeDiff": None, 
                                      "frame_count": 0, 'speed': None, 'speedFlag': False, "inFlag": False, 
                                      "outFlag": False, "processTime": []}
        
        if class_id not in class_count:
            class_count[class_id] = {"outCount": 0, "inCount": 0}
            
        #Determine Direction and Update the Counter
        if vehicle_data[track_id]["inFlag"] is False and vehicle_data[track_id]["outFlag"] is False and center_y >= (speedLine1y-10) and center_y <= (speedLine1y + 10):
                
            vehicle_data[track_id]["outFlag"] = True
            out_counts+=1
            class_count[class_id]["outCount"]+=1
            
            
        elif vehicle_data[track_id]["inFlag"] is False and vehicle_data[track_id]["outFlag"] is False and center_y <= (speedLine2y+10) and center_y >= (speedLine2y - 10):
                
            vehicle_data[track_id]["inFlag"] = True
            in_counts+=1
            class_count[class_id]["inCount"]+=1
        
        ## Speed for outgoing vehicles
        if  vehicle_data[track_id]["outFlag"]==True:

            # Check if the vehicle crosses the first line (red line)
            if vehicle_data[track_id]["start_time"] is None and center_y >= speedLine1y and center_y <= (speedLine1y + 3):
                
                #vehicle_data[track_id]["start_time"] = time.time()
                curFrameFps = cap.get(cv2.CAP_PROP_POS_FRAMES)
                vehicle_data[track_id]["start_time"] = curFrameFps/inputFps
                
                vehicle_data[track_id]["frame_count"] = 0  # Reset frame count
            # Increment the frame count if the vehicle has crossed the first line & Time count
            if vehicle_data[track_id]["start_time"] is not None and vehicle_data[track_id]["end_time"] is None:
                vehicle_data[track_id]["frame_count"] += 1
                curTime=tracks[0].speed["preprocess"]+tracks[0].speed["inference"]+tracks[0].speed["postprocess"]
                vehicle_data[track_id]['processTime'].append(curTime)
           
            # Check if the vehicle crosses the second line (green line)
            if vehicle_data[track_id]["start_time"] is not None and vehicle_data[track_id]["end_time"] is None and center_y >= speedLine2y and center_y <= (speedLine2y + 3):
                #vehicle_data[track_id]["end_time"] = time.time()
                curFrameFps = cap.get(cv2.CAP_PROP_POS_FRAMES)
                vehicle_data[track_id]["end_time"] = curFrameFps/inputFps
                #frame_duration = vehicle_data[track_id]["frame_count"] * 0.03  # 30ms per frame
                #total_time = (vehicle_data[track_id]["end_time"] - vehicle_data[track_id]["start_time"]) + frame_duration
                processTimeTotal= sum(vehicle_data[track_id]['processTime'])
                #total_time = (vehicle_data[track_id]["end_time"] - vehicle_data[track_id]["start_time"]) + (processTimeTotal/1000)
                total_time = (vehicle_data[track_id]["end_time"] - vehicle_data[track_id]["start_time"])
                vehicle_data[track_id]["timeDiff"] = total_time
          
        ##Speed for incoming vehicles  
        if  vehicle_data[track_id]["inFlag"]==True:

            # Check if the vehicle crosses the first line (red line)
            if vehicle_data[track_id]["start_time"] is None and center_y <= speedLine2y and center_y >= (speedLine2y - 3):
                vehicle_data[track_id]["start_time"] = time.time()
                vehicle_data[track_id]["frame_count"] = 0  # Reset frame count
            
            # Increment the frame count if the vehicle has crossed the first line & Time count
            if vehicle_data[track_id]["start_time"] is not None and vehicle_data[track_id]["end_time"] is None:
                vehicle_data[track_id]["frame_count"] += 1
                curTime=tracks[0].speed["preprocess"]+tracks[0].speed["inference"]+tracks[0].speed["postprocess"]
                vehicle_data[track_id]['processTime'].append(curTime)
            
            # Check if the vehicle crosses the second line (green line)
            if vehicle_data[track_id]["start_time"] is not None and vehicle_data[track_id]["end_time"] is None and center_y <= speedLine1y and center_y >= (speedLine1y - 3):
                vehicle_data[track_id]["end_time"] = time.time()
                #frame_duration = vehicle_data[track_id]["frame_count"] * 0.03  # 30ms per frame
                #total_time = (vehicle_data[track_id]["end_time"] - vehicle_data[track_id]["start_time"]) + frame_duration
                processTimeTotal= sum(vehicle_data[track_id]['processTime'])
                #total_time = (vehicle_data[track_id]["end_time"] - vehicle_data[track_id]["start_time"]) + (processTimeTotal/1000)
                total_time = (vehicle_data[track_id]["end_time"] - vehicle_data[track_id]["start_time"])
                vehicle_data[track_id]["timeDiff"] = total_time
     
            
         # Calculate speed (distance/time)
        if vehicle_data[track_id]["speedFlag"] is False and vehicle_data[track_id]["end_time"] is not None:
            
            timeGap = vehicle_data[track_id]["timeDiff"]
            speed = (distance / timeGap)*3.6
            speed = int(speed)
            vehicle_data[track_id]['speed'] = speed 
            vehicle_data[track_id]['speedFlag'] = True 
            speed_text = f"{(speed)} km/h"
            
        #Write the speed of the vehicles    
        if  vehicle_data[track_id]["speed"] is not None:
              #annotator.box_label(box, label=f'{track_id} #{vehicle_data[track_id]["speed"]}km/h', txt_color=(255,255,255) ,color=(0,0,255))  
              cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 3)
              putTextRect(frame, f'{track_id} #{vehicle_data[track_id]["speed"]}km/h', [int(x1+6), int(y1) -8],
                                  scale=0.7, thickness=2, colorR=(0, 0, 255), font=cv2.FONT_HERSHEY_SIMPLEX, offset=8)
              traceColor= (0, 0, 255)
        else:
              #annotator.box_label(box, label=f'{track_id} {class_name}', txt_color=(255,255,210) ,color=colors(int(track_id), True))  #color = colorr
              cv2.rectangle(frame, (x1, y1), (x2, y2), colors(int(track_id), True), 4)
              putTextRect(frame, f'{track_id} {class_name}', [int(x1+6), int(y1) -8], 
                                  scale=0.7, thickness=2, colorR=colors(int(track_id), True), font=cv2.FONT_HERSHEY_SIMPLEX,offset=8) 
              traceColor= colors(int(track_id), True)
        
        #Write alert for wrong side vehicles #Right Zone
        if vehicle_data[track_id]["outFlag"] and center_x > int(W*0.5):
            putTextRect(frame, 'WRONG SIDE', [int(x1+6), int(y2) + 20], 
                                  scale=0.7, thickness=2, colorR= (0,0,0), font=cv2.FONT_HERSHEY_SIMPLEX,offset=8) 
        
        # Set flags for line blink while crossing
        if vehicle_data[track_id]["outFlag"] and abs(center_y - speedLine1y) <= offSet and not flagLine1:
            flagLine1= True
        
        if vehicle_data[track_id]["inFlag"] and abs(center_y - speedLine2y) <= offSet:
            flagLine2 = True 
            
       
        # Store track info and draw traces
        trk_pts = store_track_info(track_id, box, trk_history) ##below add traceColor
        cv2.polylines(frame, [trk_pts], isClosed=False, color=colors(int(track_id), True), thickness=2)
    
    #Remove from count lsit if vehicle suddenly disappear from frame
    for countZoneList in countZoneLists:
        for id in countZoneList: 
            if id not in curFrameId:
                countZoneList.remove(id)
    curFrameId=[]
    
    #Show the counts 
    putTextRect(frame, f'RIGHT ZONE:{in_counts}', [(int(W*0.7)-30), 70], scale=1.5, thickness=2, colorR=(51, 51, 0), border=3, colorB=(0,0,0), font=cv2.FONT_HERSHEY_SIMPLEX)
    putTextRect(frame, f'LEFT ZONE:{out_counts}', [20,70], scale=1.5, thickness=2, colorR=(51, 0, 25), border=3, colorB=(0,0,0), font=cv2.FONT_HERSHEY_SIMPLEX)
    putTextRect(frame, f'INSIDE ZONE:{len(countZoneLists[0])}', [(int(W*0.4)-80), 70], scale=1.5, thickness=2, colorR=(51, 51, 225), border=3, colorB=(0,0,0), font=cv2.FONT_HERSHEY_SIMPLEX)
    putTextRect(frame, f'INSIDE ZONE2:{len(countZoneLists[1])}', [(int(W*0.4)-80), 150], scale=1.5, thickness=2, colorR=(153, 0, 76), border=3, colorB=(0,0,0), font=cv2.FONT_HERSHEY_SIMPLEX)
    putTextRect(frame, f'CAR:Out:{class_count[4]["outCount"]},In:{class_count[4]["inCount"]}', 
            [20,150], scale=1.5, thickness=2, colorR=(0, 69, 255), border=3, colorB=(0,0,0), font=cv2.FONT_HERSHEY_SIMPLEX)
    
    # Draw the polygon corresponding the zones
    cv2.polylines(frame, [ptsZone1], isClosed=True, color=(127, 0, 255), thickness=3)
    cv2.polylines(frame, [ptsZone2], isClosed=True, color=(153, 0, 76), thickness=3)
    
    #Draw the lines that blink with count
    if flagLine1: 
        flagLine1 = False
        cv2.line(frame, (speedLine1x,speedLine1y),(speedLine1x+width,speedLine1y), (255,0,0),3)
    else: 
        cv2.line(frame, (speedLine1x,speedLine1y),(speedLine1x+width,speedLine1y), (0,0,255),3)
        
    if flagLine2: 
        flagLine2 = False
        cv2.line(frame, (speedLine2x,speedLine2y), (speedLine2x+width,speedLine2y ), (0,210,0),3)
    else: 
        cv2.line(frame, (speedLine2x,speedLine2y), (speedLine2x+width,speedLine2y ), (0,128,255),3)
        
   
    #Plot Temporary 
    #Convert the image from BGR (OpenCV format) to RGB (matplotlib format)
    
#     img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#     # Plot the image using matplotlib
#     plt.figure(figsize=(10, 6))  # Optional: adjust the figure size
#     plt.imshow(img_rgb)
#     plt.axis('off')  # Hide the axis
#     plt.show()
#     break 
    
    #frame = heatmap_obj.generate_heatmap(frame, tracks)

    # Write the frame to the output video file
    out.write(frame)
    
    # Read the next frame
    ret, frame = cap.read()

# Release the video capture and writer objects
cap.release()
out.release()

# Close all OpenCV windows
cv2.destroyAllWindows()
