# Clean Traffic Analysis - Essential Features Only

This notebook contains only the essential traffic analysis features:
1. Vehicle detection and counting
2. Green signal timing calculation
3. Red light time analysis
4. Improvement calculations

# Essential Setup

In [75]:
# Essential imports
import torch
import cv2
import numpy as np
from ultralytics import YOLO
from pathlib import Path
import yaml

print(f"CUDA available: {torch.cuda.is_available()}")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using: {device}")

CUDA available: False
Using: cpu


In [76]:
# Load dataset config
DATASET_PATH = Path("vehicle-detection.v21i.yolov11")
DATA_YAML = DATASET_PATH / "data.yaml"

with open(DATA_YAML, 'r') as f:
    data_config = yaml.safe_load(f)
    
print(f"Classes: {data_config['names']}")

Classes: ['mobil', 'motor']


# Load Model

In [77]:
# Load trained model
model = YOLO('saved_models/vehicle_detector.pt')
print("Model loaded successfully")

Model loaded successfully


# Essential Functions

In [81]:
# Essential functions
def process_detections(results, conf_threshold=0.25):
    """Process YOLO detection results and count vehicles by type"""
    counts = {'mobil': 0, 'motor': 0, 'total': 0}
    
    for r in results:
        boxes = r.boxes
        for box in boxes:
            conf = float(box.conf)
            if conf < conf_threshold:
                continue
            cls = int(box.cls)
            class_name = data_config['names'][cls]
            counts[class_name] += 1
            counts['total'] += 1
    return counts

def calculate_green_time(car_count, motorcycle_count, base_time=10, time_per_car=6, time_per_motorcycle=5, min_time=15, max_time=60):
    """Calculate green signal time based on vehicle counts"""
    green_time = base_time + (car_count * time_per_car) + (motorcycle_count * time_per_motorcycle)
    green_time = max(min_time, min(green_time, max_time))
    return green_time

def process_image(image_path, model, conf_threshold=0.25):
    """Process a single image and return detection results"""
    img = cv2.imread(str(image_path))
    if img is None:
        raise ValueError(f"Could not read image: {image_path}")
    
    results = model.predict(img, conf=conf_threshold)
    counts = process_detections(results, conf_threshold)
    return counts


# Test Images and Analysis

In [82]:
# Test images
test_images = [
    Path("vehicle-detection.v21i.yolov11/test/images/2023-05-09-5-_png.rf.7d5d7a7c0c345b65b11d68be03906e77.jpg"),
    Path("vehicle-detection.v21i.yolov11/test/images/2023-05-10-15-_png.rf.ede4ef4864720d2d19cac73085637827.jpg"),
    Path("vehicle-detection.v21i.yolov11/test/images/Screenshot-1626-_png.rf.1fbe085d11ab26e88b06e7965e69c1f7.jpg"),
    Path("vehicle-detection.v21i.yolov11/test/images/Screenshot-1865-_png.rf.c40e666579f5ac8bea7d9d4f3c9e6743.jpg"),
]

# Process all lanes
lane_data = []
recommended_times = []

for i, img_path in enumerate(test_images):
    counts = process_image(img_path, model)
    green_time = calculate_green_time(counts['mobil'], counts['motor'])
    
    lane_info = {
        'lane_number': i+1,
        'image_name': img_path.name,
        'counts': counts,
        'green_time': green_time
    }
    lane_data.append(lane_info)
    recommended_times.append(green_time)




0: 416x416 4 mobils, 2 motors, 61.9ms
Speed: 31.5ms preprocess, 61.9ms inference, 1.4ms postprocess per image at shape (1, 3, 416, 416)

0: 416x416 2 mobils, 5 motors, 56.2ms
Speed: 1.1ms preprocess, 56.2ms inference, 1.3ms postprocess per image at shape (1, 3, 416, 416)

0: 416x416 2 mobils, 3 motors, 67.3ms
Speed: 1.9ms preprocess, 67.3ms inference, 1.4ms postprocess per image at shape (1, 3, 416, 416)

0: 416x416 3 mobils, 3 motors, 90.0ms
Speed: 1.1ms preprocess, 90.0ms inference, 2.0ms postprocess per image at shape (1, 3, 416, 416)


# Essential Results Only

In [88]:
print("="*80)
print("TRAFFIC ANALYSIS - ESSENTIAL RESULTS")
print("="*80)

# Process each lane and show only essential information
for i, (lane, green_time) in enumerate(zip(lane_data, recommended_times)):
    print(f"\nLane {i+1}: {lane['image_name']}")
    print(f"  Vehicles: {lane['counts']['total']} (Cars: {lane['counts']['mobil']}, Motorcycles: {lane['counts']['motor']})")
    print(f"  Recommended Green Time: {green_time:.1f} seconds")
    
    # Calculate red time and improvement
    total_cycle_time = sum(recommended_times)
    red_time = total_cycle_time - green_time
    constant_time = 60  # Previous constant green time
    improvement_time = constant_time - green_time
    time_saved = improvement_time
    improvement_percentage = (improvement_time / constant_time) * 100
    
    print(f"  Overall Red Time: {red_time:.1f} seconds")
    print(f"  Improvement Time: {improvement_time:.1f} seconds")
    print(f"  Time Saved: {time_saved:.1f} seconds")
    print(f"  Improvement: {improvement_percentage:.1f}%")

# Overall summary
total_cycle_time = sum(recommended_times)
constant_total = 60 * 4  # If all lanes had constant 60s
ai_total = sum(recommended_times)
overall_time_saved = constant_total - ai_total
overall_improvement = (overall_time_saved / constant_total) * 100

print(f"\n" + "="*80)
print("OVERALL SUMMARY")
print("="*80)
print(f"Total Cycle Time: {total_cycle_time:.1f} seconds")
print(f"Previous Constant Time: {constant_total:.1f} seconds")
print(f"Total Time Saved: {overall_time_saved:.1f} seconds")
print(f"Overall Improvement: {overall_improvement:.1f}%")

TRAFFIC ANALYSIS - ESSENTIAL RESULTS

Lane 1: 2023-05-09-5-_png.rf.7d5d7a7c0c345b65b11d68be03906e77.jpg
  Vehicles: 6 (Cars: 4, Motorcycles: 2)
  Recommended Green Time: 44.0 seconds
  Overall Red Time: 127.0 seconds
  Improvement Time: 16.0 seconds
  Time Saved: 16.0 seconds
  Improvement: 26.7%

Lane 2: 2023-05-10-15-_png.rf.ede4ef4864720d2d19cac73085637827.jpg
  Vehicles: 7 (Cars: 2, Motorcycles: 5)
  Recommended Green Time: 47.0 seconds
  Overall Red Time: 124.0 seconds
  Improvement Time: 13.0 seconds
  Time Saved: 13.0 seconds
  Improvement: 21.7%

Lane 3: Screenshot-1626-_png.rf.1fbe085d11ab26e88b06e7965e69c1f7.jpg
  Vehicles: 5 (Cars: 2, Motorcycles: 3)
  Recommended Green Time: 37.0 seconds
  Overall Red Time: 134.0 seconds
  Improvement Time: 23.0 seconds
  Time Saved: 23.0 seconds
  Improvement: 38.3%

Lane 4: Screenshot-1865-_png.rf.c40e666579f5ac8bea7d9d4f3c9e6743.jpg
  Vehicles: 6 (Cars: 3, Motorcycles: 3)
  Recommended Green Time: 43.0 seconds
  Overall Red Time: 128.0 s

# Save Clean Model

In [89]:
# Save the clean model
save_dir = Path('saved_models')
save_dir.mkdir(exist_ok=True)

# Save model
model_name = 'clean_traffic_model'
torch_path = save_dir / f'{model_name}.pt'
model.save(str(torch_path))

# Save clean functions
clean_script = save_dir / 'clean_traffic_functions.py'
with open(clean_script, 'w') as f:
    f.write('''
# Clean Traffic Analysis Functions
import torch
import cv2
import numpy as np
from ultralytics import YOLO
from pathlib import Path
import yaml

def process_detections(results, conf_threshold=0.25):
    counts = {'mobil': 0, 'motor': 0, 'total': 0}
    for r in results:
        boxes = r.boxes
        for box in boxes:
            conf = float(box.conf)
            if conf < conf_threshold:
                continue
            cls = int(box.cls)
            class_name = ['mobil', 'motor'][cls]
            counts[class_name] += 1
            counts['total'] += 1
    return counts

def calculate_green_time(car_count, motorcycle_count, base_time=10, time_per_car=6, time_per_motorcycle=5, min_time=15, max_time=60):
    green_time = base_time + (car_count * time_per_car) + (motorcycle_count * time_per_motorcycle)
    green_time = max(min_time, min(green_time, max_time))
    return green_time

def process_image(image_path, model, conf_threshold=0.25):
    img = cv2.imread(str(image_path))
    if img is None:
        raise ValueError(f"Could not read image: {image_path}")
    results = model.predict(img, conf=conf_threshold)
    counts = process_detections(results, conf_threshold)
    return counts

# Usage:
# model = YOLO('saved_models/clean_traffic_model.pt')
# counts = process_image('image.jpg', model)
# green_time = calculate_green_time(counts['mobil'], counts['motor'])
''')

print(f"Clean model saved at: {torch_path}")
print(f"Clean functions saved at: {clean_script}")
print("\nClean model ready for use!")

Clean model saved at: saved_models\clean_traffic_model.pt
Clean functions saved at: saved_models\clean_traffic_functions.py

Clean model ready for use!


In [97]:
# Function to process a new image
def detect_vehicles_in_image(image_path):
    """
    Detect and count vehicles in a new image, and calculate green signal time.
    """
    try:
        counts = process_image(image_path, model)
        
        # Calculate green signal time
        green_time = calculate_green_time(
            counts['mobil'],
            counts['motor']
        )
        
        print("\nDetection Results:")
        print(f"Total vehicles: {counts['total']}")
        print(f"Cars: {counts['mobil']}")
        print(f"Motorcycles: {counts['motor']}")
        print(f"Recommended Green Signal Time: {green_time} seconds")
        
        # Calculate and display improvement compared to previous constant (60s)
        previous_time = 60
        improvement = ((previous_time - green_time) / previous_time) * 100
        print(f"Improvement compared to previous constant ({previous_time}s): {improvement:.2f}%")
        
    except Exception as e:
        print(f"Error processing image: {str(e)}")

# Test the function with one of the test images
print("Testing single image processing:")
detect_vehicles_in_image(test_images[0])

Testing single image processing:

0: 416x416 4 mobils, 2 motors, 69.6ms
Speed: 2.7ms preprocess, 69.6ms inference, 1.2ms postprocess per image at shape (1, 3, 416, 416)

Detection Results:
Total vehicles: 6
Cars: 4
Motorcycles: 2
Recommended Green Signal Time: 44 seconds
Improvement compared to previous constant (60s): 26.67%


In [99]:
# Test on four images and display improvement for each

test_images = [
    Path("vehicle-detection.v21i.yolov11/test/images/2023-05-09-5-_png.rf.7d5d7a7c0c345b65b11d68be03906e77.jpg"),
    Path("vehicle-detection.v21i.yolov11/test/images/2023-05-10-15-_png.rf.ede4ef4864720d2d19cac73085637827.jpg"),
    Path("vehicle-detection.v21i.yolov11/test/images/Screenshot-1626-_png.rf.1fbe085d11ab26e88b06e7965e69c1f7.jpg"),
    Path("vehicle-detection.v21i.yolov11/test/images/Screenshot-1865-_png.rf.c40e666579f5ac8bea7d9d4f3c9e6743.jpg"),
]

previous_time = 60
recommended_times = []
improvements = []

counts_list = []
for img_path in test_images:
    counts = process_image(img_path, model)  # Fixed: process_image returns only counts
    green_time = calculate_green_time(counts['mobil'], counts['motor'])
    recommended_times.append(green_time)
    improvement = ((previous_time - green_time) / previous_time) * 100
    improvements.append(improvement)
    counts_list.append(counts)
    print(f"{img_path.name}:")
    print(f"  Total vehicles: {counts['total']}")
    print(f"  Cars: {counts['mobil']}")
    print(f"  Motorcycles: {counts['motor']}")
    print(f"  Recommended Green Time: {green_time} seconds")
    print(f"  Improvement: {improvement:.2f}%\n")

avg_improvement = sum(improvements) / len(improvements) if improvements else 0
print(f"Average improvement over four images: {avg_improvement:.2f}%")

# Calculate waiting time for each lane in one cycle
print("\nWaiting time for each lane in one cycle (sum of other lanes' green times):")
for i in range(4):
    waiting_time = sum(recommended_times) - recommended_times[i]
    print(f"Lane {i+1} ({test_images[i].name}): Waiting time = {waiting_time:.1f} seconds")

# If you want to run for multiple cycles, e.g., 3 cycles:
num_cycles = 3
print(f"\nAverage waiting time for each lane over {num_cycles} cycles:")
for i in range(4):
    waiting_time = (sum(recommended_times) - recommended_times[i]) * num_cycles / num_cycles
    print(f"Lane {i+1} ({test_images[i].name}): Average waiting time = {waiting_time:.1f} seconds")

# Compare AI-recommended vs constant waiting time (180s)
constant_wait_time = 180
print(f"\nComparison with constant waiting time ({constant_wait_time}s):")

# Calculate average wait time with AI recommendations
ai_avg_wait_time = sum(recommended_times) * 3 / 4  # Average wait time across all lanes

# Calculate savings
time_saved = constant_wait_time - ai_avg_wait_time
improvement_percentage = (time_saved / constant_wait_time) * 100

print(f"AI-recommended average wait time: {ai_avg_wait_time:.1f} seconds")
print(f"Constant wait time: {constant_wait_time} seconds")
print(f"Time saved: {time_saved:.1f} seconds")
print(f"Improvement: {improvement_percentage:.2f}%")

# Calculate average waiting time for all vehicles in the cycle
# (weighted by number of vehicles in each image)
total_waiting_time = 0
total_vehicles = 0
for i in range(len(test_images)):
    total_waiting_time += counts_list[i]['total'] * (recommended_times[i] / 2)
    total_vehicles += counts_list[i]['total']






0: 416x416 4 mobils, 2 motors, 64.3ms
Speed: 31.2ms preprocess, 64.3ms inference, 1.2ms postprocess per image at shape (1, 3, 416, 416)
2023-05-09-5-_png.rf.7d5d7a7c0c345b65b11d68be03906e77.jpg:
  Total vehicles: 6
  Cars: 4
  Motorcycles: 2
  Recommended Green Time: 44 seconds
  Improvement: 26.67%


0: 416x416 2 mobils, 5 motors, 52.6ms
Speed: 1.1ms preprocess, 52.6ms inference, 1.2ms postprocess per image at shape (1, 3, 416, 416)
2023-05-10-15-_png.rf.ede4ef4864720d2d19cac73085637827.jpg:
  Total vehicles: 7
  Cars: 2
  Motorcycles: 5
  Recommended Green Time: 47 seconds
  Improvement: 21.67%


0: 416x416 2 mobils, 3 motors, 52.9ms
Speed: 1.1ms preprocess, 52.9ms inference, 1.1ms postprocess per image at shape (1, 3, 416, 416)
Screenshot-1626-_png.rf.1fbe085d11ab26e88b06e7965e69c1f7.jpg:
  Total vehicles: 5
  Cars: 2
  Motorcycles: 3
  Recommended Green Time: 37 seconds
  Improvement: 38.33%


0: 416x416 3 mobils, 3 motors, 60.0ms
Speed: 1.3ms preprocess, 60.0ms inference, 1.3ms p

In [100]:
# Create a directory for saving models if it doesn't exist
save_dir = Path('saved_models')
save_dir.mkdir(exist_ok=True)

# Save the model in PyTorch format
model_name = 'vehicle_detector_with_timing'
torch_path = save_dir / f'{model_name}.pt'
model.save(str(torch_path))

# Also save the complete analysis functions as a Python script
analysis_script = save_dir / 'traffic_analysis_functions.py'

with open(analysis_script, 'w') as f:
    f.write('''
# Traffic Analysis Functions
# This script contains all the functions for vehicle detection and traffic timing analysis

import torch
import cv2
import numpy as np
import matplotlib.pyplot as plt
from ultralytics import YOLO
from pathlib import Path
import yaml

def calculate_green_time(car_count, motorcycle_count, base_time=10, time_per_car=4.5, time_per_motorcycle=1.5, min_time=15, max_time=60):
    """Calculate green signal time based on car and motorcycle counts, with min and max cap."""
    green_time = base_time + (car_count * time_per_car) + (motorcycle_count * time_per_motorcycle)
    green_time = max(min_time, min(green_time, max_time))
    return green_time

def calculate_waiting_times(recommended_times):
    """Calculate waiting times for each lane in one cycle."""
    waiting_times = []
    for i in range(len(recommended_times)):
        waiting_time = sum(recommended_times) - recommended_times[i]
        waiting_times.append(waiting_time)
    return waiting_times

def compare_with_constant(recommended_times, constant_wait_time=180):
    """Compare AI recommendations with constant waiting time."""
    ai_avg_wait_time = sum(recommended_times) * 3 / 4
    time_saved = constant_wait_time - ai_avg_wait_time
    improvement_percentage = (time_saved / constant_wait_time) * 100
    return ai_avg_wait_time, time_saved, improvement_percentage

# Load the trained model
model = YOLO('saved_models/vehicle_detector_with_timing.pt')

# Example usage:
# recommended_times = [31.0, 26.5, 23.5, 28.0]  # Example times
# waiting_times = calculate_waiting_times(recommended_times)
# ai_avg, saved, improvement = compare_with_constant(recommended_times)
''')

print(f"Model saved in PyTorch format at: {torch_path}")
print(f"Analysis functions saved at: {analysis_script}")
print("\nModel saving complete! You can use this file for future inference without retraining.")
print("All features included:")
print("- Vehicle detection and counting")
print("- Green signal timing calculation")
print("- Waiting time analysis")
print("- Improvement comparison with constant timing") 

Model saved in PyTorch format at: saved_models\vehicle_detector_with_timing.pt
Analysis functions saved at: saved_models\traffic_analysis_functions.py

Model saving complete! You can use this file for future inference without retraining.
All features included:
- Vehicle detection and counting
- Green signal timing calculation
- Waiting time analysis
- Improvement comparison with constant timing
