In [None]:
import cv2
import mediapipe as mp
import numpy as np
from tqdm import tqdm
import subprocess
import json
import os
import tensorflow as tf
from collections import deque

# --- 1. SETUP, CONSTANTS & MODEL LOADING ---

# --- Model and Processing Constants ---
MODEL_PATH = 'best_model.h5'  # <--- IMPORTANT: SET YOUR MODEL FILENAME HERE
SEQUENCE_LENGTH = 150
NUM_FEATURES = 66

# --- Directory Setup ---
COMPRESSED_FOLDER = "compressed_videos"
OUTPUT_FOLDER = "output_videos"
os.makedirs(COMPRESSED_FOLDER, exist_ok=True)
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

# --- Load the Trained GRU Model ---
print("--- Loading TensorFlow Model ---")
if not os.path.exists(MODEL_PATH):
    raise FileNotFoundError(f"Model file not found at: {MODEL_PATH}")
model = tf.keras.models.load_model(MODEL_PATH)
print("âœ… Model loaded successfully.")

# --- MediaPipe Initialization ---
mp_pose = mp.solutions.pose

# --- 2. HELPER FUNCTIONS ---

def draw_body_landmarks(image, landmarks):
    """Draws the body skeleton on an image."""
    h, w, _ = image.shape
    # Define connections between keypoints to form a skeleton
    pairs = [
        (mp_pose.PoseLandmark.LEFT_SHOULDER, mp_pose.PoseLandmark.RIGHT_SHOULDER),
        (mp_pose.PoseLandmark.LEFT_HIP, mp_pose.PoseLandmark.RIGHT_HIP),
        (mp_pose.PoseLandmark.LEFT_SHOULDER, mp_pose.PoseLandmark.LEFT_HIP),
        (mp_pose.PoseLandmark.RIGHT_SHOULDER, mp_pose.PoseLandmark.RIGHT_HIP),
        (mp_pose.PoseLandmark.LEFT_SHOULDER, mp_pose.PoseLandmark.LEFT_ELBOW),
        (mp_pose.PoseLandmark.LEFT_ELBOW, mp_pose.PoseLandmark.LEFT_WRIST),
        (mp_pose.PoseLandmark.RIGHT_SHOULDER, mp_pose.PoseLandmark.RIGHT_ELBOW),
        (mp_pose.PoseLandmark.RIGHT_ELBOW, mp_pose.PoseLandmark.RIGHT_WRIST),
        (mp_pose.PoseLandmark.LEFT_HIP, mp_pose.PoseLandmark.LEFT_KNEE),
        (mp_pose.PoseLandmark.LEFT_KNEE, mp_pose.PoseLandmark.LEFT_ANKLE),
        (mp_pose.PoseLandmark.RIGHT_HIP, mp_pose.PoseLandmark.RIGHT_KNEE),
        (mp_pose.PoseLandmark.RIGHT_KNEE, mp_pose.PoseLandmark.RIGHT_ANKLE),
    ]
    # Draw lines for each pair of connected keypoints
    for a, b in pairs:
        pa, pb = landmarks.landmark[a.value], landmarks.landmark[b.value]
        # Only draw if both points are reasonably visible
        if pa.visibility > 0.5 and pb.visibility > 0.5:
            cv2.line(image, (int(pa.x * w), int(pa.y * h)), (int(pb.x * w), int(pb.y * h)), (255, 0, 0), 2)
            cv2.circle(image, (int(pa.x * w), int(pa.y * h)), 4, (0, 0, 255), -1)
            cv2.circle(image, (int(pb.x * w), int(pb.y * h)), 4, (0, 0, 255), -1)
    return image

def draw_prediction_info(image, status, confidence):
    """Draws the prediction status and confidence on the image."""
    color = (0, 255, 0) if status == "Correct" else (0, 0, 255)
    
    # Status box
    cv2.rectangle(image, (0, 0), (320, 60), (245, 117, 16), -1)
    
    # Display Status
    cv2.putText(image, 'STATUS', (15, 20),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
    cv2.putText(image, status, (10, 50),
                cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2, cv2.LINE_AA)

    # Display Confidence
    cv2.putText(image, 'CONF', (220, 20),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
    cv2.putText(image, f'{confidence:.2%}', (210, 50),
                cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2, cv2.LINE_AA)
    return image


def get_video_resolution(video_path):
    """Gets video resolution using ffprobe."""
    command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0',
               '-show_entries', 'stream=width,height', '-of', 'json', video_path]
    try:
        result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
        info = json.loads(result.stdout)
        return info['streams'][0]['width'], info['streams'][0]['height']
    except (subprocess.CalledProcessError, FileNotFoundError):
        print("Error: ffprobe not found. Please ensure ffmpeg is installed and in your system's PATH.")
        return None, None

# --- 3. USER INPUT ---

choice = input("Enter 'v' for video upload or 'w' for webcam: ").lower().strip()

# --- 4. VIDEO vs WEBCAM LOGIC ---

if choice == 'v':
    input_video = input("Enter the full path to your video file: ").strip().strip('"') # Strip quotes for drag-and-drop
    if not os.path.exists(input_video):
        raise FileNotFoundError(f"The file was not found at: {input_video}")

    base_filename = os.path.basename(input_video)
    compressed_video = os.path.join(COMPRESSED_FOLDER, f"compressed_{base_filename}")
    output_filename = os.path.join(OUTPUT_FOLDER, f"output_{base_filename}")

    print(f"\n--- Step 1: Compressing '{input_video}' ---")
    command = [
        'ffmpeg', '-i', input_video, '-vf', 'scale=-1:480', '-an', 
        '-vcodec', 'libx264', '-crf', '28', compressed_video, '-y'
    ]
    subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    print(f"âœ… Compression complete. Processing '{compressed_video}'...")
    video_path = compressed_video
    use_webcam = False

elif choice == 'w':
    print("\nðŸŽ¥ Using webcam.")
    video_path = 0
    use_webcam = True
    output_filename = None

else:
    raise ValueError("Invalid choice. Please enter 'v' or 'w'.")

# --- 5. MAIN PROCESSING LOOP ---

cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    raise IOError(f"Cannot open video source: {video_path}")

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) if not use_webcam else 30

out = None
if not use_webcam:
    out = cv2.VideoWriter(output_filename, cv2.VideoWriter_fourcc(*'mp4v'),
                          fps, (frame_width, frame_height))

# A deque is a highly efficient list for adding/removing from the ends.
# It will store the last SEQUENCE_LENGTH frames of landmark data.
sequence_data = deque(maxlen=SEQUENCE_LENGTH)
current_status = "Waiting..."
prediction_confidence = 0.0

with mp_pose.Pose(
    static_image_mode=False, model_complexity=1,
    min_detection_confidence=0.5, min_tracking_confidence=0.5
) as pose:

    # Determine total frames for tqdm progress bar
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if not use_webcam else None
    
    # Initialize tqdm
    progress_bar = tqdm(total=total_frames, desc="Processing Video", unit="frame") if not use_webcam else None
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if use_webcam:
            frame = cv2.flip(frame, 1) # Mirror webcam feed
            
        # --- Landmark Extraction ---
        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb)
        
        # --- Feature Vector Creation ---
        frame_features = []
        if results.pose_landmarks:
            landmarks = results.pose_landmarks.landmark
            body_landmark_indices = list(range(11, 33)) # 22 body landmarks

            for index in body_landmark_indices:
                lm = landmarks[index]
                frame_features.extend([lm.x, lm.y, lm.z])
        else:
            # If no landmarks, append a zero vector
            frame_features = [0.0] * NUM_FEATURES
            
        sequence_data.append(frame_features)

        # --- Prediction Logic ---
        # Only predict when we have a full sequence
        if len(sequence_data) == SEQUENCE_LENGTH:
            input_data = np.expand_dims(np.array(sequence_data), axis=0)
            prediction = model.predict(input_data, verbose=0)[0][0]
            prediction_confidence = prediction
            
            current_status = "Correct" if prediction > 0.5 else "Wrong"

        # --- Annotation and Display ---
        annotated_image = frame.copy()
        if results.pose_landmarks:
            annotated_image = draw_body_landmarks(annotated_image, results.pose_landmarks)
        
        annotated_image = draw_prediction_info(annotated_image, current_status, prediction_confidence)

        if use_webcam:
            cv2.imshow("Push-Up Form Checker", annotated_image)
            if cv2.waitKey(5) & 0xFF == ord('q'):
                break
        else:
            out.write(annotated_image)
            progress_bar.update(1)

# --- 6. CLEANUP ---
if progress_bar:
    progress_bar.close()
cap.release()
if out:
    out.release()
cv2.destroyAllWindows()

if not use_webcam:
    print(f"\nâœ… Processing complete. Output saved as '{output_filename}'.")

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'



--- Loading TensorFlow Model ---
âœ… Model loaded successfully.

--- Step 1: Compressing 'D:\visionpipeline\sampledata\example2.mp4' ---


CalledProcessError: Command '['ffmpeg', '-i', 'D:\\visionpipeline\\sampledata\\example2.mp4', '-vf', 'scale=-1:480', '-an', '-vcodec', 'libx264', '-crf', '28', 'compressed_videos\\compressed_example2.mp4', '-y']' returned non-zero exit status 3752568763.

OVERFITTED

In [3]:
import cv2
import mediapipe as mp
import numpy as np
from tqdm import tqdm
import subprocess
import json
import os
import tensorflow as tf
from collections import deque

# --- 1. SETUP, CONSTANTS & MODEL LOADING ---

# --- Model and Processing Constants ---
MODEL_PATH = 'best_model.h5'  # <--- IMPORTANT: SET YOUR MODEL FILENAME HERE
SEQUENCE_LENGTH = 150
NUM_FEATURES = 66

# --- Directory Setup ---
COMPRESSED_FOLDER = "compressed_videos"
OUTPUT_FOLDER = "output_videos"
os.makedirs(COMPRESSED_FOLDER, exist_ok=True)
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

# --- Load the Trained GRU Model ---
print("--- Loading TensorFlow Model ---")
if not os.path.exists(MODEL_PATH):
    raise FileNotFoundError(f"Model file not found at: {MODEL_PATH}")
model = tf.keras.models.load_model(MODEL_PATH)
print("âœ… Model loaded successfully.")

# --- MediaPipe Initialization ---
mp_pose = mp.solutions.pose

# --- 2. HELPER FUNCTIONS ---

def draw_body_landmarks(image, landmarks):
    """Draws the body skeleton on an image."""
    h, w, _ = image.shape
    # Define connections between keypoints to form a skeleton
    pairs = [
        (mp_pose.PoseLandmark.LEFT_SHOULDER, mp_pose.PoseLandmark.RIGHT_SHOULDER),
        (mp_pose.PoseLandmark.LEFT_HIP, mp_pose.PoseLandmark.RIGHT_HIP),
        (mp_pose.PoseLandmark.LEFT_SHOULDER, mp_pose.PoseLandmark.LEFT_HIP),
        (mp_pose.PoseLandmark.RIGHT_SHOULDER, mp_pose.PoseLandmark.RIGHT_HIP),
        (mp_pose.PoseLandmark.LEFT_SHOULDER, mp_pose.PoseLandmark.LEFT_ELBOW),
        (mp_pose.PoseLandmark.LEFT_ELBOW, mp_pose.PoseLandmark.LEFT_WRIST),
        (mp_pose.PoseLandmark.RIGHT_SHOULDER, mp_pose.PoseLandmark.RIGHT_ELBOW),
        (mp_pose.PoseLandmark.RIGHT_ELBOW, mp_pose.PoseLandmark.RIGHT_WRIST),
        (mp_pose.PoseLandmark.LEFT_HIP, mp_pose.PoseLandmark.LEFT_KNEE),
        (mp_pose.PoseLandmark.LEFT_KNEE, mp_pose.PoseLandmark.LEFT_ANKLE),
        (mp_pose.PoseLandmark.RIGHT_HIP, mp_pose.PoseLandmark.RIGHT_KNEE),
        (mp_pose.PoseLandmark.RIGHT_KNEE, mp_pose.PoseLandmark.RIGHT_ANKLE),
    ]
    # Draw lines for each pair of connected keypoints
    for a, b in pairs:
        pa, pb = landmarks.landmark[a.value], landmarks.landmark[b.value]
        if pa.visibility > 0.5 and pb.visibility > 0.5:
            cv2.line(image, (int(pa.x * w), int(pa.y * h)), (int(pb.x * w), int(pb.y * h)), (255, 0, 0), 2)
            cv2.circle(image, (int(pa.x * w), int(pa.y * h)), 4, (0, 0, 255), -1)
            cv2.circle(image, (int(pb.x * w), int(pb.y * h)), 4, (0, 0, 255), -1)
    return image

def draw_prediction_info(image, status, confidence):
    """Draws the prediction status and confidence on the image."""
    color = (0, 255, 0) if status == "Correct" else (0, 0, 255)
    cv2.rectangle(image, (0, 0), (320, 60), (245, 117, 16), -1)
    cv2.putText(image, 'STATUS', (15, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
    cv2.putText(image, status, (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2, cv2.LINE_AA)
    cv2.putText(image, 'CONF', (220, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
    cv2.putText(image, f'{confidence:.2%}', (210, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2, cv2.LINE_AA)
    return image


def get_video_resolution(video_path):
    """Gets video resolution using ffprobe."""
    command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0',
               '-show_entries', 'stream=width,height', '-of', 'json', video_path]
    try:
        result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
        info = json.loads(result.stdout)
        return info['streams'][0]['width'], info['streams'][0]['height']
    except (subprocess.CalledProcessError, FileNotFoundError):
        print("Error: ffprobe not found. Please ensure ffmpeg is installed and in your system's PATH.")
        return None, None

# --- 3. USER INPUT ---

choice = input("Enter 'v' for video upload or 'w' for webcam: ").lower().strip()

# --- 4. VIDEO vs WEBCAM LOGIC --- (THIS BLOCK IS UPDATED)

if choice == 'v':
    input_video = input("Enter the full path to your video file: ").strip().strip('"')
    if not os.path.exists(input_video):
        raise FileNotFoundError(f"The file was not found at: {input_video}")

    # Generate organized file paths for compressed and final output
    base_filename = os.path.basename(input_video)
    compressed_video = os.path.join(COMPRESSED_FOLDER, f"compressed_{base_filename}")
    output_filename = os.path.join(OUTPUT_FOLDER, f"output_{base_filename}")

    print(f"\n--- Step 1: Compressing '{input_video}' ---")

    width, height = get_video_resolution(input_video)
    if width is None:
        raise IOError("Could not read video resolution. Is ffprobe installed?")

    # This is your proven FFmpeg logic
    if height < 720:
        print("Video is below 720p. Removing audio only...")
        command = ['ffmpeg', '-i', input_video, '-c', 'copy', '-an', compressed_video, '-y']
    else:
        print("Video is 720p or higher. Resizing to 720p, removing audio, and compressing...")
        command = [
            'ffmpeg', '-i', input_video,
            '-vf', 'scale=trunc((iw/ih)*720/2)*2:720,setsar=1:1',
            '-an', '-vcodec', 'libx264', '-crf', '28', compressed_video, '-y'
        ]

    # Run the command, hiding the verbose output unless there's an error
    try:
        subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
    except subprocess.CalledProcessError as e:
        print("\n--- FFMPEG ERROR ---")
        print("FFmpeg command failed. Error message:")
        print(e.stderr.decode())
        print("--------------------")
        raise

    print(f"âœ… Compression complete. Processing '{compressed_video}'...")
    video_path = compressed_video
    use_webcam = False

elif choice == 'w':
    print("\nðŸŽ¥ Using webcam.")
    video_path = 0
    use_webcam = True
    output_filename = None

else:
    raise ValueError("Invalid choice. Please enter 'v' or 'w'.")

# --- 5. MAIN PROCESSING LOOP ---

cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    raise IOError(f"Cannot open video source: {video_path}")

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) if not use_webcam else 30

out = None
if not use_webcam:
    out = cv2.VideoWriter(output_filename, cv2.VideoWriter_fourcc(*'mp4v'),
                          fps, (frame_width, frame_height))

sequence_data = deque(maxlen=SEQUENCE_LENGTH)
current_status = "Waiting..."
prediction_confidence = 0.0

with mp_pose.Pose(
    static_image_mode=False, model_complexity=1,
    min_detection_confidence=0.5, min_tracking_confidence=0.5
) as pose:

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if not use_webcam else None
    progress_bar = tqdm(total=total_frames, desc="Analyzing Video", unit="frame") if not use_webcam else None
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if use_webcam:
            frame = cv2.flip(frame, 1)
            
        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb)
        
        frame_features = []
        if results.pose_landmarks:
            landmarks = results.pose_landmarks.landmark
            body_landmark_indices = list(range(11, 33))
            for index in body_landmark_indices:
                lm = landmarks[index]
                frame_features.extend([lm.x, lm.y, lm.z])
        else:
            frame_features = [0.0] * NUM_FEATURES
            
        sequence_data.append(frame_features)

        if len(sequence_data) == SEQUENCE_LENGTH:
            input_data = np.expand_dims(np.array(sequence_data), axis=0)
            prediction = model.predict(input_data, verbose=0)[0][0]
            prediction_confidence = prediction
            current_status = "Correct" if prediction > 0.5 else "Wrong"

        annotated_image = frame.copy()
        if results.pose_landmarks:
            annotated_image = draw_body_landmarks(annotated_image, results.pose_landmarks)
        
        annotated_image = draw_prediction_info(annotated_image, current_status, prediction_confidence)

        if use_webcam:
            cv2.imshow("Push-Up Form Checker", annotated_image)
            if cv2.waitKey(5) & 0xFF == ord('q'):
                break
        else:
            out.write(annotated_image)
            progress_bar.update(1)

# --- 6. CLEANUP ---
if progress_bar:
    progress_bar.close()
cap.release()
if out:
    out.release()
cv2.destroyAllWindows()

if not use_webcam:
    print(f"\nâœ… Processing complete. Output saved to '{output_filename}'.")



--- Loading TensorFlow Model ---
âœ… Model loaded successfully.

--- Step 1: Compressing 'D:\visionpipeline\sampledata\example2.mp4' ---
Video is below 720p. Removing audio only...
âœ… Compression complete. Processing 'compressed_videos\compressed_example2.mp4'...


Analyzing Video: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 325/325 [00:26<00:00, 12.41frame/s]


âœ… Processing complete. Output saved to 'output_videos\output_example2.mp4'.





In [2]:
!ffmpeg --version


ffmpeg version 8.0-essentials_build-www.gyan.dev Copyright (c) 2000-2025 the FFmpeg developers
  built with gcc 15.2.0 (Rev8, Built by MSYS2 project)
  configuration: --enable-gpl --enable-version3 --enable-static --disable-w32threads --disable-autodetect --enable-fontconfig --enable-iconv --enable-gnutls --enable-libxml2 --enable-gmp --enable-bzlib --enable-lzma --enable-zlib --enable-libsrt --enable-libssh --enable-libzmq --enable-avisynth --enable-sdl2 --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxvid --enable-libaom --enable-libopenjpeg --enable-libvpx --enable-mediafoundation --enable-libass --enable-libfreetype --enable-libfribidi --enable-libharfbuzz --enable-libvidstab --enable-libvmaf --enable-libzimg --enable-amf --enable-cuda-llvm --enable-cuvid --enable-dxva2 --enable-d3d11va --enable-d3d12va --enable-ffnvcodec --enable-libvpl --enable-nvdec --enable-nvenc --enable-vaapi --enable-openal --enable-libgme --enable-libopenmpt --enable-libopencore-amrwb --enabl