In [1]:
import time
import cv2
import mediapipe as mp
import numpy as np
from adafruit_servokit import ServoKit
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.framework.formats import landmark_pb2
import libcamera
from picamera2 import Picamera2
import traitlets
import ipywidgets.widgets as widgets
from IPython.display import display
import pandas as pd
from collections import defaultdict

class DriverMonitor:
    def __init__(self):
        # UI Display Widget
        self.face_image_widget = widgets.Image(format='jpeg', width=640, height=480)
        display(self.face_image_widget)

        # Servo setup
        self.kit = ServoKit(channels=16)
        self.pan = 90
        self.tilt = 90
        self.kit.servo[10].angle = self.pan
        self.kit.servo[11].angle = self.tilt

        # Camera setup
        self.picamera = Picamera2()
        config = self.picamera.create_preview_configuration(main={"format": 'XRGB8888', "size": (640, 480)})
        config["transform"] = libcamera.Transform(hflip=0, vflip=1)
        self.picamera.configure(config)
        self.picamera.start()

        # Face Tracking Parameter
        self.dispW = 640
        self.dispH = 480
        self.face_cascade = cv2.CascadeClassifier('./images/haarcascade_frontalface_default.xml')

        # MediaPipe face mesh
        self.detector = self.initialize_face_landmarker()

        # Drawing utils
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_face_mesh = mp.solutions.face_mesh
        self.mp_drawing_styles = mp.solutions.drawing_styles

        # FPS tracking
        self.COUNTER = 0
        self.FPS = 0
        self.START_TIME = time.time()
        self.DETECTION_RESULT = None
        self.fps_avg_frame_count = 10

        # Data collection
        self.blendshape_data = []  # 用于存储每秒的平均值
        self.current_blendshapes = defaultdict(list)  # 用于累积每帧的分数
        self.last_log_time = time.time()  # 上次记录的时间

    def initialize_face_landmarker(self):
        base_options = python.BaseOptions(model_asset_path='face_landmarker.task')
        options = vision.FaceLandmarkerOptions(
            base_options=base_options,
            running_mode=vision.RunningMode.LIVE_STREAM,
            num_faces=2,
            min_face_detection_confidence=0.5,
            min_face_presence_confidence=0.5,
            min_tracking_confidence=0.5,
            output_face_blendshapes=True,
            result_callback=self.save_result)
        return vision.FaceLandmarker.create_from_options(options)

    def save_result(self, result: vision.FaceLandmarkerResult, unused_output_image: mp.Image, timestamp_ms: int):
        if self.COUNTER % self.fps_avg_frame_count == 0:
            self.FPS = self.fps_avg_frame_count / (time.time() - self.START_TIME)
            self.START_TIME = time.time()
        self.DETECTION_RESULT = result
        self.COUNTER += 1

    def bgr8_to_jpeg(self, value, quality=75):
        return bytes(cv2.imencode('.jpg', value)[1])

    def update_servo(self, x, y, w, h):
        Xcent = x + w / 2
        Ycent = y + h / 2
        errorPan = Xcent - self.dispW / 2
        errorTilt = Ycent - self.dispH / 2
        if abs(errorPan) > 15:
            self.pan -= errorPan / 40
        if abs(errorTilt) > 15:
            self.tilt -= errorTilt / 40
        self.pan = max(0, min(180, self.pan))
        self.tilt = max(0, min(180, self.tilt))
        self.kit.servo[10].angle = 180 - self.pan
        self.kit.servo[11].angle = 180 - self.tilt

    def draw_blendshapes(self, frame):
        if not self.DETECTION_RESULT:
            return frame
        
        label_padding_width = 1500
        label_background_color = (255, 255, 255)       
        frame = cv2.copyMakeBorder(frame, 0, 0, 0, label_padding_width, cv2.BORDER_CONSTANT, None, label_background_color)

        legend_x = frame.shape[1] - label_padding_width + 20
        legend_y = 30
        bar_max_width = label_padding_width - 40
        bar_height = 8
        gap_between_bars = 5
        text_gap = 5

        face_blendshapes = self.DETECTION_RESULT.face_blendshapes
        
        if face_blendshapes:
            for idx, category in enumerate(face_blendshapes[0]):
                category_name = category.category_name
                score = round(category.score, 2)
                text = f"{category_name} ({score:.2f})"
                # Accumulate data
                self.current_blendshapes[category_name].append(score)
                (text_width, _), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
                cv2.putText(frame, text, (legend_x, legend_y + bar_height // 2 + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 0, 0), 1, cv2.LINE_AA)
                bar_width = int(bar_max_width * score)
                cv2.rectangle(frame, (legend_x + text_width + text_gap, legend_y),
                              (legend_x + text_width + text_gap + bar_width, legend_y + bar_height),
                              (0, 255, 0), -1)
                legend_y += (bar_height + gap_between_bars)
        return frame

    def draw_landmarks(self, frame):
        if not self.DETECTION_RESULT:
            return
        for face_landmarks in self.DETECTION_RESULT.face_landmarks:
            face_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
            face_landmarks_proto.landmark.extend([
                landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z)
                for landmark in face_landmarks
            ])
            self.mp_drawing.draw_landmarks(
                image=frame,
                landmark_list=face_landmarks_proto,
                connections=self.mp_face_mesh.FACEMESH_TESSELATION,
                landmark_drawing_spec=None,
                connection_drawing_spec=self.mp_drawing_styles.get_default_face_mesh_tesselation_style())
            self.mp_drawing.draw_landmarks(
                image=frame,
                landmark_list=face_landmarks_proto,
                connections=self.mp_face_mesh.FACEMESH_CONTOURS,
                landmark_drawing_spec=None,
                connection_drawing_spec=self.mp_drawing_styles.get_default_face_mesh_contours_style())
            self.mp_drawing.draw_landmarks(
                image=frame,
                landmark_list=face_landmarks_proto,
                connections=self.mp_face_mesh.FACEMESH_IRISES,
                landmark_drawing_spec=None,
                connection_drawing_spec=self.mp_drawing_styles.get_default_face_mesh_iris_connections_style())
            
    def log_blendshapes(self, log_time):
            if not self.current_blendshapes:
                return

            # 计算每个 category_name 的平均分数
            averaged_blendshapes = {
                category_name: sum(scores) / len(scores)
                for category_name, scores in self.current_blendshapes.items()
            }

            # 将结果存储到 blendshape_data 中
            for category_name, avg_score in averaged_blendshapes.items():
                self.blendshape_data.append({
                    'Time': log_time,
                    'Category': category_name,
                    'Average Score': round(avg_score, 2)
                })

            # 清空当前累积数据
            self.current_blendshapes.clear()

    def save_blendshapes(self, file_path):
        df = pd.DataFrame(self.blendshape_data)
        df.to_csv(file_path, index=False)
        print(f"Blendshapes data saved to {file_path}")

    def run(self):
        while True:
            frame = self.picamera.capture_array()
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            image = cv2.flip(frame, 1)
            image = image.astype(np.uint8)
            rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            raw_image = cv2.cvtColor(rgb_image, cv2.COLOR_RGB2BGR)  # 强制格式为BGR
            mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_image)

            self.detector.detect_async(mp_image, time.time_ns() // 1_000_000)

            faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)
            for (x, y, w, h) in faces:
                self.update_servo(x, y, w, h)

            fps_text = f'FPS = {self.FPS:.1f}'
            cv2.putText(raw_image, fps_text, (24, 50), cv2.FONT_HERSHEY_DUPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA)
            self.draw_landmarks(raw_image)
            raw_image = self.draw_blendshapes(raw_image)
            self.face_image_widget.value = self.bgr8_to_jpeg(raw_image)

            current_time = time.time()
            if current_time - self.last_log_time >= 1.0:
                log_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
                self.log_blendshapes(log_time)
                self.last_log_time = current_time


# 示例：运行
if __name__ == "__main__":
    monitor = DriverMonitor()
    try: 
        monitor.run()
    except KeyboardInterrupt:
        monitor.save_blendshapes("data/Camera.csv")

Image(value=b'', format='jpeg', height='480', width='640')

[0:17:28.840861281] [3612] [1;32m INFO [1;37mCamera [1;34mcamera_manager.cpp:284 [0mlibcamera v0.1.0+118-563cd78e
[0:17:28.876641780] [3644] [1;32m INFO [1;37mRPI [1;34mpisp.cpp:653 [0mlibpisp version v1.0.2 fa44a258644a 22-11-2023 (21:59:22)
[0:17:28.886235571] [3644] [1;32m INFO [1;37mRPI [1;34mpisp.cpp:1112 [0mRegistered camera /base/axi/pcie@120000/rp1/i2c@80000/imx708@1a to CFE device /dev/media2 and ISP device /dev/media0 using PiSP variant BCM2712_C0
[0:17:28.889390423] [3612] [1;33m WARN [1;37mV4L2 [1;34mv4l2_pixelformat.cpp:338 [0mUnsupported V4L2 pixel format Y16 
[0:17:28.889420481] [3612] [1;33m WARN [1;37mV4L2 [1;34mv4l2_pixelformat.cpp:338 [0mUnsupported V4L2 pixel format RGB6
[0:17:28.889424982] [3612] [1;33m WARN [1;37mV4L2 [1;34mv4l2_pixelformat.cpp:338 [0mUnsupported V4L2 pixel format BGR6
[0:17:28.889431019] [3612] [1;33m WARN [1;37mV4L2 [1;34mv4l2_pixelformat.cpp:338 [0mUnsupported V4L2 pixel format PC1M
[0:17:28.890135001] [3612] [1;32m

Blendshapes data saved to data/Camera.csv
