In [None]:
!pip install ultralytics opencv-python-headless

Collecting ultralytics
  Downloading ultralytics-8.3.252-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.252-py3-none-any.whl (1.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m21.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.252 ultralytics-thop-2.0.18


In [None]:
import cv2
import numpy as np
from ultralytics import YOLO

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [None]:
model = YOLO("yolov8n.pt")

VEHICLE_CLASSES = [2, 3, 5, 7]  # car, motorcycle, bus, truck

[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2MB 71.4MB/s 0.1s


In [None]:
def detect_edges(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (5, 5), 0)
    return cv2.Canny(blur, 50, 150)


def region_of_interest(edges):
    h, w = edges.shape
    mask = np.zeros_like(edges)

    polygon = np.array([[
        (100, h),
        (w//2 - 80, int(h*0.6)),
        (w//2 + 80, int(h*0.6)),
        (w - 100, h)
    ]], np.int32)

    cv2.fillPoly(mask, polygon, 255)
    return cv2.bitwise_and(edges, mask), polygon


def average_slope_intercept(lines):
    left, right = [], []
    if lines is None:
        return None, None

    for line in lines:
        x1, y1, x2, y2 = line[0]
        if x1 == x2:
            continue
        slope = (y2 - y1) / (x2 - x1)
        intercept = y1 - slope * x1
        if slope < -0.5:
            left.append((slope, intercept))
        elif slope > 0.5:
            right.append((slope, intercept))

    return (np.mean(left, axis=0) if left else None,
            np.mean(right, axis=0) if right else None)


def make_line(frame, line):
    if line is None:
        return None
    h = frame.shape[0]
    slope, intercept = line
    y1 = h
    y2 = int(h * 0.6)
    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)
    return (x1, y1, x2, y2)

In [None]:
vehicle_ids = {}
vehicle_count = 0
next_id = 0

def in_roi(cx, cy, polygon):
    return cv2.pointPolygonTest(polygon, (cx, cy), False) >= 0

In [None]:
video_path = "/content/7.mp4"
cap = cv2.VideoCapture(video_path)

w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
if fps == 0:
    fps = 25

out = cv2.VideoWriter(
    "/content/adas_output.mp4",
    cv2.VideoWriter_fourcc(*"mp4v"),
    fps,
    (w, h)
)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    edges = detect_edges(frame)
    roi_edges, lane_poly = region_of_interest(edges)

    lines = cv2.HoughLinesP(
        roi_edges, 1, np.pi/180, 100,
        minLineLength=60, maxLineGap=150
    )

    left, right = average_slope_intercept(lines)
    left_line = make_line(frame, left)
    right_line = make_line(frame, right)

    lane_center = None
    if left_line and right_line:
        cv2.line(frame, left_line[:2], left_line[2:], (0,255,0), 5)
        cv2.line(frame, right_line[:2], right_line[2:], (0,255,0), 5)
        lane_center = (left_line[0] + right_line[0]) // 2
        cv2.line(frame, (lane_center, h), (lane_center, int(h*0.6)), (255,0,0), 3)

    cv2.polylines(frame, lane_poly, True, (255,255,0), 2)

    results = model(frame, conf=0.25, verbose=False)[0]

    collision_warning = False

    for box in results.boxes:
        cls = int(box.cls.item())
        if cls not in VEHICLE_CLASSES:
            continue

        x1, y1, x2, y2 = map(int, box.xyxy[0])
        cx, cy = (x1 + x2)//2, (y1 + y2)//2
        area = (x2 - x1) * (y2 - y1)
        confidence = float(box.conf.item())

        label = f"{model.names[cls]} {confidence:.2f}"

        cv2.rectangle(frame, (x1,y1), (x2,y2), (255,255,0), 1)
        cv2.putText(frame, label, (x1,y1-8),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 2)

        if not in_roi(cx, cy, lane_poly[0]):
            continue

        matched = False
        for vid, (px, py) in vehicle_ids.items():
            if abs(cx - px) < 40 and abs(cy - py) < 40:
                vehicle_ids[vid] = (cx, cy)
                matched = True
                break

        if not matched:
            vehicle_ids[next_id] = (cx, cy)
            vehicle_count += 1
            next_id += 1

        if cy > 0.75 * h or area > 40000:
            collision_warning = True
            cv2.rectangle(frame, (x1,y1), (x2,y2), (0,0,255), 3)

    if collision_warning:
        cv2.putText(frame, "FORWARD COLLISION RISK!",
                    (50, 100),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    1.1, (0,0,255), 3)

    if lane_center is not None:
        car_center = w // 2
        if abs(car_center - lane_center) > 50:
            cv2.putText(frame, "LANE DEPARTURE WARNING!",
                        (50, 150),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        1.1, (0,0,255), 3)

    cv2.putText(frame, f"Vehicles Counted: {vehicle_count}",
                (20, 40),
                cv2.FONT_HERSHEY_SIMPLEX,
                1, (0,255,255), 2)

    out.write(frame)

cap.release()
out.release()

print("ADAS video saved successfully.")

ADAS video saved successfully.


In [None]:
from google.colab import files
files.download("/content/adas_output.mp4")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>