In [1]:
import cv2
import numpy as np
import math
import matplotlib.pyplot as plt

## Functions

In [2]:
def detect_cars_location(frame):

    frame = frame.astype(np.uint8)

    # Apply a threshold to create a binary image
    _, binary = cv2.threshold(frame, 1, 255, cv2.THRESH_BINARY)

    # Find contours in the binary image
    contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    car_locations = []

    # Iterate over the contours and filter out small contours
    for contour in contours:
        area = cv2.contourArea(contour)
        if area > 100:  # Adjust this threshold as per your requirements
            # Find the bounding rectangle of the contour
            x, y, w, h = cv2.boundingRect(contour)
            car_locations.append((x, y, w, h))

    return car_locations


In [3]:
def get_road_frame(video_path, output_width, output_height):
    
    # Create video capture
    cap = cv2.VideoCapture(video_path)

    # Reset video capture to process frames again
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

    # Define output video settings
    output_fps = cap.get(cv2.CAP_PROP_FPS)

    # Define a kernel
    kernel = np.ones((3, 3), np.uint8)

    # Read the first frame
    ret, prev_frame = cap.read()

    # Convert prev_frame to grayscale
    prev_frame_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY).astype('float64')

    i = 0
    road = None
    all_frames = None
    frams_to_process = 1000

    # Process each frame in the video
    while cap.isOpened():
        ret, curr_frame = cap.read()

        if not ret:
            break
        
        # Convert curr_frame to grayscale
        curr_frame_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY).astype('float64')

        # Resize frame to the desired output size
        curr_frame_gray = cv2.resize(curr_frame_gray, (output_width, output_height))
        prev_frame_gray = cv2.resize(prev_frame_gray, (output_width, output_height))

        # Calculate the difference between the frames
        diff_frame = detect_cars(prev_frame_gray, curr_frame_gray, kernel)

        if i == 0:
            road = diff_frame
            all_frames = np.zeros((frams_to_process, *curr_frame.shape), dtype=curr_frame.dtype)

        if i < frams_to_process:
            road += diff_frame
            road = np.clip(road, 0, 255)
            road = cv2.normalize(road, None, 0, 255, cv2.NORM_MINMAX)
            road = np.round(road)
            all_frames[i] = curr_frame
            i += 1

        elif i == frams_to_process:
            backround = np.uint8(np.round(np.median(all_frames, axis=0)))
            backround = cv2.resize(backround, (output_width, output_height))
            road = cv2.resize(road, (output_width, output_height))
            i += 1
            
        else:
            cap.release()
            cv2.destroyAllWindows()
            return road, backround

        # Update the previous frame
        prev_frame_gray = curr_frame_gray
        
        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
        
        # Display the road frame
        cv2.imshow('Road', road)
        cv2.waitKey(1)

In [4]:
def detect_cars(frame1, frame2, kernel):
    
    # Calculate the difference between the frames
    diff_frame = cv2.absdiff(frame1, frame2)
    
    # Make threshold
    th = 20
    diff_frame[diff_frame < th] = 0
    diff_frame[diff_frame >= th] = 255
    
    # Using in morphological operations
    diff_frame = cv2.morphologyEx(diff_frame, cv2.MORPH_DILATE, kernel, iterations=2)
    diff_frame = cv2.morphologyEx(diff_frame, cv2.MORPH_OPEN, kernel, iterations=3)
    
    return diff_frame

## Program

In [6]:
# Import the live traffic video
liveTrafficVideo = 'video3.mp4'
cap = cv2.VideoCapture(liveTrafficVideo)

# Reset video capture to process frames again
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

# Define output video settings
output_width = 800  # Output video width
output_height = 400  # Output video height
output_fps = 30  # Output video frame rate

# Create output video writer
fourcc = cv2.VideoWriter_fourcc(*'XVID')
output_path = 'output_video.mp4'
out = cv2.VideoWriter(output_path, fourcc, output_fps, (output_width, output_height))

# Define a kernel
kernel = np.ones((3, 3), np.uint8)

# Read the first frame
ret, prev_frame = cap.read()

# Convert perv_frame to grayscale
prev_frame_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY).astype('float64')

# Calculate the road
road, backround = get_road_frame(liveTrafficVideo, output_width, output_height)
backround = cv2.cvtColor(backround, cv2.COLOR_BGR2GRAY).astype('float64')
road = cv2.morphologyEx(road, cv2.MORPH_OPEN, kernel, iterations=2)

count = 0
center_pt_prev = []
tracking_objects = {}
tracking_id = 0

# Process each frame in the video
while cap.isOpened():
    ret, curr_frame = cap.read()
    
    count += 1
    
    if not ret:
        break
    
    
    # Convert curr_frame to grayscale
    curr_frame_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY).astype('float64')

    # Resize frame to the desired output size
    curr_frame = cv2.resize(curr_frame, (output_width, output_height))
    curr_frame_gray = cv2.resize(curr_frame_gray, (output_width, output_height))
    prev_frame_gray = cv2.resize(prev_frame_gray, (output_width, output_height))
    c= curr_frame_gray.copy()
    
    #ROI
    height, width = output_height, output_width
    roi = curr_frame[output_height // 2 :, :]
    
    # Calculate the difference between the frames
    diff_frame = detect_cars(prev_frame_gray, curr_frame_gray, kernel)
    
    # Detect cars in the frame
    cars = detect_cars_location(diff_frame)
    
    center_pt_curr = []

    # Draw rectangles around the cars on the curr_frame
    
    for (x, y, w, h) in cars:
        cx = int((x + x + w) / 2)
        cy = int((y + y + h) / 2)
        center_pt_curr.append((cx,cy))
        cv2.rectangle(curr_frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
         
    if count <= 2:    
        for pt in center_pt_curr:
            for pt2 in center_pt_prev:
                distance = math.hypot(pt2[0] - pt[0], pt2[1] - pt[1])

                if distance < 20:
                    tracking_objects[tracking_id] = pt
                    tracking_id += 1
    else:
        tracking_objects_copy = tracking_objects.copy()
        center_pt_curr_copy = center_pt_curr.copy()
        for object_id, pt2 in tracking_objects_copy.items():
            object_exists = False
            for pt in center_pt_curr_copy:
                distance = math.hypot(pt2[0] - pt[0], pt2[1] - pt[1])
                
                # Update IDs position
                if distance < 20:
                    tracking_objects[object_id] = pt
                    object_exists = True
                    if pt in center_pt_curr:
                        center_pt_curr.remove(pt)
                    continue
                    
            # Remove IDs lost
            if not object_exists:
                tracking_objects.pop(object_id)
                
        # Add new IDs found
        for pt in center_pt_curr:
            tracking_objects[tracking_id] = pt
            tracking_id += 1

    for object_id, pt in tracking_objects.items():
        cv2.circle(curr_frame, pt, 5, (0,0,255), -1)
        text = f"ID: {str(object_id)}"
        cv2.putText(curr_frame, text, (pt[0], pt[1] - 7), cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (0,0,255), 1)
    
    # Display the frame with bounding boxes and road sides
    cv2.imshow('roi', roi)
    cv2.imshow('B', backround)
    cv2.imshow('car B', cv2.absdiff(backround.astype('uint8'), c.astype('uint8')))
    cv2.imshow('Car detection in the current frame', curr_frame.astype('uint8'))
    cv2.imshow('Cars and Road Detection', np.vstack((road, diff_frame)))
    
    # Update the previous frame
    prev_frame_gray = curr_frame_gray
    
    center_pt_prev = center_pt_curr.copy()
    
    # Break the loop if 'q' is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    else:
        cv2.waitKey(0)

# Release the video capture, video writer, and close all windows
cap.release()
out.release()
cv2.destroyAllWindows()