<a href="https://colab.research.google.com/github/Rafna123/AutoCrashDetect/blob/main/AutoCrashDetect_Real_Time_Road_Accident_Detection_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#This project focuses on detecting road accidents automatically using computer vision and deep learning techniques.
#The system analyzes live or recorded video footage to identify collisions or sudden vehicle stops that may indicate an accident.

In [1]:
# Install required libraries
!pip install ultralytics opencv-python-headless numpy
import os, time, math
from collections import deque
import cv2
import numpy as np
from ultralytics import YOLO
from google.colab import files

Collecting ultralytics
  Downloading ultralytics-8.3.220-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.17-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.220-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m18.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.17-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.220 ultralytics-thop-2.0.17
Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [2]:
#Upload Video
print(" Upload your accident video file")
uploaded = files.upload()
video_path = list(uploaded.keys())[0]
print(" Uploaded:", video_path)

#CONFIG
VEHICLE_CLASSES = {"car","truck","bus","motorcycle","bicycle"}
HUMAN_CLASSES = {"person"}
IOU_COLLISION_THRESHOLD = 0.15
REL_SPEED_THRESHOLD = 2.0
SUDDEN_STOP_SPEED_DROP = 1.0
STATIONARY_FRAMES_THRESHOLD = 5
MAX_TRACK_LOST = 30
MIN_DETECTION_CONF = 0.2
ALERT_DIR = "alerts"
os.makedirs(ALERT_DIR, exist_ok=True)

#UTILS
def box_area(box):
    x1,y1,x2,y2=box
    return max(0,x2-x1)*max(0,y2-y1)
def iou(boxA,boxB):
    xA,yA=max(boxA[0],boxB[0]), max(boxA[1],boxB[1])
    xB,yB=min(boxA[2],boxB[2]), min(boxA[3],boxB[3])
    inter=max(0,xB-xA)*max(0,yB-yA)
    union=box_area(boxA)+box_area(boxB)-inter
    return inter/union if union>0 else 0.0
def centroid_from_box(box):
    x1,y1,x2,y2=box
    return ((x1+x2)/2.0,(y1+y2)/2.0)

# TRACKER
class Track:
    def __init__(self,tid,box,frame_idx):
        self.id=tid
        self.box=box
        self.centroid=centroid_from_box(box)
        self.history=deque(maxlen=30)
        self.history.append((frame_idx,self.centroid,box))
        self.lost_frames=0
        self.stationary_count=0
        self.alerted=False
    def update(self,box,frame_idx):
        self.box=box
        self.centroid=centroid_from_box(box)
        self.history.append((frame_idx,self.centroid,box))
        self.lost_frames=0
    def mark_lost(self): self.lost_frames+=1
    def speed(self):
        if len(self.history)<2: return 0.0
        (_,c1,_),(_,c2,_)=self.history[-2],self.history[-1]
        return math.hypot(c2[0]-c1[0],c2[1]-c1[1])
    def average_speed(self,n=5):
        hist=list(self.history)[-n:]
        if len(hist)<2: return 0.0
        speeds=[math.hypot(hist[i][1][0]-hist[i-1][1][0], hist[i][1][1]-hist[i-1][1][1])
                for i in range(1,len(hist))]
        return sum(speeds)/len(speeds)

# ----- DETECTOR -----
class AccidentDetector:
    def __init__(self, model_name="yolov8s.pt"):
        print("[INFO] Loading YOLO model...")
        self.model=YOLO(model_name)
        self.tracks={}
        self.next_id=0
        self.frame_idx=0

    def detect(self,frame):
        results=self.model(frame,imgsz=640,conf=MIN_DETECTION_CONF)[0]
        dets=[]
        if results.boxes is None: return []
        for b in results.boxes:
            x1,y1,x2,y2=b.xyxy[0].tolist()
            cls_id=int(b.cls[0].item())
            name=self.model.model.names.get(cls_id,"").lower()
            if name in VEHICLE_CLASSES or name in HUMAN_CLASSES:
                dets.append({"box":[int(x1),int(y1),int(x2),int(y2)],"class":name})
        return dets

    def match_tracks(self,detections):
        det_centroids=[centroid_from_box(d["box"]) for d in detections]
        track_ids=list(self.tracks.keys())
        track_centroids=[self.tracks[t].centroid for t in track_ids]
        if len(track_centroids)==0:
            for d in detections: self._new_track(d["box"])
            return
        if len(det_centroids)==0:
            for tid in track_ids: self.tracks[tid].mark_lost()
            self.tracks={tid:tr for tid,tr in self.tracks.items() if tr.lost_frames<=MAX_TRACK_LOST}
            return
        dist_mat=np.full((len(track_centroids),len(det_centroids)),1e6)
        for i,tc in enumerate(track_centroids):
            for j,dc in enumerate(det_centroids):
                dist_mat[i,j]=math.hypot(tc[0]-dc[0], tc[1]-dc[1])
        assigned_tracks,assigned_dets=set(),set()
        while dist_mat.size>0 and np.min(dist_mat)<1e6:
            i,j=np.unravel_index(np.argmin(dist_mat),dist_mat.shape)
            tid=track_ids[i]
            if tid in assigned_tracks or j in assigned_dets:
                dist_mat[i,j]=1e6
                if np.min(dist_mat)>1e5: break
                continue
            self.tracks[tid].update(detections[j]["box"],self.frame_idx)
            assigned_tracks.add(tid)
            assigned_dets.add(j)
            dist_mat[i,:]=1e6
            dist_mat[:,j]=1e6
        for j,d in enumerate(detections):
            if j not in assigned_dets: self._new_track(d["box"])
        for tid in track_ids:
            if tid not in assigned_tracks: self.tracks[tid].mark_lost()
        self.tracks={tid:tr for tid,tr in self.tracks.items() if tr.lost_frames<=MAX_TRACK_LOST}

    def _new_track(self,box):
        self.tracks[self.next_id]=Track(self.next_id,box,self.frame_idx)
        self.next_id+=1

    def analyze(self,frame):
        alerts=[]
        active=[tr for tr in self.tracks.values() if tr.lost_frames==0]
        vehicles=[tr for tr in active if tr.id in [t.id for t in active]]
        # Collision detection
        for i in range(len(vehicles)):
            for j in range(i+1,len(vehicles)):
                tr1,tr2=vehicles[i],vehicles[j]
                if iou(tr1.box,tr2.box)>IOU_COLLISION_THRESHOLD and abs(tr1.speed()-tr2.speed())>REL_SPEED_THRESHOLD:
                    if not(tr1.alerted or tr2.alerted):
                        tr1.alerted=tr2.alerted=True
                        x1=min(tr1.box[0],tr2.box[0]); y1=min(tr1.box[1],tr2.box[1])
                        x2=max(tr1.box[2],tr2.box[2]); y2=max(tr1.box[3],tr2.box[3])
                        alerts.append({"type":"collision","bbox":[x1,y1,x2,y2]})
        # Sudden stop detection
        for tr in vehicles:
            if len(tr.history)>=5 and not tr.alerted:
                avg_prev=tr.average_speed(); last_speed=tr.speed()
                if avg_prev-last_speed>SUDDEN_STOP_SPEED_DROP and last_speed<1.5:
                    tr.stationary_count+=1
                    if tr.stationary_count>=STATIONARY_FRAMES_THRESHOLD:
                        tr.alerted=True
                        alerts.append({"type":"sudden_stop","bbox":tr.box})
        # Draw all boxes
        for tr in active:
            x1,y1,x2,y2=[int(v) for v in tr.box]
            color=(0,0,255) if tr.alerted else (0,255,0)
            cv2.rectangle(frame,(x1,y1),(x2,y2),color,2)
            cv2.putText(frame,f"ID:{tr.id}",(x1,y2+15),cv2.FONT_HERSHEY_SIMPLEX,0.5,color,1)
        # Save alerts
        for a in alerts:
            x1,y1,x2,y2=[int(v) for v in a["bbox"]]
            cv2.rectangle(frame,(x1,y1),(x2,y2),(0,0,255),3)
            cv2.putText(frame,"ACCIDENT",(x1,y1-10),cv2.FONT_HERSHEY_SIMPLEX,0.8,(0,0,255),2)
            cv2.imwrite(f"{ALERT_DIR}/alert_{time.time()}.jpg",frame)
        return alerts

    def process(self,frame):
        self.frame_idx+=1
        dets=self.detect(frame)
        self.match_tracks(dets)
        self.analyze(frame)
        return frame

# ----- Run Detection -----
detector=AccidentDetector("yolov8s.pt")
cap=cv2.VideoCapture(video_path)
w=int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h=int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps=cap.get(cv2.CAP_PROP_FPS) or 20.0
out=cv2.VideoWriter("output.mp4",cv2.VideoWriter_fourcc(*"mp4v"),fps,(w,h))

while True:
    ret,frame=cap.read()
    if not ret: break
    frame=detector.process(frame)
    out.write(frame)

cap.release()
out.release()
print(" Processing complete! Download 'output.mp4' and check 'alerts/' folder for accident snapshots.")

# ----- Download Output Video -----
files.download("output.mp4")


📤 Upload your accident video file


Saving istockphoto-948764164-640_adpp_is.mp4 to istockphoto-948764164-640_adpp_is.mp4
✅ Uploaded: istockphoto-948764164-640_adpp_is.mp4
[INFO] Loading YOLO model...
[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8s.pt to 'yolov8s.pt': 100% ━━━━━━━━━━━━ 21.5MB 179.1MB/s 0.1s

0: 384x640 (no detections), 529.0ms
Speed: 20.0ms preprocess, 529.0ms inference, 21.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 310.5ms
Speed: 3.3ms preprocess, 310.5ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 311.6ms
Speed: 2.7ms preprocess, 311.6ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 319.3ms
Speed: 3.2ms preprocess, 319.3ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 303.9ms
Speed: 3.1ms preprocess, 303.9ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>