In [None]:
import os
import cv2
import numpy as np
from PIL import Image
import torch
from detectron2.config import get_cfg
from detectron2.engine import DefaultPredictor
from detectron2.data import MetadataCatalog
from detectron2 import model_zoo
from ultralytics import YOLO

def get_keypoints(result):
    keypoints = result.keypoints.data.cpu().numpy()
    return keypoints

def calculate_angle(a, b, c):
    """Calculate the angle at point b given points a, b, and c."""
    ab = np.array(a) - np.array(b)
    cb = np.array(c) - np.array(b)
    dot_product = np.dot(ab, cb)
    ab_magnitude = np.linalg.norm(ab)
    cb_magnitude = np.linalg.norm(cb)
    cosine_angle = dot_product / (ab_magnitude * cb_magnitude)
    angle = np.arccos(cosine_angle)
    return np.degrees(angle)

def classify_pose(keypoints):
    try:
        left_shoulder = keypoints[5][:2]
        right_shoulder = keypoints[6][:2]
        left_hip = keypoints[11][:2]
        right_hip = keypoints[12][:2]
        left_knee = keypoints[13][:2]
        right_knee = keypoints[14][:2]
        left_ankle = keypoints[15][:2]
        right_ankle = keypoints[16][:2]
    except IndexError as e:
        print(f"IndexError: {e}")
        return "Unknown"

    left_hip_angle = calculate_angle(left_shoulder, left_hip, left_knee) if not np.all(left_ankle) else 180
    right_hip_angle = calculate_angle(right_shoulder, right_hip, right_knee) if not np.all(right_ankle) else 180

    left_knee_angle = calculate_angle(left_hip, left_knee, left_ankle) if np.all(left_ankle) else 0
    right_knee_angle = calculate_angle(right_hip, right_knee, right_ankle) if np.all(right_ankle) else 0

    if ((left_hip_angle > 150 or right_hip_angle > 150) and (not np.all(left_ankle) or not np.all(right_ankle))) or \
       (left_knee_angle > 150 or right_knee_angle > 150):
        return "Standing"
    else:
        return "Sitting"

def visualize_keypoints(image, keypoints, color=(0, 255, 0)):
    colors = {
        'head': (0, 255, 0),
        'body': (0, 0, 255),
        'left_arm': (255, 0, 0),
        'right_arm': (255, 255, 0),
        'left_leg': (0, 255, 255),
        'right_leg': (255, 0, 255)
    }

    skeleton = {
        'head': [(0, 1), (0, 2), (1, 3), (2, 4)],
        'body': [(5, 6)],
        'left_arm': [(5, 7), (7, 9)],
        'right_arm': [(6, 8), (8, 10)],
        'left_leg': [(11, 13), (13, 15)],
        'right_leg': [(12, 14), (14, 16)]
    }
    for part, joints in skeleton.items():
        for joint in joints:
            pt1, pt2 = joint
            if keypoints[pt1][2] > 0.5 and keypoints[pt2][2] > 0.5:
                cv2.line(image, (int(keypoints[pt1][0]), int(keypoints[pt1][1])), (int(keypoints[pt2][0]), int(keypoints[pt2][1])), colors[part], 2)

    for i, (x, y, conf) in enumerate(keypoints):
        if conf > 0.5:
            cv2.circle(image, (int(x), int(y)), 5, colors['left_leg'] if i in [11, 13, 15] else colors['right_leg'], -1)
    return image

def match_keypoints(prev_keypoints, current_keypoints):
    if prev_keypoints is None:
        return list(range(len(current_keypoints)))
    
    prev_centroids = [np.mean(person[:2], axis=0) for person in prev_keypoints]
    current_centroids = [np.mean(person[:2], axis=0) for person in current_keypoints]

    distances = np.zeros((len(prev_centroids), len(current_centroids)))
    for i, prev in enumerate(prev_centroids):
        for j, curr in enumerate(current_centroids):
            distances[i, j] = np.linalg.norm(prev - curr)

    matches = []
    for i in range(len(prev_centroids)):
        min_index = np.argmin(distances[i])
        matches.append(min_index)
        distances[:, min_index] = np.inf
    
    return matches

def get_custom_model(model_path):
    cfg = get_cfg()
    cfg.merge_from_file(model_zoo.get_config_file("COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml"))
    cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1
    cfg.MODEL.WEIGHTS = model_path
    cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.7
    MetadataCatalog.get("chair_val").thing_classes = ["chair"]
    return DefaultPredictor(cfg)

def iou(box1, box2):
    x1, y1, x2, y2 = box1
    x3, y3, x4, y4 = box2
    xi1, yi1 = max(x1, x3), max(y1, y3)
    xi2, yi2 = min(x2, x4), min(y2, y4)
    inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
    box1_area = (x2 - x1) * (y2 - y1)
    box2_area = (x4 - x3) * (y4 - y3)
    union_area = box1_area + box2_area - inter_area
    iou = inter_area / union_area
    return iou

pose_model = YOLO('pose.pt')
model_path = "model_final.pth"
chair_model = get_custom_model(model_path)

video_path = "D:/ARTIFICIAL INTELLIGENCE/SEMESTER 1/Computer Vision and Deep Learning/CV_PROJECT_LAZY_TRAIN/human-pose-estimation-opencv/Chair_occupancy/Smart_Seats/Videos/One_person.mp4"
cap = cv2.VideoCapture(video_path)

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)

out = cv2.VideoWriter('D:/ARTIFICIAL INTELLIGENCE/SEMESTER 1/Computer Vision and Deep Learning/CV_PROJECT_LAZY_TRAIN/human-pose-estimation-opencv/Chair_occupancy/Smart_Seats/Output/output15_combined.avi', cv2.VideoWriter_fourcc('M','J','P','G'), fps, (frame_width, frame_height))

confidence_threshold = 0.5

prev_keypoints = None

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    pose_results = pose_model(frame, conf=confidence_threshold)
    chair_outputs = chair_model(frame)
    chair_instances = chair_outputs["instances"].to("cpu")
    chair_boxes = chair_instances.pred_boxes.tensor.numpy()
    chair_scores = chair_instances.scores.numpy()
    chair_classes = chair_instances.pred_classes.numpy()

    num_persons = len(pose_results[0].keypoints)
    print(f"Processing frame: {num_persons} persons detected")

    current_keypoints = [get_keypoints(pose_results[0])[i] for i in range(num_persons)]
    
    matches = match_keypoints(prev_keypoints, current_keypoints)
    
    for i, match in enumerate(matches):
        keypoints = current_keypoints[match]
        
        if keypoints.size == 0:
            continue

        frame = visualize_keypoints(frame, keypoints)
        
        pose = classify_pose(keypoints)
        
        conf = pose_results[0].keypoints.conf[match].mean().item() if hasattr(pose_results[0].keypoints, 'conf') and len(pose_results[0].keypoints.conf) > match else None
        
        if conf is not None:
            print(f"Person {i+1}: Pose: {pose}, Confidence: {conf:.2f}")
            cv2.putText(frame, f"Person {i+1}: Pose: {pose}, Confidence: {conf:.2f}", 
                        (int(keypoints[0][0]), int(keypoints[0][1])-30), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 2, cv2.LINE_AA)
        else:
            print(f"Person {i+1}: Pose: {pose}")
            cv2.putText(frame, f"Person {i+1}: Pose: {pose}", 
                        (int(keypoints[0][0]), int(keypoints[0][1])-30), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 2, cv2.LINE_AA)

    occupied_chairs = []
    for box, score, class_id in zip(chair_boxes, chair_scores, chair_classes):
        if score > 0.7:
            cords = [round(x) for x in box]
            if class_id == 0:
                chair_center = ((cords[0] + cords[2]) // 2, (cords[1] + cords[3]) // 2)
                occupied = False
                best_overlap = 0
                best_keypoints = None
                for i in range(num_persons):
                    keypoints = current_keypoints[i]
                    if keypoints.size == 0:
                        continue
                    overlap = 0
                    for kp in keypoints:
                        if cords[0] <= kp[0] <= cords[2] and cords[1] <= kp[1] <= cords[3]:
                            overlap += 1
                    if overlap > best_overlap:
                        best_overlap = overlap
                        best_keypoints = keypoints
                
                if best_keypoints is not None and classify_pose(best_keypoints) == "Sitting":
                    occupied = True

                occupied_chairs.append((cords, occupied, best_keypoints))

    final_chairs = []
    for i, (box1, occupied1, keypoints1) in enumerate(occupied_chairs):
        keep = True
        for j, (box2, occupied2, keypoints2) in enumerate(occupied_chairs):
            if i != j and iou(box1, box2) > 0.7:
                keep = False
                if not occupied1 and occupied2:
                    keep = True
                    occupied_chairs[j] = (box2, False, None)
        if keep:
            final_chairs.append((box1, occupied1))

    for cords, occupied in final_chairs:
        color = (0, 0, 255) if occupied else (0, 255, 0)
        status = "Occupied" if occupied else "Not Occupied"
        cv2.rectangle(frame, (cords[0], cords[1]), (cords[2], cords[3]), color, 2)
        cv2.putText(frame, f"Chair: {status}",
                    (cords[0], cords[1]-10), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2, cv2.LINE_AA)
    
    prev_keypoints = current_keypoints

    # Write the summary on the top right corner
    text = f"Chairs: {len(chair_boxes)}, Occupied: {sum(1 for _, occupied in final_chairs if occupied)}, Persons: {num_persons}"
    text_size, _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2)
    text_x = frame.shape[1] - text_size[0] - 10
    text_y = text_size[1] + 10
    cv2.putText(frame, text, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

    out.write(frame)

    cv2.imshow('Pose Detection', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
out.release()
cv2.destroyAllWindows()