In [1]:
# pip install torchreid

In [2]:
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="h5py")

In [3]:
import cv2
import time
import csv
import os
import numpy as np
import torch
from collections import defaultdict
from ultralytics import YOLO
import mediapipe as mp
from insightface.app import FaceAnalysis
import torchreid  # New lightweight ReID



In [4]:
# Initialize face recognition
face_app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider'])
face_app.prepare(ctx_id=0, det_size=(640, 640))

Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\HP/.insightface\models\buffalo_l\1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\HP/.insightface\models\buffalo_l\2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\HP/.insightface\models\buffalo_l\det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\HP/.insightface\models\buffalo_l\genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\HP/.insightface\models\buffalo_l\w600k_r50.onnx recognition ['None', 3, 112, 112] 127.5 127.5
set det

In [5]:
# Initialize lightweight ReID (Torchreid)
reid_model = torchreid.models.build_model(
    name='osnet_x0_25',
    num_classes=1000,
    pretrained=True
)
reid_model.eval()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
reid_model.to(device)


Successfully loaded imagenet pretrained weights from "C:\Users\HP/.cache\torch\checkpoints\osnet_x0_25_imagenet.pth"


OSNet(
  (conv1): ConvLayer(
    (conv): Conv2d(3, 16, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
  )
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (conv2): Sequential(
    (0): OSBlock(
      (conv1): Conv1x1(
        (conv): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
      )
      (conv2a): LightConv3x3(
        (conv1): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (conv2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False)
        (bn): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
      )
      (conv2b): Sequential(
        (

In [6]:
# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils

In [7]:
# Employee and ReID databases
employee_db = {}  # {name: face features}
locked_reid_db = {}  # {name: reid features}
gesture_log = defaultdict(lambda: {"Perfect": 0, "Lazy": 0})

In [8]:
# Camera URLs
CAMERAS = {
    'Cam1': 'rtsp://admin:Admin%25123@192.168.29.200:554/Streaming/Channels/101',
    'Cam2': 'rtsp://admin:Admin%25123@192.168.29.200:554/Streaming/Channels/201',
    'Cam3': 'rtsp://admin:Admin%25123@192.168.29.200:554/Streaming/Channels/301',
    'Cam4': 'rtsp://admin:Admin%25123@192.168.29.200:554/Streaming/Channels/401'
}

In [9]:
# Load Employee Faces
def load_employee_faces(image_folder='Images'):
    for file in os.listdir(image_folder):
        if file.endswith('.jpg') or file.endswith('.png'):
            name = os.path.splitext(file)[0]
            img = cv2.imread(os.path.join(image_folder, file))
            faces = face_app.get(img)
            if faces:
                employee_db[name] = faces[0].normed_embedding
                print(f"✅ Loaded {name}")
            else:
                print(f"❌ No face detected in {file}")

In [10]:
# Match Face
def match_face(face_feat, threshold=0.8):
    for name, db_feat in employee_db.items():
        dist = np.linalg.norm(db_feat - face_feat)
        if dist < threshold:
            return name
    return "Unknown"

In [11]:
# Extract ReID feature (Torchreid)
def extract_reid_feature(frame):
    img = cv2.resize(frame, (128, 256))
    img = img[:, :, ::-1]  # BGR to RGB
    img = torch.from_numpy(img).permute(2, 0, 1).unsqueeze(0).float()
    img = img / 255.0  # Normalize
    img = img.to(device)

    with torch.no_grad():
        feat = reid_model(img)
    return feat.squeeze().cpu().numpy()

In [12]:
# Register ReID
def register_reid(name, frame):
    body_feat = extract_reid_feature(frame)
    locked_reid_db[name] = body_feat

In [13]:
# Match ReID
def match_reid(frame, threshold=1.2):
    body_feat = extract_reid_feature(frame)
    for name, db_feat in locked_reid_db.items():
        dist = np.linalg.norm(db_feat - body_feat)
        if dist < threshold:
            return name
    return "Unknown"

In [14]:
# Load YOLOv8 model
model = YOLO("yolov8n.pt")

In [15]:
# Open camera streams
caps = {name: cv2.VideoCapture(url) for name, url in CAMERAS.items()}

In [16]:
# Tracking gesture times
gesture_times = defaultdict(lambda: {'Perfect': 0, 'Lazy': 0, 'Yawning': 0})
person_last_seen = {}

In [17]:
# Gesture Classification
def classify_gesture(landmarks):
    left_shoulder = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
    right_shoulder = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
    nose = landmarks[mp_pose.PoseLandmark.NOSE.value]

    shoulder_slope = abs(left_shoulder.y - right_shoulder.y)
    mouth_open = abs(nose.y - (left_shoulder.y + right_shoulder.y) / 2) < 0.05

    if shoulder_slope > 0.1:
        return 'Lazy'
    elif mouth_open:
        return 'Yawning'
    else:
        return 'Perfect'

In [18]:
# Load employee faces
load_employee_faces()

✅ Loaded Faez


In [None]:
# Start Processing
start_time = time.time()
frame_count = 0

try:
    while True:
        for cam_name, cap in caps.items():
            ret, frame = cap.read()
            if not ret:
                print(f"[ERROR] Cannot read frame from {cam_name}")
                continue

            results = model(frame)[0]

            person_boxes = [box for box in results.boxes if int(box.cls) == 0]

            for box in person_boxes:
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                person_crop = frame[y1:y2, x1:x2]

                rgb_crop = cv2.cvtColor(person_crop, cv2.COLOR_BGR2RGB)
                result = pose.process(rgb_crop)

                # Face recognition
                faces = face_app.get(person_crop)
                name = "Unknown"
                if faces:
                    name = match_face(faces[0].normed_embedding)

                if result.pose_landmarks:
                    landmarks = result.pose_landmarks.landmark
                    gesture = classify_gesture(landmarks)

                    duration = 1
                    person_id = f"{cam_name}_{x1}_{y1}"
                    gesture_times[person_id][gesture] += duration

                    color = (0, 255, 0) if gesture == 'Perfect' else (0, 0, 255) if gesture == 'Lazy' else (255, 0, 0)
                    cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

                    # Draw name above gesture
                    cv2.putText(frame, name, (x1, y1 - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
                    cv2.putText(frame, gesture, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

            cv2.imshow(cam_name, frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

        frame_count += 1

except KeyboardInterrupt:
    print("[INFO] Stopped by user.")

finally:
    end_time = time.time()
    print("[INFO] Saving CSV results...")

    with open("gesture_tracking_summary.csv", "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["Person_ID", "Perfect_Time", "Lazy_Time", "Yawning_Time"])
        for person_id, times in gesture_times.items():
            writer.writerow([person_id, times['Perfect'], times['Lazy'], times['Yawning']])

    for cap in caps.values():
        cap.release()
    cv2.destroyAllWindows()

    print("[INFO] CSV file saved as gesture_tracking_summary.csv")


0: 384x640 (no detections), 135.5ms
Speed: 16.2ms preprocess, 135.5ms inference, 8.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 1 chair, 1 couch, 54.3ms
Speed: 2.6ms preprocess, 54.3ms inference, 9.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 8 persons, 5 chairs, 6 laptops, 55.0ms
Speed: 6.1ms preprocess, 55.0ms inference, 3.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 73.4ms
Speed: 3.2ms preprocess, 73.4ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 persons, 2 chairs, 3 tvs, 3 laptops, 49.8ms
Speed: 2.5ms preprocess, 49.8ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 1 chair, 1 couch, 58.0ms
Speed: 1.9ms preprocess, 58.0ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 8 persons, 5 chairs, 6 laptops, 50.5ms
Speed: 1.7ms preprocess, 50.5ms inference, 0.9ms postprocess per image at shape (1, 3,

In [None]:
conda list opencv