In [None]:
import cv2
import os
from screeninfo import get_monitors
import pickle
import numpy as np
from skimage.io import imread
from skimage.transform import resize
from skimage.color import rgba2rgb, gray2rgb
from sklearn.metrics import accuracy_score, classification_report
import threading
import logging
from ultralytics import YOLO
from flask import Flask, jsonify
import time

In [None]:
def classify_image_with_unknown(img, model, threshold=0.1):
   
    #img = imread(image_path)
    ##RGB conversion
    if len(img.shape) == 2:
        img = gray2rgb(img)
    elif img.shape[-1] == 4:
        img = rgba2rgb(img)

    #Prediction
    img = resize(img, img_size, anti_aliasing=True).flatten()
    probabilities = model.predict_proba(img.reshape(1, -1))
    max_confidence = np.max(probabilities)
    
    return model.predict(img.reshape(1, -1))[0] if max_confidence >= threshold else "unknown"

In [None]:
def test(image, test_results, best_estimator):
    if image is not None and image.size  > 0:
        # Classify the image
        result = classify_image_with_unknown(image, best_estimator, threshold=0.1)
        test_results[f"frame_{frame_count}"] = result
        print(f"Frame: {frame_count}, Classification Result: {result}")
    else:
        print(f"Skipping frame_{frame_count}: Not a useful frame.")

In [None]:
app = Flask(__name__)

In [None]:
def run_flask():
    app.run(host='0.0.0.0', port=5000, debug=False)

In [None]:
@app.route('/data', methods=['GET'])
def get_data():
    return jsonify(report), 200

In [None]:
threading.Thread(target=run_flask, daemon=True).start()

In [None]:
# Loading the trained model
model_file_name = r"classifier"
with open(model_file_name, 'rb') as file:
    best_estimator = pickle.load(file)

In [None]:
# Loading calssification_report
with open('Scripts/Resources/classification_report', 'rb') as f:
    report = pickle.load(f)
report["Execution_Time_of_Prediction"] = None
report["Test_Results"] = []

In [None]:
img_size = (15, 15) # Resizing for consistency

In [None]:
# Get screen resolution
screen = get_monitors()[0]  # Get the primary monitor
screen_width, screen_height = screen.width, screen.height

In [None]:
# Load YOLO model for traffic light detection
model = YOLO('yolov8n.pt') 

In [None]:
# Load the video
video_path = r"Scripts\Resources\Videos\Real_life_test_daylight_4.mp4"

In [None]:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    print("Error: Cannot open video file!")
    exit()

In [None]:
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = 0

cv2.namedWindow("Video", cv2.WINDOW_NORMAL)  # Create a resizable window
start_time = time.time()

while True:
    ret, frame = cap.read()
    if not ret:
        print("End of video or cannot access the video.")
        break

    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    frame_count += 1
    # Process every frame or skip to reduce processing
    if frame_count % int(fps) != 0:  # Process one frame per second
        continue

    # YOLO detection
    results = model(frame)  # Run the YOLO model on the current frame
    
    # Access the first result in the list
    result = results[0]
    
    test_results = {}
    
    # Iterate through detected boxes
    for box in result.boxes:
        class_id = int(box.cls)  # Object class ID
        if class_id == 9:  # class 9 corresponds to traffic lights in YOLO
            # Extract bounding box coordinates
            x1, y1, x2, y2 = map(int, box.xyxy[0])  # Bounding box coordinates
            traffic_light_roi = frame[y1:y2, x1:x2]  # Crop the traffic light region

            test(traffic_light_roi, test_results, best_estimator)
            #report["Test_Results"].append(test_results)
            report["Test_Results"].append(test_results)

            break  # Avoid multiple saves for the same frame
            
    # Resize frame to fit screen size
    frame_height, frame_width = frame.shape[:2]
    aspect_ratio = frame_width / frame_height

    # Calculate new dimensions while maintaining aspect ratio
    if frame_width > screen_width or frame_height > screen_height:
        if frame_width / screen_width > frame_height / screen_height:
            new_width = screen_width
            new_height = int(screen_width / aspect_ratio)
        else:
            new_height = screen_height
            new_width = int(screen_height * aspect_ratio)
    else:
        new_width, new_height = frame_width, frame_height

    resized_frame = cv2.resize(frame, (new_width, new_height))
    cv2.imshow("Video", resized_frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):  # Press 'q' to quit
        break

end_time = time.time()
execution_time = end_time - start_time
report["Execution_Time_of_Prediction"] = execution_time

print(f"Total frames processed: {frame_count}")
cap.release()
cv2.destroyAllWindows()