In [1]:
import os
import glob
import cv2
import numpy as np
import mediapipe as mp
import torch
from torch_geometric.data import Data, DataLoader

In [2]:
# Adjust these directories to point to your video and ground truth files.
# video_directory = r"C:\Users\johna\Desktop\CS3264proj\testing\vid"  
# label_directory = r"C:\Users\johna\Desktop\CS3264proj\testing\label"     
# video_directory1 = r"C:\Users\johna\Desktop\CS3264proj\Coffee_room_01\Coffee_room_01\Videos"
# video_directory2 = r"C:\Users\johna\Desktop\CS3264proj\Coffee_room_02\Coffee_room_02\Videos"
video_directory3 = r"C:\Users\johna\Desktop\CS3264proj\Home_01\Home_01\Videos"
video_directory4 = r"C:\Users\johna\Desktop\CS3264proj\Home_02\Home_02\Videos"
label_directory1 = r"C:\Users\johna\Desktop\CS3264proj\Coffee_room_01\Coffee_room_01\Annotation_files"
label_directory2 = r"C:\Users\johna\Desktop\CS3264proj\Coffee_room_02\Coffee_room_02\Annotation_files"
label_directory3 = r"C:\Users\johna\Desktop\CS3264proj\Home_01\Home_01\Annotation_files"
label_directory4 = r"C:\Users\johna\Desktop\CS3264proj\Home_02\Home_02\Annotation_files"

video_directory = [video_directory3, video_directory4]
label_directory = [label_directory3, label_directory4]

![image.png](attachment:image.png)


In [3]:
# Mediapipe pose initialization
mp_pose = mp.solutions.pose
pose_detector = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)
pose_estimator = mp_pose.Pose(static_image_mode=True)

# Skeleton edges (Mediapipe format)
EDGES = [
    (0,1),(1,2),(2,3),(3,7),(0,4),(4,5),(5,6),(6,8),
    (9,10),(11,12),(12,14),(14,16),(11,13),(13,15),
    (12,24),(24,23),(23,11),(24,26),(26,28),(28,32),
    (23,25),(25,27),(27,31)
]

In [4]:
def extract_pose_from_frame(frame):
    results = pose_estimator.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) #nered to convert to rgb for mediapipe
    if results.pose_landmarks:
        pose_data = np.array([[lm.x, lm.y, lm.z, lm.visibility] for lm in results.pose_landmarks.landmark]) #x,y,z,visibility from mediapipe
        return pose_data
    return np.zeros((33, 4))

In [5]:
def parse_ground_truth(gt_file):
    with open(gt_file, 'r') as f:
        lines = f.readlines() #since the first 2 line is the start fram of fale and end fram of fall
        try:
            fall_start = int(lines[0].strip())
            fall_end = int(lines[1].strip())
        except ValueError:
            print("This file does not have any fall start and end frame", gt_file)
            fall_start = None
            fall_end = None
    return fall_start, fall_end

In [6]:
def build_graph(sequence):
    T, N, C = sequence.shape #expected for  sample video should be (30,33,4)
    x_list = [] #node
    edge_index = [] #edges
    # Build node features for every joint in every frame.
    for t in range(T):
        for i in range(N):
            x_list.append(sequence[t, i]) 
    # Add spatial edges within each frame.
    # print(x_list)
    for t in range(T):
        base = t * N
        for (src, dst) in EDGES:
            edge_index.append([base + src, base + dst])
            edge_index.append([base + dst, base + src])
    # Add temporal edges by linking between frames (t and t+1).
    for t in range(T - 1):
        base_current = t * N
        base_next = (t + 1) * N
        for i in range(N):
            edge_index.append([base_current + i, base_next + i])
            edge_index.append([base_next + i, base_current + i])
    x = torch.tensor(np.array(x_list), dtype=torch.float) #node features
    # Convert the edge index list to a tensor.
    edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
    return Data(x=x, edge_index=edge_index)

In [7]:
def create_dataset_from_video(video_path, gt_path, window_size=30, stride=10, max_frames=300):
    dataset = []
    cap = cv2.VideoCapture(video_path)
    frames = []
    while cap.isOpened():
        ret, frame = cap.read() #read a frame from video
        if not ret or len(frames) >= max_frames: #stop if no frame or max frames reached
            break
        frames.append(frame)
    cap.release()

    fall_start, fall_end = parse_ground_truth(gt_path)

    for i in range(0, len(frames) - window_size + 1, stride): # iterate over the frames with a stride
        window = frames[i:i+window_size] # windows frames act as a sequence
        pose_sequence = np.array([extract_pose_from_frame(f) for f in window])
        if pose_sequence.shape != (window_size, 33, 4):
            continue
        graph = build_graph(pose_sequence)
        # Label is 1 if any part of the window overlaps the fall event.
        if fall_start is None or fall_end is None:
            label = 0
        else:
            # Check if the window overlaps with the fall event.
            # The label is 1 if any part of the window overlaps with the fall event.
            label = int(not (i+window_size < fall_start or i > fall_end))
        graph.y = torch.tensor([label], dtype=torch.long)
        dataset.append(graph)
    return dataset

In [8]:
def load_le2i_dataset(video_dir, label_dir, window_size=30, stride=5):
    all_graphs = []
    for video_file in sorted(glob.glob(os.path.join(video_dir, '*.avi'))):
        base_name = os.path.splitext(os.path.basename(video_file))[0]
        gt_file = os.path.join(label_dir, base_name + '.txt')
        if not os.path.exists(gt_file):
            continue
        graphs = create_dataset_from_video(video_file, gt_file, window_size, stride)
        all_graphs.extend(graphs)
    print(f"Loaded {len(all_graphs)} graphs from {len(glob.glob(os.path.join(video_dir, '*.avi')))} videos.")
    return all_graphs


In [9]:
for i in range(len(video_directory)):
    video_dir = video_directory[i]
    label_dir = label_directory[i]
    dataset = load_le2i_dataset(video_dir, label_dir)
    print(f"Dataset {i+1} loaded with {len(dataset)} graphs.")

Loaded 1266 graphs from 30 videos.
Dataset 1 loaded with 1266 graphs.
Loaded 1276 graphs from 30 videos.
Dataset 2 loaded with 1276 graphs.


saved dataset

In [12]:
#save dataset 
torch.save(dataset, 'dataset.pt')

In [None]:
#test dataset