In [1]:
import cv2
import mediapipe as mp
import numpy as np
import torch
import torch.nn as nn

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the 18 landmarks in the same order as used during training
landmark_order = [
    "Nose", "Neck", "Right Shoulder", "Right Elbow", "Right Wrist",
    "Left Shoulder", "Left Elbow", "Left Wrist", "Right Hip", "Right Knee",
    "Right Ankle", "Left Hip", "Left Knee", "Left Ankle", "Right Eye",
    "Left Eye", "Right Ear", "Left Ear"
]

# Spatial graph edges for 18 joints (added to match training script)
edges = [
    (0, 1),   # Nose to Neck
    (1, 2),   # Neck to Right Shoulder
    (2, 3),   # Right Shoulder to Right Elbow
    (3, 4),   # Right Elbow to Right Wrist
    (1, 5),   # Neck to Left Shoulder
    (5, 6),   # Left Shoulder to Left Elbow
    (6, 7),   # Left Elbow to Left Wrist
    (1, 8),   # Neck to Right Hip
    (8, 9),   # Right Hip to Right Knee
    (9, 10),  # Right Knee to Right Ankle
    (1, 11),  # Neck to Left Hip
    (11, 12), # Left Hip to Left Knee
    (12, 13), # Left Knee to Left Ankle
    (0, 14),  # Nose to Right Eye
    (0, 15),  # Nose to Left Eye
    (14, 16), # Right Eye to Right Ear
    (15, 17)  # Left Eye to Left Ear
]

# Function to create adjacency matrix
def create_adjacency_matrix(num_joints, edges):
    A = np.zeros((num_joints, num_joints))
    for edge in edges:
        i, j = edge
        A[i, j] = 1
        A[j, i] = 1
    A += np.eye(num_joints)
    D = np.diag(np.sum(A, axis=1) ** -0.5)
    A = D @ A @ D
    return torch.tensor(A, dtype=torch.float32)

# Graph Convolution Layer
class GraphConv(nn.Module):
    def __init__(self, in_channels, out_channels, A):
        super(GraphConv, self).__init__()
        self.A = A.to(device)
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=(1, 1))
    
    def forward(self, x):
        batch, channels, time, vertices = x.size()
        x = x.permute(0, 2, 3, 1)
        x = torch.matmul(self.A, x)
        x = x.permute(0, 3, 1, 2)
        x = self.conv(x)
        return x

# ST-GCN Block
class STGCNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, A, temporal_kernel_size=3, stride=1, residual=True):
        super(STGCNBlock, self).__init__()
        self.gcn = GraphConv(in_channels, out_channels, A)
        self.tcn = nn.Conv2d(out_channels, out_channels, kernel_size=(temporal_kernel_size, 1), 
                             padding=(temporal_kernel_size//2, 0), stride=(stride, 1))
        self.bn = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout(0.5)
        
        self.residual = residual
        if residual:
            self.res_conv = nn.Conv2d(in_channels, out_channels, kernel_size=(1, 1), stride=(stride, 1))
            self.res_bn = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        x_gcn = self.gcn(x)
        x_tcn = self.tcn(x_gcn)
        x_tcn = self.bn(x_tcn)
        if self.residual:
            res = self.res_conv(x)
            res = self.res_bn(res)
            x_tcn = x_tcn + res
        x_tcn = self.relu(x_tcn)
        x_tcn = self.dropout(x_tcn)
        return x_tcn

# Enhanced ST-GCN Model
class EnhancedSTGCN(nn.Module):
    def __init__(self, in_channels=3, num_joints=18, num_classes=2, graph_edges=None):
        super(EnhancedSTGCN, self).__init__()
        self.num_joints = num_joints
        self.A = create_adjacency_matrix(num_joints, graph_edges)
        
        self.block1 = STGCNBlock(in_channels, 64, self.A, temporal_kernel_size=3)
        self.block2 = STGCNBlock(64, 64, self.A, temporal_kernel_size=3)
        self.block3 = STGCNBlock(64, 128, self.A, temporal_kernel_size=3, stride=2)
        self.block4 = STGCNBlock(128, 128, self.A, temporal_kernel_size=3)
        self.block5 = STGCNBlock(128, 256, self.A, temporal_kernel_size=3, stride=2)
        self.block6 = STGCNBlock(256, 256, self.A, temporal_kernel_size=3)
        
        self.pool = nn.AdaptiveAvgPool2d((1, num_joints))
        self.fc = nn.Linear(256 * num_joints, num_classes)

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)
        x = self.block6(x)
        x = self.pool(x)
        x = x.reshape(x.size(0), -1)
        x = self.fc(x)
        return x

# Load the pre-trained model
model_path = "enhanced_stgcn_fall_detection.pth"
try:
    model = EnhancedSTGCN(in_channels=3, num_joints=18, num_classes=2, graph_edges=edges).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    print(f"Model loaded successfully from {model_path}")
except Exception as e:
    print(f"Error loading model: {e}")
    exit()

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, model_complexity=1, 
                    enable_segmentation=False, min_detection_confidence=0.7)
mp_drawing = mp.solutions.drawing_utils

# Define the 18 landmarks in the same order as used during training
landmark_indices = {
    "Nose": 0, "Right Shoulder": 12, "Right Elbow": 14, "Right Wrist": 16,
    "Left Shoulder": 11, "Left Elbow": 13, "Left Wrist": 15, "Right Hip": 24,
    "Right Knee": 26, "Right Ankle": 28, "Left Hip": 23, "Left Knee": 25,
    "Left Ankle": 27, "Right Eye": 5, "Left Eye": 2, "Right Ear": 8, "Left Ear": 7
}

# Initialize buffer and settings
buffer = []
prob_buffer = []
window_size = 30
step_size = 1
threshold = 0.5
frame_count = 0
fps = 30

# Graph settings
graph_height = 100
graph_width = 400
max_points = 100

# Start webcam
cap = cv2.VideoCapture(0)
if not cap.isOpened():
    print("Error: Unable to open camera.")
    exit()

_, frame = cap.read()
if frame is not None:
    frame_height, frame_width = frame.shape[:2]
else:
    frame_height, frame_width = 480, 640
graph_y_offset = frame_height - graph_height - 10

# Main loop
while True:
    success, image = cap.read()
    if not success:
        print("Error: Unable to read frame from camera.")
        break
    
    frame_count += 1
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    results = pose.process(image_rgb)
    
    skeleton_frame = np.zeros((18, 3))
    
    if results.pose_landmarks:
        landmarks = results.pose_landmarks.landmark
        left_hip = landmarks[23]
        right_hip = landmarks[24]
        ref_x = (left_hip.x + right_hip.x) / 2
        ref_y = (left_hip.y + right_hip.y) / 2
        ref_z = (left_hip.z + right_hip.z) / 2
        left_shoulder = landmarks[11]
        right_shoulder = landmarks[12]
        neck_x = (left_shoulder.x + right_shoulder.x) / 2
        neck_y = (left_shoulder.y + right_shoulder.y) / 2
        neck_z = (left_shoulder.z + right_shoulder.z) / 2
        
        for i, part in enumerate(landmark_order):
            if part == "Neck":
                x = neck_x - ref_x
                y = neck_y - ref_y
                z = neck_z - ref_z
            else:
                lm = landmarks[landmark_indices[part]]
                x = lm.x - ref_x
                y = lm.y - ref_y
                z = lm.z - ref_z
            skeleton_frame[i] = [x, y, z]
        
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
    
    buffer.append(skeleton_frame)
    
    if len(buffer) >= window_size and frame_count % step_size == 0:
        window = buffer[-window_size:]
        skeleton_sequence = np.stack(window, axis=0)
        skeleton_sequence = torch.tensor(skeleton_sequence, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0)
        
        with torch.no_grad():
            output = model(skeleton_sequence.to(device))
            probabilities = torch.softmax(output, dim=1)
            fall_prob = probabilities[0, 1].item()
            prediction = 1 if fall_prob >= threshold else 0
        
        prob_buffer.append(fall_prob * 100)
        if len(prob_buffer) > max_points:
            prob_buffer.pop(0)
        
        text = f"Fall Detected ({fall_prob:.2f})" if prediction == 1 else f"No Fall ({fall_prob:.2f})"
        color = (0, 0, 255) if prediction == 1 else (0, 255, 0)
        cv2.putText(image, text, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
        
        info_text = f"Threshold: {threshold:.2f} (+/-), Window: {window_size}, Step: {step_size}"
        cv2.putText(image, info_text, (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
    elif len(buffer) < window_size:
        cv2.putText(image, f"Buffering... ({len(buffer)}/{window_size})", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
    
    graph_img = np.zeros((graph_height, graph_width, 3), dtype=np.uint8)
    if prob_buffer:
        cv2.line(graph_img, (0, graph_height - 10), (graph_width, graph_height - 10), (255, 255, 255), 1)
        cv2.line(graph_img, (10, 0), (10, graph_height), (255, 255, 255), 1)
        cv2.putText(graph_img, "Time (s)", (graph_width - 50, graph_height - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)
        cv2.putText(graph_img, "Prob (%)", (5, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)
        cv2.putText(graph_img, "100", (2, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1)
        cv2.putText(graph_img, "0", (2, graph_height - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1)
        
        points = []
        for i, prob in enumerate(prob_buffer):
            x = int(i * (graph_width - 20) / max_points) + 10
            y = int((1 - prob / 100) * (graph_height - 20)) + 10
            points.append((x, y))
        
        for i in range(1, len(points)):
            cv2.line(graph_img, points[i-1], points[i], (0, 255, 0), 1)
        
        time_span = len(prob_buffer) / fps
        cv2.putText(graph_img, f"{time_span:.1f}s", (graph_width - 30, graph_height - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1)
    
    image[graph_y_offset:graph_y_offset + graph_height, 10:10 + graph_width] = graph_img
    
    cv2.imshow("Live Fall Detection", image)
    
    key = cv2.waitKey(1) & 0xFF
    if key == ord('q'):
        break
    elif key == ord('+') or key == ord('='):
        threshold = min(1.0, threshold + 0.05)
    elif key == ord('-'):
        threshold = max(0.0, threshold - 0.05)
    elif key == ord('w'):
        window_size = min(60, window_size + 5)
    elif key == ord('s'):
        window_size = max(10, window_size - 5)
    elif key == ord('d'):
        step_size = min(10, step_size + 1)
    elif key == ord('a'):
        step_size = max(1, step_size - 1)

cap.release()
cv2.destroyAllWindows()
pose.close()

Model loaded successfully from enhanced_stgcn_fall_detection.pth


I0000 00:00:1744344024.787235 2633731 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 88.1), renderer: Apple M1
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1744344024.882593 2633925 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1744344024.897301 2633931 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1744344036.347069 2633928 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.
