In [1]:
import os
from PIL import Image
import cv2
import numpy as np
import matplotlib.pyplot as plt
import torch

%matplotlib inline
print("CUDA Available:", torch.cuda.is_available())
print("GPU Name:", torch.cuda.get_device_name(0))

CUDA Available: True
GPU Name: NVIDIA GeForce RTX 4070 SUPER


# Step 1: Setup Dataset Paths for Kaggle

In [2]:
# Kaggle dataset path
base_path = "C:\\Users\\admin\\Downloads\\258project\\content\\UA-DETRAC\\DETRAC_Upload"

# Define image and label paths
train_image_path = os.path.join(base_path, "images/train")
val_image_path = os.path.join(base_path, "images/val")
train_label_path = os.path.join(base_path, "labels/train")
val_label_path = os.path.join(base_path, "labels/val")

# Verify paths
print(f"Train images path: {train_image_path}")
print(f"Validation images path: {val_image_path}")
print(f"Train labels path: {train_label_path}")
print(f"Validation labels path: {val_label_path}")

Train images path: C:\Users\admin\Downloads\258project\content\UA-DETRAC\DETRAC_Upload\images/train
Validation images path: C:\Users\admin\Downloads\258project\content\UA-DETRAC\DETRAC_Upload\images/val
Train labels path: C:\Users\admin\Downloads\258project\content\UA-DETRAC\DETRAC_Upload\labels/train
Validation labels path: C:\Users\admin\Downloads\258project\content\UA-DETRAC\DETRAC_Upload\labels/val


# Step 2: Convert Images to JPG

In [3]:
def convert_images_to_jpg(folder_path):
    for filename in os.listdir(folder_path):
        if not filename.endswith('.jpg'):
            img = Image.open(os.path.join(folder_path, filename))
            img.convert("RGB").save(os.path.join(folder_path, os.path.splitext(filename)[0] + ".jpg"), "JPEG")
            os.remove(os.path.join(folder_path, filename))  # Remove old file

print("Converting train images...")
convert_images_to_jpg(train_image_path)

print("Converting validation images...")
convert_images_to_jpg(val_image_path)

Converting train images...
Converting validation images...


# **Step 3: Install YOLOv5**

In [4]:
# Install YOLOv5 dependencies in Kaggle
# pip install -q torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
# git clone https://github.com/ultralytics/yolov5.git
# pip install -r yolov5/requirements.txt

# Create Dataset YAML for YOLOv

In [5]:
import os

yaml_path = "runs/detrac.yaml"
os.makedirs(os.path.dirname(yaml_path), exist_ok=True)

with open(yaml_path, "w") as f:
    f.write(f"""
train: {train_image_path}
val: {val_image_path}
nc: 4
names:
  0: "car"
  1: "bus"
  2: "van"
  3: "truck"
""")

print(f"YAML file saved to: {yaml_path}")


YAML file saved to: runs/detrac.yaml


In [6]:
# solve the warning
"""
n.py:412: FutureWarning: `torch.cuda.amp.autocast(args...)` is deprecated. Please use `torch.amp.autocast('cuda', args...)` instead.
  with torch.cuda.amp.autocast(amp):
        0/3      3.43G     0.1222    0.06848     0.0459        116        640:  runs/yolov5/trai
"""
# sed -i 's/torch\.cuda\.amp\.autocast(amp)/torch.amp.autocast("cuda", enabled=amp)/g' runs/yolov5/train.py

# if previous not work
# import warnings
# warnings.filterwarnings("ignore", category=FutureWarning)



# Step 4: Training YOLOv5 on GPU

In [None]:
# resume from last checkpoint last.pt
# python yolov5/train.py --img 640 --batch 8 --epochs 4 --data {yaml_path} --weights yolov5m.pt --device 0 --save-period 1 --cache --resume

# if not saved checkpoint, run from start
import sys
import subprocess

command = [
    sys.executable, "yolov5/train.py",
    "--img", "640",
    "--batch", "32",
    "--epochs", "5",
    "--data", "runs/detrac.yaml",
    "--weights", "yolov5n.pt",
    "--device", "0",
    "--cache",
    "--workers", "4"
]

# Add encoding=‘utf-8’ to explicitly specify the encoding format to avoid GBK errors.
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding='utf-8', errors='replace')

for line in process.stdout:
    print(line, end="")



[34m[1mtrain: [0mweights=yolov5n.pt, cfg=, data=runs/detrac.yaml, hyp=yolov5\data\hyps\hyp.scratch-low.yaml, epochs=5, batch_size=32, imgsz=640, rect=False, resume=False, nosave=False, noval=False, noautoanchor=False, noplots=False, evolve=None, evolve_population=yolov5\data\hyps, resume_evolve=None, bucket=, cache=ram, image_weights=False, device=0, multi_scale=False, single_cls=False, optimizer=SGD, sync_bn=False, workers=4, project=yolov5\runs\train, name=exp, exist_ok=False, quad=False, cos_lr=False, label_smoothing=0.0, patience=100, freeze=[0], save_period=-1, seed=0, local_rank=-1, entity=None, upload_dataset=False, bbox_interval=-1, artifact_alias=latest, ndjson_console=False, ndjson_file=False
[34m[1mgithub: [0mup to date with https://github.com/ultralytics/yolov5 
YOLOv5  v7.0-416-gfe1d4d99 Python-3.10.4 torch-2.5.1+cu118 CUDA:0 (NVIDIA GeForce RTX 4070 SUPER, 12282MiB)

[34m[1mhyperparameters: [0mlr0=0.01, lrf=0.01, momentum=0.937, weight_decay=0.0005, warmup_epochs

In [8]:
# check weight directory
# ls runs/yolov5/runs/train/exp/weights/

#  **5. Evaluate the Model**

In [36]:
import os
train_dir = "yolov5/runs/train"
print("🗂️ List of training results folders:", os.listdir(train_dir))


🗂️ List of training results folders: ['exp', 'exp2', 'exp3', 'exp4', 'exp5', 'exp6', 'exp7']


In [16]:
for subdir in os.listdir(train_dir):
    best_path = os.path.join(train_dir, subdir, "weights", "best.pt")
    if os.path.exists(best_path):
        print(f"✅ Found best.pt at: {best_path}")


✅ Found best.pt at: yolov5/runs/train\exp7\weights\best.pt


In [None]:
import subprocess
import sys

# Setup validation commands (note the path)
command = [
    sys.executable, "yolov5/val.py",
    "--weights", "yolov5/runs/train/exp7/weights/best.pt",
    "--data", "runs/detrac.yaml",
    "--img", "416"
]

# Execute and output each line in real time
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="replace")
for line in process.stdout:
    print(line, end="")


[34m[1mval: [0mdata=runs/detrac.yaml, weights=['yolov5/runs/train/exp7/weights/best.pt'], batch_size=32, imgsz=416, conf_thres=0.001, iou_thres=0.6, max_det=300, task=val, device=, workers=8, single_cls=False, augment=False, verbose=False, save_txt=False, save_hybrid=False, save_conf=False, save_json=False, project=yolov5\runs\val, name=exp, exist_ok=False, half=False, dnn=False
YOLOv5  v7.0-416-gfe1d4d99 Python-3.10.4 torch-2.5.1+cu118 CUDA:0 (NVIDIA GeForce RTX 4070 SUPER, 12282MiB)

Fusing layers... 
Model summary: 157 layers, 1764577 parameters, 0 gradients, 4.1 GFLOPs

[34m[1mval: [0mScanning C:\Users\admin\Downloads\258project\content\UA-DETRAC\DETRAC_Upload\labels\val.cache... 56167 images, 173 backgrounds, 0 corrupt: 100%|██████████| 56340/56340 [00:00<?, ?it/s]
[34m[1mval: [0mScanning C:\Users\admin\Downloads\258project\content\UA-DETRAC\DETRAC_Upload\labels\val.cache... 56167 images, 173 backgrounds, 0 corrupt: 100%|██████████| 56340/56340 [00:00<?, ?it/s]

         

# **6. Inference on Videos**

In [None]:
import os
import sys
import cv2
import torch
from pathlib import Path


# Set the path to yolov5
yolov5_path = r"C:\Users\admin\Downloads\258project\yolov5"
sys.path.insert(0, yolov5_path)

from models.common import DetectMultiBackend

# Set the model path (you've identified it as exp7)
weights_path = os.path.join(yolov5_path, "runs", "train", "exp7", "weights", "best.pt")

# Initialize the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DetectMultiBackend(weights_path, device=device)
print(f"✅ Model loaded successfully on {device}")

# Set the video path
videos_dir = r"C:\Users\admin\Downloads\258project\videotraffic"
video_files = [f for f in os.listdir(videos_dir) if f.lower().endswith(".mp4")]

# Create the output directory
output_dir = os.path.join(yolov5_path, "runs", "inference")
os.makedirs(output_dir, exist_ok=True)

# Auto-fill images to multiples of 32
def pad_to_multiple_of_32(img):
    h, w, _ = img.shape
    new_h = ((h + 31) // 32) * 32
    new_w = ((w + 31) // 32) * 32
    padded_img = cv2.copyMakeBorder(img, 0, new_h - h, 0, new_w - w, cv2.BORDER_CONSTANT, value=[114, 114, 114])
    return padded_img

# Reasoning each video
for video_file in video_files:
    video_path = os.path.join(videos_dir, video_file)
    print(f"\n📹 Processing video: {video_path}")

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"❌ Cannot open video {video_file}")
        continue

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    print(f"🧾 Resolution: {width}x{height}, FPS: {fps}")

    # Set the output file path
    output_file = f"annotated_{video_file}"
    output_path = os.path.join(output_dir, output_file)
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    while True:
        ret, frame = cap.read()
        if not ret:
            print("✅ Video processing completed.")
            break

        # Image processing: BGR → RGB + padding
        img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        padded_img = pad_to_multiple_of_32(img)

        # Turn the tensor and feed it into the model
        tensor_img = torch.from_numpy(padded_img).permute(2, 0, 1).float() / 255.0
        if tensor_img.ndimension() == 3:
            tensor_img = tensor_img.unsqueeze(0)
        tensor_img = tensor_img.to(device)

         # Reasoning and framing
        from utils.general import non_max_suppression
        from utils.plots import Annotator, colors

        # Reasoning
        pred_raw = model(tensor_img, augment=False, visualize=False)

        # NMS post-processing (first batch only)
        pred = non_max_suppression(pred_raw, conf_thres=0.25, iou_thres=0.45)[0]

        frame_for_draw = frame.copy()
        annotator = Annotator(frame_for_draw, line_width=2, example=str(model.names))

        if pred is not None and len(pred):
            pred = pred.cpu()
            for det in pred:
                x1, y1, x2, y2, conf, cls = det[:6]
                cls_id = int(cls.item())
                conf_val = conf.item()
                label = f"{model.names[cls_id]} {conf_val:.2f}"
                annotator.box_label([x1, y1, x2, y2], label, color=colors(cls_id, True))

        annotated = annotator.result()
        out.write(annotated)


    # ✅ Always release resources outside of a while loop
    cap.release()
    out.release()
    print(f"✅ Output saved to: {output_path}")



Fusing layers... 
Model summary: 157 layers, 1764577 parameters, 0 gradients, 4.1 GFLOPs


✅ Model loaded successfully on cuda

📹 Processing video: C:\Users\admin\Downloads\258project\videotraffic\Road Safety.mp4
🧾 Resolution: 1920x1080, FPS: 30.0
✅ Video processing completed.
✅ Output saved to: C:\Users\admin\Downloads\258project\yolov5\runs\inference\annotated_Road Safety.mp4


# **Example Code to Simulate Traffic Signal Management:**

# Simulate traffic signals for 4 roads
traffic_signals = {
    'road_1': {'vehicles': 0, 'signal': 'RED'},
    'road_2': {'vehicles': 0, 'signal': 'RED'},
    'road_3': {'vehicles': 0, 'signal': 'RED'},
    'road_4': {'vehicles': 0, 'signal': 'RED'}
}

# Function to update traffic lights based on vehicle count
def update_traffic_signals():
    print("\nUpdating Traffic Signals...")
    # Sort roads by vehicle count (most to least vehicles)
    sorted_roads = sorted(traffic_signals, key=lambda x: traffic_signals[x]['vehicles'], reverse=True)
    
    # Set the first road with the most vehicles to green, others to red
    for road in traffic_signals:
        if road == sorted_roads[0]:
            traffic_signals[road]['signal'] = 'GREEN'
        else:
            traffic_signals[road]['signal'] = 'RED'
    
    print("Traffic Signal Update:")
    for road, info in traffic_signals.items():
        print(f"  {road}: Signal = {info['signal']}, Vehicles = {info['vehicles']}")

# Load the trained model
print("Loading YOLOv5 model...")
model = torch.hub.load('ultralytics/yolov5', 'custom', path='runs/train/exp3/weights/best.pt')
print("Model loaded successfully!")

# Simulate each road with its own video feed
video_paths = {
    'road_1': '/kaggle/input/videotraffic/Traffic.mp4',
    'road_2': '/kaggle/input/videotraffic/Traffic2.mp4',
    'road_3': '/kaggle/input/videotraffic/Traffic3.mp4',
    'road_4': '/kaggle/input/videotraffic/Traffic4.mp4'
}

# Initialize video captures
caps = {road: cv2.VideoCapture(path) for road, path in video_paths.items()}

# Function to skip frames based on the video FPS and the required skip time (3 seconds)
def skip_frames(caps, skip_time):
    for road, cap in caps.items():
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        skip_frames = int(fps * skip_time)
        for _ in range(skip_frames):
            cap.grab()

# Process video frames
skip_time = 3  # Skip 3 seconds of video
iterations = 10  # Number of iterations

for iteration in range(iterations):
    print(f"\nIteration {iteration + 1}")
    for road, cap in caps.items():
        # Read a frame from the video feed
        ret, frame = cap.read()
        if not ret:
            print(f"  End of video for {road}")
            continue
        
        # Perform inference to detect vehicles
        print(f"  Detecting vehicles on {road}...")
        results = model(frame)
        vehicle_count = len(results.xyxy[0])
        
        # Update vehicle count for the road
        traffic_signals[road]['vehicles'] = vehicle_count
        print(f"  {road}: {vehicle_count} vehicles detected")
        
        # Render the results on the frame
        rendered_frame = results.render()[0]  # Render the bounding boxes and labels
        
        # Convert BGR frame to RGB for Matplotlib
        rendered_frame_rgb = cv2.cvtColor(rendered_frame, cv2.COLOR_BGR2RGB)
        
        # Display the frame using Matplotlib
        plt.figure(figsize=(10, 6))
        plt.imshow(rendered_frame_rgb)
        plt.title(f'{road} Frame with Detection')
        plt.axis('off')
        plt.show()
    
    # Skip frames equivalent to 3 seconds for each video
    skip_frames(caps, skip_time)
    
    # Update traffic signals based on vehicle count
    update_traffic_signals()
    
    # Introduce a delay of 1 second between each iteration
    time.sleep(1)  # Delay of 1 second before processing the next frame

# Release video captures
for cap in caps.values():
    cap.release()

print("End of program")