<a href="https://colab.research.google.com/github/Huni1i1/Running-Posture-Correction-AI/blob/main/RPC_Test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup

In [None]:
# Check the gpu
!nvidia-smi

In [None]:
# Install Detectron2
!pip3 install -q torch
!pip3 install -q torchvision
!pip3 install pyyaml
!python -m pip install 'git+https://github.com/facebookresearch/detectron2.git'

In [None]:
# Clone VideoPose3D
import os

os.chdir('/content')
!git clone https://github.com/facebookresearch/VideoPose3D

In [None]:
# Change infer_video_d2.py and run.py
import shutil

os.chdir('/content')
!git clone https://github.com/Huni1i1/RPC_requirements
shutil.move('/content/RPC_requirements/infer_video_d2.py', '/content/VideoPose3D/inference/infer_video_d2.py')
shutil.move('/content/RPC_requirements/run.py', '/content/VideoPose3D/run.py')

In [None]:
# Download pre-trained model
os.makedirs('/content/VideoPose3D/checkpoint', exist_ok = True)
!wget https://dl.fbaipublicfiles.com/video-pose-3d/pretrained_h36m_detectron_coco.bin -P /content/VideoPose3D/checkpoint

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
os.makedirs('/content/drive/MyDrive/CapstoneDesign/test/videos', exist_ok = True)
os.makedirs('/content/drive/MyDrive/CapstoneDesign/test/Det2', exist_ok = True)
os.makedirs('/content/drive/MyDrive/CapstoneDesign/test/VP3D_joint', exist_ok = True)
os.makedirs('/content/drive/MyDrive/CapstoneDesign/test/angles', exist_ok = True)
os.makedirs('/content/drive/MyDrive/CapstoneDesign/test/RPC', exist_ok = True)

In [None]:
ORIGINAL_VIDEOS = '/content/drive/MyDrive/CapstoneDesign/test/original_videos'

VIDEOS = '/content/drive/MyDrive/CapstoneDesign/test/videos'
DET2 = '/content/drive/MyDrive/CapstoneDesign/test/Det2'
VP3D_JOINT = '/content/drive/MyDrive/CapstoneDesign/test/VP3D_joint'
ANGLES = '/content/drive/MyDrive/CapstoneDesign/test/angles'
RPC = '/content/drive/MyDrive/CapstoneDesign/test/RPC'

# Infer Pose

In [None]:
# Resample videos to 50fps
import os
import glob
import subprocess

def resample_videos_to_50fps(input_folder, output_folder):
    video_files = sorted(glob.glob(os.path.join(input_folder, '*.mp4')))

    for input_video_path in video_files:
        output_video_path = os.path.join(output_folder, os.path.basename(input_video_path))

        print(f"Resample video to 50fps: {input_video_path} -> {output_video_path}")

        command = [
            "ffmpeg",
            "-i", input_video_path,
            "-filter:v", "fps=fps=50",
            "-c:a", "copy",
            output_video_path
        ]

        subprocess.run(command, check=True)

    print(f"All videos have been resampled and saved in {output_folder}.")

resample_videos_to_50fps(ORIGINAL_VIDEOS, VIDEOS)

In [None]:
# Infer 2D keypoints with Detectron2 and prepare custom dataset
def infer_Det2_and_prepare_dataset(VIDEOS, DET2):
    mp4_files = sorted(glob.glob(os.path.join(VIDEOS, '*.mp4')))

    for mp4_file in mp4_files:
        print(f"Inferring 2D keypoints of {mp4_file}")

        os.chdir('/content/VideoPose3D/inference')
        subprocess.run(['python', 'infer_video_d2.py',
                        '--cfg', 'COCO-Keypoints/keypoint_rcnn_R_101_FPN_3x.yaml',
                        '--output-dir', DET2,
                        '--image-ext', 'mp4',
                        mp4_file])

        os.chdir('/content/VideoPose3D/data')
        subprocess.run(['python', 'prepare_data_2d_custom.py',
                        '-i', DET2,
                        '-o', os.path.splitext(os.path.basename(mp4_file))[0]])

        for npz_file in glob.glob(os.path.join(DET2, '*.npz')):
          os.remove(npz_file)

infer_Det2_and_prepare_dataset(VIDEOS, DET2)

In [None]:
# Infer 3D keypoints with VideoPose3D
def infer_VP3D(npz_folder, VP3D_JOINT):
    npz_files = sorted(glob.glob(os.path.join(npz_folder, '*.npz')))

    for npz_file in npz_files:
        video_name = os.path.splitext(os.path.basename(npz_file))[0]
        video_name = video_name[15:]

        print(f"Inferring 3D keypoints of {npz_file}")

        os.chdir('/content/VideoPose3D')
        subprocess.run(['python', 'run.py',
            '-d', 'custom',
            '-k', f'{video_name}',
            '-arc', '3,3,3,3,3',
            '-c', 'checkpoint',
            '--evaluate', 'pretrained_h36m_detectron_coco.bin',
            '--render',
            '--viz-subject', f'{video_name}.mp4',
            '--viz-action', 'custom',
            '--viz-camera', '0',
            '--viz-size', '10',
            '--viz-export', f'{VP3D_JOINT}/{video_name}_joint'
            ])

npz_folder = '/content/VideoPose3D/data'

infer_VP3D(npz_folder, VP3D_JOINT)

In [None]:
os.chdir('/content/RPC_requirements')
import angle, dl

In [None]:
# Get angle dataset
def process_files(VP3D_JOINT, ANGLES):
    npy_files = sorted(glob.glob(os.path.join(VP3D_JOINT, '*.npy')))

    for file_path in npy_files:
        keypoints_data = np.load(file_path)
        print(f"Processing {file_path}...")

        processor = AngleProcessor(keypoints_data)
        angles = processor.getAngle()

        output_file_name = os.path.basename(file_path).replace('.npy', '_angles.npy')
        output_path = os.path.join(ANGLES, output_file_name)
        np.save(output_path, angles)

        print(f"Saved angles to {output_path}")


process_files(VP3D_JOINT, ANGLES)

In [None]:
# Infer poseture and render video
import os
import glob
import cv2
import numpy as np
import torch

label_mapping = {
    0: "Correct Posture",
    1: "Leaning Forward",
    2: "Leaning Backward",
    3: "Excessive Movement"
}

def estimate_posture(model, angles_data, device):
    model.eval()
    model.to(device)
    with torch.no_grad():
        angle_tensor = torch.tensor(angles_data, dtype=torch.float32).to(device)
        outputs = model(angle_tensor)
        _, predictions = torch.max(outputs, 1)
    return predictions.cpu().numpy()

def render_video_with_posture(video_path, angles_data, predictions, output_path):
    cap = cv2.VideoCapture(video_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    frame_idx = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret or frame_idx >= len(predictions):
            break

        label = label_mapping[predictions[frame_idx]]

        cv2.putText(frame, f"Posture: {label}", (50, 50),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

        out.write(frame)
        frame_idx += 1

    cap.release()
    out.release()
    print(f"Rendered video saved at {output_path}")

def process_all_files(angles_folder, videos_folder, output_folder, model_path):
    angles_files = sorted(glob.glob(os.path.join(angles_folder, '*.npy')))
    video_files = sorted(glob.glob(os.path.join(videos_folder, '*.mp4')))

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = PostureClassifierFCNN(input_size=15)
    model.load_state_dict(torch.load(model_path))
    model.to(device)

    for angles_file, video_file in zip(angles_files, video_files):
        video_name = os.path.basename(video_file)
        output_video_path = os.path.join(output_folder, f"output_{video_name}")

        angles_data = np.load(angles_file)

        predictions = estimate_posture(model, angles_data, device)

        render_video_with_posture(video_file, angles_data, predictions, output_video_path)

model_path = '/content/RPC_requirements/model.pth'

process_all_files(ANGLES, VIDEOS, RPC, model_path)