In [2]:
from ultralytics import YOLO
import cv2
import mediapipe as mp
import numpy as np
from stopwatch import Stopwatch

In [3]:
segmentationModel = YOLO("YOLO_models/yolov8n-seg.pt")
# n s m l x

In [4]:
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

In [5]:
# segmentation color map
fixed_color_map = [
    [255, 0, 0],  # Red
    [0, 255, 0],  # Green
    [0, 0, 255],  # Blue
    [255, 255, 0],  # Yellow
    [255, 0, 255],  # Magenta
    [0, 255, 255],  # Cyan
    [128, 0, 0],  # Maroon
    [128, 128, 0],  # Olive
    [0, 128, 0],  # Dark Green
    [128, 0, 128],  # Purple
    [0, 128, 128],  # Teal
    [0, 0, 128],  # Navy
    [192, 192, 192],  # Silver
    [128, 128, 128],  # Gray
    [255, 165, 0],  # Orange
    [255, 192, 203],  # Pink
    [75, 0, 130],  # Indigo
    [245, 222, 179],  # Wheat
    [255, 228, 196],  # Bisque
    [34, 139, 34],  # Forest Green
    [255, 215, 0],  # Gold
    [173, 216, 230],  # Light Blue
    [0, 255, 127],  # Spring Green
    [70, 130, 180],  # Steel Blue
    [255, 69, 0],  # Red-Orange
    [124, 252, 0],  # Lawn Green
    [0, 206, 209],  # Dark Turquoise
    [147, 112, 219],  # Medium Purple
    [199, 21, 133],  # Medium Violet-Red
    [255, 99, 71],  # Tomato
]

In [6]:
# Close camera and output window for termination
cap = object()


def closeAll():
    cap.release()
    cv2.destroyAllWindows()


# Calculate angle for 3 landmarks.
def calculateAngle(a, b, c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)

    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(
        a[1] - b[1], a[0] - b[0]
    )
    angle = np.abs(radians * 180.0 / np.pi)

    if angle > 180.0:
        angle = 360.0 - angle

    return angle

In [7]:
def segmentFrame(frame):

    # Perform segmentation
    results = segmentationModel.predict(frame, task="segment")

    # Extract the segmentation masks
    masks = results[0].masks.data

    # Create a color map for the masks
    # color_map = np.random.randint(0, 255, (len(masks), 3), dtype=np.uint8)

    if len(fixed_color_map) < len(masks):
        raise ValueError(
            "The fixed color map does not have enough colors for all masks."
        )

    # color
    for i, mask in enumerate(masks):
        color = fixed_color_map[i]
        binary_mask = mask.cpu().numpy() > 0.5  # Convert to binary mask
        binary_mask = binary_mask.astype(np.uint8) * 255  # Convert to uint8

        # Find contours and draw them on the frame
        contours, _ = cv2.findContours(
            binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
        )
        cv2.drawContours(frame, contours, -1, color, thickness=cv2.FILLED)

In [8]:
# pose detection by mediapipe
def poseDetection(frame, pose):
    # recolor image to rgb from cv2 default (bgr)
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame.flags.writeable = False

    # make detection and store in result
    # writeable flag is unset before processing to improve performance and avoid unintended write ops.
    # frame is read and processed and the data is written to results.
    results = pose.process(frame)

    # convert back to cv2 default bgr
    frame.flags.writeable = True
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

    return results


# marking landmarks, pose and joining lines
def poseMarking(frame, results):
    landmarks = results.pose_landmarks.landmark
    shoulderStats = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
    hipStats = landmarks[mp_pose.PoseLandmark.LEFT_HIP.value]
    ankleStats = landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value]

    shoulder = [
        shoulderStats.x,
        shoulderStats.y,
    ]
    hip = [
        hipStats.x,
        hipStats.y,
    ]
    ankle = [
        ankleStats.x,
        ankleStats.y,
    ]

    angle = int(calculateAngle(shoulder, hip, ankle))

    # dictionary for label : coordinates
    dct = {
        "shoulder": tuple(np.multiply(shoulder, [640, 480]).astype(int)),
        "hip": tuple(
            np.multiply(hip, [640, 480]).astype(int)
            + np.array([0, -10])  # -10 to avoid overlap with angle
        ),
        "ankle": tuple(np.multiply(ankle, [640, 480]).astype(int)),
        str(angle): tuple(
            np.multiply(hip, [640, 480]).astype(int)
            + np.array([0, 10])  # +10 to avoid overlap with hip
        ),
    }

    # add labels and angle measure
    for part in dct:
        cv2.putText(
            frame,
            part,
            tuple(dct[part]),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.5,
            (255, 255, 255),
            2,
            cv2.LINE_AA,
        )

    # draw landmarks and connecting lines
    mp_drawing.draw_landmarks(
        frame,  # output
        results.pose_landmarks,  # passing landmarks
        mp_pose.POSE_CONNECTIONS,  # passing landmark connections
        mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=2),
        mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2),
    )

    return angle


#

In [10]:
running = True
landmarksAvailable = (
    True  # to flag landmarks not available for marking in case person not in frame
)

# CAMERA MEDIA PIPING
lying = Stopwatch(2)
sitting = Stopwatch(2)
state = None  # can be "standing", "sitting", "lying"


with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while running:

        # capture single frame
        # ~50ms per run. Can be removed if camera has FPS customization. 10 FPS
        cap = cv2.VideoCapture(0)
        ret, frame = cap.read()
        cap.release()

        if not ret:
            print("Failed to grab frame")
            break

        # 1 Find Body Landmarks
        try:
            results = poseDetection(frame, pose)
        except Exception as e:
            # closeAll()
            print("Error with pose detection", e)
            landmarksAvailable = False
            pass  # to prevent crash when no person pose is detected.

        # 2 Segment and Color Objects
        try:
            segmentFrame(frame)
        except Exception as e:
            # closeAll()
            print("Error in segmentation :", e)

        # 3 Apply Landmarks and Angles
        if landmarksAvailable:
            try:
                # angle is returned after marking
                angle = poseMarking(frame, results)
            except:
                print("Error in landmarks output")
                continue

        # change to lying
        if state != "lying" and angle > 150:
            lying.start()
            sitting.stop()
            state = "lying"

        # change to sitting
        if state == "lying" and angle < 150:
            sitting.start()
            lying.stop()
            state = "sitting"

        cv2.rectangle(frame, (0, 0), (225, 73), (245, 117, 16), -1)

        displayDict = {
            "ANGLE": (15, 12),
            str(angle): (15, 40),
            "STATE": (85, 12),
            state: (85, 40),
            "Lying": (135, 12),
            str(int(lying.duration)): (135, 40),
            "Sitting": (185, 12),
            str(int(sitting.duration)): (185, 40),
        }

        for text in displayDict:
            cv2.putText(
                frame,
                text,
                displayDict[text],
                cv2.FONT_HERSHEY_SIMPLEX,
                0.5,
                (0, 0, 0),
                1,
                cv2.LINE_AA,
            )

        # Display the frame
        cv2.imshow("Video", frame)

        # wait for "q" click to quit
        if cv2.waitKey(1) & 0xFF == ord("q"):
            running = False
            break

cv2.destroyAllWindows()
cap.release()

I0000 00:00:1720689276.711633  162094 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1720689276.734878  162705 gl_context.cc:357] GL version: 3.2 (OpenGL ES 3.2 Mesa 23.2.1-1ubuntu3.1~22.04.2), renderer: Mesa Intel(R) HD Graphics 4400 (HSW GT2)
W0000 00:00:1720689276.945417  162701 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1720689276.995452  162701 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.



0: 480x640 1 person, 1 remote, 301.4ms
Speed: 27.4ms preprocess, 301.4ms inference, 18.9ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 241.8ms
Speed: 3.4ms preprocess, 241.8ms inference, 3.8ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 239.0ms
Speed: 3.8ms preprocess, 239.0ms inference, 4.8ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 275.1ms
Speed: 4.2ms preprocess, 275.1ms inference, 3.7ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 1 cat, 237.7ms
Speed: 3.0ms preprocess, 237.7ms inference, 7.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 233.0ms
Speed: 2.2ms preprocess, 233.0ms inference, 0.8ms postprocess per image at shape (1, 3, 480, 640)
Error in segmentation : 'NoneType' object has no attribute 'data'
Error in landmarks output

0: 480x640 1 bed, 242.1ms
Speed: 3.1ms preprocess, 242.1ms inference, 9.0ms postprocess per image at shape (1, 3,