# Traffic Red-Light Running Violation Detection

In [1]:
import cv2
import numpy as np
import time
import imutils
import sqlite3

## Constants

In [2]:
# consts
COLORS = {
    "GREEN" :(0, 255, 0),
    "YELLOW" : (0, 255, 255),
    "RED" : (0, 0, 255),
    "BLUE": (255, 0, 0)
}

TRAFFICLIGHTS = {
    "RED": 13,
    "YELLOW": 16,
    "GREEN": 19,
}


## Cross Line Detector

In [21]:

def detect_draw_cross_line(image, color="GREEN"):
    # set color
    color = COLORS.get(color.upper(), (255, 0, 0))

    # Convert image to HSV colorspace
    hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

    # Define range of yellow color in HSV
    lower_yellow = np.array([20, 100, 100])
    upper_yellow = np.array([30, 255, 255])

    # Threshold the HSV image to get only yellow colors
    yellow_mask = cv2.inRange(hsv_image, lower_yellow, upper_yellow)

    # Bitwise-AND mask and original image
    color_isolated = cv2.bitwise_and(image, image, mask=yellow_mask)

    # Convert to Grayscale
    gray = cv2.cvtColor(color_isolated, cv2.COLOR_BGR2GRAY)

    # Define a kernel size and apply Gaussian smoothing
    blurred = cv2.GaussianBlur(gray, (5, 5), 0)

    # Define our parameters for Canny and apply it to detect edges
    edges = cv2.Canny(blurred, 50, 120)

    # Create a masked edges image using cv2.fillPoly()
    mask = np.zeros_like(edges)
    ignore_mask_color = 255
    # Define a four-sided polygon to mask
    imshape = image.shape
    vertices = np.array([
        [
            (0, int(imshape[0] * 0.9)),  # bottom left
            (0, int(imshape[0] * 0.6)),  # top left 
            (imshape[1], int(imshape[0] * 0.6)),  # top right
            (imshape[1], int(imshape[0] * 0.9))  # bottom right
        ]
    ], dtype=np.int32)

    # Apply the polygon as a mask to the edges image
    cv2.fillPoly(mask, vertices, ignore_mask_color)
    masked_edges = cv2.bitwise_and(edges, mask)

    # # start test
    # # Draw the polygon on the image
    # image_with_polygon = image.copy()
    # cv2.polylines(image_with_polygon, [vertices], isClosed=True, color=(255, 255, 255), thickness=2)
    # return image_with_polygon
    # # end test

    # Run Hough on edge detected image
    lines = cv2.HoughLinesP(masked_edges, rho=1, theta=np.pi / 180, threshold=30, minLineLength=15, maxLineGap=1)

    cross_line = None
    if lines is not None:
        # Find the line with the maximum y2 value to get the lowest line in the image
        line = max(lines, key=lambda line: line[0][3])

        # Draw the line across the bottom of the detected speed bump
        x, y, w, h = cross_line = line[0]
        cv2.line(image, (0, h), (imshape[1], h), color, 2)  # color line with thickness 2
        cv2.putText(image, "Cross Line", (20, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)

    # # start debug
    # # Visualize the intermediate steps for debugging
    # cv2.imshow("Yellow Mask", cv2.resize(yellow_mask, (0, 0), fx=0.5, fy=0.5))
    # cv2.imshow("Color Isolated", cv2.resize(color_isolated, (0, 0), fx=0.5, fy=0.5))
    # cv2.imshow("Edges", cv2.resize(edges, (0, 0), fx=0.5, fy=0.5))
    # cv2.imshow("Masked Edges", cv2.resize(masked_edges, (0, 0), fx=0.5, fy=0.5))
    # # end of debug


    return image, cross_line

## License Plate Detector

In [4]:
def crop_plate_with_error_margin(frame, plate_location, error_margin_px):
    x, y, w, h = plate_location

    # Get the frame dimensions
    frame_height, frame_width = frame.shape[:2]

    # Calculate new coordinates with the error margin
    x_start = max(x - error_margin_px, 0)
    y_start = max(y - error_margin_px, 0)
    x_end = min(x + w + error_margin_px, frame_width)
    y_end = min(y + h + error_margin_px, frame_height)

    # Crop the frame using the new coordinates
    cropped_frame = frame[y_start:y_end, x_start:x_end]

    return cropped_frame

def plate_detector(frame, car):
    # Create an array of zeros with the same shape as the frame
    mask = np.zeros_like(frame, dtype=np.uint8)

    # Extract the car coordinates
    x, y, w, h = car

    # Set the car region to the original frame values
    mask[y:y+h, x:x+w] = frame[y:y+h, x:x+w]

    gray = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
    # apply a blur to make contours more thick
    filtered = cv2.GaussianBlur(gray, (5, 5), 0)
    # perform canny edge detection
    edged = cv2.Canny(filtered, 10, 100) 
    # find coutours on edge detected image
    contours = cv2.findContours(edged.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

    contours_redefined = imutils.grab_contours(contours)

    # sort contours by their area size so we search in biggest shapres first
    contours_redefined = sorted(contours_redefined, key=cv2.contourArea, reverse=True)

    # for contour in contours_redefined:
    #     contour_approx = cv2.approxPolyDP(contour, 10, True)
    #     if len(contour_approx) == 4:
    #         plate_location = contour_approx
    #         break

    # frame = cv2.drawContours(frame, [plate_location], -1, (0, 255, 0), 2)

    plate = False
    plate_location = None
    # loop over our contours
    for c in contours_redefined:
        # approximate the contour
        perimeter = cv2.arcLength(c, True)
        approx = cv2.approxPolyDP(c, 20, True)
        # if our approximated contour has four points
        # and have big enough area, then
        # we can assume that we have found license plate
        if len(approx) == 4 and cv2.contourArea(c) > 1000:
            x, y, w, h = cv2.boundingRect(c)
            if 2.5 < w / h < 4.1:
                plate = True
                plate_location = x, y, w, h
                # # comment it for test -> start
                # cv2.drawContours(frame, c, -1, (0, 255, 0), 1)
                # cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 3)
                # # comment it for test -> end
                break
    
    # if we still could'nt find the plate we keep looking with some diffrent rules
    if not plate:
        for c in contours_redefined:
            perimeter = cv2.arcLength(c, True)
            approx = cv2.approxPolyDP(c, 20, True)
            # if our approximated contour has more than four points
            # then we check for its radio and if its between 2.5 and 4.5
            # we could use that as plate (but its not accurate and needs work)
            if len(approx) >= 4:
                x, y, w, h = cv2.boundingRect(c)
                if 2.5 < w / h < 4.5 and 10000 <= (w * h):
                    # b, g, r = cv2.split(img[y:y + h, x - 20:x])
                    plate = True
                    plate_location = x, y, w, h
                    # # comment it for test -> start
                    # cv2.drawContours(frame, c, -1, (0, 255, 0), 1)
                    # cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 3)
                    # # comment it for test -> end
                    break

    # # show the plate image
    # cv2.imshow('image', frame)
    # cv2.imshow('edged', edged)

    # if we found any plates then crop and save that part
    cropped = None
    if plate:
        x, y, w, h = plate_location
        error_margin_px = 0
        cropped = crop_plate_with_error_margin(frame, plate_location, error_margin_px)
        # cropped = frame[y+error_margin_px:y+h-error_margin_px, x+error_margin_px:x+w-error_margin_px]
        # cv2.imshow('plate', cropped)
        # cv2.imwrite('plate.jpg', cropped)

    return frame, cropped


### test for license plate detector

In [5]:
# start test for plate detector sections
cars = [(1351, 988, 417, 72), (1100, 777, 540, 255), (1181, 653, 299, 132), (995, 637, 228, 199), (833, 625, 182, 276), (796, 596, 70, 66)]
frame = cv2.imread('tests/frame2.jpg', -1)
frame, cropped = plate_detector(frame, cars[1])
# frame = cv2.resize(frame, (0, 0), fx=0.5, fy=0.5)
# cv2.imshow("remove", frame)
cv2.imshow("plate", cropped)
cv2.waitKey(0)
cv2.destroyAllWindows()
# end test for plate detector sections

## plate image enhancer

In [6]:
def detect_edges_and_enhance_image(image, resize_factor=2, low_threshold=50, high_threshold=150, mean_filter_size=5):
    # Step 1: Edge Detection with Canny
    edges = cv2.Canny(image, low_threshold, high_threshold)

    # Step 2: Resize the Edge Image to a Larger Size
    original_size = (image.shape[1], image.shape[0])
    new_size = (int(original_size[0] * resize_factor), int(original_size[1] * resize_factor))
    resized_edges = cv2.resize(edges, new_size, interpolation=cv2.INTER_LINEAR)

    # Step 3: Enhance the Image in Color
    # Resize the original color image
    resized_image = cv2.resize(image, new_size, interpolation=cv2.INTER_LINEAR)
    
    # Convert the resized color image to YUV color space
    yuv_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2YUV)
    
    # Apply histogram equalization to the Y channel
    yuv_image[:, :, 0] = cv2.equalizeHist(yuv_image[:, :, 0])
    
    # Convert back to BGR color space
    enhanced_image = cv2.cvtColor(yuv_image, cv2.COLOR_YUV2BGR)
    
    # Apply sharpening to the enhanced color image
    kernel = np.array([[0, -1, 0],
                       [-1, 5, -1],
                       [0, -1, 0]])
    sharpened_image = cv2.filter2D(enhanced_image, -1, kernel)

    # Apply mean filter to the sharpened image
    mean_filtered_image = cv2.blur(sharpened_image, (mean_filter_size, mean_filter_size))

    # # Step 4: Convert blue pixels to white in HSV color space
    # # Convert the mean filtered image to HSV color space
    # # (just for remove left side of plate)
    # hsv_image = cv2.cvtColor(mean_filtered_image, cv2.COLOR_BGR2HSV)

    # # Define range of blue color in HSV
    # lower_blue = np.array([100, 150, 0])
    # upper_blue = np.array([140, 255, 255])

    # # Create a mask for blue color
    # blue_mask = cv2.inRange(hsv_image, lower_blue, upper_blue)

    # # Convert blue pixels to white
    # mean_filtered_image[blue_mask > 0] = [255, 255, 255]

    return resized_edges, mean_filtered_image


### test for edge detection and image enhancement

In [7]:
# start for test
# Perform edge detection and enhancement
plate = cv2.imread("tests/cropped.jpg", -1)
edges, enhanced_image = detect_edges_and_enhance_image(plate, resize_factor=2, low_threshold=50, high_threshold=150, mean_filter_size=5)

# Display the results
cv2.imshow("Edges", edges)
cv2.imshow("Enhanced Image", enhanced_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# end for test

## plate recognition

In [8]:
def plate_recognizer(plate_image):
    IMAGE_WIDTH = 70
    IMAGE_HEIGHT = 70
    # for test
    SHOW_STEPS = False

    # load trained data from images and classifications
    npa_classifications = np.loadtxt("classifications.txt", np.float32)
    npa_flattenedImages = np.loadtxt("flattened_images.txt", np.float32)
    # create KNN object
    k_nearest = cv2.ml.KNearest_create()
    # train KNN object with training data
    k_nearest.train(npa_flattenedImages, cv2.ml.ROW_SAMPLE, npa_classifications)

    # load plate image
    img = plate_image
    # apply a blur to make edges thick
    img = cv2.GaussianBlur(img, (5, 5), 0)
    # convert image to grayscale
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    # apply a threshhold to make image black & white
    gray = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2)

    # find contours on the black & white image
    contours, hir = cv2.findContours(gray.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

    # validating characters
    # Note: Since in Farsi characters we have characters with more than one characters (like chars with dots)
    # so we need to validate that they are characters or not 
    valid_contours = []
    for c in contours:
        [_, _, w, h] = cv2.boundingRect(c)
        if w * h >= 400:
            valid_contours.append(c)

    # cv2.drawContours(img, valid_contours, -1, (0, 255, 0), 2)

    # sort characters by their bouding size
    valid_contours = sorted(valid_contours, key=lambda ctr: cv2.boundingRect(ctr)[0])

    # store the plate number
    plate_number = []

    # loop through the valid contours (characters)
    for c in valid_contours:
        # store boundings
        [x, y, w, h] = cv2.boundingRect(c)
        # check for dots or etc
        # Method: check anything in Y axis if their boundings are in X axis
        # Explain: cause dots in Arabic/Farsi characters are above and below the character itself
        for c2 in contours:
            [x2, y2, w2, h2] = cv2.boundingRect(c2)
            if x2 >= x and x2 <= x + w:
                if y2 < y:
                    h += abs(y - y2)
                    y = y2
                if y2 > y + h:
                    h += abs(y2 - (y + h))

        # draw rectangle around the characters
        cv2.rectangle(img, (x, y), (x + w, y + h), (0, 0, 255), 1)

        # crop the character from grayscale image
        char_image = gray[y:y + h, x:x + w]
        # resize character to training size so we could compare those
        char_image = cv2.resize(char_image, (IMAGE_WIDTH, IMAGE_HEIGHT))
        # reshape resized character to numpy array
        npa = np.float32(char_image.reshape((1, IMAGE_WIDTH * IMAGE_HEIGHT)))

        # store results of compare between character and training data and find nearest possible example
        retval, npaResults, neigh_resp, dists = k_nearest.findNearest(npa, k=1)

        # convert the result found in classifications to normal character and print it on output
        # print(chr(int(npaResults[0][0])), end='')
        plate_number.append(chr(int(npaResults[0][0])))

        if SHOW_STEPS:
            cv2.imshow('test', char_image)
            cv2.waitKey(0)

    if SHOW_STEPS:
        cv2.imshow('test', gray)
        cv2.imshow('test2', img)
        cv2.waitKeyEx(0)

    # use for test
    # cv2.destroyAllWindows()

    # for removing letter from start of plate
    while True:
        try:
            if plate_number[0].isdigit():
                break
            else:
                plate_number.pop(0)
        except IndexError:
            break

    # for removing letter from end of plate
    while True:
        try:
            if plate_number[-1].isdigit():
                break
            else:
                plate_number.pop(-1)
        except IndexError:
            break
    
    valid = False
    # check if plate is valid
    plate_number = "".join(plate_number)
    if  5 <= len(plate_number) <= 8:
        valid = True
    
    return valid, plate_number

### test for plate recognition

In [9]:
# start for test
enhanced_image = cv2.imread("tests/enhanced_image.jpg", -1)
valid, plate_number = plate_recognizer(enhanced_image)
print(f'Valid plate: {valid}')
print(f'License plate: {plate_number}')
# end for test

Valid plate: True
License plate: ۸۴و۷۶۱۴۲


## DB section

In [10]:
# connect to db
conn = sqlite3.connect('database.db')

# create cursor for execute cmd
cursor = conn.cursor()

# create table to store license plates of offending cars
cursor.execute("CREATE TABLE IF NOT EXISTS Plates (id INTEGER PRIMARY KEY, plate TEXT, time INTEGER)")

# commit changes
conn.commit()

# close the connection
conn.close()

## Insert plate into DB

In [23]:
def insert_panalty(plate_text, crime_time):
    # connect to db
    conn = sqlite3.connect('database.db')

    # create cursor for execute cmd
    cursor = conn.cursor()

    # get the last time of penalty for this plate
    cursor.execute("SELECT time FROM Plates WHERE plate=? ORDER BY time DESC LIMIT 1", (plate_text,))
 
    last_time_row = cursor.fetchone()
    
    if last_time_row is not None:
        last_time = last_time_row[0]
        time_difference = crime_time - last_time
        if time_difference >= 60: # Penalty for every 60 seconds
            # insert new data
            cursor.execute("INSERT INTO Plates (plate, time) VALUES (?, ?)", (plate_text, crime_time))
            print(f"Penalty for {plate_text}")
    else:
        # insert new data
        cursor.execute("INSERT INTO Plates (plate, time) VALUES (?, ?)", (plate_text, crime_time))
        print(f"Penalty for {plate_text}")

    # commit changes
    conn.commit()

    # close the connection
    conn.close()

## Main Section

In [31]:
# Initialize video capture
cap = cv2.VideoCapture("DIP_Proj_short.mp4")

# Get the original FPS of the video
fps = cap.get(cv2.CAP_PROP_FPS)
frame_delay = int(1000 / fps)  # Calculate the delay between frames in milliseconds

# set config for save video
output_video = 'DIP_output_video.mp4'
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Define the codec and create a VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video, fourcc, fps, (width, height))

# Initialize background subtractor
fgbg = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=100, detectShadows=False)
fgbg = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=50, detectShadows=False) # better result

# set detect car color
blue_color = COLORS.get("blue".upper())
red_color = COLORS.get("red".upper())
yellow_color = COLORS.get("yellow".upper())
green_color = COLORS.get("green".upper())

frame_count = 0
seconds = 0

while cap.isOpened():
    start_time = time.time()  # Start time for frame processing

    ret, frame = cap.read()
    if not ret:
        break

    # Specifying the color of traffic lights
    if seconds >= TRAFFICLIGHTS["GREEN"]:
        lights_color = "green"
    elif seconds >= TRAFFICLIGHTS["YELLOW"]:
        lights_color = "yellow"
    else:
        lights_color = "red"

    # Write Light Color status on corner of frame
    lights_color_rgb = COLORS.get(lights_color.upper())
    cv2.putText(frame, f"Lights Color: {lights_color.capitalize()}", (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 2, lights_color_rgb, 5)
    

    # find out seconds of video based reading frame
    if frame_count % fps == 0:
        seconds += 1
        print(f"Video time: {seconds}s")
        print(f"Lights color: {lights_color}")

    frame_count += 1

    # Resize frame to half of its original size to speed up processing
    # frame = cv2.resize(frame, (0, 0), fx=0.5, fy=0.5)

    # copy frame
    org_frame = frame.copy()

    # Apply background subtraction
    fgmask = fgbg.apply(frame)

    # Apply some morphological operations to remove noise
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
    fgmask = cv2.morphologyEx(fgmask, cv2.MORPH_OPEN, kernel)
    fgmask = cv2.morphologyEx(fgmask, cv2.MORPH_CLOSE, kernel)

    # Detect and draw the yellow speed bump line
    frame, cross_line = detect_draw_cross_line(frame, lights_color)
    if cross_line is not None:
        _, _, _, h_cross = cross_line
    else:
        h_cross = -1

    # List to store car detections for the current frame
    car_detections = []

    # Find contours in the mask
    contours, _ = cv2.findContours(fgmask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    for contour in contours:
        # Filter out small contours
        if cv2.contourArea(contour) < 1000:
        # if cv2.contourArea(contour) < 5000: better result
            continue

        # Get bounding box for each contour
        car_color = blue_color
        x, y, w, h = cv2.boundingRect(contour)
        
        if (y+h) >= h_cross and lights_color == "red":
            car_color = red_color
        elif (y+h) >= h_cross and lights_color == "yellow":
            car_color = yellow_color
        elif (y+h) >= h_cross and lights_color == "green":
            car_color = green_color

        cv2.rectangle(frame, (x, y), (x + w, y + h), car_color, 2)
        cv2.putText(frame, "Car", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 1, car_color, 2)
        
        # Add the detection to the list
        car_detections.append((x, y, w, h))

    # detect crime car
    for car in car_detections:
        x, y, w, h = car
        if (y+h) >= h_cross:
            _frame, cropped = plate_detector(org_frame, car)
            if type(cropped) != np.ndarray:
                continue
            edges, enhanced_image = detect_edges_and_enhance_image(plate, resize_factor=2, low_threshold=50, high_threshold=150, mean_filter_size=5)
            valid, plate_number = plate_recognizer(enhanced_image)
            if valid:
                insert_panalty(plate_number, seconds)


    # Write the modified frame to the output video
    out.write(frame)

    # Display the frame
    frame = cv2.resize(frame, (0, 0), fx=0.5, fy=0.5)
    cv2.imshow("Frame", frame)

    # # Print or save car detections for the current frame
    # print(f"Frame {int(cap.get(cv2.CAP_PROP_POS_FRAMES))} detections: {car_detections}")

    # Calculate the time spent processing the frame
    elapsed_time = (time.time() - start_time) * 1000  # Convert to milliseconds

    # Calculate the wait time to match the original FPS
    wait_time = max(int(frame_delay - elapsed_time), 1)

    if cv2.waitKey(wait_time) & 0xFF == ord('q'):
        break

cap.release()
out.release()
cv2.destroyAllWindows()


Video time: 1s
Lights color: red
Penalty for ۸۴و۷۶۱۶۲
Video time: 2s
Lights color: red
Video time: 3s
Lights color: red
Video time: 4s
Lights color: red
Video time: 5s
Lights color: red
Video time: 6s
Lights color: red
Video time: 7s
Lights color: red
Video time: 8s
Lights color: red
Video time: 9s
Lights color: red
Video time: 10s
Lights color: red
Video time: 11s
Lights color: red
Video time: 12s
Lights color: red
Video time: 13s
Lights color: red
Video time: 14s
Lights color: red
Video time: 15s
Lights color: red
Video time: 16s
Lights color: red
Video time: 17s
Lights color: yellow
Video time: 18s
Lights color: yellow
Video time: 19s
Lights color: yellow
Video time: 20s
Lights color: green
Video time: 21s
Lights color: green
Video time: 22s
Lights color: green
Video time: 23s
Lights color: green


## Clear all rows the plate table of DB

In [30]:
# connect to db
conn = sqlite3.connect('database.db')

# create cursor for execute cmd
cursor = conn.cursor()

# delete all rows from table
cursor.execute("DELETE FROM plates")

# commit changes
conn.commit()

# close the connection
conn.close()