In [1]:
import cv2
import numpy as np
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.image import img_to_array
import threading
import queue

# Parameters
cam = 0
fps = 240
model_version = "my_model_20.0.keras"
max_frames = 3
frame_buffer = []
raw_frame_queue = queue.Queue(maxsize=20)  # Queue for raw frames
processed_frame_queue = queue.Queue(maxsize=20)  # Queue for processed frames
stop_event = threading.Event()

# Image settings
Im_Width = 128
Im_Height = 128
x, y, w, h = 120, 0, 400, 200
max_brightness = 175
image_path = "no_bottle.jpg"

# Utility functions
def crop_image(frame, x, y, w, h):
    return frame[y:y+h, x:x+w]

def measure_brightness(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    return np.mean(gray)

def measure_edge_richness(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    edges = cv2.Canny(gray, 200, 400)
    return np.sum(edges)

# Thread Functions
def capture(cam, fps, frame_buffer, raw_frame_queue, max_frames, stop_event):
    cap = cv2.VideoCapture(cam)
    cap.set(cv2.CAP_PROP_FPS, fps)

    if not cap.isOpened():
        print("Error: Cannot open camera.")
        stop_event.set()
        return

    while not stop_event.is_set():
        ret, frame = cap.read()
        if not ret:
            print("Failed to capture frame. Exiting...")
            break

        frame_buffer.append(frame)

        if len(frame_buffer) == max_frames:
            try:
                raw_frame_queue.put(frame_buffer.copy(), timeout=1)
                frame_buffer.clear()
            except queue.Full:
                print("Raw frame queue is full. Dropping buffer.")

    cap.release()
    print("Capture thread stopped.")

def process_frames(raw_frame_queue, processed_frame_queue, x, y, w, h, max_brightness, stop_event):
    while not stop_event.is_set() or not raw_frame_queue.empty():
        try:
            frame_buffer = raw_frame_queue.get(timeout=1)
        except queue.Empty:
            continue

        processed_buffer = []
        for frame in frame_buffer:
            cropped_frame = crop_image(frame, x, y, w, h)
            brightness = measure_brightness(cropped_frame)

            if brightness > max_brightness:
                processed_buffer.append(frame)  # Pass frames meeting the condition

        if processed_buffer:
            try:
                processed_frame_queue.put(processed_buffer, timeout=1)
            except queue.Full:
                print("Processed frame queue is full. Dropping buffer.")

    print("Processing thread stopped.")

def runModel(model_version, processed_frame_queue, stop_event, target_size=(Im_Width, Im_Height)):
    print("Loading model...")
    model = load_model(model_version)
    print("Model loaded successfully.")

    while not stop_event.is_set() or not processed_frame_queue.empty():
        try:
            frame_buffer = processed_frame_queue.get(timeout=1)
        except queue.Empty:
            continue

        for frame in frame_buffer:
            feedback, confidence = predict_frame(frame, model, target_size)
            print(f"Prediction: {feedback} (Confidence: {confidence:.2f})")

    print("Model thread stopped.")

# Helper Functions
def process_frame(frame, target_size):
    resized_frame = cv2.resize(frame, target_size)
    normalized_frame = resized_frame / 255.0
    frame_array = img_to_array(normalized_frame)
    return np.expand_dims(frame_array, axis=0)

def predict_frame(frame, model, target_size):
    frame_array = process_frame(frame, target_size)
    prediction = model.predict(frame_array)[0][0]
    return ("Non-Defective" if prediction > 0.5 else "Defective", prediction)

# Threads
capture_thread = threading.Thread(target=capture, args=(cam, fps, frame_buffer, raw_frame_queue, max_frames, stop_event))
processing_thread = threading.Thread(target=process_frames, args=(raw_frame_queue, processed_frame_queue, x, y, w, h, max_brightness, stop_event))
runModel_thread = threading.Thread(target=runModel, args=(model_version, processed_frame_queue, stop_event))

if __name__ == "__main__":
    # Start threads
    capture_thread.start()
    processing_thread.start()
    runModel_thread.start()

    try:
        while True:
            pass
    except KeyboardInterrupt:
        print("Stopping threads...")
        stop_event.set()

    # Wait for threads to complete
    capture_thread.join()
    processing_thread.join()
    runModel_thread.join()
    print("All tasks completed.")


Loading model...
Processed frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Raw frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Processed frame queue is full. Dropping buffer.
Model loaded successfully.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m25s[0m 25s/step
Prediction: Defective (Confidence: 0.00)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━