# process_data - batch feature extraction
This notebook processes all images under `Datasets_train/`, computes features using the same formulas as `real_time_demo.py`, saves up to 10 annotated previews per class into `Outputs/previews/`, and writes `Outputs/training-feature.csv` with columns:
[EAR, MAR, PITCH, YAW, ROLL, D_SLUMP, R_TILT, EYE_CL, FACIAL_DISPLAYED, POSE_DISPLAYED, LABEL, Image_Path]

In [None]:
import pandas as pd

In [None]:
# Imports and constants
import os, math, cv2, time
import numpy as np
import pandas as pd
from scipy.spatial import distance as dist

try:
    import mediapipe as mp
    TASKS_AVAILABLE = True
except Exception:
    import mediapipe as mp
    TASKS_AVAILABLE = False

# Landmark indices
RIGHT_EYE_INDICES = [33, 160, 158, 133, 153, 144]
LEFT_EYE_INDICES = [362, 385, 387, 263, 373, 380]
MOUTH_INDICES = [61, 291, 0, 17]

# UPDATED: Set Dummy Value to 0.0 to match Scaler expectations
DUMMY_VALUE = 0.0 

# Paths
ROOT = os.path.abspath('.')
# UPDATED: Point to the new dataset folder
DATA_ROOT = os.path.join(ROOT, 'Datasets_ReRecorded') 
OUTPUT_DIR = os.path.join(ROOT, 'Outputs')
PREVIEW_DIR = os.path.join(OUTPUT_DIR, 'previews')
os.makedirs(PREVIEW_DIR, exist_ok=True)

In [None]:
# Helper functions: EAR, MAR, head-pose, slump (same formulas as real_time_demo.py)
def eye_aspect_ratio(eye_coords):
    p1, p2, p3, p4, p5, p6 = eye_coords
    vertical_1 = dist.euclidean(p2, p6)
    vertical_2 = dist.euclidean(p3, p5)
    horizontal = dist.euclidean(p1, p4)
    if horizontal == 0: return 0.001
    return (vertical_1 + vertical_2) / (2.0 * horizontal)

def mouth_aspect_ratio(mouth_coords):
    p1_h, p4_h, p2_v, p6_v = mouth_coords
    vertical = dist.euclidean(p2_v, p6_v)
    horizontal = dist.euclidean(p1_h, p4_h)
    if horizontal == 0: return 0.001
    return vertical / horizontal

def calculate_geometric_head_pose(landmarks, w, h, overlay=None):
    try:
        nose = landmarks[1]
        left_eye_outer = landmarks[33]
        right_eye_outer = landmarks[263]
        mouth_center = landmarks[13]
        nx, ny = nose.x * w, nose.y * h
        lx, ly = left_eye_outer.x * w, left_eye_outer.y * h
        rx, ry = right_eye_outer.x * w, right_eye_outer.y * h
        mx, my = mouth_center.x * w, mouth_center.y * h
        if overlay is not None:
            try: cv2.line(overlay, (int(lx), int(ly)), (int(rx), int(ry)), (255,0,255), 2)
            except Exception: pass
        dY = ry - ly
        dX = rx - lx if (rx - lx) != 0 else 1e-6
        roll = math.degrees(math.atan2(dY, dX))
        dist_l = math.hypot(nx - lx, ny - ly)
        dist_r = math.hypot(nx - rx, ny - ry)
        yaw = ((dist_l - dist_r) / (dist_l + dist_r + 1e-6)) * 150
        ex, ey = (lx + rx) / 2, (ly + ry) / 2
        dist_nose_eyes = math.hypot(nx - ex, ny - ey)
        dist_nose_mouth = math.hypot(nx - mx, ny - my) + 1e-6
        pitch = (dist_nose_eyes / dist_nose_mouth - 1.0) * 100
        return pitch, yaw, roll
    except Exception:
        return DUMMY_VALUE, DUMMY_VALUE, DUMMY_VALUE

def calculate_slump_geometry(pose_landmarks, face_landmarks, w, h, overlay=None):
    if not pose_landmarks: return DUMMY_VALUE, DUMMY_VALUE
    try:
        p_nose = pose_landmarks[0]
        p_left_sh = pose_landmarks[11]
        p_right_sh = pose_landmarks[12]
        x_n, y_n = int(p_nose.x * w), int(p_nose.y * h)
        x_l, y_l = int(p_left_sh.x * w), int(p_left_sh.y * h)
        x_r, y_r = int(p_right_sh.x * w), int(p_right_sh.y * h)
        mx, my = int((x_l + x_r) / 2), int((y_l + y_r) / 2)
        if overlay is not None:
            try:
                cv2.line(overlay, (x_l, y_l), (x_r, y_r), (255,0,0), 3)
                cv2.line(overlay, (x_n, y_n), (mx, my), (0,255,255), 3)
            except Exception: pass
        dY = y_l - y_r
        dX = x_l - x_r if (x_l - x_r) != 0 else 1e-6
        r_tilt = math.degrees(math.atan2(dY, dX))
        if face_landmarks:
            chin_y = face_landmarks[152].y * h
            head_top_y = face_landmarks[10].y * h
            face_h = abs(chin_y - head_top_y)
            if face_h < 1: face_h = 1
            d_slump = (my - y_n) / face_h
        else:
            d_slump = DUMMY_VALUE
        return d_slump, r_tilt
    except Exception:
        return DUMMY_VALUE, DUMMY_VALUE

In [None]:
# Process single image and return features in required order; also save annotated preview when requested
from IPython.display import display, Image as IPyImage

def process_and_annotate(image_path, save_preview=True, preview_dir=PREVIEW_DIR):
    img = cv2.imread(image_path)
    if img is None:
        raise FileNotFoundError(f'Cannot read image: {image_path}')
    h, w = img.shape[:2]
    overlay = img.copy()

    mp_face_mesh = mp.solutions.face_mesh
    mp_pose = mp.solutions.pose
    mp_drawing = mp.solutions.drawing_utils
    mp_styles = mp.solutions.drawing_styles

    with mp_face_mesh.FaceMesh(refine_landmarks=True, max_num_faces=1) as fm, \
         mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pm:
        
        rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        fres = fm.process(rgb)
        pres = pm.process(rgb)
        
        # --- LOGIC CHANGE 1: STRICT FILTERING ---
        # If no face is detected, we return NONE. We do not want this image.
        if not fres.multi_face_landmarks:
            return None, None

        face_detected = True
        face_lms = fres.multi_face_landmarks[0]
        
        pose_detected = bool(getattr(pres, 'pose_landmarks', None))
        pose_lms = pres.pose_landmarks if pose_detected else None

        # Draw face mesh (Visual only)
        try:
            mp_drawing.draw_landmarks(
            image=overlay, landmark_list=face_lms, 
            connections=mp_face_mesh.FACEMESH_TESSELATION, 
            landmark_drawing_spec=None, 
            connection_drawing_spec=mp_styles.get_default_face_mesh_tesselation_style())
        except Exception: pass

        # --- Calculate Face Features ---
        fl = face_lms.landmark
        pts = [(lm.x * w, lm.y * h) for lm in fl]
        
        try:
            left_eye = [pts[i] for i in LEFT_EYE_INDICES]
            right_eye = [pts[i] for i in RIGHT_EYE_INDICES]
            ear = (eye_aspect_ratio(left_eye) + eye_aspect_ratio(right_eye)) / 2.0
        except: ear = 0.0
            
        try:
            mouth_pts = [pts[i] for i in MOUTH_INDICES]
            mar = mouth_aspect_ratio(mouth_pts)
        except: mar = 0.0
            
        # try:
        #     # Eye Circularity (Optional - we keep it for now as requested)
        #     eye_v = (dist.euclidean(left_eye[1], left_eye[5]) + dist.euclidean(right_eye[1], right_eye[5])) / 2.0
        #     xs = [p[0] for p in pts]
        #     face_w = (max(xs) - min(xs)) if xs else 1.0
        #     eye_closure = eye_v / (face_w + 1e-6)
        # except: eye_closure = 0.0
            
        try: pitch, yaw, roll = calculate_geometric_head_pose(fl, w, h, overlay)
        except: pitch = yaw = roll = 0.0

        # --- LOGIC CHANGE 2: BODY IMPUTATION ---
        if pose_detected and pose_lms is not None:
            try:
                pl = pose_lms.landmark
                d_slump, r_tilt = calculate_slump_geometry(pl, fl, w, h, overlay)
            except: 
                d_slump, r_tilt = 0.8, 0.0 # Default if calculation fails
        else:
            # BODY MISSING? Use "Healthy" Defaults
            d_slump = 0.8  # Average upright sitting value
            r_tilt = 0.0   # Straight shoulders
            
        # Annotate text
        texts = [
            f'EAR:{ear:.2f} MAR:{mar:.2f}',
            f'P:{pitch:.1f} Y:{yaw:.1f} R:{roll:.1f}',
            f'SLUMP:{d_slump:.2f} TILT:{r_tilt:.1f}'
        ]
        for i, t in enumerate(texts):
            cv2.putText(overlay, t, (10, 30 + i*18), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1)

        if save_preview:
            outname = os.path.basename(image_path)
            outpath = os.path.join(preview_dir, outname)
            try: cv2.imwrite(outpath, overlay)
            except: pass

        # Return Features
        features = [
            round(ear, 4), round(mar, 4), 
            round(pitch, 4), round(yaw, 4), round(roll, 4), 
            round(d_slump, 4), round(r_tilt, 4), 
            int(face_detected), int(pose_detected)
        ]
        return features, overlay

In [None]:
# Batch processor: iterate dataset folders, process images, save up to 10 previews per class, export CSV
desired_map = {'Awake': 0, 'Sleep': 1, 'Yawning': 2}

rows = []
preview_counts = {v:0 for v in desired_map.values()}

# Walk dataset folders
for folder in sorted(os.listdir(DATA_ROOT)):
    folder_path = os.path.join(DATA_ROOT, folder)
    if not os.path.isdir(folder_path): continue

    label = desired_map.get(folder, None)
    if label is None:
        print('Skipping unknown folder (no mapping):', folder)
        continue

    imgs = [f for f in sorted(os.listdir(folder_path)) if f.lower().endswith(('.jpg','.jpeg','.png','.bmp'))]
    print(f'Processing {folder} (label={label}) - {len(imgs)} images')

    saved_for_class = 0

    for fname in imgs:
        path = os.path.join(folder_path, fname)
        try:
            save_preview = (preview_counts[label] < 10)
            
            # Call function
            result = process_and_annotate(path, save_preview=save_preview, preview_dir=PREVIEW_DIR)

            # UPDATED: Check if image was skipped (No Face)
            if result is None or result[0] is None:
                continue

            features, overlay = result
            
            if save_preview:
                preview_counts[label] += 1

            row = {
                'EAR': features[0], 'MAR': features[1], 
                'PITCH': features[2], 'YAW': features[3], 
                'ROLL': features[4], 'D_SLUMP': features[5], 
                'R_TILT': features[6], 'FACIAL_DISPLAYED': features[7], 
                'POSE_DISPLAYED': features[8], 'LABEL': label, 'Image_Path': path
            }
            rows.append(row)
        except Exception as e:
            print('Skipped', path, '->', e)

# Export CSV
if rows:
    df = pd.DataFrame(rows)
    
    # 1. Rounding (Sanity Check)
    for c in ['EAR','MAR','PITCH','YAW','ROLL','D_SLUMP','R_TILT']:
        if c in df.columns:
            df[c] = df[c].astype(float).round(4)

    # 2. Save the Raw Export (Good for backup)
    out_csv = os.path.join(OUTPUT_DIR, 'training-feature.csv')
    df.to_csv(out_csv, index=False)
    
    # 3. Save the FINAL File (For the Training Model)
    # We use this one in model_Khang.ipynb
    final_csv = os.path.join(OUTPUT_DIR, 'final.csv')
    df.to_csv(final_csv, index=False)
    
    print(f"SUCCESS: Processed {len(df)} images.")
    print(f"Saved to: {final_csv}")
    
    # 4. Final Class Count Check (Important!)
    print("\n--- Final Class Distribution ---")
    print(df['LABEL'].value_counts())
    print("--------------------------------")
else:
    print('No rows to export')

Processing Awake (label=0) - 422 images
Processing Passed_out (label=3) - 237 images
Processing Sleep (label=1) - 510 images
Processing Yawning (label=2) - 892 images
Wrote c:\VGU_projects\Drowsiness_detector\AI_Drowsiness_Detection\Outputs\training-feature.csv rows= 2061


In [17]:
df = pd.read_csv("./Outputs/training-feature.csv", header=0)

In [18]:
df[df['FACIAL_DISPLAYED'] == 0].count()

EAR                 0
MAR                 0
PITCH               0
YAW                 0
ROLL                0
D_SLUMP             0
R_TILT              0
EYE_CL              0
FACIAL_DISPLAYED    0
POSE_DISPLAYED      0
LABEL               0
Image_Path          0
dtype: int64

In [None]:
# Filter dataframe:
# 1) Drop any images with LABEL==2 (Yawning) when FACIAL_DISPLAYED == 0
# 2) Change LABEL from 1 (Sleep) to 3 (Passed_out) when FACIAL_DISPLAYED == 0
df_filtered = df.copy()
# Drop rows where label==2 and no face displayed
df_filtered = df_filtered[~((df_filtered['LABEL'] == 2) & (df_filtered['FACIAL_DISPLAYED'] == 0))]
# Relabel rows where label==1 and no face displayed -> 3
mask = (df_filtered['LABEL'] == 1) & (df_filtered['FACIAL_DISPLAYED'] == 0)
if mask.any():
    df_filtered.loc[mask, 'LABEL'] = 3
# Summary counts after filtering
print('Counts by LABEL after filtering:', df_filtered['LABEL'].value_counts())

Counts by LABEL after filtering: LABEL
0    296
Name: count, dtype: int64


In [10]:
df_filtered[(df_filtered["LABEL"] == 0) & (df_filtered["FACIAL_DISPLAYED"] == 1)].count()

EAR                 422
MAR                 422
PITCH               422
YAW                 422
ROLL                422
D_SLUMP             422
R_TILT              422
EYE_CL              422
FACIAL_DISPLAYED    422
POSE_DISPLAYED      422
LABEL               422
Image_Path          422
dtype: int64

In [None]:
# # Finalize numeric columns and export CSVs
# df_filtered.iloc[:, 0:8] = df_filtered.iloc[:, 0:8].round(4)
# # keep the original training-feature export (OUTPUT_CSV) and also write the final.csv as requested
# df_filtered.to_csv(OUTPUT_CSV, index=False)
# OUTPUT_FINAL = os.path.join(OUTPUT_DIR, 'final.csv')
# df_filtered.to_csv(OUTPUT_FINAL, index=False)
# print('Wrote', OUTPUT_CSV, 'and', OUTPUT_FINAL, 'rows:', len(df_filtered))
# display(df_filtered.head())

Wrote c:\VGU_projects\Drowsiness_detector\AI_Drowsiness_Detection\Outputs\training-feature.csv and c:\VGU_projects\Drowsiness_detector\AI_Drowsiness_Detection\Outputs\final.csv rows: 2055


Unnamed: 0,EAR,MAR,PITCH,YAW,ROLL,D_SLUMP,R_TILT,EYE_CL,FACIAL_DISPLAYED,POSE_DISPLAYED,LABEL,Image_Path
0,0.3331,0.5861,31.6382,-5.2346,-5.0504,1.064,0.507,0.0621,1,1,0,c:\VGU_projects\Drowsiness_detector\AI_Drowsin...
1,0.2696,0.2103,23.0431,-1.6258,-0.4376,0.9222,-1.8882,0.0536,1,1,0,c:\VGU_projects\Drowsiness_detector\AI_Drowsin...
2,0.4142,0.2831,145.9168,0.9384,-1.5578,0.7423,-1.2571,0.0878,1,1,0,c:\VGU_projects\Drowsiness_detector\AI_Drowsin...
3,0.4271,0.2635,32.7882,-25.6324,3.5415,1.0925,4.0142,0.0811,1,1,0,c:\VGU_projects\Drowsiness_detector\AI_Drowsin...
4,0.359,0.2906,57.2217,-1.0224,-0.2451,0.9877,0.7673,0.0683,1,1,0,c:\VGU_projects\Drowsiness_detector\AI_Drowsin...
