In [1]:
import importlib
import smoke_detection
importlib.reload(smoke_detection)
import vehicle_detection
importlib.reload(vehicle_detection)
import license_plate_detection
importlib.reload(license_plate_detection)


Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.
Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


<module 'license_plate_detection' from 'c:\\Users\\Abad\\Desktop\\FYP\\yolo_test\\combine_2\\final_realtime\\license_plate_detection.py'>

In [2]:
import os
import cv2
import time
import csv
from collections import Counter
from ultralytics import YOLO
import easyocr

# Import existing functions and counters
from license_plate_detection import process_license_plate_frame, license_plate_counts
from smoke_detection import detect_smoke  # assume this function works on any input image (cropped or full)
from vehicle_detection import detect_vehicle

# --- Paths for Output ---
output_dir = r"./output"
os.makedirs(output_dir, exist_ok=True)
csv_file_path = os.path.join(output_dir, "detected_plates.csv")

# --- Track plates saved this session ---
saved_plates_session = set()

def save_plate_csv(plate_text, extra_info="SmokingVehicle"):
    if plate_text in saved_plates_session:
        return

    saved_plates_session.add(plate_text)
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
    row = [timestamp, plate_text, extra_info]

    file_exists = os.path.isfile(csv_file_path)
    with open(csv_file_path, mode="a", newline="", encoding="utf-8") as csv_file:
        writer = csv.writer(csv_file)
        if not file_exists:
            writer.writerow(["DateTime", "LicensePlate", "ExtraInfo"])
        writer.writerow(row)
    print(f"Saved new license plate to CSV: {plate_text}")

def process_frame_strategy(frame):
    """
    Process a frame using a step-by-step strategy:
      1. Detect vehicles.
      2. For each vehicle, crop the region and run smoke detection.
      3. If smoke is confirmed, run license plate detection on that vehicle region.
      4. Annotate the full frame with the "smoking vehicle" info and save the license plate to CSV.
    
    Returns:
        Annotated frame.
    """
    output_frame = frame.copy()

    # Detect vehicles in the full frame
    vehicle_detections = detect_vehicle(frame)
    
    # Iterate over each detected vehicle
    for det in vehicle_detections:
        box = det["box"]  # [x1, y1, x2, y2]
        # Crop the region corresponding to the vehicle
        x1, y1, x2, y2 = box
        vehicle_roi = frame[y1:y2, x1:x2]
        
        # Safety check: if the ROI is empty, skip
        if vehicle_roi.size == 0:
            continue
        
        # Run smoke detection on the cropped vehicle ROI.
        # (Assuming detect_smoke can be run on any image; if it returns (smoke_detected, processed_image),
        # adapt accordingly. Here we assume it returns a boolean and a processed image.)
        smoke_detected, _ = detect_smoke(vehicle_roi)
        
        # If the vehicle region shows smoke, run license plate detection on the same region.
        if smoke_detected:
            # Option 1: Run license plate detection on the vehicle ROI.
            lp_roi = process_license_plate_frame(vehicle_roi.copy())
            # Optionally, if process_license_plate_frame returns an annotated image, you could extract
            # the license plate string from the global license_plate_counts (if that's how your module works)
            if license_plate_counts:
                most_common_plate = license_plate_counts.most_common(1)[0][0]
                # Save the license plate to CSV (only once per session)
                save_plate_csv(most_common_plate)
                # Annotate the full frame: Draw the bounding box in red and label it.
                cv2.rectangle(output_frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
                cv2.putText(output_frame, f"vehicle (smoke) - Plate: {most_common_plate}", 
                            (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
            else:
                # If no plate is detected, at least label the vehicle as smoking.
                cv2.rectangle(output_frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
                cv2.putText(output_frame, "vehicle (smoke)", (x1, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
        else:
            # For vehicles without smoke, you might want to annotate them differently (optional)
            cv2.rectangle(output_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(output_frame, "vehicle", (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

    return output_frame

def run_detection(source=0):
    """
    Runs the combined strategy on either:
      - Webcam (source=0)
      - Video file (source="video.mp4")
    Saves the resulting video to ./output/predicted_output.avi
    """
    # Clear global counters and session data
    license_plate_counts.clear()
    saved_plates_session.clear()

    cap = cv2.VideoCapture(source)
    if not cap.isOpened():
        print(f"Error: Could not open source: {source}")
        return

    # --- Setup VideoWriter ---
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps == 0 or fps is None:
        fps = 25

# Find next available filename
    i = 1
    while True:
        output_path = os.path.join(output_dir, f"predicted_output{i}.avi")
        if not os.path.exists(output_path):
            break
        i += 1

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    print(f"Saving output video to: {output_path}")


    while True:
        ret, frame = cap.read()
        if not ret:
            print("End of video or error reading frame.")
            break

        processed_frame = process_frame_strategy(frame)
        out.write(processed_frame)
        cv2.imshow("Vehicle with Confirmed Smoke Detection", processed_frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    out.release()
    cv2.destroyAllWindows()


def main():
    # Use 0 for real-time camera, or provide a path like 'input_video.mp4'
    # Example usage:
    # run_detection(0)  # real-time
    run_detection("input/smoke.mp4")  # video file

if __name__ == "__main__":
    main()


Saving output video to: ./output\predicted_output22.avi

0: 640x480 1 car, 272.2ms
Speed: 0.0ms preprocess, 272.2ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 car, 186.1ms
Speed: 0.0ms preprocess, 186.1ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 car, 163.3ms
Speed: 2.0ms preprocess, 163.3ms inference, 1.9ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 car, 186.8ms
Speed: 0.0ms preprocess, 186.8ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 car, 179.4ms
Speed: 0.0ms preprocess, 179.4ms inference, 1.5ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 car, 233.6ms
Speed: 0.0ms preprocess, 233.6ms inference, 9.2ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 car, 179.3ms
Speed: 4.3ms preprocess, 179.3ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 car, 218.7ms
Speed: 11.8ms preprocess, 218.7ms inferenc

method #2 (license palte on full frame)

In [3]:
import os
import cv2
import time
import csv
from collections import Counter
from ultralytics import YOLO
import easyocr

# Import license plate and smoke detection functions and global counters.
from license_plate_detection import process_license_plate_frame, license_plate_counts
from smoke_detection import detect_smoke  # Assume detect_smoke(frame) returns a tuple: (smoke_detected, processed_smoke_frame)
from vehicle_detection import detect_vehicle

# --- Paths for Output ---
output_dir = r"./output"
os.makedirs(output_dir, exist_ok=True)
csv_file_path = os.path.join(output_dir, "detected_plates.csv")

# --- Track plates saved this session ---
saved_plates_session = set()

def save_plate_csv(plate_text, extra_info="SmokingVehicle"):
    if plate_text in saved_plates_session:
        return

    saved_plates_session.add(plate_text)
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
    row = [timestamp, plate_text, extra_info]

    file_exists = os.path.isfile(csv_file_path)
    with open(csv_file_path, mode="a", newline="", encoding="utf-8") as csv_file:
        writer = csv.writer(csv_file)
        if not file_exists:
            writer.writerow(["DateTime", "LicensePlate", "ExtraInfo"])
        writer.writerow(row)
    print(f"Saved new license plate to CSV: {plate_text}")

def compute_iou(box1, box2):
    """
    Compute the Intersection over Union (IoU) of two bounding boxes.
    Each box is defined as [x1, y1, x2, y2].
    """
    x1_inter = max(box1[0], box2[0])
    y1_inter = max(box1[1], box2[1])
    x2_inter = min(box1[2], box2[2])
    y2_inter = min(box1[3], box2[3])
    
    inter_width = max(0, x2_inter - x1_inter)
    inter_height = max(0, y2_inter - y1_inter)
    inter_area = inter_width * inter_height

    box1_area = (box1[2]-box1[0]) * (box1[3]-box1[1])
    box2_area = (box2[2]-box2[0]) * (box2[3]-box2[1])
    
    union_area = box1_area + box2_area - inter_area
    if union_area == 0:
        return 0
    return inter_area / union_area

def detect_license_plate(frame):
    """
    Run license plate detection on the full frame.
    This function is assumed to return a list of detections.
    Each detection is a dictionary with keys:
         "box": [x1, y1, x2, y2] bounding box,
         "text": recognized license plate text.
         
    Here we call process_license_plate_frame to draw and update the global counter,
    but then we assume that the license_plate_counts (or another mechanism) provides the detections.
    For demonstration purposes, we assume this function returns an empty list
    if you haven't implemented a proper LP detector.
    
    Replace this placeholder with your actual license plate detection code.
    """
    # You might already have code that shows LP annotations on the frame.
    # For demonstration, call the existing function:
    _ = process_license_plate_frame(frame.copy())
    # If your module stores detections in a global variable (e.g., license_plate_counts),
    # you could convert that into a list of detection dictionaries.
    # As a placeholder, we return an empty list.
    return []  # Replace with actual return value if available

def process_frame_strategy(frame):
    """
    This function:
      1. Runs full-frame license plate detection (draws green boxes + text).
      2. Runs vehicle detection to get vehicle boxes.
      3. For each vehicle, run smoke detection on a cropped ROI.
      4. If smoke is confirmed, highlight the vehicle in red, otherwise green.
      5. Optionally match the vehicle with the plate if needed via IoU, etc.
    """
    # 1) License Plate Detection (full frame)
    #    The function returns an annotated frame with bounding boxes in green + text.
    lp_annotated_frame = process_license_plate_frame(frame.copy())
    
    # 2) Vehicle Detection
    vehicle_detections = detect_vehicle(frame)

    # Prepare final annotation frame by copying the license plate annotated version
    # so that we keep the green LP bounding boxes always visible.
    output_frame = lp_annotated_frame.copy()

    # 3) For each vehicle, do smoke detection
    for veh in vehicle_detections:
        vbox = veh["box"]  # [x1, y1, x2, y2]

        # Crop vehicle ROI
        vehicle_roi = frame[vbox[1]:vbox[3], vbox[0]:vbox[2]]
        if vehicle_roi.size == 0:
            continue

        # Suppose detect_smoke returns a boolean
        smoke_found, _ = detect_smoke(vehicle_roi)

        # 4) Annotate the vehicle box
        if smoke_found:
            color = (0, 0, 255)  # red
            label = "vehicle (smoke)"
        else:
            color = (0, 255, 0)  # green
            label = "vehicle"

        cv2.rectangle(output_frame, (vbox[0], vbox[1]), (vbox[2], vbox[3]), color, 2)
        cv2.putText(output_frame, label, (vbox[0], vbox[1] - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)

    return output_frame


def run_detection(source=0):
    """
    Runs the detection strategy on either:
      - A Webcam (source=0)
      - A Video file (source="path/to/video.mp4")
    Saves the resulting output video to a uniquely named file.
    """
    # Clear global counters and session data.
    license_plate_counts.clear()
    saved_plates_session.clear()
    
    cap = cv2.VideoCapture(source)
    if not cap.isOpened():
        print(f"Error: Could not open source: {source}")
        return
    
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps == 0 or fps is None:
        fps = 25

    # Generate a unique output filename.
    i = 1
    while True:
        output_path = os.path.join(output_dir, f"predicted_output{i}.avi")
        if not os.path.exists(output_path):
            break
        i += 1
    
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    print(f"Saving output video to: {output_path}")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            print("End of video or error reading frame.")
            break
        
        processed_frame = process_frame_strategy(frame)
        out.write(processed_frame)
        cv2.imshow("Advanced Vehicle-Smoke-Plate Detection", processed_frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    out.release()
    cv2.destroyAllWindows()

def main():
    # Use 0 for webcam feed or provide the path to a video file.
    run_detection("input/smoke.mp4")  # Replace with your input source

if __name__ == "__main__":
    main()


Saving output video to: ./output\predicted_output23.avi

0: 640x480 1 ANPR, 134.7ms
Speed: 0.0ms preprocess, 134.7ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)
Detected License Plate: ELATLIN

0: 640x480 1 car, 204.8ms
Speed: 0.0ms preprocess, 204.8ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 ANPR, 94.9ms
Speed: 11.4ms preprocess, 94.9ms inference, 1.6ms postprocess per image at shape (1, 3, 640, 480)
Detected License Plate: ZLATLIN

0: 640x480 1 car, 167.5ms
Speed: 10.8ms preprocess, 167.5ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 ANPR, 87.6ms
Speed: 12.1ms preprocess, 87.6ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)
Detected License Plate: FLATLIN

0: 640x480 1 car, 148.9ms
Speed: 0.0ms preprocess, 148.9ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 ANPR, 78.7ms
Speed: 0.0ms preprocess, 78.7ms inference, 3.7ms postprocess per imag

In [4]:
import os
import cv2
import time
import csv
from collections import Counter
from ultralytics import YOLO

# Import license plate and smoke detection functions and global counters.
from license_plate_detection import process_license_plate_frame, license_plate_counts
from smoke_detection import detect_smoke  # Expected to return (smoke_detected, processed_smoke_frame)
from vehicle_detection import detect_vehicle  # Returns list of dicts with keys "box", "confidence", "class"

# --- Paths for Output ---
output_dir = r"./output"
os.makedirs(output_dir, exist_ok=True)
csv_file_path = os.path.join(output_dir, "detected_plates.csv")

# --- Paths for Output ---
output_dir = r"./output"
os.makedirs(output_dir, exist_ok=True)
csv_file_path = os.path.join(output_dir, "detected_plates.csv")

# --- Track plates saved this session ---
saved_plates_session = set()

def save_plate_csv(plate_text, extra_info="SmokingVehicle"):
    if plate_text in saved_plates_session:
        return

    saved_plates_session.add(plate_text)
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
    row = [timestamp, plate_text, extra_info]

    file_exists = os.path.isfile(csv_file_path)
    with open(csv_file_path, mode="a", newline="", encoding="utf-8") as csv_file:
        writer = csv.writer(csv_file)
        if not file_exists:
            writer.writerow(["DateTime", "LicensePlate", "ExtraInfo"])
        writer.writerow(row)
    print(f"Saved new license plate to CSV: {plate_text}")


def compute_iou(box1, box2):
    """
    Compute the Intersection over Union (IoU) of two bounding boxes.
    Each box is defined as [x1, y1, x2, y2].
    """
    x1_inter = max(box1[0], box2[0])
    y1_inter = max(box1[1], box2[1])
    x2_inter = min(box1[2], box2[2])
    y2_inter = min(box1[3], box2[3])
    
    inter_width = max(0, x2_inter - x1_inter)
    inter_height = max(0, y2_inter - y1_inter)
    inter_area = inter_width * inter_height

    box1_area = (box1[2]-box1[0]) * (box1[3]-box1[1])
    box2_area = (box2[2]-box2[0]) * (box2[3]-box2[1])
    
    union_area = box1_area + box2_area - inter_area
    if union_area == 0:
        return 0
    return inter_area / union_area

def detect_license_plate(frame):
    """
    Run license plate detection on the full frame.
    This function calls process_license_plate_frame (which draws annotations and updates a global counter)
    and is intended to return a list of detections in the format:
    [{"box": [x1, y1, x2, y2], "text": "ABC123"}, ...].
    Currently, it returns an empty list as a placeholder.
    """
    _ = process_license_plate_frame(frame.copy())
    # TODO: Update this function to extract and return structured detection results.
    return []  # Placeholder return

def process_frame_combined(frame):
    """
    Processes a frame by running:
      1. Full-frame license plate detection.
      2. Vehicle detection.
      3. For each detected vehicle, running smoke detection on its cropped ROI.
      4. Matching the vehicle bounding box with global license plate detections (using IoU).
      5. Annotating the frame with:
           - License plate boxes (blue) from global detection.
           - Vehicle bounding boxes (red if smoke detected, green otherwise).
           - If a smoking vehicle has a matched license plate, overlay the plate text and save it to CSV.
    """
    # Step 1: Global license plate detection
    global_lp_detections = detect_license_plate(frame)  # Expected format: [{"box": [...], "text": ...}, ...]
    
    # Step 2: Vehicle detection
    vehicle_detections = detect_vehicle(frame)
    
    # Get a version of the frame with LP annotations (so LP boxes are always visible)
    lp_annotated_frame = process_license_plate_frame(frame.copy())
    output_frame = lp_annotated_frame.copy()
    
    iou_threshold = 0.3
    
    for veh in vehicle_detections:
        vbox = veh["box"]  # [x1, y1, x2, y2]
        vehicle_roi = frame[vbox[1]:vbox[3], vbox[0]:vbox[2]]
        if vehicle_roi.size == 0:
            continue
        
        smoke_found, _ = detect_smoke(vehicle_roi)
        
        if smoke_found:
            color = (0, 0, 255)  # red for smoking vehicle
            label = "vehicle (smoke)"
            lp_matched = False
            # Match vehicle box to global license plate detections using IoU.
            for lp in global_lp_detections:
                if compute_iou(vbox, lp["box"]) >= iou_threshold:
                    lp_text = lp["text"]
                    cv2.putText(output_frame, f"vehicle (smoke) - Plate: {lp_text}",
                                (vbox[0], vbox[1]-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
                    save_plate_csv(lp_text)
                    lp_matched = True
                    break
            if not lp_matched:
                cv2.putText(output_frame, label, (vbox[0], vbox[1]-10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
        else:
            color = (0, 255, 0)  # green for non-smoking vehicles
            label = "vehicle"
            cv2.putText(output_frame, label, (vbox[0], vbox[1]-10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
        
        cv2.rectangle(output_frame, (vbox[0], vbox[1]), (vbox[2], vbox[3]), color, 2)
    
    # Step 5: Draw the global license plate detections (blue boxes) so they are always visible.
    for lp in global_lp_detections:
        lp_box = lp["box"]
        lp_text = lp["text"]
        cv2.rectangle(output_frame, (lp_box[0], lp_box[1]), (lp_box[2], lp_box[3]), (255, 0, 0), 2)
        cv2.putText(output_frame, lp_text, (lp_box[0], lp_box[1]-10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
    
    return output_frame

def run_detection(source=0):
    """
    Runs detection strategy on a webcam (source=0) or video file.
    Saves the annotated video to a uniquely named file.
    """
    # Clear counters/session data.
    license_plate_counts.clear()
    saved_plates_session.clear()
    
    cap = cv2.VideoCapture(source)
    if not cap.isOpened():
        print(f"Error: Could not open source: {source}")
        return
    
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps == 0 or fps is None:
        fps = 25
    
    i = 1
    while True:
        output_path = os.path.join(output_dir, f"predicted_output{i}.avi")
        if not os.path.exists(output_path):
            break
        i += 1
    
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    print(f"Saving output video to: {output_path}")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            print("End of video or error reading frame.")
            break
        
        processed_frame = process_frame_combined(frame)
        out.write(processed_frame)
        cv2.imshow("Advanced Vehicle-Smoke-LP Detection", processed_frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    out.release()
    cv2.destroyAllWindows()

def main():
    # Use 0 for webcam feed or a path for a video file.
    run_detection("input/smoke.mp4")  # Update with your input source

if __name__ == "__main__":
    main()


Saving output video to: ./output\predicted_output17.avi

0: 640x480 1 ANPR, 98.7ms
Speed: 2.0ms preprocess, 98.7ms inference, 12.6ms postprocess per image at shape (1, 3, 640, 480)
Detected License Plate: ELATLIN

0: 640x480 1 car, 160.6ms
Speed: 0.0ms preprocess, 160.6ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 ANPR, 66.9ms
Speed: 0.0ms preprocess, 66.9ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)
Detected License Plate: ELATLIN

0: 640x480 1 ANPR, 75.4ms
Speed: 0.0ms preprocess, 75.4ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)
Detected License Plate: ZLATLIN

0: 640x480 1 car, 180.3ms
Speed: 0.0ms preprocess, 180.3ms inference, 0.8ms postprocess per image at shape (1, 3, 640, 480)

0: 640x480 1 ANPR, 83.0ms
Speed: 0.0ms preprocess, 83.0ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 480)
Detected License Plate: ZLATLIN

0: 640x480 1 ANPR, 66.6ms
Speed: 5.3ms preprocess, 66.6ms inference,