### Process Dataset

In [9]:
import cv2
import os
import re
import dlib
from scipy.spatial import distance
import numpy as np
import pandas as pd

In [2]:
def calculate_ear(eye_points):
    A = distance.euclidean(eye_points[1], eye_points[5])
    B = distance.euclidean(eye_points[2], eye_points[4])
    C = distance.euclidean(eye_points[0], eye_points[3])
    ear = (A+B) / (2.0 * C)
    return ear

In [3]:
def calculate_mar(mouth_points):
    A = distance.euclidean(mouth_points[2], mouth_points[8])
    B =  distance.euclidean(mouth_points[3], mouth_points[7])
    C = distance.euclidean(mouth_points[4], mouth_points[6])
    D = distance.euclidean(mouth_points[0], mouth_points[5])
    mar = (A + B + C) / (2.0 * D)
    return mar

In [4]:
def calculate_head_angle(landmarks):
    model_points = np.array([
        (0.0, 0.0, 0.0), # Nariz
        (0.0, -330.0, -65.0), # Barbilla
        (-225.0, 170.0, -135.0), # Ojo izquierdo
        (225.0, 170.0, -135.0), # Ojo derecho
        (-150.0, -150.0, -125.0), # Boca izquierda
        (150.0, -150.0, -125.0) # Boca derecha
    ])
    
    image_points = np.array([
        (landmarks.part(30).x, landmarks.part(30).y), # Nariz
        (landmarks.part(8).x, landmarks.part(8).y), # Barbilla
        (landmarks.part(36).x, landmarks.part(36).y), # Ojo izquierdo
        (landmarks.part(45).x, landmarks.part(45).y), # Ojo derecho
        (landmarks.part(48).x, landmarks.part(48).y), # Boca izquierda
        (landmarks.part(54).x, landmarks.part(54).y) # Boca derecha
    ], dtype="double")

    focal_length = 640
    center = (640/2, 480/2)
    camera_matrix = np.array([[focal_length, 0, center[0]],
                              [0, focal_length, center[1]],
                              [0, 0, 1]], dtype="double")
    dist_coeffs = np.zeros((4,1))

    # Usa la función cv2.solvePnP (Perspective-n-Point) de OpenCV para estimar la rotación y traslación de la cabeza en 3D.
    # 'rotation_vector': Un vector de rotación 3D (en radianes) que describe la orientación de la cabeza (ángulos de Euler en los ejes X, Y, Z).
    _, rotation_vector, _ = cv2.solvePnP(model_points, image_points, camera_matrix, dist_coeffs)
    # np.linalg.norm(rotation_vector): Calcula la magnitud del vector de rotación, que representa la rotación total de la cabeza (en radianes).
    # * 180 / np.pi: Convierte la magnitud de radianes a grados.
    angle = np.linalg.norm(rotation_vector) * 180 / np.pi
    return angle

In [5]:
def count_blinks(ears, ear_threshold=0.2):
    blinks = 0
    total_closed_frames = 0
    current_closed_frames = 0
    max_closed_duration = 0
    prev_ear = ears[0]

    for ear in ears[1:]:
        if ear <= ear_threshold:
            current_closed_frames += 1
            total_closed_frames += 1
        else:
            max_closed_duration = max(max_closed_duration, current_closed_frames)
            current_closed_frames = 0
        
        if prev_ear > ear_threshold and ear <= ear_threshold:
            blinks += 1
            
        prev_ear = ear

    if current_closed_frames > 0:
        max_closed_duration = max(max_closed_duration, current_closed_frames)
    
    return {
        'blinks': blinks,
        'total_closed_frames': total_closed_frames,
        'max_closed_duration': max_closed_duration
    }

In [6]:
def extract_features_from_image(image_path, detector, predictor):
    img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    if img is None:
        return None
    faces = detector(img)
    for face in faces:
        landmarks = predictor(img, face)
        left_eye = [(landmarks.part(i).x, landmarks.part(i).y) for i in range(36, 42)]
        right_eye = [(landmarks.part(i).x, landmarks.part(i).y) for i in range(42, 48)]
        mouth = [(landmarks.part(i).x, landmarks.part(i).y) for i in range(48, 68)]
        ear = (calculate_ear(left_eye) + calculate_ear(right_eye)) / 2.0
        mar = calculate_mar(mouth)
        head_angle = calculate_head_angle(landmarks)
        return {'ear': ear, 'mar': mar, 'head_angle': head_angle}
    return None

In [7]:
def process_dataset(drowsy_dir, notdrowsy_dir, detector, predictor, window_size=20, meta_window_size=5):
    videos = {}

    # Procesar carpeta drowsy
    for img_name in os.listdir(drowsy_dir):
        match = re.match(r"(\d+_\w+_\w+)_(\d+)_drowsy", img_name)
        if match:
            video_id_base, frame_num = match.groups()
            video_id = f"{video_id_base}_drowsy"
            if video_id not in videos:
                videos[video_id] = {'frames': [], 'label': 1}
            videos[video_id]['frames'].append((int(frame_num), os.path.join(drowsy_dir, img_name)))
    
    # Procesar carpeta notdrowsy
    for img_name in os.listdir(notdrowsy_dir):
        match = re.match(r"(\d+_\w+_\w+)_(\d+)_notdrowsy", img_name)
        if match:
            video_id_base, frame_num = match.groups()
            video_id = f"{video_id_base}_notdrowsy"
            if video_id not in videos:
                videos[video_id] = {'frames': [], 'label': 0}
            videos[video_id]['frames'].append((int(frame_num), os.path.join(notdrowsy_dir, img_name)))

    # Ordenar frames y extraer caracteristicas
    data = []
    for video_id, info in videos.items():
        info['frames'].sort() # Ordenar por número de fotograma
        print(f"Video {video_id}: Inicio = {info['frames'][0][0]}, Fin = {info['frames'][-1][0]}, Etiqueta = {info['label']}, Imagenes = {len(info['frames'])}")

        ears, mars, head_angles = [], [], []
        for _, frame_path in info['frames']:
            features = extract_features_from_image(frame_path, detector, predictor)
            if features:
                ears.append(features['ear'])
                mars.append(features['mar'])
                head_angles.append(features['head_angle'])
        
        if len(ears) >= window_size:
            window_metrics = []
            for i in range(0, len(ears) - window_size + 1):
                window_ears = ears[i:i+window_size]
                window_mars = mars[i:i+window_size]
                window_angles = head_angles[i:i+window_size]
                blink_info = count_blinks(window_ears)
                window_metrics.append({
                    'blink_freq': blink_info['blinks'],
                    'total_closed_frames': blink_info['total_closed_frames'],
                    'max_closed_duration': blink_info['max_closed_duration'],
                    'ear_mean': np.mean(window_ears),
                    'ear_std': np.std(window_ears),
                    'ear_min': np.min(window_ears),
                    'mar_mean': np.mean(window_mars),
                    'mar_std': np.std(window_mars),
                    'mar_max': np.max(window_mars),
                    'head_angle_mean': np.mean(window_angles)
                })
            
            for j in range(0, len(window_metrics) - meta_window_size + 1):
                meta_window = window_metrics[j:j+meta_window_size]
                total_closed_in_meta = sum(w['total_closed_frames'] for w in meta_window)
                percent_closed_in_meta = (total_closed_in_meta / (window_size*meta_window_size)) * 100
                mostly_closed_windows = sum(1 for w in meta_window if(w['total_closed_frames'] / window_size) > 0.5)
                data.append({
                    'video_id': video_id,
                    'ear_mean': meta_window[-1]['ear_mean'],
                    'ear_std': meta_window[-1]['ear_std'],
                    'ear_min': meta_window[-1]['ear_min'],
                    'mar_mean': meta_window[-1]['mar_mean'],
                    'mar_std': meta_window[-1]['mar_std'],
                    'mar_max': meta_window[-1]['mar_max'],
                    'head_angle_mean': meta_window[-1]['head_angle_mean'],
                    'blink_freq': meta_window[-1]['blink_freq'],
                    'total_closed_frames': meta_window[-1]['total_closed_frames'],
                    'max_closed_duration': meta_window[-1]['max_closed_duration'],
                    'percent_closed_in_meta': percent_closed_in_meta,
                    'mostly_closed_windows': mostly_closed_windows,
                    'label': info['label']
                })

    return pd.DataFrame(data)
        

In [8]:
# Directorios del dataset
drowsy_dir = "dataset/drowsy"
notdrowsy_dir = "dataset/notdrowsy"

# Cargar detector y predictor

# Esta línea crea un detector de rostros frontales usando la biblioteca DLib. El método get_frontal_face_detector() utiliza un clasificador 
# preentrenado basado en Histogram of Oriented Gradients (HOG) para detectar rostros en una imagen. Devuelve un objeto que puede identificar 
# regiones en una imagen donde hay rostros.
detector = dlib.get_frontal_face_detector()

# Crea un predictor de puntos faciales (landmarks) usando DLib. El archivo "shape_predictor_68_face_landmarks.dat" es un modelo preentrenado 
# que identifica 68 puntos específicos en un rostro (como ojos, nariz, boca, contornos). Este predictor toma una región de rostro detectada 
# por el 'detector' y devuelve las coordenadas de esos 68 puntos.
predictor = dlib.shape_predictor("face_landmarks/shape_predictor_68_face_landmarks.dat")  # Descarga este archivo

# Procesar dataset
df = process_dataset(drowsy_dir, notdrowsy_dir, detector, predictor)
df.to_csv("features.csv", index=False)
print("Características guardadas en features.csv")

Video 001_glasses_sleepyCombination_drowsy: Inicio = 599, Fin = 2747, Etiqueta = 1, Imagenes = 2149


KeyboardInterrupt: 