In [1]:
pip install pytesseract

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install easyocr

Note: you may need to restart the kernel to use updated packages.


In [3]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import os
import pytesseract
import easyocr
import re
from PIL import Image
from collections import deque
import time

In [4]:
def detect_traffic_light_color(image, rect):
    # Extract rectangle dimensions
    x, y, w, h = rect
    # Extract region of interest (ROI) from the image based on the rectangle
    roi = image[y:y+h, x:x+w]
    
    # Convert ROI to HSV color space
    hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
    
    # Define HSV range for red color
    red_lower = np.array([0, 120, 70])
    red_upper = np.array([10, 255, 255])

    # Define HSV range for yellow color
    yellow_lower = np.array([20, 100, 100])
    yellow_upper = np.array([30, 255, 255])

    # Create binary masks for detecting red and yellow in the ROI
    red_mask = cv2.inRange(hsv, red_lower, red_upper)
    yellow_mask = cv2.inRange(hsv, yellow_lower, yellow_upper)
    
    # Font details for overlaying text on the image
    font = cv2.FONT_HERSHEY_TRIPLEX
    font_scale = 1  
    font_thickness = 2  
    
    # Check which color is present based on the masks
    if cv2.countNonZero(red_mask) > 0:
        text_color = (0, 0, 255)
        message = "Detected Signal Status: Stop"
        color = 'red'
    elif cv2.countNonZero(yellow_mask) > 0:
        text_color = (0, 255, 255)
        message = "Detected Signal Status: Caution"
        color = 'yellow'
    else:
        text_color = (0, 255, 0)
        message = "Detected Signal Status: Go"
        color = 'green'
        
    # Overlay the detected traffic light status on the main image
    cv2.putText(image, message, (15, 70), font, font_scale+0.5, text_color, font_thickness+1, cv2.LINE_AA)
    # Add a separator line
    cv2.putText(image, 34*'-', (10, 115), font, font_scale, (255,255,255), font_thickness, cv2.LINE_AA)
    
    # Return the modified image and detected color
    return image, color

pass

In [5]:
class LineDetector:
    def __init__(self, num_frames_avg=10):
        # Initialize two deque queues to hold y-coordinate values across frames
        self.y_start_queue = deque(maxlen=num_frames_avg)
        self.y_end_queue = deque(maxlen=num_frames_avg)

    
    def detect_white_line(self, frame, color, 
                          slope1=0.03, intercept1=920, slope2=0.03, intercept2=770, slope3=-0.8, intercept3=2420):
        
        # Function to map color names to BGR values
        def get_color_code(color_name):
            color_codes = {
                'red': (0, 0, 255),
                'green': (0, 255, 0),
                'yellow': (0, 255, 255)
                 }
            return color_codes.get(color_name.lower())

        frame_org = frame.copy()
        
        # Line equations for defining region of interest (ROI)
        def line1(x): return slope1 * x + intercept1
        def line2(x): return slope2 * x + intercept2
        def line3(x): return slope3 * x + intercept3

        height, width, _ = frame.shape
        
        # Create a mask to spotlight the line's desired area
        mask1 = frame.copy()
        # Set pixels below the first line to black in mask1
        for x in range(width):
            y_line = line1(x)
            mask1[int(y_line):, x] = 0

        mask2 = mask1.copy()
        # Set pixels above the second line to black in mask2
        for x in range(width):
            y_line = line2(x)
            mask2[:int(y_line), x] = 0

        mask3 = mask2.copy()
        # Set pixels to the left of the third line to black in mask3 (final mask)
        for y in range(height):
            x_line = line3(y)
            mask3[y, :int(x_line)] = 0

        # Convert the mask to grayscale
        gray = cv2.cvtColor(mask3, cv2.COLOR_BGR2GRAY)

        # Apply a Gaussian filter to the ROI
        blurred_gray = cv2.GaussianBlur(gray, (7, 7), 0)

        # Apply CLAHE to equalize the histogram
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
        gray_clahe = clahe.apply(blurred_gray)

        # Perform edge detection
        edges = cv2.Canny(gray, 30, 100)

        # Perform a dilation and erosion to close gaps in between object edges
        dilated_edges = cv2.dilate(edges, None, iterations=1)
        edges = cv2.erode(dilated_edges, None, iterations=1)

        # Perform Hough Line Transform
        lines = cv2.HoughLinesP(edges, 1, np.pi/180, 100, minLineLength=160, maxLineGap=5)

        # Calculate x coordinates for the start and end of the line
        x_start = 0
        x_end = width - 1

        if lines is not None:
            for line in lines:
                x1, y1, x2, y2 = line[0]

                # Calculate line parameters
                slope = (y2 - y1) / (x2 - x1 + np.finfo(float).eps)  # Add a small number to avoid division by zero
                intercept = y1 - slope * x1

                # Calculate corresponding y coordinates
                y_start = int(slope * x_start + intercept)
                y_end = int(slope * x_end + intercept)

                # Add the y_start and y_end values to the queues
                self.y_start_queue.append(y_start)
                self.y_end_queue.append(y_end)

        # Compute the average y_start and y_end values
        avg_y_start = int(sum(self.y_start_queue) / len(self.y_start_queue)) if self.y_start_queue else 0
        avg_y_end = int(sum(self.y_end_queue) / len(self.y_end_queue)) if self.y_end_queue else 0

        # Draw the line
        line_start_ratio=0.32
        x_start_adj = x_start + int(line_start_ratio * (x_end - x_start))  # Adjusted x_start
        avg_y_start_adj = avg_y_start + int(line_start_ratio * (avg_y_end - avg_y_start))  # Adjusted avg_y_start

        # Create a mask with the same size as the frame and all zeros (black)
        mask = np.zeros_like(frame)

        # Draw the line on the mask
        cv2.line(mask, (x_start_adj, avg_y_start_adj), (x_end, avg_y_end), (255, 255, 255), 4)

        # Determine which color channel(s) to change based on the color argument
        color_code = get_color_code(color)
        if color_code == (0, 255, 0):  # Green
            channel_indices = [1]
        elif color_code == (0, 0, 255):  # Red
            channel_indices = [2]
        elif color_code == (0, 255, 255):  # Yellow
            # Yellow in BGR is a combination of green and red channels.
            # Here we modify both green and red channels.
            channel_indices = [1, 2]
        else:
            raise ValueError('Unsupported color')

        # Change the specified color channels of the frame where the mask is white
        for channel_index in channel_indices:
            frame[mask[:,:,channel_index] == 255, channel_index] = 255
                  
        # Calculate slope and intercept for the average green line
        slope_avg = (avg_y_end - avg_y_start) / (x_end - x_start + np.finfo(float).eps)
        intercept_avg = avg_y_start - slope_avg * x_start

        # Create a mask with the pixels above the green line set to black
        mask_line = np.copy(frame_org)
        for x in range(width):
            y_line = slope_avg * x + intercept_avg - 35
            mask_line[:int(y_line), x] = 0  # set pixels above the line to black

        return frame, mask_line
    
pass

In [6]:
def extract_license_plate(frame, mask_line):    
    # Convert the image to grayscale (Haar cascades are typically trained on grayscale images)
    gray = cv2.cvtColor(mask_line, cv2.COLOR_BGR2GRAY)
    
    # Apply CLAHE to equalize the histogram
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    gray = clahe.apply(gray)
    
    # Erode the image using a 2x2 kernel to remove noise
    kernel = np.ones((2, 2), np.uint8)
    gray = cv2.erode(gray, kernel, iterations=1)

    # Find the bounding box of non-black pixels
    non_black_points = cv2.findNonZero(gray)
    x, y, w, h = cv2.boundingRect(non_black_points)

    # Calculate the new width of the bounding box, excluding 30% on the right side
    w = int(w * 0.7)

    # Crop the image to the bounding box
    cropped_gray = gray[y:y+h, x:x+w]

    # Detect license plates in the image (this returns a list of rectangles)
    license_plates = license_plate_cascade.detectMultiScale(cropped_gray, scaleFactor=1.07, minNeighbors=15, minSize=(20, 20))

    # List to hold cropped license plate images
    license_plate_images = []

    # Loop over the license plates
    for (x_plate, y_plate, w_plate, h_plate) in license_plates:
        # Draw a rectangle around the license plate in the original frame (here you need the original coordinates)
        cv2.rectangle(frame, (x_plate + x, y_plate + y), (x_plate + x + w_plate, y_plate + y + h_plate), (0, 255, 0), 3)
    
        # Crop the license plate and append it to the list (here x_plate and y_plate are relative to cropped_gray)
        license_plate_image = cropped_gray[y_plate:y_plate+h_plate, x_plate:x_plate+w_plate]
        license_plate_images.append(license_plate_image)

    return frame, license_plate_images

pass

In [7]:
def extract_license_plate(frame, mask_line):    
    # Convert the image to grayscale (Haar cascades are typically trained on grayscale images)
    gray = cv2.cvtColor(mask_line, cv2.COLOR_BGR2GRAY)
    
    # Apply CLAHE to equalize the histogram
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    gray = clahe.apply(gray)
    
    # Erode the image using a 2x2 kernel to remove noise
    kernel = np.ones((2, 2), np.uint8)
    gray = cv2.erode(gray, kernel, iterations=1)

    # Find the bounding box of non-black pixels
    non_black_points = cv2.findNonZero(gray)
    x, y, w, h = cv2.boundingRect(non_black_points)

    # Calculate the new width of the bounding box, excluding 30% on the right side
    w = int(w * 0.7)

    # Crop the image to the bounding box
    cropped_gray = gray[y:y+h, x:x+w]
    
    # Print dimensions of cropped_gray for debugging
    print("Dimensions of cropped_gray:", cropped_gray.shape)

    # Detect license plates in the image (this returns a list of rectangles)
    license_plates = license_plate_cascade.detectMultiScale(cropped_gray, scaleFactor=1.2, minNeighbors=5, minSize=(30, 30))

    # List to hold cropped license plate images
    license_plate_images = []

    # Loop over the license plates
    for (x_plate, y_plate, w_plate, h_plate) in license_plates:
        cv2.rectangle(frame, (x_plate + x, y_plate + y), (x_plate + x + w_plate, y_plate + y + h_plate), (0, 255, 0), 3)

        # Crop the license plate and append it to the list (here x_plate and y_plate are relative to cropped_gray)
        license_plate_image = cropped_gray[y_plate:y_plate+h_plate, x_plate:x_plate+w_plate]
        license_plate_images.append(license_plate_image)

    return frame, license_plate_images


In [8]:
def apply_ocr_to_image(license_plate_image):    
    # Threshold the image
    _, img = cv2.threshold(license_plate_image, 120, 255, cv2.THRESH_BINARY)

    # Convert OpenCV image format to PIL Image format for pytesseract
    pil_img = Image.fromarray(img)

    # Use pytesseract to extract text from the image
    full_text = pytesseract.image_to_string(pil_img, config='--psm 6')

    return full_text.strip()  # Removing any extra white spaces from the ends

pass

In [9]:
def draw_penalized_text(frame):
    # Set font, scale, thickness, and color
    font = cv2.FONT_HERSHEY_TRIPLEX
    font_scale = 1  
    font_thickness = 2
    color = (255, 255, 255)  # White color
    
    # Initial position for Y-coordinate
    y_pos = 180
    
    # Put title on the frame
    cv2.putText(frame, 'Fined license plates:', (25, y_pos), font, font_scale, color, font_thickness)
    
    # Update Y-coordinate position
    y_pos += 80

    # Loop through all fined license plates
    for text in penalized_texts:
        # Add fined license plate text on the frame
        cv2.putText(frame, '->  '+text, (40, y_pos), font, font_scale, color, font_thickness)
        
        # Update Y-coordinate for next license plate
        y_pos += 60

pass

In [10]:
def create_text_file(file_path):
    try:
        with open(file_path, 'w') as file:
            # Write header for the data
            file.write("id,plate_number,violation_count\n")
        print(f"Text file {file_path} created successfully!")

    except IOError as e:
        print("Error while creating text file", e)

def create_database_and_table_txt(file_path):
    try:
        # Create a text file
        create_text_file(file_path)

    except Exception as e:
        print("Error occurred:", e)

create_database_and_table_txt("license_plates.txt")

pass

Text file license_plates.txt created successfully!


In [11]:
def update_text_file_with_violation(plate_number, file_path):
    try:
        # Check if the text file exists, if not, create it
        if not os.path.exists(file_path):
            create_text_file(file_path)

        # Open the text file in append mode
        with open(file_path, 'a') as file:
            # Check if the plate_number already exists in the text file
            with open(file_path, 'r') as f:
                for line in f:
                    if plate_number in line:
                        # If plate_number exists, update the violation_count by 1
                        line_parts = line.strip().split(',')
                        new_count = int(line_parts[2]) + 1
                        updated_line = f"{line_parts[0]},{line_parts[1]},{new_count}\n"
                        file.write(updated_line)
                        return

            # If plate_number doesn't exist, append a new record
            file.write(f"{len(open(file_path).readlines())},{plate_number},1\n")

        print(f"Violation for plate number {plate_number} updated in {file_path} successfully!")

    except IOError as e:
        print("Error while updating text file with violation", e)

def update_database_with_violation_txt(plate_number, file_path):
    try:
        # Update the text file with the violation
        update_text_file_with_violation(plate_number, file_path)

    except Exception as e:
        print("Error occurred:", e)

# Example usage:
update_database_with_violation_txt("ABC123", "license_plates.txt")

pass

Violation for plate number ABC123 updated in license_plates.txt successfully!


In [12]:
def print_all_violations_txt(file_path):
    try:
        # Check if the text file exists
        if os.path.exists(file_path):
            # Open the text file and read its contents
            with open(file_path, 'r') as file:
                print("\n")
                print("-"*66)
                print("\nAll Registered Traffic Violations:\n")
                # Skip the header line
                next(file)
                for line in file:
                    id, plate_number, violation_count = line.strip().split(',')
                    print(f"Plate Number: {plate_number}, Violations: {violation_count}")
        else:
            print("No violations found.")

    except IOError as e:
        print("Error while reading text file", e)

def print_all_violations_from_txt(file_path):
    try:
        # Print all violations from the text file
        print_all_violations_txt(file_path)

    except Exception as e:
        print("Error occurred:", e)

# Example usage:
print_all_violations_from_txt("license_plates.txt")



------------------------------------------------------------------

All Registered Traffic Violations:

Plate Number: ABC123, Violations: 1


In [15]:
def clear_license_plates_txt(file_path):
    try:
        # Open the text file in write mode to truncate its content
        with open(file_path, 'w') as file:
            file.write("id,plate_number,violation_count\n")
        print("All license plates cleared successfully!")

    except IOError as e:
        print("Error while clearing license plates", e)

def clear_license_plates_from_txt(file_path):
    try:
        # Clear license plates from the text file
        clear_license_plates_txt(file_path)

    except Exception as e:
        print("Error occurred:", e)

# Example usage:
clear_license_plates_from_txt("license_plates.txt")

All license plates cleared successfully!


In [16]:
%%capture
# Load the trained Haar Cascade
license_plate_cascade = cv2.CascadeClassifier('haarcascade_russian_plate_number.xml')

# Create a list to store unique penalized license plate texts
penalized_texts = []

In [17]:
def calculate_speed(distance, time_interval):
    # Convert distance from pixels to meters (assuming 1 pixel = 0.05 meters)
    distance_meters = distance * 0.05
    # Convert time_interval from seconds to hours
    time_hours = time_interval / 3600
    # Calculate speed in km/hr
    speed_kmh = (distance_meters / 1000) / time_hours
    return speed_kmh

In [23]:
# Function to write car information to a text file
def write_to_file(car_speeds):
    with open('speeding_cars.txt', 'a') as file:
        for car_id, speed in car_speeds.items():
            # Check if speed is within the desired range
            if 20 <= speed <= 100:
                file.write(f"Car {car_id}: Speed {speed:.2f} km/hr\n")

pass


In [26]:
def main():
    try:
        # Ensure the text file exists and create it if necessary
        create_text_file('license_plates.txt')

        # Clear the license plates from the previous run. (Comment out this line if desired!)
        clear_license_plates_txt('license_plates.txt')

        # Load pre-trained car detection model
        car_cascade = cv2.CascadeClassifier('haarcascade_car.xml')

        # Initialize video capture
        vid = cv2.VideoCapture('traffic.mp4')
        
        # Check if the video file was opened successfully
        if not vid.isOpened():
            print("Error: Unable to open video file.")
            return


        # Create detector object
        detector = LineDetector()

        # Initialize variables for tracking
        prev_frame_time = 0
        prev_car_positions = {}

        # Coordinates and dimensions of the rectangle where the traffic light is located
        rect = (1700, 40, 100, 250)

        # Loop through each frame in the video
        while True:
            # Read frame
            ret, frame = vid.read()

            # Break if frame is not returned
            if not ret:
                break

            # Detect cars in the frame
            cars = car_cascade.detectMultiScale(frame, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

            # Track car positions
            current_car_positions = {}
            for i, (x, y, w, h) in enumerate(cars):
                # Draw rectangle around detected cars
                cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)

                # Store current car positions
                current_car_positions[i] = (x, y)

            # Calculate time interval between frames
            current_time = time.time()
            time_interval = current_time - prev_frame_time

            # Calculate distance traveled by each car and speed
            distances = {}
            for car_id, current_pos in current_car_positions.items():
                min_distance = float('inf')
                for prev_id, prev_pos in prev_car_positions.items():
                    distance = ((current_pos[0] - prev_pos[0])**2 + (current_pos[1] - prev_pos[1])**2)**0.5
                    min_distance = min(min_distance, distance)
                distances[car_id] = min_distance

            # Calculate speed for each car
            speeds = {car_id: calculate_speed(distance, time_interval) for car_id, distance in distances.items()}

            # Update previous frame time and car positions
            prev_frame_time = current_time
            prev_car_positions = current_car_positions

            # Write car speeds to file
            write_to_file(speeds)

            # Detect traffic light color
            frame, color = detect_traffic_light_color(frame, rect)

            # If traffic light is red, attempt license plate detection
            if color == 'red':
                
                # Detect license plates
                plates = license_plate_cascade.detectMultiScale(frame, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
                for (x, y, w, h) in plates:
                    # Extract license plate region
                    license_plate_roi = frame[y:y+h, x:x+w]
                    # Apply OCR to recognize characters
                    plate_number = apply_ocr_to_image(license_plate_roi)
                    # Check if plate number is valid and save to file
                    if plate_number is not None and re.match("^[A-Z]{2}\s[0-9]{3,4}$", plate_number):
                        update_text_file_with_violation(plate_number, 'license_plates.txt')
                        print(f"Detected license plate: {plate_number}")

            # Display the frame
            cv2.imshow('frame', frame)

            # Break if ESC key is pressed
            if cv2.waitKey(1) == 27:
                break

        # Release the video
        vid.release()

        # Close all OpenCV windows
        cv2.destroyAllWindows()

    except Exception as e:
        print(f"An error occurred: {e}")


  if plate_number is not None and re.match("^[A-Z]{2}\s[0-9]{3,4}$", plate_number):


In [27]:

if __name__ == "__main__":
    main()

Text file license_plates.txt created successfully!
All license plates cleared successfully!
Violation for plate number YB 6433 updated in license_plates.txt successfully!
Detected license plate: YB 6433
Detected license plate: YB 6433
Detected license plate: YB 6433
Detected license plate: YB 6433
Detected license plate: YB 6433


In [28]:
def remove_duplicate_speeds_from_file(file_path):
    unique_speeds = set()
    # Read speeds from file and remove duplicates
    with open(file_path, 'r') as file:
        lines = file.readlines()
        for line in lines:
            
            # Find the index of 'Speed' in the line
            speed_index = line.find('Speed')  
            if speed_index != -1:

                # Extract the speed value
                speed_str = line[speed_index:].split()[1]  
                # Add speed to set to remove duplicates
                unique_speeds.add(speed_str.strip())  

    # Write the unique speeds back to the file
    with open(file_path, 'w') as file:
        for speed in unique_speeds:
            file.write(f"Car: Speed {speed}: km/hr\n")


In [29]:
remove_duplicate_speeds_from_file('speeding_cars.txt')