In [1]:
import torch
import cv2
import numpy as np
from deep_sort_realtime.deepsort_tracker import DeepSort
from ultralytics import YOLO

class ANPRPipelineWithTracking:
    def __init__(self, plate_model_path, char_model_path, confidence_threshold=0.55):
        # Load the YOLO models
        self.plate_model = YOLO(plate_model_path)  # Model for detecting number plates
        self.char_model = YOLO(char_model_path)    # Model for detecting characters
        self.confidence_threshold = confidence_threshold
        
        # Initialize the DeepSORT tracker
        self.deepsort = DeepSort()

    def detect_number_plate(self, image):
        """Detect the number plate using the first YOLO model."""
        results = self.plate_model(image, verbose=False)
        plates = []

        # Process the results to extract bounding boxes
        for result in results:
            boxes = result.boxes.xyxy.cpu().numpy()  # Bounding box coordinates
            confidences = result.boxes.conf.cpu().numpy()  # Confidence scores
            class_ids = result.boxes.cls.cpu().numpy()  # Class IDs (for number plates)

            for box, confidence, class_id in zip(boxes, confidences, class_ids):
                if confidence > self.confidence_threshold:
                    x1, y1, x2, y2 = map(int, box)

                    # # Add padding of 20 pixels
                    # padding = 20

                    # # Extend the bounding box by the padding, ensuring we don't go out of image bounds
                    # x1 = max(1, x1 - padding)  # Ensure x1 is not less than 0
                    # y1 = max(1, y1 - padding)  # Ensure y1 is not less than 0
                    # x2 = min(image.shape[1], x2 + padding)  # Ensure x2 doesn't exceed image width
                    # y2 = min(image.shape[0], y2 + padding)  # Ensure y2 doesn't exceed image height

                    plates.append((x1, y1, x2, y2, confidence))

        return plates

    def track_number_plates(self, image, plates):
        """Track number plates using DeepSORT."""
        # Convert plates to the required format for DeepSORT ([left, top, width, height], confidence, detection_class)
        detections = []

        for x1, y1, x2, y2, confidence in plates:
            # Ensure the detection is formatted correctly as ([left, top, width, height], confidence, detection_class)
            if confidence > self.confidence_threshold:  # Optional: Check for confidence threshold
                # The class_id is generally needed to indicate the detection class; for plates, it could be a fixed class (e.g., 0)
                detection_class = 0  # Assuming class '0' corresponds to number plates
                width = max(x2 - x1,x1-x2)
                height =max( y2 - y1,y1-y2)
                detections.append(([x1, y1, width, height], confidence, detection_class))

        # Ensure detections are passed as a list of tuples in the correct format
        if len(detections) > 0:
            trackers = self.deepsort.update_tracks(detections, frame=image)
        else:
            trackers = []

        # Collect the tracked plates with their IDs
        # print(detections)
        # print(len(trackers))
        # print(len(plates))
        tracked_plates = []
        for track in trackers:
            track_id = track.track_id # Unique ID for each track (DeepSORT assigns this)
            x1, y1, x2, y2 = track.to_tlbr()  # Get the bounding box coordinates
            # x1, y1, x2, y2 = plate[:4]  # Get the bounding box coordinates

            # Append the tracked plate information (ID, bounding box coordinates)
            tracked_plates.append((track_id, int(x1), int(y1), int(x2), int(y2)))

        return tracked_plates





    def detect_characters(self, plate_image):
        """Detect characters in the number plate using the second YOLO model."""
        results = self.char_model(plate_image, verbose=False)
        characters = []
        
        # Process character detection results
        for result in results:
            # print(result)
            boxes = result.boxes.xyxy.cpu().numpy()  # Bounding boxes
            confidences = result.boxes.conf.cpu().numpy()  # Confidence scores
            class_ids = result.boxes.cls.cpu().numpy()  # Character class IDs
            names = result.names  # Character class names

            # Sort by x-coordinate to maintain left-to-right order
            sorted_indices = np.argsort(boxes[:, 0])
            sorted_boxes = boxes[sorted_indices]
            sorted_texts = class_ids[sorted_indices]

            # Collect detected characters
            for box, text in zip(sorted_boxes, sorted_texts):
                char = names[int(text)]
                characters.append(char)

        return characters
    
    def get_plate_text(self, image, tracked_plates) -> list:
        """Get the text detected on the number plate along with the ID."""
        plate_info = []
        for track_id, x1, y1, x2, y2 in tracked_plates:
            # Crop the number plate region and process characters
            plate_image = image[y1:y2, x1:x2]
            if plate_image.shape[0] == 0 or plate_image.shape[1] == 0:
                print("Invalid plate image dimensions:", plate_image.shape)
                continue  
            characters = self.detect_characters(plate_image)
            plate_text = ''.join(characters)
            
            # Add the ID and text of the plate to the result
            plate_info.append((track_id, plate_text))

        return plate_info

    def draw_bounding_boxes(self, image, plates):
        """Draw bounding boxes around the detected number plates and their characters."""
        for x1, y1, x2, y2, confidence in plates:
            # Draw the bounding box for the number plate
            cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
            
            # Crop the number plate region and process characters
            plate_image = image[y1:y2, x1:x2]
            # plate_text = self.get_plate_text(plate_image)[0]


            # Display the detected text on the image
            cv2.putText(image, "license plate", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
            # print(f"Detected Number Plate: {plate_text}")

        return image

    def show_results(self, image, tracked_plates):
        """Visualize the detected number plates with bounding boxes and IDs on the image."""
        for track_id, x1, y1, x2, y2 in tracked_plates:
            # Draw the bounding box for the number plate
            cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
            # Display the ID
            cv2.putText(image, f"ID: {track_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        result_image = cv2.resize(image, (1280, 720))
        cv2.imshow("Detected Number Plates with Tracking", result_image)
        cv2.waitKey(0)
        cv2.destroyAllWindows()

    def process_video(self, video_path):
        """Process a video file with the full pipeline."""
        cap = cv2.VideoCapture(video_path)
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Detect number plates
            plates = self.detect_number_plate(frame)

            # Track number plates
            tracked_plates = self.track_number_plates(frame, plates)

            # Get plate text along with their IDs
            plate_info = self.get_plate_text(frame, tracked_plates)

            # Show the results (number plates with bounding boxes and IDs)
            self.show_results(frame, tracked_plates)

            # Print plate text with ID
            for track_id, plate_text in plate_info:
                print(f"Plate ID: {track_id}, Plate Text: {plate_text}")

            if cv2.waitKey(1) & 0xFF == ord("q"):
                break

        cap.release()
        cv2.destroyAllWindows()
print("doneee")
# Example Usage
# if __name__ == "__main__":
#     # Define the paths to the YOLO model weights
#     plate_model_path = "best_l.pt"  # Replace with your plate detection model path
#     char_model_path = "best_charseg.pt"  # Replace with your character segmentation model path

#     # Initialize the ANPR pipeline with tracking
#     anpr_pipeline = ANPRPipelineWithTracking(plate_model_path, char_model_path)

#     # Process a video
#     video_path = r"videos/192.168.1.108_IP Camera_main_20241115162510.mp4"  # Replace with your video file path
#     anpr_pipeline.process_video(video_path)


doneee


In [2]:
# from ANPRPipeline import ANPRPipelineWithTracking
import cv2

# Define the paths to the YOLO model weights
plate_model_path = "best_l.pt"
char_model_path = "best_charseg.pt"  

video_path = r"C:\Users\samar\Desktop\capstone\anprtest4\videos\192.168.1.108_IP Camera_main_20241115162510.mp4"

# Initialize the ANPR pipeline with tracking
anpr_pipeline = ANPRPipelineWithTracking(plate_model_path, char_model_path)

# Process a video
cap = cv2.VideoCapture(video_path)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Detect number plates and draw bounding boxes
    plates = anpr_pipeline.detect_number_plate(frame)
    if plates:
        tracked_plates = anpr_pipeline.track_number_plates(frame, plates)

        number_plates = anpr_pipeline.get_plate_text(frame, tracked_plates)
        result_image = anpr_pipeline.draw_bounding_boxes(frame, plates)
        # anpr_pipeline.show_results(frame, plates)
        for number_plate in number_plates:
            print(number_plate)
    else:
        # No number plates detected
        result_image = frame
    result_image = cv2.resize(result_image, (1280, 720))
    cv2.imshow("Detected Number Plates", result_image)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()


  from .autonotebook import tqdm as notebook_tqdm


('1', 'L')
('2', 'L')
('3', '11ZLIE347')
('4', 'JFLZA43347')
('5', 'CH0DALA4S347')
('6', 'CH0AAB347')
('7', 'CHDAA8347')
('7', 'CH0AAB8347')
('7', 'CH0AA8347')
('7', 'CH0AA8347')
('7', 'CH0AA8347')
('7', 'CH0AA8347')
('7', 'C5H0AAB347')
('7', 'CN0AAB347')
('7', 'CNH0ZAAB347')
('7', 'CHW0ZAA8B347')
('7', 'CW0ZAA83A47')
('7', 'CNW0AA8B347')
('7', 'CH0ZAAB347')
('7', '')
('8', '')
('7', '')
('9', '')
('7', '')
('10', '4S5')
('7', '')
('11', 'CHDQ1AB143')
('7', '')
('12', 'CHO01AH1485')
('7', '')
('13', 'CH01AH1485')
('7', '')
('14', 'CH01AMH1485')
('7', '')
('14', 'CHD1AM1485')
('7', '')
('14', 'CN01AH1485')
('7', '')
('14', 'CWH01AM1485')
('7', '05')
('14', 'CH01AM1485')
('7', '')
('14', 'CH01AM1485')
('7', '')
('14', 'C3W01AM1485')
('7', '11')
('14', 'CW01AM1485')
('7', '')
('14', 'CN01AM1485')
('7', '')
('14', 'CA01AMT485')


In [None]:
# Initialize the ANPR pipeline with tracking
anpr_pipeline = ANPRPipelineWithTracking(plate_model_path, char_model_path)

# Process a video
video_path = r"videos/192.168.1.108_IP Camera_main_20241115162510.mp4"  # Replace with your video file path
anpr_pipeline.process_video(video_path)

In [27]:
cap.release()
cv2.destroyAllWindows()

In [2]:
import datetime
import difflib  # To calculate similarity using sequence matching

class NumberPlatePredictor:
    def __init__(self, existing_plates=None):
        # Initialize with an empty history and existing valid number plates
        self.history = {}
        """ 
        key: number plate id
        value: dict {
            number_plate: str
            similarity: float
            last_time_stamp: datetime
        }
        """
        self.existing_plates = existing_plates if existing_plates else []

    def _calculate_similarity(self, plate1, plate2):
        """Calculates the similarity between two number plates using sequence matching."""
        return difflib.SequenceMatcher(None, plate1, plate2).ratio()

    def add_existing_plate(self, plates:list[str]):
        """Adds a valid plates to the list of existing plates."""
        for plate in plates:
            self.existing_plates.append(plate.upper())
    
    # todo:Imporve this function
    def is_plate_text_valid(self,plate_text):
        """Checks if the given plate text is valid."""
        return plate_text.isalnum() and len(plate_text) >=7
    
    def get_similar_plate(self, plate_text):
        """Returns the most similar plate from existing plates."""
        highest_similarity = 0.0
        most_similar_plate = None

        for existing_plate in self.existing_plates:
            similarity = self._calculate_similarity(plate_text, existing_plate)
            if similarity > highest_similarity:
                highest_similarity = similarity
                most_similar_plate = existing_plate

        return most_similar_plate, highest_similarity

    def update_history(self, plate_id, plate_text) ->str:
        """Updates the history dictionary with a new plate and its most similar existing plate."""
        if not plate_text:
            return "" 
        plate_text = plate_text.upper()
        if not self.is_plate_text_valid(plate_text):
            return ""

        if plate_id in self.history and self.history[plate_id]["similarity"]>0.95:
            return self.history[plate_id]["number_plate"]
        
        # Find the most similar plate from existing plates
        most_similar_plate = None
        highest_similarity = 0.0

        most_similar_plate, highest_similarity = self.get_similar_plate(plate_text)

        if plate_id in self.history and self.history[plate_id]["similarity"] >highest_similarity:
            return self.history[plate_id]["number_plate"]
            
        
        # Update the history with the most similar plate and current timestamp
        if most_similar_plate and highest_similarity > 0.75:
            self.history[plate_id] = {
                'number_plate': most_similar_plate,
                'similarity': highest_similarity,
                'last_time_stamp': datetime.datetime.now()
            }
            return most_similar_plate
        else:
            return plate_text

    def get_history(self):
        """Returns the history dictionary."""
        return self.history

    


In [3]:
number_plate_db = ['HR01AR4949', 'PB01A4470', 'PB01D4802', 'PB11DB4699', 'PB11DF1112', 'PB11DG8713', 'PB11V0012', 'PB13AN9198', 'PB13AW0055', 'PB23U1292', 'PB31N1297', 'PB39B0002', 'HP02Z1086', 'PB11DC0012', 'PB91D2222', 'PB11DB5138', 'PB91N0593', 'PB11DD2667', 'T1024PB0311G',"CH02AA8347","CH01AM1485","UP15CX4041","PB23U1292","PB11DC0012","PB11BB9800"]

In [27]:
# from ANPRPipeline import ANPRPipelineWithTracking
import cv2

# Define the paths to the YOLO model weights
plate_model_path = "best_l.pt"
char_model_path = "best_char_200.pt"

video_path = r"C:\Users\samar\Desktop\capstone\anprtest4\VIDEOS_NUMBERPLATE\PB91N0593_PB11DD2667_PB13AW0055.mp4"

# Initialize the ANPR pipeline with tracking
anpr_pipeline = ANPRPipelineWithTracking(plate_model_path, char_model_path)
number_plate_predictor = NumberPlatePredictor(number_plate_db)
# Process a video
cap = cv2.VideoCapture(video_path)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    number_plate_text = ""
    # Detect number plates and draw bounding boxes
    plates = anpr_pipeline.detect_number_plate(frame)
    if plates:
        tracked_plates = anpr_pipeline.track_number_plates(frame, plates)

        number_plates = anpr_pipeline.get_plate_text(frame, tracked_plates)
        result_image = anpr_pipeline.draw_bounding_boxes(frame, plates)
        # anpr_pipeline.show_results(frame, plates)
        for number_plate in number_plates:
            print(number_plate)
            number_plate_text = number_plate_predictor.update_history(
                number_plate[0], number_plate[1]
            )

    else:
        # No number plates detected
        result_image = frame
    result_image = cv2.resize(result_image, (1280, 720))
    if number_plate_text:
        cv2.putText(
            result_image,
            number_plate_text,
            (200, 200),
            cv2.FONT_HERSHEY_SIMPLEX,
            4,
            (0, 255, 0),
            4,
        )
    cv2.imshow("Detected Number Plates", result_image)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()

('1', 'PB91N0593')
('1', 'PB91N0593')
('1', 'PB91N0593')
('1', 'PB91N0593')
('1', 'PB91N0593')
('1', 'PB91N0593')
('1', 'PB91N0593')
('1', 'PB91N0593')
('1', 'PB91N0593')
('1', 'PB91N0593')
('2', '10067')
('1', 'PB91N0593')
('2', '100G67')
('1', 'PB91N0593')
('1', 'PB91N0593')
('1', 'PB91N0593')
('1', 'PB91N0593')
('1', 'PB91N0593')
('1', 'PB91WN0593')
('3', '1100')
('1', 'PB91WN0593')
('3', '11067')
('1', 'PB91N0593')
('3', '1100G667')
('1', 'PB91N0593')
('3', '100667')
('1', 'PB91N0593')
('3', '1100667')
('1', 'PB91N0593')
('3', '1100667')
('1', 'PB91N0593')
('3', '31100667')
('1', 'PB91N0593')
('3', '11500667')
('1', 'PB91N0593')
('3', '31100667')
('1', 'PB91N0593')
('3', '31100667')
('1', 'PB91N0593')
('3', '1100667')
('1', 'PB91N0593')
('3', '1100667')
('1', 'PB91N0593')
('3', 'P110066')
('1', 'PB91N0593')
('3', 'P1100')
('1', 'PB91N0593')
('3', '1100')
('1', 'PB91N0593')
('3', '31100667')
('1', 'PB91N0593')
('3', '1100667')
('1', 'PB91N0593')
('3', '1100667')
('1', 'PB91N0593')
(

In [31]:
cap.release()
cv2.destroyAllWindows()

In [4]:
numberplate_vids = r"C:\Users\samar\Desktop\capstone\anprtest4\VIDEOS_NUMBERPLATE"

In [7]:
import cv2

# Define the paths to the YOLO model weights
plate_model_path = "best_l.pt"
char_model_path = "best_char_200.pt"

# video_path = r"C:\Users\samar\Desktop\capstone\anprtest4\videos\192.168.1.108_IP Camera_main_20241115162510.mp4"

# Initialize the ANPR pipeline with tracking
anpr_pipeline = ANPRPipelineWithTracking(plate_model_path, char_model_path)
number_plate_predictor = NumberPlatePredictor(number_plate_db)

In [None]:
import os
import numpy as np
# np = set()
# List the files in the directory

files = os.listdir(numberplate_vids)
for file in files:
    numberplates = file.split(".")[0].split("_")
    video_path = os.path.join(numberplate_vids, file)
    print(numberplates)
    cap = cv2.VideoCapture(video_path)

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Detect number plates and draw bounding boxes
        plates = anpr_pipeline.detect_number_plate(frame)
        if plates:
            tracked_plates = anpr_pipeline.track_number_plates(frame, plates)

            number_plates = anpr_pipeline.get_plate_text(frame, tracked_plates)
            result_image = anpr_pipeline.draw_bounding_boxes(frame, plates)
            # anpr_pipeline.show_results(frame, plates)
            for number_plate in number_plates:
                # print(number_plate)
                # print(number_plate)
                number_plate_predictor.update_history(number_plate[0], number_plate[1])

        else:
            # No number plates detected
            result_image = frame
        result_image = cv2.resize(result_image, (1280, 720))
        if number_plate_text:
            cv2.putText(result_image, number_plate_text, (200,200), cv2.FONT_HERSHEY_SIMPLEX, 4, (0, 255, 0), 4)
        cv2.imshow("Detected Number Plates", result_image)
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    print("--------------------------------------------------------------------------------")

cap.release()
cv2.destroyAllWindows()









['CH02AA8347', 'CH01AM1485']
('1', '')
('2', 'LT')
('3', '1934T7')
('4', 'JLZA49347')
('5', 'CH0AA4B347')
('6', 'CH0AAB347')
('7', 'CHDAA8B347')
('8', 'CH0AAB8347')
('9', 'CH0AA8347')
('10', 'CH0AA8347')
('11', 'CH0AA8347')
('12', 'CH0AA8347')
('13', 'H0AA8347')
('13', 'CH0AAB34')
('13', 'CH0AAB347')
('13', 'CH0AA8347')
('13', 'CWH0AA834')
('13', 'CRH0AA')
('13', 'CH0')
('14', 'H0AA834')
('13', '')
('15', '')
('13', '')
('16', '')
('13', '')
('17', '5')
('13', '')
('18', 'CHQ1A1485')
('13', '')
('19', 'CH01AMH1485')
('13', '')
('20', 'CH01AM1485')
('13', '')
('21', 'CH01AM1485')
('13', '')
('22', 'CH01AM1485')
('13', '')
('23', 'CH01AM1485')
('13', '')
('24', 'CH01AM1485')
('13', '')
('24', 'CH01AM1485')
('13', '')
('24', 'CH01AM1485')
('13', '')
('24', 'CW01AM1485')
('13', '1')
('24', 'CH01AM1485')
('13', '')
('24', 'CHW01AM1485')
('13', '')
('24', 'CW01AM1485')
--------------------------------------------------------------------------------
['HR01AR4949']
('13', '5')
('24', '')
('25'

In [10]:
cap.release()
cv2.destroyAllWindows()

In [8]:
import os
import numpy as np
import cv2

# Assuming 'numberplate_vids' and 'anpr_pipeline' are already defined

# Set the output video file name and codec
output_video_path = "combined_output.mp4"
frame_width, frame_height = 1280, 720  # Set resolution for output video

# Initialize the VideoWriter to save the combined video
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for mp4
out = cv2.VideoWriter(output_video_path, fourcc, 30.0, (frame_width, frame_height))

# List the files in the directory
files = os.listdir(numberplate_vids)
for file in files:
    numberplates = file.split(".")[0].split("_")
    video_path = os.path.join(numberplate_vids, file)
    print(f"Processing video: {file}")
    
    cap = cv2.VideoCapture(video_path)
    number_plate_text = ""
    if not cap.isOpened():
        print(f"Error opening video file: {file}")
        continue

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Detect number plates and draw bounding boxes
        plates = anpr_pipeline.detect_number_plate(frame)
        if plates:
            tracked_plates = anpr_pipeline.track_number_plates(frame, plates)
            number_plates = anpr_pipeline.get_plate_text(frame, tracked_plates)
            result_image = anpr_pipeline.draw_bounding_boxes(frame, plates)

            # Update history with detected number plates
            for number_plate in number_plates:
                number_plate_text = number_plate_predictor.update_history(number_plate[0], number_plate[1])
        else:
            # No number plates detected
            result_image = frame

        result_image = cv2.resize(result_image, (frame_width, frame_height))
        if number_plate_text:
            cv2.putText(result_image, number_plate_text, (200,200), cv2.FONT_HERSHEY_SIMPLEX, 4, (0, 255, 0), 4)
        out.write(result_image)  # Write the processed frame to the output video
        # cv2.imshow("Detected Number Plates", result_image)
        # if cv2.waitKey(1) & 0xFF == ord("q"):
        #     break
        

    # After each video, add a black frame to separate videos
    black_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
    out.write(black_frame)  # Add a black frame as separator
    black_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
    out.write(black_frame)  # Add a black frame as separator
    black_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
    out.write(black_frame)  # Add a black frame as separator

    print(f"Finished processing video: {file}")
    print("--------------------------------------------------------------------------------")

cap.release()

# Release the VideoWriter
out.release()

# Close all OpenCV windows
cv2.destroyAllWindows()

print(f"Output video saved as {output_video_path}")


Processing video: CH02AA8347_CH01AM1485.mp4
Finished processing video: CH02AA8347_CH01AM1485.mp4
--------------------------------------------------------------------------------
Processing video: HR01AR4949.avi
Invalid plate image dimensions: (0, 0, 3)
Invalid plate image dimensions: (0, 0, 3)
Invalid plate image dimensions: (0, 0, 3)
Invalid plate image dimensions: (0, 0, 3)
Invalid plate image dimensions: (0, 0, 3)
Invalid plate image dimensions: (0, 0, 3)
Invalid plate image dimensions: (0, 0, 3)
Invalid plate image dimensions: (0, 0, 3)
Invalid plate image dimensions: (0, 0, 3)
Invalid plate image dimensions: (0, 0, 3)
Invalid plate image dimensions: (0, 0, 3)
Invalid plate image dimensions: (0, 0, 3)
Finished processing video: HR01AR4949.avi
--------------------------------------------------------------------------------
Processing video: PB01A4470.mp4
Finished processing video: PB01A4470.mp4
--------------------------------------------------------------------------------
Processi