In [2]:
print("Hello World!!")

Hello World!!


## Installing libraries

In [6]:
!pip install tensorflow tensorflow-hub joblib



# Running the model for single person (very latent)

### Imports

In [None]:
# 📦 Imports
import cv2
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
import joblib

### Load Model

In [None]:
# ✅ Load model & label encoder
model = tf.keras.models.load_model("yoga_pose_nn.h5")
label_encoder = joblib.load("label_encoder.pkl")

# 🔄 Load MoveNet model from TF Hub
movenet = hub.load("https://tfhub.dev/google/movenet/singlepose/thunder/4")

def detect_keypoints_from_frame(frame):
    """Extract 17 keypoints (x, y) from an RGB frame using MoveNet."""
    img = tf.image.resize_with_pad(tf.convert_to_tensor(frame), 256, 256)
    input_img = tf.expand_dims(tf.cast(img, dtype=tf.int32), axis=0)
    keypoints = movenet.signatures['serving_default'](input_img)['output_0'].numpy()
    return keypoints[0, 0, :, :2].flatten()


### Checking in real-time working on single person

In [None]:
# 🎥 Start webcam
cap = cv2.VideoCapture('yog.mp4')
print("Press 'q' to quit.")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    try:
        keypoints = detect_keypoints_from_frame(frame_rgb)
        keypoints = np.expand_dims(keypoints, axis=0)
        prediction = model.predict(keypoints)
        class_id = np.argmax(prediction)
        label = label_encoder.inverse_transform([class_id])[0]

        cv2.putText(frame, f"Pose: {label}", (10, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 3)
    except Exception as e:
        cv2.putText(frame, "Pose: Not detected", (10, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)

    cv2.imshow("Yoga Pose Classifier", frame)

    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()

# Running the model for multi person using yolo

### Imports

In [None]:
!pip install tensorflow tensorflow-hub opencv-python joblib ultralytics

In [None]:
import cv2
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
import joblib
from ultralytics import YOLO

print("Cell Executed")

### Loading Model

In [None]:
# Load YOLOv8 model (pretrained on COCO)
yolo_model = YOLO("yolov8n.pt")

# Load MoveNet and classifier
movenet = hub.load("https://tfhub.dev/google/movenet/singlepose/thunder/4")
pose_model = tf.keras.models.load_model("yoga_pose_nn.h5")
label_encoder = joblib.load("label_encoder.pkl")

# Helper to extract keypoints
def detect_keypoints_from_crop(crop):
    img = tf.image.resize_with_pad(tf.convert_to_tensor(crop), 256, 256)
    input_img = tf.expand_dims(tf.cast(img, dtype=tf.int32), axis=0)
    keypoints = movenet.signatures['serving_default'](input_img)['output_0'].numpy()
    return keypoints[0, 0, :, :2].flatten()

### Model checking on video

In [None]:
# Start webcam
cap = cv2.VideoCapture('my.mp4')
print("Press 'q' to quit.")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # YOLOv8 inference
    results = yolo_model(frame)[0]
    boxes = results.boxes.xyxy.cpu().numpy()
    classes = results.boxes.cls.cpu().numpy()

    for i, cls in enumerate(classes):
        if int(cls) != 0:  # 0 = person class in COCO
            continue

        x1, y1, x2, y2 = map(int, boxes[i])
        person_crop = frame[y1:y2, x1:x2]

        try:
            keypoints = detect_keypoints_from_crop(person_crop)
            prediction = pose_model.predict(np.expand_dims(keypoints, axis=0))
            class_id = np.argmax(prediction)
            label = label_encoder.inverse_transform([class_id])[0]
            confidence = np.max(prediction)

            # Draw box + label
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, f"{label} ({confidence:.2f})", (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

        except Exception as e:
            cv2.putText(frame, "Pose not detected", (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

    # Add this right before cv2.imshow()
    resized_frame = cv2.resize(
        frame, 
        (1280, 720),  # (width, height)
        interpolation=cv2.INTER_LINEAR  # Use INTER_AREA for downsizing
    )
    cv2.imshow("Multi-Person Yoga Pose Classifier", resized_frame)

    # cv2.imshow("Multi-Person Yoga Pose Classifier", frame)

    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()

# Reducing latency code for multi person

### Check if GPU is available

In [4]:
# Check if GPU is available
import torch
print(torch.cuda.is_available())  # Should print True if GPU is detected


True


In [5]:
import tensorflow as tf
print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))


Num GPUs Available: 0


In [6]:
import torch
print("Is CUDA available?", torch.cuda.is_available())
print("Device:", torch.cuda.get_device_name(0))


Is CUDA available? True
Device: NVIDIA GeForce GTX 1650


In [7]:
import torch

print("Number of GPU: ", torch.cuda.device_count())
print("GPU Name: ", torch.cuda.get_device_name())


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

Number of GPU:  1
GPU Name:  NVIDIA GeForce GTX 1650
Using device: cuda


### Example Code to check optimization

In [None]:
import cv2
import threading
import queue
import numpy as np
import torch
import tensorflow as tf
import tensorflow_hub as hub
from sklearn.preprocessing import LabelEncoder

In [None]:
import torch
from ultralytics import YOLO  # Install ultralytics first

# Load model (correct method for YOLOv8+)
yolo_model = YOLO('yolov8n.pt')  # Automatically uses GPU if available

print("Executed")

In [None]:
# Load MoveNet Thunder from TF Hub
movenet = hub.load("https://tfhub.dev/google/movenet/singlepose/thunder/4")
input_size = 256

# Dummy functions to load your pose classification model and label encoder
# Replace with your actual model loading code!
def load_pose_model():
    # Example: TensorFlow/Keras model
    from tensorflow.keras.models import load_model
    return load_model("yoga_pose_nn.h5")

def load_label_encoder():
    import joblib
    return joblib.load("label_encoder.pkl")

pose_model = load_pose_model()
label_encoder = load_label_encoder()

print("Done")

In [None]:
# === Helper functions ===

def detect_keypoints_from_crop(crop_img):
    # Preprocess crop for MoveNet
    img = tf.image.resize_with_pad(tf.expand_dims(crop_img, axis=0), input_size, input_size)
    img = tf.cast(img, dtype=tf.int32)
    
    # Run MoveNet
    outputs = movenet.signatures['serving_default'](img)
    keypoints = outputs['output_0'].numpy()
    keypoints = keypoints[0, 0, :, :2].flatten()  # (17*2,)
    return keypoints

print("Executed")

In [None]:
# === Video Capture Thread ===

frame_queue = queue.Queue(maxsize=5)
stop_signal = False

def video_capture_thread(video_path=0):  # 0 for webcam, or filename for video
    global stop_signal
    cap = cv2.VideoCapture(video_path)
    while not stop_signal:
        ret, frame = cap.read()
        if not ret:
            break
        if not frame_queue.full():
            frame_queue.put(frame)
    cap.release()

# === Main Processing Loop ===

def main(video_path=0):
    global stop_signal

    # Start video capture thread
    threading.Thread(target=video_capture_thread, args=(video_path,), daemon=True).start()

    frame_skip = 2  # skip frames to speed up, adjust as needed
    frame_count = 0

    while True:
        if not frame_queue.empty():
            frame = frame_queue.get()

            if frame_count % frame_skip != 0:
                frame_count += 1
                continue
            frame_count += 1

            # Resize for faster YOLO inference
            small_frame = cv2.resize(frame, (640, 360))
            
            # YOLO expects images in RGB
            small_frame_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)

            # Run YOLO on GPU
            results = yolo_model(small_frame_rgb)[0]

            scale_x = frame.shape[1] / 640
            scale_y = frame.shape[0] / 360

            boxes = results.boxes.xyxy.cpu().numpy() * [scale_x, scale_y, scale_x, scale_y]
            boxes = boxes.astype(int)
            classes = results.boxes.cls.cpu().numpy()

            for i, cls in enumerate(classes):
                if int(cls) != 0:
                    continue  # Only person class

                x1, y1, x2, y2 = boxes[i]
                person_crop = frame[y1:y2, x1:x2]

                try:
                    keypoints = detect_keypoints_from_crop(person_crop)
                    prediction = pose_model.predict(np.expand_dims(keypoints, axis=0))
                    class_id = np.argmax(prediction)
                    label = label_encoder.inverse_transform([class_id])[0]
                    confidence = np.max(prediction)

                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                    cv2.putText(frame, f"{label} ({confidence:.2f})", (x1, y1 - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

                except Exception as e:
                    cv2.putText(frame, "Pose not detected", (x1, y1 - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

            cv2.imshow("Multi-Person Yoga Pose Classifier", frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                stop_signal = True
                break

    cv2.destroyAllWindows()

# === Run ===
if __name__ == "__main__":
    main("my.mp4")  # 0 for webcam, or replace with filename like "myvideo.mp4"

## Using GPU for better performance

### No latency but can't detect the pose 

In [30]:
import cv2
import numpy as np
import tensorflow_hub as hub
import tensorflow as tf
from ultralytics import YOLO
from concurrent.futures import ThreadPoolExecutor
from sklearn.preprocessing import LabelEncoder
import joblib
import os

# Load YOLOv8
yolo_model = YOLO('yolov8n.pt')  # or yolov8s.pt for better accuracy

# Load MoveNet
movenet = hub.load("https://tfhub.dev/google/movenet/singlepose/thunder/4")
input_size = 256

# Load trained pose classifier (Neural Network model)
pose_model = tf.keras.models.load_model("yoga_pose_nn.h5")
label_encoder = joblib.load("label_encoder.pkl")




In [None]:

# Helper to extract keypoints from cropped person image
def detect_keypoints_from_crop(image):
    img = cv2.resize(image, (input_size, input_size))
    img = tf.image.convert_image_dtype(img, tf.float32)
    input_img = tf.expand_dims(img, axis=0)
    outputs = movenet.signatures['serving_default'](input_img)
    keypoints = outputs['output_0'].numpy()[0, 0, :, :2]
    return keypoints.flatten()

# Threaded per-person inference
def process_person(crop, box, results_list):
    try:
        keypoints = detect_keypoints_from_crop(crop)
        prediction = pose_model.predict(np.expand_dims(keypoints, axis=0), verbose=0)
        class_id = np.argmax(prediction)
        label = label_encoder.inverse_transform([class_id])[0]
        confidence = np.max(prediction)
        results_list.append((box, label, confidence))
    except Exception as e:
        results_list.append((box, "Pose Not Detected", 0.0))

def main(video_path="yog.mp4", save_output=True):
    cap = cv2.VideoCapture(video_path)
    print("Starting video stream... Press 'q' to quit.")

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = None

    if not cap.isOpened():
        print("Failed to open video.")
        return

    executor = ThreadPoolExecutor(max_workers=6)

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        results = yolo_model(frame)[0]
        boxes = results.boxes.xyxy.cpu().numpy().astype(int)
        classes = results.boxes.cls.cpu().numpy().astype(int)

        tasks = []
        results_list = []

        for i, cls in enumerate(classes):
            if cls != 0:  # Only person class
                continue
            x1, y1, x2, y2 = boxes[i]
            crop = frame[y1:y2, x1:x2]
            tasks.append(executor.submit(process_person, crop, (x1, y1, x2, y2), results_list))

        # Wait for all threads to finish
        for task in tasks:
            task.result()

        # Draw results
        for box, label, conf in results_list:
            x1, y1, x2, y2 = box
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, f"{label} ({conf:.2f})", (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

        # Resize frame for display/output
        frame = cv2.resize(frame, (1280, 720))

        # Save output if enabled
        if save_output:
            if out is None:
                os.makedirs("outputs", exist_ok=True)
                out = cv2.VideoWriter("outputs/predicted_output.avi", fourcc, 20.0, (1280, 720))
            out.write(frame)

        cv2.imshow("Yoga Pose Classifier", frame)
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    cap.release()
    if out:
        out.release()
    cv2.destroyAllWindows()
    print("Inference complete. Output saved to outputs/predicted_output.avi")


In [None]:
main("yog.mp4")  # Replace with 0 for webcam

### Little latency but detecting pose good

In [None]:
import cv2
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
from ultralytics import YOLO
from sklearn.preprocessing import LabelEncoder
import threading
import joblib

# Load models
yolo_model = YOLO("yolov8n.pt")
movenet = hub.load("https://tfhub.dev/google/movenet/singlepose/thunder/4")
pose_model = tf.keras.models.load_model("yoga_pose_nn.h5")
label_encoder = joblib.load("label_encoder.pkl")

input_size = 256  # Thunder model requires 256x256 input

In [57]:
def detect_keypoints_from_crop(crop_img):
    # Convert to TensorFlow tensor and maintain aspect ratio
    img_tensor = tf.convert_to_tensor(cv2.cvtColor(crop_img, cv2.COLOR_BGR2RGB))
    img_tensor = tf.image.resize_with_pad(img_tensor, input_size, input_size)
    
    # Convert to int32 with 0-255 range (CRITICAL FIX)
    img_tensor = tf.cast(img_tensor, dtype=tf.int32)
    
    # Add batch dimension and run inference
    input_image = tf.expand_dims(img_tensor, axis=0)
    outputs = movenet.signatures['serving_default'](input_image)
    
    # Get normalized keypoints and convert to pixel coordinates
    keypoints = outputs['output_0'].numpy()[0, 0]  # Shape: (17, 3)
    
    # Denormalize to crop coordinates
    h, w = crop_img.shape[:2]
    keypoints[:, 0] *= h  # y-coordinates
    keypoints[:, 1] *= w  # x-coordinates
    
    return keypoints[:, :2].flatten()  # Return (34,) array of (x,y) pairs

def classify_pose(crop_img):
    try:
        keypoints = detect_keypoints_from_crop(crop_img)
        # Normalize keypoints for classifier
        normalized_kps = keypoints / np.array([crop_img.shape[1], crop_img.shape[0]] * 17)
        prediction = pose_model.predict(np.expand_dims(normalized_kps, axis=0), verbose=0)
        class_id = np.argmax(prediction)
        label = label_encoder.inverse_transform([class_id])[0]
        confidence = np.max(prediction)
        return label, confidence
    except Exception as e:
        return "Pose not detected", 0

def process_person(frame, box, results_list, idx):
    x1, y1, x2, y2 = map(int, box)
    crop = frame[y1:y2, x1:x2]
    label, confidence = classify_pose(crop)
    results_list[idx] = (label, confidence, (x1, y1, x2, y2))

def process_frame(frame):
    results = yolo_model(frame)[0]
    boxes = results.boxes.xyxy.cpu().numpy()
    classes = results.boxes.cls.cpu().numpy().astype(int)

    threads = []
    results_list = [None] * len(boxes)

    for i, cls in enumerate(classes):
        if cls != 0:
            continue
        thread = threading.Thread(target=process_person, args=(frame, boxes[i], results_list, i))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    for result in results_list:
        if result is None:
            continue
        label, confidence, (x1, y1, x2, y2) = result
        color = (0, 255, 0) if label != "Pose not detected" else (0, 0, 255)
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        cv2.putText(frame, f"{label} ({confidence:.2f})", (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    return frame

def main(video_path):
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = None
    
    print("Press 'q' to quit.")
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
    
        frame = process_frame(frame)
        frame = cv2.resize(frame, (1280, 720))
    
        if out is None:
            fps = cap.get(cv2.CAP_PROP_FPS)
            out = cv2.VideoWriter("output_pose.mp4", fourcc, fps, (1280, 720))
        out.write(frame)
    
        cv2.imshow("Yoga Pose Detection", frame)
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break


    cap.release()
    out.release()
    cv2.destroyAllWindows()


In [58]:
main("yog.mp4")  # Or use 0 for webcam

Press 'q' to quit.

0: 384x640 1 person, 89.4ms
Speed: 247.2ms preprocess, 89.4ms inference, 503.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 61.0ms
Speed: 10.1ms preprocess, 61.0ms inference, 3.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 61.4ms
Speed: 11.3ms preprocess, 61.4ms inference, 4.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 60.5ms
Speed: 8.7ms preprocess, 60.5ms inference, 3.1ms postprocess per image at shape (1, 3, 384, 640)


### Using lite movenet

In [4]:
# Old: Using TensorFlow Hub
# movenet = hub.load("https://tfhub.dev/google/movenet/singlepose/thunder/4")

# New: Using TensorFlow Lite
import os
import requests
import tensorflow as tf

TFLITE_MODEL_URL = "https://tfhub.dev/google/lite-model/movenet/singlepose/thunder/tflite/float16/4?lite-format=tflite"
TFLITE_MODEL_PATH = "movenet_thunder.tflite"

if not os.path.exists(TFLITE_MODEL_PATH):
    print("Downloading MoveNet Thunder TFLite model...")
    r = requests.get(TFLITE_MODEL_URL)
    with open(TFLITE_MODEL_PATH, "wb") as f:
        f.write(r.content)

interpreter = tf.lite.Interpreter(model_path=TFLITE_MODEL_PATH)
interpreter.allocate_tensors()

input_size = 256


In [10]:
def detect_keypoints_from_crop(crop_img):
    # Preprocess
    img_rgb = cv2.cvtColor(crop_img, cv2.COLOR_BGR2RGB)
    img_resized = tf.image.resize_with_pad(img_rgb, input_size, input_size)
    input_image = tf.cast(tf.expand_dims(img_resized, axis=0), dtype=tf.uint8)  # uint8 for TFLite

    # TFLite inference
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    interpreter.set_tensor(input_details[0]['index'], input_image.numpy())
    interpreter.invoke()
    keypoints_with_scores = interpreter.get_tensor(output_details[0]['index'])  # [1, 1, 17, 3]

    # Convert to crop coordinates
    keypoints = keypoints_with_scores[0, 0, :, :]  # (17, 3)
    h, w = crop_img.shape[:2]
    keypoints[:, 0] *= h  # y
    keypoints[:, 1] *= w  # x

    return keypoints[:, :2].flatten()  # (34,)


def classify_pose(crop_img):
    try:
        keypoints = detect_keypoints_from_crop(crop_img)
        # Normalize keypoints for classifier
        normalized_kps = keypoints / np.array([crop_img.shape[1], crop_img.shape[0]] * 17)
        prediction = pose_model.predict(np.expand_dims(normalized_kps, axis=0), verbose=0)
        class_id = np.argmax(prediction)
        label = label_encoder.inverse_transform([class_id])[0]
        confidence = np.max(prediction)
        return label, confidence
    except Exception as e:
        return "Pose not detected", 0

def process_person(frame, box, results_list):
    x1, y1, x2, y2 = map(int, box)
    crop = frame[y1:y2, x1:x2]
    label, confidence = classify_pose(crop)
    results_list.append((label, confidence, (x1, y1, x2, y2)))

def process_frame(frame):
    results = yolo_model(frame)[0]
    boxes = results.boxes.xyxy.cpu().numpy()
    classes = results.boxes.cls.cpu().numpy().astype(int)

    threads = []
    results_list = []

    for i, cls in enumerate(classes):
        if cls != 0:
            continue
        thread = threading.Thread(
            target=process_person,
            args=(frame, boxes[i], results_list)
        )
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    # Draw results
    for label, confidence, (x1, y1, x2, y2) in results_list:
        color = (0, 255, 0) if label != "Pose not detected" else (0, 0, 255)
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        cv2.putText(frame, f"{label} ({confidence:.2f})", (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    return frame

def main(video_path="yog.mp4", save_output=True):
    cap = cv2.VideoCapture(video_path)
    print("Starting video stream... Press 'q' to quit.")

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = None

    if not cap.isOpened():
        print("Failed to open video.")
        return

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame = process_frame(frame)
        frame = cv2.resize(frame, (1280, 720))

        # Save output if enabled
        if save_output:
            if out is None:
                os.makedirs("outputs", exist_ok=True)
                out = cv2.VideoWriter("outputs/predicted_output.avi", fourcc, 20.0, (1280, 720))
            out.write(frame)

        cv2.imshow("Yoga Pose Classifier", frame)
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    cap.release()
    if out:
        out.release()
    cv2.destroyAllWindows()
    print("Inference complete. Output saved to outputs/predicted_output.avi")



In [11]:
if __name__ == "__main__":
    main("yog.mp4")  # Replace with your actual video file


NameError: name 'cv2' is not defined

### Both for Img and Vid

In [9]:
def process_image(image_path, display=True, save_path=None):
    """
    Process single image for yoga pose detection
    
    Args:
        image_path (str): Path to input image
        display (bool): Whether to show the result
        save_path (str): Optional path to save result image
    
    Returns:
        np.ndarray: Processed image with annotations
    """
    # Read image
    frame = cv2.imread(image_path)
    if frame is None:
        raise ValueError(f"Could not read image at {image_path}")
    
    # Process frame
    processed_frame = process_frame(frame)
    
    # Resize for display
    processed_frame = cv2.resize(processed_frame, (1280, 720))
    
    # Save/output results
    if save_path:
        cv2.imwrite(save_path, processed_frame)
        print(f"Saved result to {save_path}")
    
    if display:
        cv2.imshow("Yoga Pose Detection", processed_frame)
        cv2.waitKey(0)
        cv2.destroyAllWindows()
    
    return processed_frame

def process_video(video_path, save_output=True):
    """
    Process video file or webcam stream
    
    Args:
        video_path (str/int): Path to video file or 0 for webcam
        save_path (str): Optional path to save output video
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Could not open video source {video_path}")

    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Initialize writer
    if save_output:
        fourcc = cv2.VideoWriter_fourcc(*'XVID')
        out = cv2.VideoWriter("output.mp4", fourcc, fps, (1280, 720))

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Process and resize frame
        processed = process_frame(frame)
        processed = cv2.resize(processed, (1280, 720))

        # Save/show results
        if save_output:
            out.write(processed)
        
        cv2.imshow("Yoga Pose Detection", processed)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    if save_output:
        out.release()
    cv2.destroyAllWindows()


In [11]:
# Unified interface
def yoga_pose_detection(input_path, display=True, save_output=False):
    """
    Main interface for yoga pose detection
    
    Args:
        input_path (str/int): Image path, video path, or 0 for webcam
        display (bool): Whether to show results
        save_output (bool/str): True to save with default path, or custom path
    """
    if isinstance(input_path, str):
        if input_path.lower().endswith(('.png', '.jpg', '.jpeg')):
            save_path = "output.jpg" if save_output else None
            return process_image(input_path, display, save_path)
        else:
            save_path = "output_video.mp4" if save_output else None
            process_video(input_path, save_path)
    elif isinstance(input_path, int):  # Webcam
        process_video(input_path, save_output)
    else:
        raise ValueError("Invalid input type. Use image path, video path, or 0 for webcam")

In [None]:
yoga_pose_detection("j.jpeg", True, True)


0: 448x640 1 person, 1 bottle, 1 chair, 1 couch, 3 potted plants, 1 vase, 455.2ms
Speed: 168.5ms preprocess, 455.2ms inference, 1162.2ms postprocess per image at shape (1, 3, 448, 640)
Saved result to output.jpg
