#Installing Libraries

In [None]:
!pip install ultralytics torch torchvision opencv-python mediapipe numpy matplotlib

Collecting ultralytics
  Downloading ultralytics-8.3.50-py3-none-any.whl.metadata (35 kB)
Collecting mediapipe
  Downloading mediapipe-0.10.20-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.13-py3-none-any.whl.metadata (9.4 kB)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.1-py3-none-any.whl.metadata (1.4 kB)
Downloading ultralytics-8.3.50-py3-none-any.whl (898 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m899.0/899.0 kB[0m [31m37.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading mediapipe-0.10.20-cp310-cp310-manylinux_2_28_x86_64.whl (35.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.6/35.6 MB[0m [31m54.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sounddevice-0.5.1-py3-none-any.whl (32 kB)
Downloading ultralytics_thop-2.0.13-py3-none-any.whl (26 kB)
Installing collected packages: sounddevice, ultralytics-t

In [None]:
pip install tqdm



In [None]:
pip install twilio

Collecting twilio
  Downloading twilio-9.4.1-py2.py3-none-any.whl.metadata (12 kB)
Collecting aiohttp-retry==2.8.3 (from twilio)
  Downloading aiohttp_retry-2.8.3-py3-none-any.whl.metadata (8.9 kB)
Downloading twilio-9.4.1-py2.py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m49.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading aiohttp_retry-2.8.3-py3-none-any.whl (9.8 kB)
Installing collected packages: aiohttp-retry, twilio
Successfully installed aiohttp-retry-2.8.3 twilio-9.4.1


#YOLO Model Setup

##Function

In [None]:
from ultralytics import YOLO
import cv2
import os

# Initialize YOLO model
model = YOLO('yolov8n.pt')

def detect_bounding_boxes(video_path, output_folder):
    """
    Detects bounding boxes of people in a video and saves cropped frames.
    Args:
        video_path (str): Path to the input video.
        output_folder (str): Folder to save cropped frames.
    """
    cap = cv2.VideoCapture(video_path)
    frame_count = 0

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Perform detection
        results = model(frame)
        for result in results:
            boxes = result.boxes.xyxy.cpu().numpy()
            confs = result.boxes.conf.cpu().numpy()
            classes = result.boxes.cls.cpu().numpy()

            # Save frames with bounding boxes
            for box, conf, cls in zip(boxes, confs, classes):
                if int(cls) == 0 and conf > 0.5:  # Class 0 is for 'person'
                    x1, y1, x2, y2 = map(int, box)
                    cropped_person = frame[y1:y2, x1:x2]
                    cv2.imwrite(f"{output_folder}/frame_{frame_count}.jpg", cropped_person)

        frame_count += 1

    cap.release()
    print(f"Bounding boxes detected and saved in {output_folder}")


Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt'...


100%|██████████| 6.25M/6.25M [00:00<00:00, 254MB/s]


##Execution

In [None]:
from google.colab import files

# Upload video
uploaded = files.upload()
video_path = list(uploaded.keys())[0]
output_folder = '/content/cropped_frames'

# Detect bounding boxes and save frames
detect_bounding_boxes(video_path, output_folder)


Saving person_walking.mp4 to person_walking.mp4


AttributeError: 'numpy.ndarray' object has no attribute 'dim'

In [None]:
print(video_path)

person_walking.mp4


#Pose Keypoint Extraction

##Function

In [None]:
import mediapipe as mp
import numpy as np
import cv2
import os

def extract_keypoints_and_labels(image_folder, output_keypoints_path, output_labels_path):
    """
    Extracts pose keypoints and labels from images.
    Args:
        image_folder (str): Path to the folder containing images.
        output_keypoints_path (str): Path to save the keypoints numpy file.
        output_labels_path (str): Path to save the labels numpy file.
    """
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(static_image_mode=True)
    keypoints_list = []
    labels_list = []

    for image_name in os.listdir(image_folder):
        if image_name.endswith('.jpg') or image_name.endswith('.png'):
            image_path = os.path.join(image_folder, image_name)
            label = 1 if 'fallen' in image_name.lower() else 0

            image = cv2.imread(image_path)
            image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            results = pose.process(image_rgb)

            if results.pose_landmarks:
                keypoints = [
                    [lm.x, lm.y, lm.z]
                    for lm in results.pose_landmarks.landmark
                ]
            else:
                # If no pose is detected, add zeros
                keypoints = [[0, 0, 0] for _ in range(33)]

            keypoints_list.append(keypoints)
            labels_list.append(label)

    pose.close()

    # Save keypoints and labels as numpy arrays
    keypoints_array = np.array(keypoints_list)
    labels_array = np.array(labels_list)
    np.save(output_keypoints_path, keypoints_array)
    np.save(output_labels_path, labels_array)
    print(f"Keypoints saved to {output_keypoints_path}")
    print(f"Labels saved to {output_labels_path}")


##Execution

In [None]:
# Mount Drive to load dataset
from google.colab import drive
drive.mount('/content/drive')

# Paths to your datasets
train_images_path = '/content/drive/MyDrive/STGCN_Dataset/images/train'
validation_images_path = '/content/drive/MyDrive/STGCN_Dataset/images/validation'

# Output paths for keypoints and labels
train_keypoints_path = '/content/drive/MyDrive/STGCN_Dataset/train_keypoints.npy'
train_labels_path = '/content/drive/MyDrive/STGCN_Dataset/train_labels.npy'
validation_keypoints_path = '/content/drive/MyDrive/STGCN_Dataset/validation_keypoints.npy'
validation_labels_path = '/content/drive/MyDrive/STGCN_Dataset/validation_labels.npy'

# Extract keypoints and labels for train and validation datasets
extract_keypoints_and_labels(train_images_path, train_keypoints_path, train_labels_path)
extract_keypoints_and_labels(validation_images_path, validation_keypoints_path, validation_labels_path)


Mounted at /content/drive
Keypoints saved to /content/drive/MyDrive/STGCN_Dataset/train_keypoints.npy
Labels saved to /content/drive/MyDrive/STGCN_Dataset/train_labels.npy
Keypoints saved to /content/drive/MyDrive/STGCN_Dataset/validation_keypoints.npy
Labels saved to /content/drive/MyDrive/STGCN_Dataset/validation_labels.npy


In [None]:
# Verify train keypoints and labels
train_keypoints = np.load(train_keypoints_path)
train_labels = np.load(train_labels_path)
print("Train Keypoints Shape:", train_keypoints.shape)
print("Train Labels Shape:", train_labels.shape)

# Verify validation keypoints and labels
validation_keypoints = np.load(validation_keypoints_path)
validation_labels = np.load(validation_labels_path)
print("Validation Keypoints Shape:", validation_keypoints.shape)
print("Validation Labels Shape:", validation_labels.shape)


Train Keypoints Shape: (332, 33, 3)
Train Labels Shape: (332,)
Validation Keypoints Shape: (86, 33, 3)
Validation Labels Shape: (86,)


#STGCN Model Definition

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# Define the STGCN layer
class STGCNLayer(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size_spatial=25, kernel_size_temporal=3):
        super(STGCNLayer, self).__init__()
        self.spatial_conv = nn.Conv2d(in_channels, out_channels, kernel_size=(1, kernel_size_spatial), padding=(0, kernel_size_spatial // 2))
        self.temporal_conv = nn.Conv2d(out_channels, out_channels, kernel_size=(kernel_size_temporal, 1), padding=(kernel_size_temporal // 2))
        self.bn = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        x = self.spatial_conv(x)  # Spatial convolution
        x = torch.relu(x)
        x = self.temporal_conv(x)  # Temporal convolution
        x = self.bn(torch.relu(x))  # Batch normalization
        return x

# Define the full STGCN model
class STGCN(nn.Module):
    def __init__(self, num_keypoints, num_classes):
        super(STGCN, self).__init__()
        self.stgcn1 = STGCNLayer(1, 64, num_keypoints)  # Change input channels to 1
        self.stgcn2 = STGCNLayer(64, 64, num_keypoints)
        self.fc = nn.Linear(64, num_classes)

    def forward(self, x):
        if x.dim() == 3:
            x = x.unsqueeze(1)  # Shape becomes (batch_size, 1, num_keypoints, num_frames)

        x = x.permute(0, 1, 3, 2)  # Reorder to (batch_size, in_channels, num_frames, num_keypoints)
        x = self.stgcn1(x)
        x = self.stgcn2(x)
        x = x.mean(dim=[2, 3])  # Global average pooling (across frames and keypoints)
        return self.fc(x)


# Dataset class
class PoseDataset(Dataset):
    def __init__(self, keypoints, labels):
        self.keypoints = torch.tensor(keypoints, dtype=torch.float32)  # Shape: (N, num_keypoints, 3)
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.keypoints[idx], self.labels[idx]


#Training the STGCN Model

##Function

In [None]:
from tqdm import tqdm

def train_stgcn(train_keypoints, train_labels, val_keypoints, val_labels, model_path, num_epochs=10, batch_size=16):
    # Define the device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Convert inputs to tensors, if not already done
    train_keypoints = torch.tensor(train_keypoints, dtype=torch.float32).to(device)
    train_labels = torch.tensor(train_labels, dtype=torch.int64).to(device)
    val_keypoints = torch.tensor(val_keypoints, dtype=torch.float32).to(device)
    val_labels = torch.tensor(val_labels, dtype=torch.int64).to(device)

    # Create DataLoader
    train_dataset = torch.utils.data.TensorDataset(train_keypoints, train_labels)
    val_dataset = torch.utils.data.TensorDataset(val_keypoints, val_labels)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # Initialize the model with correct input channels and output classes
    model = STGCN(num_keypoints=train_keypoints.shape[2], num_classes=2)  # Assuming 33 keypoints and 2 classes
    model = model.to(device)

    # Define Loss and Optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for keypoints, labels in tqdm(train_loader, desc=f"Training Epoch {epoch+1}"):
            keypoints, labels = keypoints.to(device), labels.to(device)
            optimizer.zero_grad()

            # Forward pass
            outputs = model(keypoints)
            loss = criterion(outputs, labels)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        avg_train_loss = running_loss / len(train_loader)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_train_loss:.4f}")

        # Validation step
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for keypoints, labels in val_loader:
                keypoints, labels = keypoints.to(device), labels.to(device)
                outputs = model(keypoints)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        accuracy = 100 * correct / total
        print(f"Validation Accuracy: {accuracy:.2f}%")

    # Save the trained model
    torch.save(model.state_dict(), model_path)


##Execution

In [None]:
# Load dataset
train_keypoints = np.load('/content/drive/MyDrive/STGCN_Dataset/train_keypoints.npy')
train_labels = np.load('/content/drive/MyDrive/STGCN_Dataset/train_labels.npy')

validation_keypoints = np.load('/content/drive/MyDrive/STGCN_Dataset/validation_keypoints.npy')
validation_labels = np.load('/content/drive/MyDrive/STGCN_Dataset/validation_labels.npy')

# Reshape the keypoints for STGCN
train_keypoints = train_keypoints.transpose(0, 2, 1)  # Shape: (332, 33, 3)
validation_keypoints = validation_keypoints.transpose(0, 2, 1)  # Shape: (86, 33, 3)

# Ensure the shapes are correct
print(f'Train Keypoints shape: {train_keypoints.shape}')
print(f'Train Labels shape: {train_labels.shape}')
print(f'Validation Keypoints shape: {validation_keypoints.shape}')
print(f'Validation Labels shape: {validation_labels.shape}')

# Train STGCN model
model_path = '/content/drive/MyDrive/STGCN_Dataset/stgcn_model.pth'

#train_stgcn(train_keypoints, train_labels, validation_keypoints, validation_labels, model_path, num_epochs=10, batch_size=16)


Train Keypoints shape: (332, 3, 33)
Train Labels shape: (332,)
Validation Keypoints shape: (86, 3, 33)
Validation Labels shape: (86,)


In [None]:
import torch

# Convert labels from numpy arrays to PyTorch tensors and ensure they are integers
train_labels = torch.tensor(train_labels).long()
val_labels = torch.tensor(validation_labels).long()

print(f"Train Keypoints shape: {train_keypoints.shape}, dtype: {train_keypoints.dtype}")
print(f"Train Labels shape: {train_labels.shape}, dtype: {train_labels.dtype}")
print(f"Validation Keypoints shape: {validation_keypoints.shape}, dtype: {validation_keypoints.dtype}")
print(f"Validation Labels shape: {val_labels.shape}, dtype: {val_labels.dtype}")


Train Keypoints shape: (332, 3, 33), dtype: float64
Train Labels shape: torch.Size([332]), dtype: torch.int64
Validation Keypoints shape: (86, 3, 33), dtype: float64
Validation Labels shape: torch.Size([86]), dtype: torch.int64


In [None]:
# Example inputs
'''train_keypoints = torch.rand(332, 3, 33)  # Shape: (num_samples, channels, num_keypoints)
train_labels = torch.randint(0, 2, (332,))  # Shape: (num_samples,)
val_keypoints = torch.rand(86, 3, 33)       # Shape: (num_samples, channels, num_keypoints)
val_labels = torch.randint(0, 2, (86,))     # Shape: (num_samples,)'''

model_path = '/content/drive/MyDrive/STGCN_Dataset/stgcn_model.pth'

train_stgcn(train_keypoints, train_labels, validation_keypoints, val_labels, model_path, num_epochs=10, batch_size=16)


  train_labels = torch.tensor(train_labels, dtype=torch.int64).to(device)
  val_labels = torch.tensor(val_labels, dtype=torch.int64).to(device)
Training Epoch 1: 100%|██████████| 21/21 [00:00<00:00, 29.42it/s]


Epoch [1/10], Loss: 0.6128
Validation Accuracy: 52.33%


Training Epoch 2: 100%|██████████| 21/21 [00:00<00:00, 163.54it/s]


Epoch [2/10], Loss: 0.4866
Validation Accuracy: 76.74%


Training Epoch 3: 100%|██████████| 21/21 [00:00<00:00, 200.41it/s]


Epoch [3/10], Loss: 0.4341
Validation Accuracy: 59.30%


Training Epoch 4: 100%|██████████| 21/21 [00:00<00:00, 204.77it/s]


Epoch [4/10], Loss: 0.4128
Validation Accuracy: 59.30%


Training Epoch 5: 100%|██████████| 21/21 [00:00<00:00, 202.45it/s]


Epoch [5/10], Loss: 0.3793
Validation Accuracy: 65.12%


Training Epoch 6: 100%|██████████| 21/21 [00:00<00:00, 200.47it/s]


Epoch [6/10], Loss: 0.3951
Validation Accuracy: 86.05%


Training Epoch 7: 100%|██████████| 21/21 [00:00<00:00, 206.55it/s]


Epoch [7/10], Loss: 0.3708
Validation Accuracy: 80.23%


Training Epoch 8: 100%|██████████| 21/21 [00:00<00:00, 195.45it/s]


Epoch [8/10], Loss: 0.3615
Validation Accuracy: 79.07%


Training Epoch 9: 100%|██████████| 21/21 [00:00<00:00, 181.61it/s]


Epoch [9/10], Loss: 0.3606
Validation Accuracy: 81.40%


Training Epoch 10: 100%|██████████| 21/21 [00:00<00:00, 213.26it/s]


Epoch [10/10], Loss: 0.3509
Validation Accuracy: 82.56%


In [None]:
model_path = '/content/drive/MyDrive/STGCN_Dataset/stgcn_model.pth'
torch.save(model.state_dict(), model_path)

#Prediction

##Function

In [None]:
import torch
import cv2
import numpy as np

def predict_fall_in_video(video_path, model, device, pose_extractor, frame_skip=50):
    """
    Predict whether a fall occurs in a given video using a trained ST-GCN model.

    Parameters:
        video_path (str): Path to the input video.
        model (torch.nn.Module): Trained ST-GCN model.
        device (torch.device): Device to run the model on ('cuda' or 'cpu').
        pose_extractor (function): Function to extract pose keypoints from video frames.
        frame_skip (int): Number of frames to skip during processing to reduce RAM usage.

    Returns:
        dict: Dictionary with per-frame predictions and overall result.
    """
    model.eval()  # Set model to evaluation mode

    # Step 1: Extract frames from the video
    cap = cv2.VideoCapture(video_path)
    frames = []
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if frame_count % frame_skip == 0:  # Process every `frame_skip` frame
            frames.append(frame)
        frame_count += 1
    cap.release()

    # Step 2: Extract pose keypoints from the selected frames
    keypoints_sequence = []
    for frame in frames:
        keypoints = pose_extractor(frame)  # Extract keypoints for the frame
        if keypoints is not None:
            keypoints_sequence.append(keypoints)
        else:
            keypoints_sequence.append(np.zeros((33, 4)))  # Handle missing frames

    # Step 3: Convert keypoints to tensor and reshape for ST-GCN
    keypoints_sequence = np.array(keypoints_sequence)  # Shape: (T, J, C)
    keypoints_sequence = np.expand_dims(keypoints_sequence, axis=0)  # Add batch dimension

    # Optionally collapse the keypoints to a single channel, e.g., by averaging x, y, z, visibility
    keypoints_sequence = keypoints_sequence.mean(axis=-1, keepdims=True)  # Average across the last axis (x, y, z, visibility)

    # Convert to tensor and move to the appropriate device
    keypoints_tensor = torch.tensor(keypoints_sequence, dtype=torch.float32).to(device)

    # Ensure the tensor is of the correct shape: [batch_size, channels, height, width]
    keypoints_tensor = keypoints_tensor.permute(0, 3, 1, 2)  # Reorder to [batch_size, channels, height, width]

    # Step 4: Pass keypoints through the ST-GCN model
    with torch.no_grad():
        outputs = model(keypoints_tensor)  # Model inference
        predictions = torch.argmax(outputs, dim=1).cpu().numpy()  # Get class predictions

    # Step 5: Analyze predictions
    has_fallen = 1 in predictions  # Check if any frame predicts 'fallen' (class 1)
    frame_results = {f"Frame {i * frame_skip + 1}": "Fallen" if pred == 1 else "Not Fallen"
                     for i, pred in enumerate(predictions)}

    # Return results
    return {
        "Frame Results": frame_results,
        "Overall Result": "Fall Detected" if has_fallen else "No Fall Detected"
    }


##Execution

In [None]:
from google.colab import files

# Upload video
uploaded = files.upload()
video_path = list(uploaded.keys())[0]

Saving person_walking.mp4 to person_walking.mp4


In [None]:
import torch
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("Using GPU:", torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print("Using CPU")


Using GPU: Tesla T4


In [None]:
import cv2
import mediapipe as mp
import numpy as np

# Path to your uploaded video
video_path = '/content/person_falling_new.mp4'  # Replace with your video path

# Open video using OpenCV
cap = cv2.VideoCapture(video_path)

# List to store frames
frames = []

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    # Optionally preprocess (resize, etc.)
    frame_resized = cv2.resize(frame, (640, 480))  # Resize to 640x480 for faster processing
    frames.append(frame_resized)

cap.release()

# Initialize MediaPipe pose model
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# List to store keypoints for each frame
keypoints_list = []

for frame in frames:
    # Convert frame to RGB (MediaPipe works with RGB frames)
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Process the frame to extract pose landmarks
    results = pose.process(frame_rgb)

    # Extract keypoints if detected
    if results.pose_landmarks:
        keypoints = []
        for landmark in results.pose_landmarks.landmark:
            keypoints.append([landmark.x, landmark.y, landmark.z])
        keypoints_list.append(keypoints)
    else:
        keypoints_list.append(None)  # If no pose is detected, append None

pose.close()

# Print the number of frames and some example keypoints for debugging
print(f"Total frames: {len(frames)}")
print(f"First frame keypoints (if available): {keypoints_list[0]}")


Total frames: 199
First frame keypoints (if available): [[0.3686886131763458, 0.019866913557052612, -0.2704041600227356], [0.3765782117843628, 0.001132369041442871, -0.2532722055912018], [0.3814921975135803, 0.001101166009902954, -0.25336506962776184], [0.386398583650589, 0.0012714564800262451, -0.2534179091453552], [0.3635946214199066, -5.647540092468262e-05, -0.24685418605804443], [0.35973626375198364, -0.000982135534286499, -0.2468380481004715], [0.3561495542526245, -0.001795053482055664, -0.24692454934120178], [0.39508256316185, 0.008215129375457764, -0.11848311126232147], [0.35692110657691956, 0.005233645439147949, -0.08810247480869293], [0.377853661775589, 0.034045781940221786, -0.21703632175922394], [0.3624286651611328, 0.032154083251953125, -0.20829716324806213], [0.44082823395729065, 0.11007609963417053, -0.045047711580991745], [0.32432693243026733, 0.12767410278320312, 0.02514456771314144], [0.46256619691848755, 0.2580544650554657, -0.012743744067847729], [0.3108488619327545,

In [None]:
import cv2
import mediapipe as mp
import numpy as np

# Path to your uploaded video
video_path = '/content/person_walking.mp4'  # Replace with your video path

# Open video using OpenCV
cap = cv2.VideoCapture(video_path)

# List to store frames
frames = []

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    # Optionally preprocess (resize, etc.)
    frame_resized = cv2.resize(frame, (640, 480))  # Resize to 640x480 for faster processing
    frames.append(frame_resized)

cap.release()

# Initialize MediaPipe pose model
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# List to store keypoints for each frame
keypoints_list = []

for frame in frames:
    # Convert frame to RGB (MediaPipe works with RGB frames)
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Process the frame to extract pose landmarks
    results = pose.process(frame_rgb)

    # Extract keypoints if detected
    if results.pose_landmarks:
        keypoints = []
        for landmark in results.pose_landmarks.landmark:
            keypoints.append([landmark.x, landmark.y, landmark.z])
        keypoints_list.append(keypoints)
    else:
        keypoints_list.append(None)  # If no pose is detected, append None

pose.close()

# Print the number of frames and some example keypoints for debugging
print(f"Total frames: {len(frames)}")
print(f"First frame keypoints (if available): {keypoints_list[3]}")


Total frames: 178
First frame keypoints (if available): [[0.48274102807044983, 0.524723470211029, 0.06934764981269836], [0.48136845231056213, 0.5177784562110901, 0.05716276913881302], [0.48001083731651306, 0.517755389213562, 0.05714453384280205], [0.47871294617652893, 0.5178438425064087, 0.057123295962810516], [0.4847719073295593, 0.5178069472312927, 0.054578136652708054], [0.48585644364356995, 0.5179035663604736, 0.05459362268447876], [0.4870249629020691, 0.5181229114532471, 0.05455111712217331], [0.47675877809524536, 0.5212246179580688, 0.01810361072421074], [0.48929929733276367, 0.5222467184066772, 0.005639493931084871], [0.4805240035057068, 0.5304532647132874, 0.058937229216098785], [0.484630286693573, 0.531627893447876, 0.05520838499069214], [0.4577752649784088, 0.5603043437004089, -0.0005099961417727172], [0.4972974359989166, 0.5665785074234009, -0.013322453014552593], [0.45488423109054565, 0.6230266094207764, -0.008424973115324974], [0.5024628043174744, 0.6276262402534485, -0.03

In [None]:
# Initialize your model
model = STGCN(num_keypoints=len(keypoints_list), num_classes=2)  # Assuming 33 keypoints and 2 classes  # Use your actual model class name here
#device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Load the saved state_dict
model_path = '/content/drive/MyDrive/STGCN_Dataset/stgcn_model.pth'
model.load_state_dict(torch.load(model_path, map_location=device), strict=False)


# Set the model to evaluation mode
model.eval()


STGCN(
  (stgcn1): STGCNLayer(
    (spatial_conv): Conv2d(1, 64, kernel_size=(1, 178), stride=(1, 1), padding=(0, 89))
    (temporal_conv): Conv2d(64, 64, kernel_size=(3, 1), stride=(1, 1), padding=(1, 1))
    (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (stgcn2): STGCNLayer(
    (spatial_conv): Conv2d(64, 64, kernel_size=(1, 178), stride=(1, 1), padding=(0, 89))
    (temporal_conv): Conv2d(64, 64, kernel_size=(3, 1), stride=(1, 1), padding=(1, 1))
    (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (fc): Linear(in_features=64, out_features=2, bias=True)
)

In [None]:
import torch
import cv2
import numpy as np
import mediapipe as mp

# Define pose extraction function using MediaPipe
def pose_extractor(frame):
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose()

    # Convert frame to RGB for MediaPipe
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(frame_rgb)

    # Extract keypoints
    if results.pose_landmarks:
        keypoints = np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility]
                              for landmark in results.pose_landmarks.landmark])
        return keypoints
    else:
        return None  # No keypoints detected

# Call the prediction function with video path and model
video_path = '/content/person_falling_new.mp4'  # Replace with your video path
results = predict_fall_in_video(video_path, model, device, pose_extractor)

# Print the frame results and overall result
print("Per-frame results:", results['Frame Results'])
print("Overall result:", results['Overall Result'])


Per-frame results: {'Frame 1': 'Fallen'}
Overall result: Fall Detected


In [None]:
import torch
import cv2
import numpy as np
import mediapipe as mp

# Define pose extraction function using MediaPipe
def pose_extractor2(frame):
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose()

    # Convert frame to RGB for MediaPipe
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(frame_rgb)

    # Extract keypoints
    if results.pose_landmarks:
        keypoints = np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility]
                              for landmark in results.pose_landmarks.landmark])
        return keypoints
    else:
        return None  # No keypoints detected

# Call the prediction function with video path and model
video_path2 = '/content/person_walking.mp4'  # Replace with your video path
results = predict_fall_in_video(video_path2, model, device, pose_extractor2)

# Print the frame results and overall result
print("Per-frame results:", results['Frame Results'])
print("Overall result:", results['Overall Result'])

Per-frame results: {'Frame 1': 'Fallen'}
Overall result: Fall Detected


#Twilio Integration with G-Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import cv2
import os

def save_to_gdrive_and_get_link(frame, file_name, gdrive_folder='MyDrive/Fall_Detection_Alerts'):
    """
    Saves the frame to Google Drive and retrieves a shareable link.

    Parameters:
        frame (numpy.ndarray): Image frame to save.
        file_name (str): Name for the saved file.
        gdrive_folder (str): Folder in Google Drive where the file will be saved.

    Returns:
        str: Google Drive shareable link for the saved file.
    """
    # Ensure Google Drive folder exists
    folder_path = f"/content/drive/{gdrive_folder}"
    os.makedirs(folder_path, exist_ok=True)

    # Save frame to Google Drive
    file_path = os.path.join(folder_path, file_name)
    cv2.imwrite(file_path, frame)
    print(f"Frame saved to: {file_path}")

    # Return the relative path in Google Drive for easier access
    relative_file_path = os.path.join(gdrive_folder, file_name)
    print(f"Manual location: {relative_file_path}")

    return relative_file_path  # Return relative path within MyDrive


In [None]:
def send_fall_notification_via_gdrive_simple(prediction_results, video_path, account_sid, auth_token, from_number, to_number):
    from twilio.rest import Client

    # Extract the overall result
    overall_result = prediction_results["Overall Result"]

    # Initialize Twilio client
    client = Client(account_sid, auth_token)

    if overall_result == "Fall Detected":
        # Extract a key frame from the video
        cap = cv2.VideoCapture(video_path)
        fallen_frame_index = None
        for i, (frame_key, frame_result) in enumerate(prediction_results["Frame Results"].items()):
            if frame_result == "Fallen":
                fallen_frame_index = i
                break

        # Extract and save the fallen frame
        if fallen_frame_index is not None:
            cap.set(cv2.CAP_PROP_POS_FRAMES, fallen_frame_index)
            ret, frame = cap.read()
            if ret:
                # Save to Google Drive
                fallen_frame_path = save_to_gdrive_and_get_link(frame, "fallen_frame.jpg")
                # Send MMS with a manual location
                message = client.messages.create(
                    body=f"Fall detected! Immediate assistance may be needed. Check Google Drive for the alert frame: {fallen_frame_path}",
                    from_=from_number,
                    to=to_number
                )
                print(f"Fall alert sent via SMS: SID {message.sid}")
            else:
                print("Failed to extract fallen frame.")
        else:
            print("No specific fallen frame detected in video.")
        cap.release()
    else:
        # Send a simple SMS indicating no fall was detected
        message = client.messages.create(
            body="Patient is okay. No fall detected.",
            from_=from_number,
            to=to_number
        )
        print(f"No fall detected notification sent: SID {message.sid}")


In [None]:
# Dummy prediction results for testing
fall_detected_results = {
    "Frame Results": {
        "Frame 1": "Not Fallen",
        "Frame 2": "Fallen",
        "Frame 3": "Fallen",
    },
    "Overall Result": "Fall Detected"
}

no_fall_detected_results = {
    "Frame Results": {
        "Frame 1": "Not Fallen",
        "Frame 2": "Not Fallen",
        "Frame 3": "Not Fallen",
    },
    "Overall Result": "No Fall Detected"
}

# Dummy video path
video_path = "person_falling_new.mp4"  # Replace this with the path to your test video file

##Twilio Credentials

In [None]:
# Twilio account details (replace with your actual credentials)
account_sid = "AC4711d8b017004bc588e6141e749cdf60"
auth_token = "ec36ccc0fad09bd6e411d2c7f8337006"
from_number = "whatsapp:+14155238886"
to_number = "whatsapp:+918593811202"

In [None]:
# Fall Detected Case
print("Testing Fall Detected Case:")
send_fall_notification_via_gdrive_simple(
    fall_detected_results,
    video_path,
    account_sid,
    auth_token,
    from_number,
    to_number
)

Testing Fall Detected Case:
Frame saved to: /content/drive/MyDrive/Fall_Detection_Alerts/fallen_frame.jpg
Manual location: MyDrive/Fall_Detection_Alerts/fallen_frame.jpg
Fall alert sent via SMS: SID SMf053af337749a6963b5161d753deda26


In [None]:
# No Fall Detected Case
print("\nTesting No Fall Detected Case:")
send_fall_notification_via_gdrive_simple(
    no_fall_detected_results,
    video_path,
    account_sid,
    auth_token,
    from_number,
    to_number
)


Testing No Fall Detected Case:
No fall detected notification sent: SID SM86a22a9d076522ae109d380a82aae1c5
