In [None]:
# ===============================================================
# üß© 1. Setup
# ===============================================================

!pip install mediapipe opencv-python tensorflow matplotlib tqdm

import os
import cv2
import numpy as np
import tensorflow as tf
import mediapipe as mp
import matplotlib.pyplot as plt
from tqdm import tqdm
from google.colab import files


In [None]:
# ===============================================================
# üß† 2. Model Definition (src/model.py)
# ===============================================================

def build_model(num_features=2, output_dim=1):
    """
    Simple feed-forward model for posture risk estimation based on angles.
    Each input sample is a single frame (no sequence).
    """
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(num_features,)),
        tf.keras.layers.Dense(32, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(16, activation='relu'),
        tf.keras.layers.Dense(output_dim, activation='sigmoid')  # 0‚Äì1 risk probability
    ])
    model.compile(
        optimizer=tf.keras.optimizers.Adam(1e-3),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    return model

In [None]:
# ===============================================================
# üß™ 3. Mock Training (src/mock_training.py)
# ===============================================================

def train_basic_model(X, y, num_features=2, save_path="models/model.h5"):
    """
    Train a simple posture risk model using angle-based features.
    """
    model = build_model(num_features=num_features)
    X = X.reshape((X.shape[0], num_features))

    model.fit(
        X, y,
        validation_split=0.2,
        epochs=15,
        batch_size=16,
        verbose=1
    )

    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    model.save(save_path)
    print(f"‚úÖ Model saved at {save_path}")

    return model


# --- Simulate training data
N_SAMPLES = 500
N_FEATURES = 2  # e.g., shoulder_angle, neck_angle

# Generate synthetic angles (0‚Äì180¬∞)
X = np.random.uniform(0, 180, size=(N_SAMPLES, N_FEATURES))

# Synthetic labels: risky if average angle > 90¬∞
y = (np.mean(X, axis=1) > 90).astype(int)

# Train and save model
model = train_basic_model(X, y, num_features=N_FEATURES)


In [None]:
# ===============================================================
# ‚öôÔ∏è 4. Risk Assessment (src/risk_assessment.py)
# ===============================================================

MODEL_PATH = "models/model.h5"

if not os.path.exists(MODEL_PATH):
    raise FileNotFoundError(f"‚ùå Model not found at {MODEL_PATH}. Run mock training first.")

model = tf.keras.models.load_model(MODEL_PATH)

def assess_posture_risk(angles: dict) -> dict:
    """
    Assess risk based on learned features.
    Returns a dictionary with score (0‚Äì100), qualitative risk, and feedback.
    """
    if not angles:
        return {
            "score": 0,
            "risk_level": "Unknown",
            "feedback": ["No posture data available."]
        }

    # Convert input angles to a feature vector
    feature_vector = np.array(list(angles.values())).reshape(1, -1)
    feature_vector = feature_vector / 180.0  # normalize 0‚Äì1

    # Predict risk probability
    risk_prob = float(model.predict(feature_vector, verbose=0)[0][0])

    # Convert probability to score (0‚Äì100)
    posture_score = 100 * (1 - risk_prob)
    risk_level = (
        "Low" if posture_score > 85 else
        "Medium" if posture_score > 60 else
        "High"
    )

    feedback = []
    if risk_level == "High":
        feedback.append("High-risk posture detected ‚Äî likely instability or strain risk.")
    elif risk_level == "Medium":
        feedback.append("Posture slightly off ideal form, minor adjustment suggested.")
    else:
        feedback.append("Good posture detected ‚Äî maintain form.")

    return {
        "score": float(np.clip(posture_score, 0, 100)),
        "risk_level": risk_level,
        "feedback": feedback
    }


In [None]:
# ===============================================================
# ü¶µ 5. Pose Detection (src/pose_detection.py)
# ===============================================================

mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

def calculate_angle(a, b, c):
    """Utility function to calculate the angle between three points."""
    a, b, c = np.array(a), np.array(b), np.array(c)
    ba, bc = a - b, c - b
    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
    return np.degrees(np.arccos(np.clip(cosine_angle, -1.0, 1.0)))


class PoseDetector:
    """Extracts human keypoints and calculates relevant posture angles."""

    def __init__(self):
        self.pose = mp_pose.Pose(
            static_image_mode=False,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )

    def detect(self, frame):
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = self.pose.process(rgb)

        if not results.pose_landmarks:
            return {}, {}

        landmarks = results.pose_landmarks.landmark
        h, w = frame.shape[:2]

        left_shoulder = (int(landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER].x * w),
                         int(landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER].y * h))
        right_shoulder = (int(landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER].x * w),
                          int(landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER].y * h))
        left_ear = (int(landmarks[mp_pose.PoseLandmark.LEFT_EAR].x * w),
                    int(landmarks[mp_pose.PoseLandmark.LEFT_EAR].y * h))

        # Calculate posture angles
        shoulder_angle = calculate_angle(left_shoulder, right_shoulder, (right_shoulder[0], 0))
        neck_angle = calculate_angle(left_ear, left_shoulder, (left_shoulder[0], 0))

        angles = {"shoulder_angle": shoulder_angle, "neck_angle": neck_angle}
        keypoints = {
            "left_shoulder": left_shoulder,
            "right_shoulder": right_shoulder,
            "left_ear": left_ear
        }

        mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
        return keypoints, angles

In [None]:
# ===============================================================
# üé® 6. Visualization Helper (src/visualization.py)
# ===============================================================

def draw_feedback(frame, angles, risk_info):
    """
    Overlay posture angles and risk feedback on the frame.
    """
    color = (0, 255, 0) if risk_info["risk_level"] == "Low" else \
            (0, 255, 255) if risk_info["risk_level"] == "Medium" else \
            (0, 0, 255)

    cv2.putText(frame, f"Risk: {risk_info['risk_level']}", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
    cv2.putText(frame, f"Score: {risk_info['score']:.1f}", (10, 60),
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 1)

    y = 100
    for line in risk_info["feedback"]:
        cv2.putText(frame, line, (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 1)
        y += 25

    return frame

In [None]:
# ===============================================================
# üé• 7. Upload and Test on a Video
# ===============================================================

uploaded = files.upload()
video_path = list(uploaded.keys())[0]

detector = PoseDetector()

cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out_path = "processed_output.mp4"
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
ret, frame = cap.read()
if not ret:
    print("‚ùå Failed to read video")
else:
    h, w = frame.shape[:2]
    out = cv2.VideoWriter(out_path, fourcc, fps, (w, h))

    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
    for _ in tqdm(range(frames)):
        ret, frame = cap.read()
        if not ret:
            break

        _, angles = detector.detect(frame)
        if angles:
            risk = assess_posture_risk(angles)
            frame = draw_feedback(frame, angles, risk)

        out.write(frame)

    cap.release()
    out.release()
    print(f"‚úÖ Processed video saved: {out_path}")





In [None]:
# ===============================================================
# ‚ñ∂Ô∏è 8. Display a Few Frames Inline
# ===============================================================

cap = cv2.VideoCapture("processed_output.mp4")
for i in range(50):
    ret, frame = cap.read()
    if not ret:
        break
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    plt.imshow(frame)
    plt.axis('off')
    plt.show()
cap.release()