Dont Forget to edit the
> ESP32CAM Web IP Address\
> best.pt location directory\

Down below in the main function

In [None]:
import cv2
import numpy as np
import requests
from ultralytics import YOLO
import time
import threading
from queue import Queue
import urllib.request

class ESP32CamDetector:
    def __init__(self, esp32_ip, model_path):
        """
        Initialize ESP32 Camera Object Detector
        
        Args:
            esp32_ip (str): IP address of ESP32-CAM
            model_path (str): Path to your trained YOLO model (.pt file)
        """
        self.esp32_ip = esp32_ip
        self.model = YOLO(model_path)
        self.stream_url = f"http://{esp32_ip}/stream"
        self.capture_url = f"http://{esp32_ip}/capture"
        self.frame_queue = Queue(maxsize=10)
        self.running = False
        
        # Confidence threshold
        self.conf_threshold = 0.25
        
        print(f"ESP32-CAM URL: {self.stream_url}")
        print(f"Model loaded: {model_path}")
        
    def test_connection(self):
        """Test connection to ESP32-CAM"""
        try:
            response = requests.get(self.capture_url, timeout=10)
            if response.status_code == 200:
                print("✓ ESP32-CAM connection successful!")
                return True
            else:
                print(f"✗ ESP32-CAM connection failed. Status code: {response.status_code}")
                return False
        except Exception as e:
            print(f"✗ ESP32-CAM connection error: {e}")
            return False
    
    def capture_single_frame(self):
        """Capture a single frame from ESP32-CAM"""
        try:
            response = requests.get(self.capture_url, timeout=10)
            if response.status_code == 200:
                # Convert bytes to numpy array
                nparr = np.frombuffer(response.content, np.uint8)
                # Decode image
                frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
                return frame
            else:
                print(f"Failed to capture frame. Status code: {response.status_code}")
                return None
        except Exception as e:
            print(f"Error capturing frame: {e}")
            return None
    
    def stream_frames(self):
        """Stream frames from ESP32-CAM using multiple methods"""
        # Try Method 1: OpenCV first
        if self._stream_opencv():
            return
        
        print("OpenCV streaming failed, trying alternative method...")
        # Try Method 2: urllib with MJPEG parsing
        self._stream_urllib()
    
    def _stream_opencv(self):
        """Stream using OpenCV (Method 1)"""
        try:
            cap = cv2.VideoCapture(self.stream_url)
            cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
            
            if not cap.isOpened():
                return False
            
            print("✓ OpenCV streaming started")
            
            consecutive_failures = 0
            while self.running:
                ret, frame = cap.read()
                if ret:
                    consecutive_failures = 0
                    if not self.frame_queue.full():
                        self.frame_queue.put(frame)
                else:
                    consecutive_failures += 1
                    if consecutive_failures > 10:
                        print("Too many consecutive failures, switching method...")
                        break
                    time.sleep(0.1)
            
            cap.release()
            return True
            
        except Exception as e:
            print(f"OpenCV streaming error: {e}")
            return False
    
    def _stream_urllib(self):
        """Stream using urllib with MJPEG parsing (Method 2)"""
        try:
            import urllib.request
            stream = urllib.request.urlopen(self.stream_url, timeout=30)
            bytes_data = b''
            
            print("✓ urllib streaming started")
            
            while self.running:
                try:
                    chunk = stream.read(4096)
                    if not chunk:
                        break
                    
                    bytes_data += chunk
                    
                    # Look for JPEG boundaries
                    while True:
                        a = bytes_data.find(b'\xff\xd8')  # JPEG start
                        b = bytes_data.find(b'\xff\xd9')  # JPEG end
                        
                        if a == -1 or b == -1 or b <= a:
                            break
                        
                        jpg = bytes_data[a:b+2]
                        bytes_data = bytes_data[b+2:]
                        
                        # Decode frame
                        frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
                        if frame is not None and not self.frame_queue.full():
                            self.frame_queue.put(frame)
                        
                except Exception as e:
                    print(f"urllib chunk error: {e}")
                    time.sleep(0.1)
            
            stream.close()
            
        except Exception as e:
            print(f"urllib streaming error: {e}")
            # Fallback to single frame capture
            self._stream_single_frames()
    
    def process_frame(self, frame):
        """Process frame with YOLO detection"""
        try:
            # Run YOLO detection
            results = self.model(frame, conf=self.conf_threshold, verbose=False)
            
            # Draw detections on frame
            annotated_frame = results[0].plot()
            
            # Get detection info
            detections = []
            if results[0].boxes is not None:
                for box in results[0].boxes:
                    cls_id = int(box.cls.item())
                    conf = box.conf.item()
                    class_name = self.model.names[cls_id]
                    
                    detections.append({
                        'class': class_name,
                        'confidence': conf,
                        'bbox': box.xyxy.tolist()[0]  # [x1, y1, x2, y2]
                    })
            
            return annotated_frame, detections
            
        except Exception as e:
            print(f"Detection error: {e}")
            return frame, []
    
    def run_detection_stream(self):
        """Run real-time detection on stream"""
        if not self.test_connection():
            return
        
        self.running = True
        
        # Start streaming thread
        stream_thread = threading.Thread(target=self.stream_frames)
        stream_thread.daemon = True
        stream_thread.start()
        
        print("Starting detection... Press 'q' to quit")
        
        fps_counter = 0
        fps_start_time = time.time()
        
        try:
            while True:
                if not self.frame_queue.empty():
                    frame = self.frame_queue.get()
                    
                    # Process frame
                    processed_frame, detections = self.process_frame(frame)
                    
                    # Calculate and display FPS
                    fps_counter += 1
                    if fps_counter % 30 == 0:
                        fps_end_time = time.time()
                        fps = 30 / (fps_end_time - fps_start_time)
                        print(f"FPS: {fps:.2f}")
                        fps_start_time = fps_end_time
                    
                    # Display detections info
                    if detections:
                        print(f"Detected {len(detections)} objects:")
                        for det in detections:
                            print(f"  - {det['class']}: {det['confidence']:.2f}")
                    
                    # Display frame
                    cv2.imshow('ESP32-CAM Object Detection', processed_frame)
                    
                    # Check for quit
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        break
                else:
                    time.sleep(0.01)  # Small delay if no frames available
                    
        except KeyboardInterrupt:
            print("\nStopping detection...")
        finally:
            self.running = False
            cv2.destroyAllWindows()
    
    def run_detection_single(self):
        """Run detection on single captured frames"""
        if not self.test_connection():
            return
        
        print("Starting single frame detection... Press 'q' to quit, 's' to save frame")
        
        try:
            while True:
                frame = self.capture_single_frame()
                if frame is not None:
                    # Process frame
                    processed_frame, detections = self.process_frame(frame)
                    
                    # Display detections info
                    if detections:
                        print(f"Detected {len(detections)} objects:")
                        for det in detections:
                            print(f"  - {det['class']}: {det['confidence']:.2f}")
                    else:
                        print("No objects detected")
                    
                    # Display frame
                    cv2.imshow('ESP32-CAM Object Detection (Single Frame)', processed_frame)
                    
                    key = cv2.waitKey(1) & 0xFF
                    if key == ord('q'):
                        break
                    elif key == ord('s'):
                        # Save frame
                        timestamp = int(time.time())
                        filename = f"detection_{timestamp}.jpg"
                        cv2.imwrite(filename, processed_frame)
                        print(f"Frame saved as {filename}")
                
                time.sleep(0.1)  # Small delay between captures
                
        except KeyboardInterrupt:
            print("\nStopping detection...")
        finally:
            cv2.destroyAllWindows()

def main():
    # Configuration
    ESP32_IP = "192.168.1.167"  # IP ESP32-CAM Anda
    MODEL_PATH = "./runs/detect/train/weights/best.pt"  # Path ke model Anda
    
    # Create detector instance
    detector = ESP32CamDetector(ESP32_IP, MODEL_PATH)
    
    print("ESP32-CAM Object Detection System")
    print("1. Stream Mode (Real-time detection)")
    print("2. Single Frame Mode (Capture and detect)")
    
    choice = input("Choose mode (1/2): ").strip()
    
    if choice == "1":
        detector.run_detection_stream()
    elif choice == "2":
        detector.run_detection_single()
    else:
        print("Invalid choice. Using stream mode by default.")
        detector.run_detection_stream()

if __name__ == "__main__":
    main()

In [2]:
import cv2
import numpy as np
import requests
from ultralytics import YOLO
import time
import threading
from queue import Queue
import urllib.request

class ESP32CamDetector:
    def __init__(self, esp32_ip, model_path, window_width=1280, window_height=720):
        """
        Initialize ESP32 Camera Object Detector
        
        Args:
            esp32_ip (str): IP address of ESP32-CAM
            model_path (str): Path to your trained YOLO model (.pt file)
            window_width (int): Desired window width (default: 1280)
            window_height (int): Desired window height (default: 720)
        """
        self.esp32_ip = esp32_ip
        self.model = YOLO(model_path)
        self.stream_url = f"http://{esp32_ip}/stream"
        self.capture_url = f"http://{esp32_ip}/capture"
        self.frame_queue = Queue(maxsize=10)
        self.running = False
        
        # Window size configuration
        self.window_width = window_width
        self.window_height = window_height
        
        # Confidence threshold
        self.conf_threshold = 0.25
        
        print(f"ESP32-CAM URL: {self.stream_url}")
        print(f"Model loaded: {model_path}")
        print(f"Window size: {window_width}x{window_height}")
        
    def setup_window(self, window_name):
        """Setup OpenCV window with custom size"""
        # Create named window with NORMAL flag (allows resizing)
        cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
        
        # Set window size (this will force the window size regardless of image resolution)
        cv2.resizeWindow(window_name, self.window_width, self.window_height)
        
        # Optional: Set window position
        cv2.moveWindow(window_name, 100, 100)
        
        print(f"Window '{window_name}' setup with size {self.window_width}x{self.window_height}")
        
    def test_connection(self):
        """Test connection to ESP32-CAM using same method as CekESP2"""
        print("Testing ESP32-CAM connection...")
        try:
            # Test basic connection first
            response = requests.get(f"http://{self.esp32_ip}/", timeout=10)
            if response.status_code == 200:
                print("✓ Basic connection successful!")
                
                # Test capture endpoint
                response = requests.get(self.capture_url, timeout=10)
                if response.status_code == 200:
                    print("✓ ESP32-CAM connection successful!")
                    return True
                else:
                    print(f"✗ Capture endpoint failed. Status code: {response.status_code}")
                    return False
            else:
                print(f"✗ Basic connection failed. Status code: {response.status_code}")
                return False
        except Exception as e:
            print(f"✗ ESP32-CAM connection error: {e}")
            return False
    
    def capture_single_frame(self):
        """Capture a single frame from ESP32-CAM"""
        try:
            response = requests.get(self.capture_url, timeout=10)
            if response.status_code == 200:
                # Convert bytes to numpy array
                nparr = np.frombuffer(response.content, np.uint8)
                # Decode image
                frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
                return frame
            else:
                print(f"Failed to capture frame. Status code: {response.status_code}")
                return None
        except Exception as e:
            print(f"Error capturing frame: {e}")
            return None
    
    def stream_frames(self):
        """Stream frames from ESP32-CAM using enhanced methods from CekESP2"""
        # Try Method 1: OpenCV first (enhanced version)
        if self._stream_opencv_enhanced():
            return
        
        print("OpenCV streaming failed, trying urllib method...")
        # Try Method 2: urllib with robust MJPEG parsing (from CekESP2)
        self._stream_urllib_enhanced()
    
    def _stream_opencv_enhanced(self):
        """Enhanced OpenCV streaming method from CekESP2"""
        try:
            cap = cv2.VideoCapture(self.stream_url)
            
            # Enhanced settings from CekESP2
            cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
            cap.set(cv2.CAP_PROP_FPS, 30)
            
            if not cap.isOpened():
                print("✗ OpenCV cannot open stream")
                return False
            
            print("✓ OpenCV streaming started")
            
            consecutive_failures = 0
            frame_count = 0
            
            while self.running:
                ret, frame = cap.read()
                if ret and frame is not None:
                    consecutive_failures = 0
                    frame_count += 1
                    
                    if not self.frame_queue.full():
                        self.frame_queue.put(frame)
                    
                    # Log every 30 frames
                    if frame_count % 30 == 0:
                        print(f"OpenCV: {frame_count} frames processed")
                        
                else:
                    consecutive_failures += 1
                    if consecutive_failures > 10:
                        print("Too many consecutive failures, switching method...")
                        break
                    time.sleep(0.1)
            
            cap.release()
            return True
            
        except Exception as e:
            print(f"OpenCV streaming error: {e}")
            return False
    
    def _stream_urllib_enhanced(self):
        """Enhanced urllib streaming method from CekESP2"""
        try:
            # Use the same robust method from CekESP2
            req = urllib.request.Request(self.stream_url)
            response = urllib.request.urlopen(req, timeout=15)
            
            print("✓ urllib streaming started")
            print(f"Content-Type: {response.headers.get('content-type')}")
            
            # Buffer untuk mengumpulkan data
            buffer = b''
            frame_count = 0
            
            while self.running:
                try:
                    # Baca chunk data (increased size like CekESP2)
                    chunk = response.read(4096)
                    if not chunk:
                        print("No more data from stream")
                        break
                    
                    buffer += chunk
                    
                    # Parse JPEG frames using same logic as CekESP2
                    while True:
                        # Cari start of JPEG (0xFFD8)
                        jpeg_start = buffer.find(b'\\xff\\xd8')
                        if jpeg_start == -1:
                            break
                        
                        # Cari end of JPEG (0xFFD9) setelah start
                        jpeg_end = buffer.find(b'\\xff\\xd9', jpeg_start)
                        if jpeg_end == -1:
                            break
                        
                        # Extract JPEG frame
                        jpeg_data = buffer[jpeg_start:jpeg_end + 2]
                        buffer = buffer[jpeg_end + 2:]
                        
                        # Validasi ukuran minimum untuk JPEG (from CekESP2)
                        if len(jpeg_data) < 100:  # JPEG terlalu kecil
                            continue
                        
                        try:
                            # Decode frame
                            nparr = np.frombuffer(jpeg_data, np.uint8)
                            frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
                            
                            if frame is not None and frame.size > 0:
                                frame_count += 1
                                
                                if not self.frame_queue.full():
                                    self.frame_queue.put(frame)
                                
                                # Log progress
                                if frame_count % 30 == 0:
                                    print(f"urllib: {frame_count} frames processed")
                            
                        except Exception as decode_error:
                            # Silent continue like in CekESP2
                            continue
                
                except Exception as e:
                    print(f"urllib chunk error: {e}")
                    time.sleep(0.1)
            
            response.close()
            
        except Exception as e:
            print(f"urllib streaming error: {e}")
            # Fallback to single frame capture
            self._stream_single_frames()
    
    def _stream_single_frames(self):
        """Fallback method using single frame capture"""
        print("✓ Using single frame capture method")
        
        while self.running:
            try:
                frame = self.capture_single_frame()
                if frame is not None:
                    if not self.frame_queue.full():
                        self.frame_queue.put(frame)
                time.sleep(0.1)  # 10 FPS fallback
            except Exception as e:
                print(f"Single frame capture error: {e}")
                time.sleep(0.5)
    
    def process_frame(self, frame):
        """Process frame with YOLO detection"""
        try:
            # Run YOLO detection
            results = self.model(frame, conf=self.conf_threshold, verbose=False)
            
            # Draw detections on frame
            annotated_frame = results[0].plot()
            
            # Get detection info
            detections = []
            if results[0].boxes is not None:
                for box in results[0].boxes:
                    cls_id = int(box.cls.item())
                    conf = box.conf.item()
                    class_name = self.model.names[cls_id]
                    
                    detections.append({
                        'class': class_name,
                        'confidence': conf,
                        'bbox': box.xyxy.tolist()[0]  # [x1, y1, x2, y2]
                    })
            
            return annotated_frame, detections
            
        except Exception as e:
            print(f"Detection error: {e}")
            return frame, []
    
    def run_detection_stream(self):
        """Run real-time detection on stream"""
        if not self.test_connection():
            print("Cannot proceed without ESP32-CAM connection")
            return
        
        self.running = True
        
        # Setup window dengan ukuran yang diinginkan
        window_name = 'ESP32-CAM Object Detection'
        self.setup_window(window_name)
        
        # Start streaming thread
        stream_thread = threading.Thread(target=self.stream_frames)
        stream_thread.daemon = True
        stream_thread.start()
        
        print("Starting detection... Press 'q' to quit")
        print("Waiting for frames...")
        
        fps_counter = 0
        fps_start_time = time.time()
        detection_start_time = time.time()
        
        try:
            while True:
                if not self.frame_queue.empty():
                    frame = self.frame_queue.get()
                    
                    # Show original frame info (for debugging)
                    if fps_counter == 0:  # Show only once
                        h, w = frame.shape[:2]
                        print(f"Original frame resolution: {w}x{h}")
                        print(f"Window size: {self.window_width}x{self.window_height}")
                    
                    # Process frame
                    processed_frame, detections = self.process_frame(frame)
                    
                    # Calculate and display FPS
                    fps_counter += 1
                    if fps_counter % 30 == 0:
                        fps_end_time = time.time()
                        fps = 30 / (fps_end_time - fps_start_time)
                        print(f"Detection FPS: {fps:.2f}")
                        fps_start_time = fps_end_time
                    
                    # Display detections info (less frequent to avoid spam)
                    if detections and fps_counter % 10 == 0:
                        print(f"Detected {len(detections)} objects:")
                        for det in detections:
                            print(f"  - {det['class']}: {det['confidence']:.2f}")
                    
                    # Display frame (will be automatically resized to fit window)
                    cv2.imshow(window_name, processed_frame)
                    
                    # Check for quit
                    key = cv2.waitKey(1) & 0xFF
                    if key == ord('q'):
                        break
                else:
                    # Check if we haven't received frames for too long
                    if time.time() - detection_start_time > 30:  # 30 seconds timeout
                        print("No frames received for 30 seconds. Check connection.")
                        detection_start_time = time.time()
                    
                    time.sleep(0.01)  # Small delay if no frames available
                    
        except KeyboardInterrupt:
            print("\\nStopping detection...")
        finally:
            self.running = False
            cv2.destroyAllWindows()
    
    def run_detection_single(self):
        """Run detection on single captured frames"""
        if not self.test_connection():
            print("Cannot proceed without ESP32-CAM connection")
            return
        
        # Setup window dengan ukuran yang diinginkan
        window_name = 'ESP32-CAM Object Detection (Single Frame)'
        self.setup_window(window_name)
        
        print("Starting single frame detection... Press 'q' to quit, 's' to save frame")
        
        try:
            frame_captured = False
            while True:
                frame = self.capture_single_frame()
                if frame is not None:
                    # Show original frame info (for debugging) - only once
                    if not frame_captured:
                        h, w = frame.shape[:2]
                        print(f"Original frame resolution: {w}x{h}")
                        print(f"Window size: {self.window_width}x{self.window_height}")
                        frame_captured = True
                    
                    # Process frame
                    processed_frame, detections = self.process_frame(frame)
                    
                    # Display detections info
                    if detections:
                        print(f"Detected {len(detections)} objects:")
                        for det in detections:
                            print(f"  - {det['class']}: {det['confidence']:.2f}")
                    else:
                        print("No objects detected")
                    
                    # Display frame (will be automatically resized to fit window)
                    cv2.imshow(window_name, processed_frame)
                    
                    key = cv2.waitKey(1) & 0xFF
                    if key == ord('q'):
                        break
                    elif key == ord('s'):
                        # Save frame
                        timestamp = int(time.time())
                        filename = f"detection_{timestamp}.jpg"
                        cv2.imwrite(filename, processed_frame)
                        print(f"Frame saved as {filename}")
                
                time.sleep(0.1)  # Small delay between captures
                
        except KeyboardInterrupt:
            print("\\nStopping detection...")
        finally:
            cv2.destroyAllWindows()

def main():
    # Configuration - Update these values
    ESP32_IP = "192.168.1.7"  # Your ESP32-CAM IP (same as CekESP2)
    MODEL_PATH = "./runs/detect/train/weights/best.pt"  # Path to your YOLO model
    
    # Window size configuration (customize as needed)
    WINDOW_WIDTH = 1280   # Desired window width
    WINDOW_HEIGHT = 720   # Desired window height
    
    # Verify model file exists
    import os
    if not os.path.exists(MODEL_PATH):
        print(f"❌ Model file not found: {MODEL_PATH}")
        print("Please check the path to your YOLO model file")
        return
    
    # Create detector instance with custom window size
    detector = ESP32CamDetector(ESP32_IP, MODEL_PATH, WINDOW_WIDTH, WINDOW_HEIGHT)
    
    print("ESP32-CAM Object Detection System (Enhanced with Custom Window Size)")
    print("=" * 70)
    print("1. Stream Mode (Real-time detection)")
    print("2. Single Frame Mode (Capture and detect)")
    print(f"Window size will be: {WINDOW_WIDTH}x{WINDOW_HEIGHT}")
    
    choice = input("Choose mode (1/2): ").strip()
    
    if choice == "1":
        print("\\nStarting stream mode...")
        detector.run_detection_stream()
    elif choice == "2":
        print("\\nStarting single frame mode...")
        detector.run_detection_single()
    else:
        print("Invalid choice. Using stream mode by default.")
        detector.run_detection_stream()

if __name__ == "__main__":
    main()

ESP32-CAM URL: http://192.168.1.7/stream
Model loaded: ./runs/detect/train/weights/best.pt
Window size: 1280x720
ESP32-CAM Object Detection System (Enhanced with Custom Window Size)
1. Stream Mode (Real-time detection)
2. Single Frame Mode (Capture and detect)
Window size will be: 1280x720
\nStarting single frame mode...
Testing ESP32-CAM connection...
✓ Basic connection successful!
✓ ESP32-CAM connection successful!
Window 'ESP32-CAM Object Detection (Single Frame)' setup with size 1280x720
Starting single frame detection... Press 'q' to quit, 's' to save frame
Original frame resolution: 400x296
Window size: 1280x720
Detected 2 objects:
  - car: 0.71
  - bus: 0.67
Detected 2 objects:
  - car: 0.71
  - bus: 0.68
Detected 2 objects:
  - car: 0.70
  - bus: 0.66
Detected 3 objects:
  - bus: 0.66
  - bus: 0.60
  - car: 0.51
Detected 3 objects:
  - car: 0.68
  - bus: 0.64
  - bus: 0.55
Detected 2 objects:
  - car: 0.71
  - bus: 0.65
Detected 2 objects:
  - bus: 0.70
  - car: 0.68
Detected 3