In [6]:
import cv2
import numpy as np
from ultralytics import YOLO
import mediapipe
import time
import miniaudio
from mutagen.mp3 import MP3
import os

def process_video(path, video_file, audio_file, model_path, classes_file, frame_check=7):
    os.chdir(path)

    my_pose = mediapipe.solutions.pose
    my_drawing = mediapipe.solutions.drawing_utils
    pose = my_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)

    # Model
    model = YOLO(model_path)
    
    # Data
    with open(classes_file, "r") as df:
        classes = df.read().split("\n")

        
    cap = cv2.VideoCapture(video_file)

    from tracker import Tracker
    tracker = Tracker()
    c = set()

    audio = MP3(audio_file)
    length = audio.info.length

    flags = 0
    alarm_triggered = False

    def calculate_angle(a, b, c):
        a , b , c = np.array(a) , np.array(b) , np.array(c)
        
        radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
        angle = np.abs(radians * 180 / np.pi)
        if angle > 180:
            angle = 360 - angle
        return angle

    
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        results = model(frame)
        imgRGB = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        result = pose.process(imgRGB)
        h, w, c = frame.shape

        if result.pose_landmarks:
            my_drawing.draw_landmarks(frame, result.pose_landmarks, my_pose.POSE_CONNECTIONS,
                                      my_drawing.DrawingSpec((0, 255, 0), 2, 2),
                                      my_drawing.DrawingSpec((255, 0, 0), 2, 2))

            landmarks = result.pose_landmarks.landmark

            L_shoulder = [landmarks[my_pose.PoseLandmark.LEFT_SHOULDER.value].x,
                          landmarks[my_pose.PoseLandmark.LEFT_SHOULDER.value].y]
            L_elbow = [landmarks[my_pose.PoseLandmark.LEFT_ELBOW.value].x,
                       landmarks[my_pose.PoseLandmark.LEFT_ELBOW.value].y]
            L_wrist = [landmarks[my_pose.PoseLandmark.LEFT_WRIST.value].x,
                       landmarks[my_pose.PoseLandmark.LEFT_WRIST.value].y]

            R_shoulder = [landmarks[my_pose.PoseLandmark.RIGHT_SHOULDER.value].x,
                          landmarks[my_pose.PoseLandmark.RIGHT_SHOULDER.value].y]
            R_elbow = [landmarks[my_pose.PoseLandmark.RIGHT_ELBOW.value].x,
                       landmarks[my_pose.PoseLandmark.RIGHT_ELBOW.value].y]
            R_wrist = [landmarks[my_pose.PoseLandmark.RIGHT_WRIST.value].x,
                       landmarks[my_pose.PoseLandmark.RIGHT_WRIST.value].y]

            l_angle = calculate_angle(L_shoulder, L_elbow, L_wrist)
            r_angle = calculate_angle(R_shoulder, R_elbow, R_wrist)
            

            cv2.putText(frame, str(int(l_angle)), tuple(np.multiply(L_elbow, [640, 480]).astype(int)),
                        cv2.FONT_HERSHEY_PLAIN, 2, (255, 255, 0), 2)
            cv2.putText(frame, str(int(r_angle)), tuple(np.multiply(R_elbow, [640, 480]).astype(int)),
                        cv2.FONT_HERSHEY_PLAIN, 2, (255, 255, 0), 2)
            

            if (L_wrist[1] * h < L_elbow[1] * h < L_shoulder[1] * h and l_angle > 150) or \
            (R_wrist[1] * h < R_elbow[1] * h < R_shoulder[1] * h and r_angle > 150):

                flags += 1
                if flags >= frame_check and not alarm_triggered:
                    cv2.putText(frame, "Warning!!! Someone Needs Help.", (20, 75), cv2.FONT_HERSHEY_PLAIN, 3, (0, 0, 255), 2)
                    stream = miniaudio.stream_file(audio_file)
                    with miniaudio.PlaybackDevice() as device:
                        device.start(stream)
                        time.sleep(length)
                    alarm_triggered = True

        lis = []
        for res in results:
            for box in res.boxes:
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                name = classes[int(box.cls[0])]
                if "person" in name and box.conf[0] > 0.5:
                    lis.append([x1, y1, x2, y2])

        bbox_id = tracker.update(lis)
        id_list = []
        for bb in bbox_id:
            x, y, w, h, idd = bb
            id_list.append(idd)

            cv2.rectangle(frame, (x, y), (w, h), (0, 0, 255), 2)
            cv2.putText(frame, f"Number Of Persons is = {str(len(lis))}", (20, 50), cv2.FONT_HERSHEY_PLAIN, 3, (0, 255, 255), 2)
            cv2.putText(frame, f"Person ID Is = {id_list}", (x, y), cv2.FONT_HERSHEY_PLAIN, 1, (0, 255, 255), 2)

        cv2.imshow("Frame", frame)

        if cv2.waitKey(1) == ord("q"):
            break

    cap.release()
    cv2.destroyAllWindows()

process_video(
    path=r"D:\Computer Vision\Save-people-from-drowning-in-the-swimming-pool-by-mediapipe-and-YOLO-main\save-people-from-swimming",
    video_file=r"vid.mp4",
    audio_file=r"audio.mp3",
    model_path="yolov8s.pt",
    classes_file=r"coco.txt"
)



0: 384x640 1 person, 311.7ms
Speed: 7.0ms preprocess, 311.7ms inference, 3.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 475.7ms
Speed: 10.0ms preprocess, 475.7ms inference, 3.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 278.2ms
Speed: 3.0ms preprocess, 278.2ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 persons, 276.9ms
Speed: 8.0ms preprocess, 276.9ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 persons, 247.9ms
Speed: 4.0ms preprocess, 247.9ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 persons, 256.9ms
Speed: 4.0ms preprocess, 256.9ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 260.8ms
Speed: 3.0ms preprocess, 260.8ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 250.8ms
Speed: 3.5ms preprocess, 250.8ms inference, 3.0ms postprocess per imag