In [1]:
from ultralytics import YOLO
import pandas as pd
import os
import cv2
import torch


# GENERATE keypoints here
model = YOLO("yolo11n-pose.pt")  # load an official model

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
# Predict with the model
results = model("bus.jpg")  # predict on an image

print(len(results))
for i in range(len(results[0].boxes)):
    print(results[0].boxes[i].id)

len(results[0])


image 1/1 C:\Users\sonnpm\OneDrive - UTS\Desktop\UTS\Semester 3\DL\Assignment\ass 3\bus.jpg: 640x480 4 persons, 122.8ms
Speed: 9.0ms preprocess, 122.8ms inference, 152.3ms postprocess per image at shape (1, 3, 640, 480)
1
None
None
None
None


4

In [3]:
import os
import cv2
import torch
from utils import is_none_or_empty

def process_video_with_model(yolo_model, dl_model, device, input_path="./fall_videos/processed/fall-01-cam0.mp4", output_path="./output/output.mp4", using_conf=True):

    cap = cv2.VideoCapture(input_path)

    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Set up the output writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    print(f"Processing {input_path}")
    results = yolo_model.track(source=input_path, verbose=False)

    max_pred = 0.0
    fall_cnt = 0
    for frame_number, result in enumerate(results):
        frame = result.orig_img.copy()
        boxes = result.boxes
        keypoints = result.keypoints

        if is_none_or_empty(boxes) or is_none_or_empty(keypoints):
            continue

        # test mode so predict all
        for index in range(len(boxes)):

            conf = float(boxes.conf[index])
            cls = int(boxes.cls[index])

            if cls != 0 or conf < 0.2:
                continue

            keypoints_tensor = keypoints.data[index]
            keypoints_np = keypoints_tensor.cpu().detach().numpy()
            flat = keypoints_np.flatten().tolist()

            for i in range(0, 51, 3):
                flat[i] = float(flat[i]) / float(width)
                flat[i + 1] = float(flat[i + 1]) / float(height)

            if not using_conf:
                flat = [flat[i] for i in range(len(flat)) if i % 3 != 2]

            input_tensor = torch.tensor(flat, dtype=torch.float32).unsqueeze(0).to(device)
            print(input_tensor)

            with torch.no_grad():
                output = dl_model(input_tensor)

            prediction_value = output.item()
            max_pred = max(max_pred, prediction_value)
            prediction_label = "FALL" if prediction_value > 0.5 else "SAFE"
            print("Prediction value: ", prediction_value)

            if prediction_label == "FALL":
                fall_cnt+= 1

            x, y = int(keypoints_np[0][0]), int(keypoints_np[0][1])
            color = (0, 0, 255) if prediction_label == "FALL" else (0, 255, 0)
            cv2.putText(frame, prediction_label, (x, y), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
        out.write(frame)

    out.release()
    print(f"Saved annotated video to: {output_path}")
    # print("max_pred", max_pred)
    print("fall_cnt", fall_cnt)


In [32]:
# # Model with confidence
#
# from nn_model import NN_Model
#
# nn_model = NN_Model()
# nn_model.load_state_dict(torch.load('./model/nn_model.pth'))
#
# # Set the model to evaluation mode (important for inference)
# nn_model.eval()
# nn_model.to(device)
#
# process_video_with_model(yolo_model=model, dl_model=nn_model, device=device,
#                          input_path='./fall_videos/mc_videos/Coffee_room_01/Coffee_room_01/Videos/video (1).avi',
#                          output_path='./output/nn_annotated_output.mp4')

NN_Model(
  (classifier): Sequential(
    (0): Linear(in_features=51, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=64, bias=True)
    (3): ReLU()
    (4): Linear(in_features=64, out_features=1, bias=True)
    (5): Sigmoid()
  )
)

In [3]:
# from nn_model import NN_Model_NO_CONF
#
# nn_model_no_conf = NN_Model_NO_CONF()
# nn_model_no_conf.load_state_dict(torch.load('./model/nn_model_no_conf.pth'))
#
# # Set the model to evaluation mode (important for inference)
# nn_model_no_conf.eval()
# nn_model_no_conf.to(device)
#
# process_video_with_model(yolo_model=model, dl_model=nn_model_no_conf, device=device,
#                          input_path='./fall_videos/test/video_1.mp4',
#                          output_path='./output/nn_no_conf_annotated_output.mp4', using_conf=False)

NN_Model_NO_CONF(
  (classifier): Sequential(
    (0): Linear(in_features=34, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=64, bias=True)
    (3): ReLU()
    (4): Linear(in_features=64, out_features=1, bias=True)
    (5): Sigmoid()
  )
)

In [4]:
# Complex NN model

from nn_model import NN_Model_NO_CONF_2

nn_model_no_conf_2 = NN_Model_NO_CONF_2()
nn_model_no_conf_2.load_state_dict(torch.load('./model/nn_model_no_conf2.pth'))

nn_model_no_conf_2.eval()
nn_model_no_conf_2.to(device)

process_video_with_model(yolo_model=model, dl_model=nn_model_no_conf_2, device=device,
                         input_path='./fall_videos/mc_videos_test/chute01/cam1.avi',
                         output_path='./output/nn_no_conf_2.mp4', using_conf=False)

NN_Model_NO_CONF_2(
  (classifier): Sequential(
    (0): Linear(in_features=34, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=256, bias=True)
    (3): ReLU()
    (4): Linear(in_features=256, out_features=128, bias=True)
    (5): ReLU()
    (6): Linear(in_features=128, out_features=64, bias=True)
    (7): ReLU()
    (8): Linear(in_features=64, out_features=32, bias=True)
    (9): ReLU()
    (10): Linear(in_features=32, out_features=1, bias=True)
    (11): Sigmoid()
  )
)

In [8]:
import cv2
import os
from rule_model import rule_fall_detection
from utils import is_none_or_empty

# cv2.putText() is another OpenCV function used to add text to an image.
# image: The image to which the text will be added.
# 'Person Fell down': The text message that will be displayed on the image.
# (11, 100): The position of the text (top-left corner of the text box) in (x, y) coordinates. This is the point where the text starts to be drawn on the image.
# 0: The font type (0 corresponds to the default font in OpenCV).
# 1: The font scale, i.e., the size of the text. A scale of 1 means the text will be the default size.
# [0, 0, 2550]: The color of the text. It uses the BGR format (Blue, Green, Red). This is a red color with a high intensity (2550 is likely an error and should be something like [0, 0, 255]).
# thickness=3: The thickness of the text.
# lineType=cv2.LINE_AA: This ensures that the text is drawn with antialiased lines for smoother edges.
def falling_alarm(image, bbox):
    x_min, y_min, x_max, y_max = bbox
    cv2.rectangle(image, (int(x_min), int(y_min)), (int(x_max), int(y_max)), color=(0, 0, 255),
                  thickness=5, lineType=cv2.LINE_AA)
    cv2.putText(image, 'Person Fell down', (11, 100), 0, 1, [0, 0, 2550], thickness=3, lineType=cv2.LINE_AA)

def Rule_Model(frame):
    height, width = frame.orig_img.shape[:2]
    for pose in frame:

        conf = float(pose.boxes.conf)
        cls = int(pose.boxes.cls)

        if cls != 0 or conf < 0.2:
            continue

        bbox = pose.boxes.xyxy.squeeze(0).tolist()
        xmin, ymin, xmax, ymax = bbox

        flatten_pose = pose.keypoints.data.squeeze(0).flatten().tolist()
        for i in range(0, 51, 3):
            flatten_pose[i] /= width  # normalize x
            flatten_pose[i + 1] /= height  # normalize y
        prediction = rule_fall_detection(flatten_pose, xmin, ymin, xmax, ymax)

        if prediction:
            return prediction, bbox
    return False, None

def process_video_rule(video_path, model, output_path='./output/rule_annotated_output.mp4'):
    cap = cv2.VideoCapture(video_path)

    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Set up the output writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    print(f"Processing {video_path}")
    results = model.track(source=video_path, verbose=False)

    for frame_number, result in enumerate(results):
        is_fall, bbox = Rule_Model(result)
        frame = result.orig_img.copy()

        if is_fall:
            falling_alarm(frame, bbox)

        out.write(frame)
        # print(f"Frame {frame_number}: pred = {is_fall}")

    out.release()
    print("Saved annotated video to:", output_path)


# === Main ===
video_path = './fall_videos/mc_videos_test/chute01/cam1.avi'

process_video_rule(video_path, model, output_path='./output/rule_unseen.mp4')


Processing ./fall_videos/mc_videos_test/chute01/cam1.avi

errors for large sources or long-running streams and videos. See https://docs.ultralytics.com/modes/predict/ for help.

Example:
    results = model(source=..., stream=True)  # generator of Results objects
    for r in results:
        boxes = r.boxes  # Boxes object for bbox outputs
        masks = r.masks  # Masks object for segment masks outputs
        probs = r.probs  # Class probabilities for classification outputs

Saved annotated video to: ./output/rule_unseen.mp4


In [13]:
from collections import defaultdict, deque
from utils import is_none_or_empty

def process_video_with_lstm(yolo_model, lstm_model, device, input_path, output_path, sequence_length=20, using_conf=True):
    cap = cv2.VideoCapture(input_path)

    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    print(f"Processing {input_path}")
    results = yolo_model.track(source=input_path, verbose=False)

    person_sequences = defaultdict(lambda: deque(maxlen=sequence_length))  # person_id → keypoint buffer

    for result in results:
        frame = result.orig_img.copy()
        boxes = result.boxes
        keypoints = result.keypoints

        if is_none_or_empty(boxes) or is_none_or_empty(keypoints):
            out.write(frame)
            continue

        for i in range(len(boxes)):
            box = boxes[i]
            if int(box.cls) != 0 or float(box.conf) < 0.2:
                continue

            box_id = boxes[i].id
            if box_id is not None:
                person_id = int(box_id.item())  # works for tensor([1.])
            else:
                person_id = i  # fallback to index

            keypoint = keypoints.data[i].cpu().detach().numpy()
            flat = keypoint.flatten().tolist()

            for i in range(0, 51, 3):
                flat[i] = float(flat[i]) / float(width)
                flat[i + 1] = float(flat[i + 1]) / float(height)

            if not using_conf:
                flat = [flat[i] for i in range(len(flat)) if i % 3 != 2]

            person_sequences[person_id].append(flat)

            if len(person_sequences[person_id]) == sequence_length:
                input_tensor = torch.tensor([person_sequences[person_id]], dtype=torch.float32).to(device)
                with torch.no_grad():
                    output = lstm_model(input_tensor)
                pred_score = torch.sigmoid(output).item() if output.shape[-1] == 1 else torch.softmax(output, dim=-1)[0,1].item()
                prediction_label = "FALL" if pred_score > 0.5 else "SAFE LSTM"
                # print(prediction_label)
                color = (0, 0, 255) if prediction_label == "FALL" else (0, 255, 0)

                x, y = int(keypoint[0][0]), int(keypoint[0][1])
                cv2.putText(frame, f"{prediction_label}", (x, y), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)

        out.write(frame)

    cap.release()
    out.release()
    print(f"Saved annotated video to: {output_path}")


In [None]:
# from sequence_model import LSTM_Model
#
# lstm_model_path = "./model/lstm_model.pth"
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#
# lstm_model = LSTM_Model()
# lstm_model.load_state_dict(torch.load(lstm_model_path, map_location=device))
# lstm_model.to(device)
# lstm_model.eval()
#
# process_video_with_lstm(yolo_model=model, lstm_model=lstm_model, device=device,
#                         input_path="./fall_videos/mc_videos_test/coffee1/coffee1/Videos/video (1).avi", output_path='./output/lstm_unseen_test.mp4', sequence_length=10)

In [9]:
# # process lstm no conf
#
# from sequence_model import LSTM_Model
#
# lstm_model_no_conf_path = "./model/lstm_model_no_conf.pth"
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#
# lstm_model_no_conf = LSTM_Model(input_dim=34)
# lstm_model_no_conf.load_state_dict(torch.load(lstm_model_no_conf_path, map_location=device))
# lstm_model_no_conf.to(device)
# lstm_model_no_conf.eval()
#
# process_video_with_lstm(yolo_model=model, lstm_model=lstm_model_no_conf, device=device,
#                         input_path="./fall_videos/test/video_1.mp4", output_path='./output/lstm_no_conf_unseen_test.mp4', sequence_length=10, using_conf=False)

LSTM_Model(
  (lstm): LSTM(34, 64, num_layers=2, batch_first=True)
  (fc): Linear(in_features=64, out_features=2, bias=True)
)

In [11]:
# process lstm no conf

from sequence_model import LSTM_Model_2

lstm_model_no_conf2_path = "./model/lstm_model_no_conf2.pth"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

lstm_model_no_conf2 = LSTM_Model_2(input_dim=34)
lstm_model_no_conf2.load_state_dict(torch.load(lstm_model_no_conf2_path, map_location=device))
lstm_model_no_conf2.to(device)
lstm_model_no_conf2.eval()

process_video_with_lstm(yolo_model=model, lstm_model=lstm_model_no_conf2, device=device,
                        input_path="./fall_videos/test/video_1.mp4", output_path='./output/lstm_no_conf_unseen_test.mp4', sequence_length=10, using_conf=False)

LSTM_Model_2(
  (lstm): LSTM(34, 128, num_layers=3, batch_first=True)
  (fc1): Linear(in_features=128, out_features=64, bias=True)
  (relu): ReLU()
  (dropout): Dropout(p=0.3, inplace=False)
  (fc2): Linear(in_features=64, out_features=2, bias=True)
)