In [6]:
import cv2
import time 
import threading
import numpy as np
from mss import mss
from ultralytics import YOLO
from tensorflow import keras 
from tensorflow.lite.python.interpreter import Interpreter

In [7]:
yolo_path = 'Yolos/Insects/BIG dataset/train34/weights/best.pt'  
bee_path = 'Yolos/Insects/Quantized_bee.tflite'
wasp_path = 'Yolos/Insects/Quantized_wasp.tflite'
buzz_path = '../Classifiers/classifiers/buzzOrNot.keras'

In [8]:
class insect_classifier:
    def __init__(self,bee_path,wasp_path,buzz_path,yolo_path,yolo_threshold=0.3,classifier_threshold=0.8,wasp_threshold=4,input_size=(224,224),skip_frames=1,thread_num=16):
        self.yolo_model = YOLO(yolo_path)
        self.bee_model = Interpreter(bee_path)
        self.bee_model.allocate_tensors()
        self.wasp_model = Interpreter(wasp_path)
        self.wasp_model.allocate_tensors()
        self.buzz_model = keras.models.load_model(buzz_path)
        self.yolo_threshold = yolo_threshold
        self.classifier_threshold = classifier_threshold
        self.input_size = input_size
        self.skip_frames = skip_frames
        self.frame_counter = 0
        self.wasp_counter = 0
        self.wasp_threshold = wasp_threshold
        self.colors = {'Bee':(0,255,0),
               'Wasp':(0,0,255),
               'Other':(255,0,0)
               }
        self.bee_models=[]
        self.wasp_models = []
        self.bee_locks = []
        self.wasp_locks = []
        self.thread_num = thread_num
        self.lock = threading.Lock()

        for i in range(self.thread_num):
            bee_model = Interpreter(bee_path)
            bee_model.allocate_tensors()
            wasp_model = Interpreter(wasp_path)
            wasp_model.allocate_tensors()
            bee_lock = threading.Lock()
            wasp_lock = threading.Lock()
            self.bee_models.append(bee_model)
            self.wasp_models.append(wasp_model)
            self.bee_locks.append(bee_lock)
            self.wasp_locks.append(wasp_lock)
               
             
    
    def detect_insect(self,frame):
        detections = self.yolo_model.predict(source=frame, conf=self.yolo_threshold,verbose=False)
        return detections
    
    def quantized_classify(self,interpreter, input_data):
        input_data = np.array(input_data,dtype=np.float32)
        interpreter.set_tensor(interpreter.get_input_details()[0]['index'], input_data)
        interpreter.invoke()
        return interpreter.get_tensor((interpreter.get_output_details()[0]['index']))
        
    
    def classify_box(self,frame,box,yolo_confidence,wasp_bool,i):
        try: 
            x1, y1, x2, y2 = map(int , box)
            
            x1,y1 = max(0,x1),max(0,y1)
            x2, y2 = min(x2, frame.shape[1] - 1), min(y2, frame.shape[0] - 1)

            if x2 > x1 and y2 > y1:
                cropped_region = frame[y1:y2, x1:x2]
                
                cropped_input = cv2.resize(cropped_region, self.input_size)
                cropped_input = np.expand_dims(cropped_input, axis=0)
                  
                with self.bee_locks[i%self.thread_num]:
                    bee_score = np.max(self.quantized_classify(self.bee_models[i%self.thread_num],cropped_input))
                    
                with self.wasp_locks[i%self.thread_num]:    
                    wasp_score = np.max(self.quantized_classify(self.wasp_models[i%self.thread_num],cropped_input))
                
                
                if (bee_score < self.classifier_threshold and  wasp_score < self.classifier_threshold):
                    predicted_class = "Other"
                    confidence_score = 1.0 - (bee_score + wasp_score)/2.0                    
                    
                else:
                    if bee_score > wasp_score:
                        predicted_class = 'Bee'
                        confidence_score = bee_score 
                    else:
                        predicted_class = 'Wasp'
                        confidence_score = wasp_score
                        wasp_bool[0] = True
                       
                    label = f"Class {predicted_class} ({confidence_score:.2f}, Insect {yolo_confidence:.2f})"
                    cv2.rectangle(frame, (x1, y1), (x2, y2), self.colors[predicted_class], 2)
                    cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors[predicted_class], 2)
            return frame,wasp_bool
        except Exception as e:
            print(e)
            
    def classify_insect(self,yolo_detections,frame):
        wasp_bool = [False]
        threads = []
        for detection in yolo_detections:
            boxes = detection.boxes.xyxy.cpu().numpy()
            yolo_confidences = detection.boxes.conf.cpu().numpy()
            
            for i in range(len(boxes)):
                thread = threading.Thread(target=self.classify_box,args=(frame,boxes[i],yolo_confidences[i],wasp_bool,i))
                threads.append(thread)
                thread.start()
            for thread in threads:
                thread.join()
        return frame,wasp_bool
    
    def process_frame_noskip(self, frame):
        yolo_detections = self.detect_insect(frame)
        return self.classify_insect(yolo_detections,frame)
    def process_frame(self, frame):
        self.frame_counter += 1
        if self.frame_counter % self.skip_frames == 0:
            yolo_detections = self.detect_insect(frame)
            frame, wasp_bool = self.classify_insect(yolo_detections,frame)
            if wasp_bool[0] == True:
                self.wasp_counter += 1
                print(self.wasp_counter)
                if self.wasp_counter == self.wasp_threshold:
                    print('DANGER: WASP detected CLOSING DOOR !!')
                    self.wasp_counter = 0
            else: self.wasp_counter = 0
        return frame

In [9]:
import concurrent.futures
import threading


class insect_classifier:
    def __init__(self, bee_path, wasp_path, buzz_path, yolo_path, yolo_threshold=0.3, classifier_threshold=0.8, wasp_threshold=4, input_size=(224, 224), skip_frames=1, thread_num=16):
        self.yolo_model = YOLO(yolo_path).to('cuda')
        self.bee_model = Interpreter(bee_path)
        self.bee_model.allocate_tensors()
        self.wasp_model = Interpreter(wasp_path)
        self.wasp_model.allocate_tensors()
        self.buzz_model = keras.models.load_model(buzz_path)
        self.yolo_threshold = yolo_threshold
        self.classifier_threshold = classifier_threshold
        self.input_size = input_size
        self.skip_frames = skip_frames
        self.frame_counter = 0
        self.wasp_counter = 0
        self.wasp_threshold = wasp_threshold
        self.colors = {'Bee': (0, 255, 0),
                       'Wasp': (0, 0, 255),
                       'Other': (255, 0, 0)}
        self.bee_models = []
        self.wasp_models = []
        self.thread_num = thread_num
        self.lock = threading.Lock()

        for i in range(self.thread_num):
            bee_model = Interpreter(bee_path)
            bee_model.allocate_tensors()
            wasp_model = Interpreter(wasp_path)
            wasp_model.allocate_tensors()
            self.bee_models.append(bee_model)
            self.wasp_models.append(wasp_model)

    def detect_insect(self, frame):
        detections = self.yolo_model.predict(source=frame, conf=self.yolo_threshold, verbose=False)
        return detections

    def quantized_classify(self, interpreter, input_data):
        input_data = np.array(input_data, dtype=np.float32)
        interpreter.set_tensor(interpreter.get_input_details()[0]['index'], input_data)
        interpreter.invoke()
        return interpreter.get_tensor(interpreter.get_output_details()[0]['index'])

    def classify_box(self, frame, box, yolo_confidence, wasp_bool, i):
        try:
            x1, y1, x2, y2 = map(int, box)
            x1, y1 = max(0, x1), max(0, y1)
            x2, y2 = min(x2, frame.shape[1] - 1), min(y2, frame.shape[0] - 1)

            if x2 > x1 and y2 > y1:
                cropped_region = frame[y1:y2, x1:x2]
                cropped_input = cv2.resize(cropped_region, self.input_size)
                cropped_input = np.expand_dims(cropped_input, axis=0)

                bee_score = np.max(self.quantized_classify(self.bee_models[i % self.thread_num], cropped_input))
                wasp_score = np.max(self.quantized_classify(self.wasp_models[i % self.thread_num], cropped_input))

                if (bee_score < self.classifier_threshold and wasp_score < self.classifier_threshold):
                    predicted_class = "Other"
                    confidence_score = 1.0 - (bee_score + wasp_score) / 2.0
                else:
                    if bee_score > wasp_score:
                        predicted_class = 'Bee'
                        confidence_score = bee_score
                    else:
                        predicted_class = 'Wasp'
                        confidence_score = wasp_score
                        wasp_bool[0] = True

                label = f"Class {predicted_class} ({confidence_score:.2f}, Insect {yolo_confidence:.2f})"
                cv2.rectangle(frame, (x1, y1), (x2, y2), self.colors[predicted_class], 2)
                cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors[predicted_class], 2)
            return frame, wasp_bool
        except Exception as e:
            print(e)

    def classify_insect(self, yolo_detections, frame):
        wasp_bool = [False]
        tasks = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.thread_num) as executor:
            for detection in yolo_detections:
                boxes = detection.boxes.xyxy.cpu().numpy()
                yolo_confidences = detection.boxes.conf.cpu().numpy()
                for i in range(len(boxes)):
                    tasks.append(executor.submit(self.classify_box, frame, boxes[i], yolo_confidences[i], wasp_bool, i))
            for future in concurrent.futures.as_completed(tasks):
                future.result()
        return frame, wasp_bool

    def process_frame_noskip(self, frame):
        yolo_detections = self.detect_insect(frame)
        return self.classify_insect(yolo_detections, frame)

    def process_frame(self, frame):
        self.frame_counter += 1
        if self.frame_counter % self.skip_frames == 0:
            yolo_detections = self.detect_insect(frame)
            frame, wasp_bool = self.classify_insect(yolo_detections, frame)
            if wasp_bool[0]:
                self.wasp_counter += 1
                print(self.wasp_counter)
                if self.wasp_counter == self.wasp_threshold:
                    print('DANGER: WASP detected CLOSING DOOR !!')
                    self.wasp_counter = 0
            else:
                self.wasp_counter = 0
        return frame


In [10]:
classifier = insect_classifier(bee_path,wasp_path,buzz_path,yolo_path,skip_frames=1)

In [11]:
window_name = "YOLOResNet insect detection and classification"
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
cv2.resizeWindow(window_name, 400, 300)

with mss() as sct:
    while True:
            screenshot = sct.grab(sct.monitors[0])
            frame = np.array(screenshot,dtype=np.uint8)
            frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)
            annotated_frame = classifier.process_frame(frame)
            cv2.imshow(window_name, np.array(annotated_frame))
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    cv2.destroyAllWindows()

1
1
There is at least 1 reference to internal data
      in the interpreter in the form of a numpy array or slice. Be sure to
      only hold the function returned from tensor() if you are using raw
      data access.
