In [7]:
import cv2
import numpy as np
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
import argparse
import time
import mediapipe as mp
import os
import joblib

mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

def load_yoga_postures(posture_dir):
    yoga_postures = []
    yoga_posture_labels = []

    for posture_name in os.listdir(posture_dir):
        if posture_name.startswith("."):
            continue
        posture_folder = os.path.join(posture_dir, posture_name)
        for image_name in os.listdir(posture_folder):
            if image_name.startswith("."):
                continue
            image_path = os.path.join(posture_folder, image_name)
            if image_name.endswith(".jpg") or image_name.endswith(".png"):
                image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
                image = cv2.resize(image, (64, 64))
                yoga_postures.append(image)
                yoga_posture_labels.append(posture_name)

    return yoga_postures, yoga_posture_labels

def train_svm_model(X_train, yoga_posture_labels):
    svm = SVC(kernel="linear", C=1.0, random_state=42)
    svm.fit(X_train, yoga_posture_labels)
    return svm

def classify_posture(frame, pose, svm, threshold=0.75):
        # Convert the frame to grayscale
        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        # Detect the pose landmarks using Mediapipe
        results = pose.process(frame_gray)
        if results.pose_landmarks is not None:
            # Extract the pose landmarks and calculate the HOG features
            landmarks = np.array([[lmk.x, lmk.y] for lmk in results.pose_landmarks.landmark])
            landmarks = landmarks.flatten()
            # Predict the posture using the SVM model
            scores = svm.decision_function([landmarks])
            if np.max(scores) >= threshold:
                posture_pred = svm.predict([landmarks])[0]
            else:
                posture_pred = None
        else:
            posture_pred = None
        return posture_pred

def display_posture(frame, pose, posture_pred):
    # Draw the pose landmarks and the predicted posture on the frame
    mp_drawing.draw_landmarks(frame, pose.process(frame).pose_landmarks, mp_pose.POSE_CONNECTIONS)
    cv2.putText(frame, posture_pred, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    cv2.imshow('frame', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        return False
    else:
        return True

def save_video(frame, out):
    out.write(frame)

def yoga_classifier(input_path, display=False, output_path=None, save_model_path=None):
    # Load the yoga postures and their labels
    posture_dir = "/home/khaleb.dabakuyo@Digital-Grenoble.local/Documents/ACV/Panther_trainer2/assets/images/train"
    yoga_postures, yoga_posture_labels = load_yoga_postures(posture_dir)

    if save_model_path is not None and os.path.exists(save_model_path):
        # Load the saved SVM model
        svm = joblib.load(save_model_path)
    else:
        # Extract features from the images using mediapipe
        X_train = []
        for posture in yoga_postures:
            results = mp_pose.Pose().process(posture)
            if results.pose_landmarks is not None:
                landmarks = np.array([[lmk.x, lmk.y] for lmk in results.pose_landmarks.landmark])
                landmarks = landmarks.flatten()
                X_train.append(landmarks)
        X_train = np.array(X_train)
        # Train a SVM model on the extracted features
        svm = train_svm_model(X_train, yoga_posture_labels)
        if save_model_path is not None:
            # Save the trained SVM model
            joblib.dump(svm, save_model_path)

    # Initialize Mediapipe pose detection
    pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)

    # Use the trained model to classify new images of the yoga postures
    if input_path == "camera":
        cap = cv2.VideoCapture(0)
    else:
        cap = cv2.VideoCapture(input_path)
    if output_path is not None:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, 20.0, (640, 480))
    start_time = time.time()
    posture_timer = 0
    posture_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        posture_pred = classify_posture(frame, pose, svm)
        if posture_pred == "maintained":
            posture_timer = time.time() - start_time
            if posture_timer >= 30:
                # Flash the image
                cv2.imshow('frame', frame)
                cv2.waitKey(1000)
                cv2.imshow('frame', np.zeros_like(frame))
                cv2.waitKey(1000)
                # Display progress line
                posture_count += 1
                progress = int((posture_timer / 30) * 100)
                cv2.putText(frame, f"Progress: {progress}%", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        else:
            start_time = time.time()
            posture_timer = 0
        if display:
            display = display_posture(frame, pose, posture_pred)
            if not display:
                break
        if output_path is not None:
            save_video(frame, out)
    cap.release()
    if output_path is not None:
        out.release()
    cv2.destroyAllWindows()


In [10]:
yoga_classifier('camera', display=True, output_path=None, save_model_path='True')

Corrupt JPEG data: premature end of data segment
Premature end of JPEG file
I0000 00:00:1700046578.734850   30009 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1700046578.795137   30872 gl_context.cc:344] GL version: 3.2 (OpenGL ES 3.2 NVIDIA 525.125.06), renderer: NVIDIA GeForce GTX 1650/PCIe/SSE2


IndexError: tuple index out of range

In [8]:
num_to_class_dict = {
    0: "downdog",
    1: "goddess",
    2: "plank",
    3: "tree",
    4: "warrior2",
}

class_to_num_dict = {
    "downdog": 0,
    "goddess": 1,
    "plank": 2,
    "tree": 3,
    "warrior2": 4,
}


import pandas as pd
import os

from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
from streamClassifier.utils import build_dataframe
from streamClassifier.config import num_to_class_dict, class_to_num_dict
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
import pandas as pd
import os
from PoseClassification.bootstrap_copy import BootstrapHelper
from datetime import datetime
import cv2


class SvcClassifier:
    def __init__(self, training_csv_dir: str, stream_csv_dir:str):
        self.model = None
        self._training_dataset = self._get_training_dataset(training_csv_dir)
        self._stream_csv_dir = stream_csv_dir

    def _get_training_dataset(self, training_csv_dir) -> pd.DataFrame:
        df = build_dataframe(source_dir=training_csv_dir)
        return df

    def _prepare_X_y_set(self) -> tuple:
        df = self._training_dataset
        X = df.drop(
            ["filename", "class", "class_num"], axis=1
        )  # Assuming 'label' is the column with class names/numbers
        y = df["class_num"]
        return X, y
    
    def get_training_data_infos(self) -> dict:
        infos = {
            "number_of_samples": len(self._training_dataset),
            "num_class_0": len(self._training_dataset[self._training_dataset["class_num"] == 0]),
            "num_class_1": len(self._training_dataset[self._training_dataset["class_num"] == 1]),
            "num_class_2": len(self._training_dataset[self._training_dataset["class_num"] == 2]),
            "num_class_3": len(self._training_dataset[self._training_dataset["class_num"] == 3]),
            "num_class_4": len(self._training_dataset[self._training_dataset["class_num"] == 4]),
        }
        return infos
    
    def get_precision_infos(self):
        print("Confusion Matrix:")
        print(self._confusion_matrix)
        print("Classification Report:")
        print(self._classification_report)
    
    def fit(self,  show_value: bool = True):
        """
        Evaluates the model
        """
        # Assuming X is your feature matrix and y is the target vector
        X, y = self._prepare_X_y_set()
        X_train, X_test, y_train, y_test = train_test_split(
         X, y, test_size=0.3, random_state=42
        )

        svc = SVC(kernel="linear")  # You can change the kernel based on your data characteristics
        svc.fit(X_train, y_train)
        y_pred = svc.predict(X_test)

        self._confusion_matrix = confusion_matrix(y_test, y_pred)
import cv2
import numpy as np
import os, csv
import sys
import tqdm
from datetime import datetime
from sklearn.metrics import accuracy_score
from sklearn.svm import SVC
from sklearn.metrics import classification_report
from matplotlib import pyplot as plt
from PIL import Image, ImageDraw

from mediapipe.python.solutions import drawing_utils as mp_drawing
from mediapipe.python.solutions import pose as mp_pose

class PoseClassifier:
    def __init__(self, stream_csv_dir: str, show_value: bool = True):
        self._stream_csv_dir = stream_csv_dir
        self._confusion_matrix = None
        self._classification_report = None
        self.model = None
        self.show_value = show_value

    def fit(self) -> None:
        """
        Trains the model
        """
        df = build_dataframe(source_dir=self._stream_csv_dir)
        X = df.drop(
            ["filename", "class", "class_num"], axis=1
        )
        y = df["class_num"]

        svc = SVC(kernel="linear", C=1, gamma="auto")
        svc.fit(X, y)

        y_pred = svc.predict(X)
        self._confusion_matrix = confusion_matrix(y, y_pred)
        self._classification_report = classification_report(y, y_pred)
        self.model = svc
        
        if self.show_value:
            print("Confusion Matrix:")
            print(self._confusion_matrix)
            print("Classification Report:")
            print(self._classification_report)

    def predict(self) -> list:
        """
        Predicts the class of the input
        """
        if self.model is None:
            raise Exception("Model not found. Run fit() first.")
        
        df = build_dataframe(source_dir=self._stream_csv_dir)
        X = df.drop(
            ["filename", "class", "class_num"], axis=1
        )
                
        y_pred = self.model.predict(X)
        
        classnum_pred = y_pred[0]
        classname_pred = num_to_class_dict[classnum_pred]
        
        print(f"Predicted class: {classname_pred}")
        print(f"Predicted class number: {classnum_pred}")
            

class StreamEmbedder:
    def __init__(
        self,
        stream_video: str,
        stream_image_out_dir: str,
        stream_csv_out_dir: str,
        output_video_path: str,
    ):
        self.stream_video = stream_video
        self.stream_image_out_dir = stream_image_out_dir
        self.stream_csv_out_dir = stream_csv_out_dir
        self.output_video_path = output_video_path
        self.bootstrap_helper = BootstrapHelper(
            images_out_folder=stream_image_out_dir,
            csvs_out_folder=stream_csv_out_dir,
        )

    def _get_frame_from_stream(self):
        """
        Returns the next frame from the video stream
        """
        cap = cv2.VideoCapture(self.stream_video)
        _, frame = cap.read()
        cap.release()
        return frame

    def _save_frame_to_directory(self, frame, directory):
        """
        Saves the given frame to the given directory
        """
        if not os.path.exists(directory):
            os.makedirs(directory)
        filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S-%f')}.jpg"
        filepath = os.path.join(directory, filename)
        cv2.imwrite(filepath, frame)
        
    def _remove_image_from_in_out_dir(self) -> None:
        target_dir_to_clean_in = os.path.join(self.stream_image_out_dir, "stream")
        self._clean_directory(directory=target_dir_to_clean_in)

    def generate_embbedings(self) -> None:
        """
        Returns the embeddings from the video stream
        """

        self.bootstrap_helper.bootstrap()
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(self.output_video_path, fourcc, 20.0, (640, 480))
        while True:
            frame = self._get_frame_from_stream()
            self._save_frame_to_directory(frame, os.path.join(self.stream_image_out_dir, "stream"))
            out.write(frame)
            cv2.imshow('frame', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                self._save_frame_to_directory(frame, os.path.join(self.stream_image_out_dir, "stream"))
                break
        out.release()
        self._remove_image_from_in_out_dir()
        cv2.destroyAllWindows()




import cv2
import time
import argparse
import numpy as np
from PIL import Image, ImageDraw
from pose_classification import classify_posture
from pose_estimation import PoseEstimator

def save_video(frame, out):
    out.write(frame)

def display_posture(frame, pose, posture_pred):
    display = pose.draw_landmarks(frame, posture_pred)
    cv2.imshow('frame', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        return False
    return True

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Yoga posture classifier')
    parser.add_argument('-i', '--input', type=str, required=True, help='Path to input video file or "camera" for webcam')
    parser.add_argument('--display', action='store_true', help='Display the video stream with the predicted posture')
    parser.add_argument('--output_path', type=str, help='Path to output video file')
    parser.add_argument('--save_model_path', type=str, help='Path to save the trained SVM model')
    args = parser.parse_args()

    if args.input == "camera":
        cap = cv2.VideoCapture(0)
    else:
        cap = cv2.VideoCapture(args.input)
    if args.output_path is not None:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(args.output_path, fourcc, 20.0, (640, 480))
    start_time = time.time()
    posture_timer = 0
    posture_count = 0
    pose = PoseEstimator()
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        posture_pred = classify_posture(frame, pose, svm)
        if posture_pred == "maintained":
            posture_timer = time.time() - start_time
            if posture_timer >= 30:
                # Flash the image
                cv2.imshow('frame', frame)
                cv2.waitKey(1000)
                cv2.imshow('frame', np.zeros_like(frame))
                cv2.waitKey(1000)
                # Display progress line
                posture_count += 1
                progress = int((posture_timer / 30) * 100)
                cv2.putText(frame, f"Progress: {progress}%", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        else:
            start_time = time.time()
            posture_timer = 0
        if args.display:
            display = display_posture(frame, pose, posture_pred)
            if not display:
                break
        if args.output_path is not None:
            save_video(frame, out)
    cap.release()
    if args.output_path is not None:
        out.release()
    cv2.destroyAllWindows()


KeyboardInterrupt: 