# Curio Face Detection

In [1]:
import sys
import os

module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

## Import Necessary Libraries

In [2]:
import asyncio
import numpy as np
from curio import Curio
import cv2
from IPython.display import display, Image, clear_output
from threading import Event, Thread
import time
import ipywidgets as widgets

## Define Constants

In [3]:
mac_address = "EC4B2DA9-6088-D0F1-BF8F-4A0271D7BBCC"
camera_ip = "192.168.87.114"
camera_port = 4747

commands = {
    'forward': b"go(50, 50, 1000);\n",
    'back': b"go(-50, -50, 1000);\n",
    'right': b"go(-50, 50, 1000);\n",
    'left': b"go(50, -50, 1000);\n",
    'stop': b"go(0, 0, 0);\n"
}

model_path = 'res10_300x300_ssd_iter_140000.caffemodel'
config_path = 'deploy.prototxt'

## Initialize Curio Device

In [4]:
curio = Curio(mac_address, camera_ip, camera_port)

Connected


Exception in thread Thread-5 (run_loop):
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/talha/Desktop/Curio Python/curio/bluetooth_controller.py", line 24, in run_loop
    loop.run_until_complete(self.run_client())
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/nest_asyncio.py", line 96, in run_until_complete
    raise RuntimeError(
RuntimeError: Event loop stopped before Future completed.


## Load OpenCV's pre-trained Haar Cascade for face detection

In [5]:
net = cv2.dnn.readNetFromCaffe(config_path, model_path)

In [6]:
video_output = widgets.Image(format='jpeg')

In [7]:
def display_stream():
    try:
        last_command = None
        while not curio.stream_stop_event.is_set():
            frame = curio.get_frame()
            if frame is not None:
                # Rotate the frame clockwise
                rotated_frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
                
                # Prepare the frame for DNN face detection
                blob = cv2.dnn.blobFromImage(rotated_frame, 1.0, (300, 300), [104, 117, 123], False, False)
                net.setInput(blob)
                detections = net.forward()
                
                (h, w) = rotated_frame.shape[:2]
                for i in range(detections.shape[2]):
                    confidence = detections[0, 0, i, 2]
                    if confidence > 0.5:  # Confidence threshold
                        box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                        (x1, y1, x2, y2) = box.astype("int")
                        
                        cv2.rectangle(rotated_frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
                        
                        face_center_x = (x1 + x2) // 2
                        frame_center_x = rotated_frame.shape[1] // 2
                        
                        if face_center_x < frame_center_x - 50:
                            command = commands['left']
                        elif face_center_x > frame_center_x + 50:
                            command = commands['right']
                        else:
                            command = commands['stop']
                        
                        if command != last_command or command != commands['stop']:
                            curio.send_command(command)
                            last_command = command
                
                _, jpeg = cv2.imencode('.jpg', rotated_frame)
                video_output.value = jpeg.tobytes()
            time.sleep(0.01)
    except Exception as e:
        print(f"Error displaying stream: {e}")

In [8]:
display_thread = Thread(target=display_stream)
display_thread.start()

In [9]:
display(video_output)

Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x02\x01\x0…

In [None]:
def handle_command(command):
    curio.send_command(commands[command])

forward_button = widgets.Button(description="Forward")
back_button = widgets.Button(description="Back")
left_button = widgets.Button(description="Left")
right_button = widgets.Button(description="Right")
stop_button = widgets.Button(description="Stop")

forward_button.on_click(lambda x: handle_command('forward'))
back_button.on_click(lambda x: handle_command('back'))
left_button.on_click(lambda x: handle_command('left'))
right_button.on_click(lambda x: handle_command('right'))
stop_button.on_click(lambda x: handle_command('stop'))

control_box = widgets.HBox([forward_button, back_button, left_button, right_button, stop_button])
display(control_box)

In [10]:
def stop_curio():
    curio.stop()
    display_thread.join()

stop_curio_button = widgets.Button(description="Stop Curio")
stop_curio_button.on_click(lambda x: stop_curio())
display(stop_curio_button)

Button(description='Stop Curio', style=ButtonStyle())