<a href="https://colab.research.google.com/github/Jonikpatel/Jonikpatel.github.io/blob/main/Trafficanalyiser.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install ultralytics opencv-python-headless torch torchvision numpy matplotlib

Collecting ultralytics
  Downloading ultralytics-8.3.223-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.223-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m54.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.223 ultralytics-thop-2.0.18


In [2]:
# First, upgrade ultralytics to the latest version (this often resolves PyTorch compatibility issues)
!pip install --upgrade ultralytics

# Then, add the safe global to allow loading the DetectionModel class
import torch.serialization
from ultralytics.nn.tasks import DetectionModel
torch.serialization.add_safe_globals([DetectionModel])

# Now proceed with the rest of the code as before
import cv2
import torch
import numpy as np
from ultralytics import YOLO
import matplotlib.pyplot as plt
from google.colab import files

# ... (rest of the code from the previous response)

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [12]:
   # Upload video
   uploaded = files.upload()
   video_path = list(uploaded.keys())[0]

   # Open video
   cap = cv2.VideoCapture(video_path)
   fps = cap.get(cv2.CAP_PROP_FPS)
   width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
   height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))


Saving snapsave-app_1275663753415032_hd.mp4 to snapsave-app_1275663753415032_hd (1).mp4


In [7]:
model = YOLO('yolov8x.pt')  # Nano version for speed; we also can use 'yolov8x.pt' for accuracy

[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8x.pt to 'yolov8x.pt': 100% ━━━━━━━━━━━━ 130.5MB 303.8MB/s 0.4s


In [None]:
# ... (previous imports and setup code)

# Initialize variables before the loop
prev_frame = None
speed_threshold = 60  # mph
accident_threshold = 0.99  # Increased to 0.98 for very high overlap (indicating actual collision/hit)
accident_detected = False  # Flag to track if accident is ongoing (optional, for persistence)

# Set up video writer to save output as a video file instead of displaying static images
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output_video.mp4', fourcc, fps, (width, height))

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Detect objects
    results = model(frame)
    detections = results[0].boxes.data.cpu().numpy()  # [x1, y1, x2, y2, conf, class]

    # Initialize variables
    speeds = []
    accidents = []

    # Speed estimation using optical flow
    if prev_frame is not None:
        gray_prev = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
        gray_curr = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        flow = cv2.calcOpticalFlowFarneback(gray_prev, gray_curr, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        # Estimate speed (simplified: pixels/frame to mph; calibrate with known distance)
        for det in detections:
            if det[5] in [2, 7]:  # Car or truck classes
                # Placeholder: Compute average flow in bounding box
                x1, y1, x2, y2 = det[:4].astype(int)
                flow_roi = flow[y1:y2, x1:x2]
                mag = np.sqrt(flow_roi[..., 0]**2 + flow_roi[..., 1]**2).mean()
                speed_mph = mag * (fps / 100) * 2.23694  # Rough conversion; adjust for calibration
                speeds.append(speed_mph)

    # Accident detection: Check for overlaps or sudden stops
    # Updated: Only detect if both vehicles have very low speed (<2 mph) and high IoU (indicating actual hit/collision)
    for i, det1 in enumerate(detections):
        for j, det2 in enumerate(detections):
            if i != j and det1[5] in [2,7] and det2[5] in [2,7]:  # Vehicle-vehicle
                # IoU calculation
                x1 = max(det1[0], det2[0])
                y1 = max(det1[1], det2[1])
                x2 = min(det1[2], det2[2])
                y2 = min(det1[3], det2[3])
                inter = max(0, x2 - x1) * max(0, y2 - y1)
                union = (det1[2]-det1[0])*(det1[3]-det1[1]) + (det2[2]-det2[0])*(det2[3]-det2[1]) - inter
                iou = inter / union if union > 0 else 0
                if iou > accident_threshold:
                    # Additional check: Both speeds very low (<2 mph) to confirm actual collision (not just nearby)
                    speed1 = speeds[i] if i < len(speeds) else 0
                    speed2 = speeds[j] if j < len(speeds) else 0
                    if speed1 < 2 and speed2 < 2:  # Stricter threshold for true hits
                        accidents.append((det1, det2))

    # Annotate frame
    annotated_frame = results[0].plot()
    for i, speed in enumerate(speeds):
        cv2.putText(annotated_frame, f"Speed: {speed:.1f} mph", (10, 30 + i*30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
    # Only annotate accident if detected in this frame (transient, clears when cars separate)
    if accidents:
        cv2.putText(annotated_frame, "ACCIDENT DETECTED!", (10, height-30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
        accident_detected = True  # Optional: Set flag for logging/alerts
    else:
        accident_detected = False

    # Write frame to output video
    out.write(annotated_frame)

    prev_frame = frame

    # Extra: Log to CSV
    with open('traffic_log.csv', 'a') as f:
        avg_speed = sum(speeds)/len(speeds) if speeds else 0
        f.write(f"{cap.get(cv2.CAP_PROP_POS_FRAMES)}, {len(detections)}, {avg_speed}, {len(accidents)}\n")

    # Extra: Alert for violations (only if accident detected in this frame)
    if any(s > speed_threshold for s in speeds) or accidents:
        print("ALERT: Speed violation or accident detected!")  # Replace with email/SMS API

cap.release()
out.release()  # Close the video writer

# Download the output video
files.download('output_video.mp4')

# Optional: Display a sample frame or plot logs
from google.colab.patches import cv2_imshow
cv2_imshow(annotated_frame)  # Shows the last frame

# Dashboard
import pandas as pd
log = pd.read_csv('traffic_log.csv', header=None, names=['Frame', 'Objects', 'Avg_Speed', 'Accidents'])
log.plot(x='Frame', y=['Avg_Speed', 'Accidents'], kind='line')
plt.show()



0: 640x384 1 person, 16 cars, 3 trucks, 2398.4ms
Speed: 3.1ms preprocess, 2398.4ms inference, 3.2ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 person, 15 cars, 3 trucks, 2404.4ms
Speed: 2.3ms preprocess, 2404.4ms inference, 2.9ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 person, 14 cars, 3 trucks, 2399.3ms
Speed: 2.5ms preprocess, 2399.3ms inference, 4.1ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 person, 15 cars, 3 trucks, 3004.5ms
Speed: 2.5ms preprocess, 3004.5ms inference, 4.4ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 person, 14 cars, 3 trucks, 2977.3ms
Speed: 5.2ms preprocess, 2977.3ms inference, 2.7ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 person, 15 cars, 3 trucks, 2402.6ms
Speed: 2.4ms preprocess, 2402.6ms inference, 3.1ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 person, 16 cars, 3 trucks, 2406.6ms
Speed: 2.3ms preprocess, 2406.6ms inference, 3.3ms postp

In [14]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive
