In [None]:
import cv2
import mediapipe as mp
from deepface import DeepFace
from scipy.spatial.distance import cosine
import os
import numpy as np
import statistics
import pandas as pd
from datetime import datetime
from collections import deque

In [None]:
# === CONFIG ===
face_db_path = "face_db"
model_name = "VGG-Face"
threshold = 0.4
smoothing_decay = 0.1
activity_queue = deque(maxlen=10)
prev_left_hand, prev_right_hand = None, None
last_label, last_distance = None, 1.0

In [None]:
# === Load Face Embeddings ===
print("[INFO] Loading face embeddings from DB...")
database = {}
for person_folder in os.listdir(face_db_path):
    person_path = os.path.join(face_db_path, person_folder)
    if not os.path.isdir(person_path): continue

    for img_file in os.listdir(person_path):
        if not img_file.lower().endswith(('.jpg', '.jpeg', '.png')):
            continue
        try:
            img_path = os.path.join(person_path, img_file)
            rep = DeepFace.represent(img_path=img_path, model_name=model_name, enforce_detection=False)[0]["embedding"]
            database.setdefault(person_folder, []).append(rep)
            print(f"[LOADED] {person_folder}/{img_file}")
        except Exception as e:
            print(f"[SKIPPED] {img_file} - {e}")

if not database:
    print("[FATAL] No face data found.")
    exit()

In [None]:
# === Init MediaPipe ===
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

In [None]:
# === Activity Logging CSV ===
csv_file = "activity_log.csv"
if not os.path.exists(csv_file):
    pd.DataFrame(columns=["Time", "Person", "Activity"]).to_csv(csv_file, index=False)

In [None]:
# === Start Video ===
cap = cv2.VideoCapture(0)

with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        rgb.flags.writeable = False
        results = pose.process(rgb)
        rgb.flags.writeable = True
        frame = cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR)

        detected_name = "Detecting..."
        activity = "Unknown"

        try:
            face_objs = DeepFace.extract_faces(frame, enforce_detection=False)
            for face in face_objs:
                x, y, w, h = face['facial_area'].values()
                cropped_face = face["face"]
                emb = DeepFace.represent(cropped_face, model_name=model_name, enforce_detection=False)[0]["embedding"]
                best_match, best_dist = "Unknown", 1.0

                for name, embeddings in database.items():
                    for ref_emb in embeddings:
                        dist = cosine(emb, ref_emb)
                        if dist < threshold and dist < best_dist:
                            best_match, best_dist = name, dist

                if best_dist < last_distance:
                    last_label = best_match
                    last_distance = best_dist
                else:
                    last_distance += smoothing_decay
                    if last_distance > threshold:
                        last_label = "Detecting..."

                detected_name = last_label
                color = (0, 255, 0) if last_label != "Unknown" else (0, 0, 255)
                cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2)
                cv2.putText(frame, last_label, (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2)
                break  # only one face

        except Exception as e:
            print(f"[Face Detection Error]: {e}")

        # === Pose Estimation + Activity Detection ===
        if results.pose_landmarks:
            landmarks = results.pose_landmarks.landmark
            lh, lk, ls = landmarks[mp_pose.PoseLandmark.LEFT_HIP], landmarks[mp_pose.PoseLandmark.LEFT_KNEE], landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER]
            l_hand, r_hand = landmarks[mp_pose.PoseLandmark.LEFT_WRIST], landmarks[mp_pose.PoseLandmark.RIGHT_WRIST]

            hip_knee_diff = abs(lh.y - lk.y)
            shoulder_hip_diff = abs(ls.y - lh.y)
            hand_diff = abs(l_hand.x - r_hand.x)
            pose_ratio = hip_knee_diff / (shoulder_hip_diff + 1e-5)

            if pose_ratio < 0.6 and hand_diff < 0.1:
                posture, idle = "Sitting", True
            elif pose_ratio < 0.6:
                posture, idle = "Sitting", False
            elif hand_diff < 0.1 and shoulder_hip_diff < 0.65:
                posture, idle = "Standing", True
            else:
                posture, idle = "Standing", False

            working = False
            if prev_left_hand and prev_right_hand:
                l_move = abs(l_hand.x - prev_left_hand[0]) + abs(l_hand.y - prev_left_hand[1])
                r_move = abs(r_hand.x - prev_right_hand[0]) + abs(r_hand.y - prev_right_hand[1])
                if l_move > 0.02 or r_move > 0.02:
                    working = True

            prev_left_hand = (l_hand.x, l_hand.y)
            prev_right_hand = (r_hand.x, r_hand.y)

            if idle and not working:
                activity = f"{posture} + Idle"
            elif working:
                activity = f"{posture} + Working"
            else:
                activity = posture

            activity_queue.append(activity)
            stable_activity = statistics.mode(activity_queue)
        else:
            stable_activity = "No Person Detected"

        # === Display Info ===
        mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
        cv2.putText(frame, f"{detected_name}: {stable_activity}", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,0), 2)

        # === Logging ===
        if detected_name not in ["Unknown", "Detecting..."] and stable_activity != "No Person Detected":
            timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            log_row = pd.DataFrame([[timestamp, detected_name, stable_activity]], columns=["Time", "Person", "Activity"])
            log_row.to_csv(csv_file, mode='a', header=False, index=False)

        cv2.imshow("Worker Monitoring", frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break

cap.release()
cv2.destroyAllWindows()