In [None]:
!pip install mediapipe
import os
import pandas as pd
import numpy as np
import cv2
import mediapipe as mp
import math

# Data preprocessing and Database construction

In [None]:
# Initialize mediapipe pose model
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, model_complexity=1)

# Path to the directory containing videos
videos_path = '/kaggle/input/yoga-part-2/yoga_videos'
asanas = ['bhujangasana', 'padmasana', 'savasana', 'tadasana', 'trikonasana', 'vrikshasana']
# Initialize DataFrame and columns
columns = ['Left Elbow Angle', 'Right Elbow Angle', 'Left Shoulder Angle', 'Right Shoulder Angle', 'Left Knee Angle', 'Right Knee Angle', 'Label']
df = pd.DataFrame(columns=columns)


In [None]:
def detectPose(image, pose, display=False):
    output_image = image.copy()
    imageRGB = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    results = pose.process(imageRGB)
    height, width, _ = image.shape
    rows = 33
    cols = 3
    landmarks = [[0 for _ in range(cols)] for _ in range(rows)]
#     landmarks = []
    drawing_spec = mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=6, circle_radius=16)
    if results.pose_landmarks:
        mp_drawing.draw_landmarks(image=output_image, landmark_list=results.pose_landmarks,
                                  connections=mp_pose.POSE_CONNECTIONS,landmark_drawing_spec=drawing_spec,connection_drawing_spec=drawing_spec)
        landmark = results.pose_landmarks.landmark
        for i in range(len(landmark)):
#         for landmark in results.pose_landmarks.landmark:
            landmarks[i] = ((landmark[i].x * width, landmark[i].y * height, landmark[i].z * width))

    if display:
        plt.figure(figsize=[22,22])
        plt.subplot(121); plt.imshow(image[:,:,::-1]); plt.title("Original Image"); plt.axis('off');
        plt.subplot(122); plt.imshow(output_image[:,:,::-1]); plt.title("Output Image"); plt.axis('off');
        plt.show()
    else:
        return output_image, landmarks

In [None]:
test_image_path = '/kaggle/input/bhujangasana-pose/sumit.png'
test_image = cv2.imread(test_image_path)
detectPose(test_image,pose,display=True)
test_ouput, test_landmark = detectPose(test_image,pose,display=False)

In [None]:
import matplotlib.pyplot as plt

In [None]:
image_path = '/kaggle/input/yoga-pose-image/young-man-is-making-yoga-isolated-white-background.jpg'
image = cv2.imread(image_path)
detectPose(image,pose,display=True)
sample_output,sample_landmark=detectPose(image,pose,display=False)

In [None]:
print(len(sample_landmark))
print(len(sample_landmark[0]))

In [None]:
def calculateAngle(landmark1, landmark2, landmark3):
    x1, y1, z1 = landmark1
    x2, y2, z2 = landmark2
    x3, y3, z3 = landmark3
    
    angle = math.degrees(math.atan2(y3 - y2, x3 - x2) - math.atan2(y1 - y2, x1 - x2))
    if angle < 0:
        angle += 360
    
    return angle

In [None]:
def classifyAngles(df, landmarks, label):
    left_elbow_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value],
                                      landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value],
                                      landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value])
    
    # Get the angle between the right shoulder, elbow and wrist points. 
    right_elbow_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value],
                                       landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value],
                                       landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value])   
    
    # Get the angle between the left elbow, shoulder and hip points. 
    left_shoulder_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value],
                                         landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value],
                                         landmarks[mp_pose.PoseLandmark.LEFT_HIP.value])

    # Get the angle between the right hip, shoulder and elbow points. 
    right_shoulder_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value],
                                          landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value],
                                          landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value])

    # Get the angle between the left hip, knee and ankle points. 
    left_knee_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.LEFT_HIP.value],
                                     landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value],
                                     landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value])

    # Get the angle between the right hip, knee and ankle points 
    right_knee_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value],
                                      landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value],
                                      landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value])

#     df = df.append({'Left Elbow Angle': left_elbow_angle,
#                     'Right Elbow Angle': right_elbow_angle,
#                     'Left Shoulder Angle': left_shoulder_angle,
#                     'Right Shoulder Angle': right_shoulder_angle,
#                     'Left Knee Angle': left_knee_angle,
#                     'Right Knee Angle': right_knee_angle,
#                     'Label': label}, ignore_index=True)
    
    df.loc[len(df.index)] = [left_elbow_angle,right_elbow_angle,left_shoulder_angle,right_shoulder_angle,left_knee_angle,right_knee_angle,label] 

    return df

In [None]:

def classifyAngles_check(landmarks):
    left_elbow_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value],
                                      landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value],
                                      landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value])
    
    # Get the angle between the right shoulder, elbow and wrist points. 
    right_elbow_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value],
                                       landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value],
                                       landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value])   
    
    # Get the angle between the left elbow, shoulder and hip points. 
    left_shoulder_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value],
                                         landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value],
                                         landmarks[mp_pose.PoseLandmark.LEFT_HIP.value])

    # Get the angle between the right hip, shoulder and elbow points. 
    right_shoulder_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value],
                                          landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value],
                                          landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value])

    # Get the angle between the left hip, knee and ankle points. 
    left_knee_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.LEFT_HIP.value],
                                     landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value],
                                     landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value])

    # Get the angle between the right hip, knee and ankle points 
    right_knee_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value],
                                      landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value],
                                      landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value])

    print(f"Left Elbow Angle: {left_elbow_angle}, "
          f"Right Elbow Angle: {right_elbow_angle}, "
          f"Left Shoulder Angle: {left_shoulder_angle}, "
          f"Right Shoulder Angle: {right_shoulder_angle}, "
          f"Left Knee Angle: {left_knee_angle}, "
          f"Right Knee Angle: {right_knee_angle}")
    
    return [left_elbow_angle,right_elbow_angle,left_shoulder_angle,right_shoulder_angle,left_knee_angle,right_knee_angle]


In [None]:
sample = classifyAngles_check(sample_landmark)
# sample_df = pd.DataFrame(columns=columns)
sample_dict = {'Left Elbow Angle':[],'Right Elbow Angle':[], 'Left Shoulder Angle':[], 'Right Shoulder Angle':[], 'Left Knee Angle':[], 'Right Knee Angle':[]}
sample_df = pd.DataFrame(sample_dict)
sample_df.loc[len(sample_df.index)] = list(sample)
print(sample_df)

In [None]:
test = classifyAngles_check(test_landmark)

In [None]:
columns = ['Left Elbow Angle', 'Right Elbow Angle', 'Left Shoulder Angle', 'Right Shoulder Angle', 'Left Knee Angle', 'Right Knee Angle', 'Label']
dictionary = {'Left Elbow Angle':[],'Right Elbow Angle':[], 'Left Shoulder Angle':[], 'Right Shoulder Angle':[], 'Left Knee Angle':[], 'Right Knee Angle':[], 'Label':[]}
df = pd.DataFrame(dictionary)
df.head()

In [None]:
# Iterate through each video
for label in os.listdir(videos_path):
    video_path = os.path.join(videos_path, label)
    
    # Capture video
    cap = cv2.VideoCapture(video_path)
    frame_rate = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    clip_duration = 120  # Duration to capture frames in seconds
    
    # Capture frames for the specified duration
    frames_to_capture = int(clip_duration * frame_rate)
    frames_collected = 0
    
    while frames_collected < frames_to_capture:
        ret, frame = cap.read()
        
        if not ret:
            break
        
        # Process frame to detect pose
        _, landmarks = detectPose(frame, pose, display=False)
        
        # Classify angles and add to DataFrame
        df = classifyAngles(df, landmarks, label)
        
        frames_collected += 1
        
    cap.release()

# Display or save DataFrame
print(df.head())

df.to_csv('yoga_poses_dataset.csv', index=False)

# Split and Training

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [None]:
asanas = ['bhujangasana', 'padmasana', 'savasana', 'tadasana', 'trikonasana', 'vrikshasana']
labels = list()
for x in asanas:
    labels.append(x+'.mp4')
labels

In [None]:
df = pd.read_csv('/kaggle/working/yoga_poses_dataset.csv')
print(df.head())

In [None]:
label_map = {label:num for num,label in enumerate(labels)}
label_map

In [None]:
from sklearn.model_selection import train_test_split

# Separate features (X) and target (y)
X = df.drop('Label', axis=1)  # Features
y = df['Label']  # Target

In [None]:
from sklearn.preprocessing import LabelEncoder
# Encode categorical labels using LabelEncoder
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

In [None]:
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.3, random_state=42)


# Model Training

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization

In [None]:
model = Sequential([
    Dense(256, activation='relu', input_shape=(X_train.shape[1],)),
    BatchNormalization(),
    Dropout(0.5),
    Dense(128, activation='relu'),
    BatchNormalization(),
    Dropout(0.5),
    Dense(64, activation='relu'),
    BatchNormalization(),
    Dropout(0.5),
    Dense(6, activation='softmax')  # Adjust output neurons based on your classification categories
])

# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

model.summary()

In [None]:
from tensorflow.keras.callbacks import TensorBoard
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

In [None]:
history = model.fit(X_train, y_train, epochs=250, batch_size=64, validation_split=0.1, callbacks=[TensorBoard()])

In [None]:
res = model.predict(X_test)

In [None]:
numpy_test = np.array(test)
numpy_test = np.expand_dims(numpy_test, axis=0)
numpy_test
# test_res = model.predict(numpy_test)
# test_res

In [None]:
output = loaded_model.predict(numpy_test)

# Print or use the output as needed
print(output)

In [None]:
output_probabilities = np.array([[7.6863284e-07, 8.7133900e-09, 1.9854947e-06, 9.9997902e-01, 1.8023882e-05, 7.0292941e-08]])

# Get the predicted label (index of the highest probability)
predicted_label = np.argmax(output)
predicted_label

In [None]:
for lab,num in label_map.items():
    if num==predicted_label:
        print(lab)

In [None]:
print(f'predicted asana: {asanas[np.argmax(res[60])]}')
print(f'actual asana: {asanas[np.argmax(y_test[60])]}')

In [None]:
print("Data types in X_train:")
print(X_train.dtypes)

print("\nData types in y_train:")
print(y_train.dtypes)


# Evaluation using Confusion Matrix and Accuracy

In [None]:
from sklearn.metrics import multilabel_confusion_matrix

# Assuming y_test is not one-hot encoded
yhat = model.predict(X_test)
yhat_labels = np.argmax(yhat, axis=1)  # Convert probabilities to integer labels

confusion_matrix = multilabel_confusion_matrix(y_true=y_test, y_pred=yhat_labels)
print("Multilabel Confusion Matrix:")
print(confusion_matrix)


In [None]:
from sklearn.metrics import accuracy_score

# Assuming y_test is not one-hot encoded
yhat = model.predict(X_test)
yhat_labels = np.argmax(yhat, axis=1)  # Convert probabilities to integer labels

accuracy = accuracy_score(y_true=y_test, y_pred=yhat_labels)
print("Accuracy Score:", accuracy)


In [None]:
y_test.shape

In [None]:
from sklearn.metrics import classification_report

# Assuming y_test is not one-hot encoded
yhat = model.predict(X_test)
yhat_labels = np.argmax(yhat, axis=1)  # Convert probabilities to integer labels

report = classification_report(y_true=y_test, y_pred=yhat_labels)
print("Classification Report:\n", report)


# Save Weights

In [None]:
model.save('action.h5')

In [None]:
from tensorflow.keras.models import load_model

# Load the saved model
loaded_model = load_model('action.h5')

# Setting up pipeline

In [None]:
def prediction_pipeline(test_image):
    # get the landmark
    realtime_output, realtime_landmark = detectPose(test_image, pose, display=False)
    
    # get the angles
    realtime_angles = classifyAngles_check(realtime_landmark)
    
    # create a numpy array for angles
    numpy_realtime = np.array(realtime_angles)
    numpy_realtime = np.expand_dims(numpy_realtime, axis=0)
    
    # predict the output
    output = loaded_model.predict(numpy_realtime)
    
    # get predicted label
    predicted_label = np.argmax(output)
    
    # find corresponding label name
    realtime_label = str()
    for lab, num in label_map.items():
        if num == predicted_label:
            # Return the whole string except the last 4 characters
            realtime_label = lab[:-4]

    return realtime_label

In [None]:
test_image_path = '/kaggle/input/bhujangasana-pose/sumit.png'
test_image = cv2.imread(test_image_path)
test_label_output = prediction_pipeline(test_image)
print(test_label_output)

In [None]:
def detection_pipeline(test_image):
    detectPose(test_image, pose, display=True)

# Realtime Detection and Prediction using Webcam Feed

In [None]:
# Initialize OpenCV video capture
cap = cv2.VideoCapture(0)

# Variables to control the timer
timer_started = False
start_time = None

# Main loop to capture frames from the webcam
while True:
    ret, frame = cap.read()

    if not ret:
        break

    # Check for key press events
    key = cv2.waitKey(1) & 0xFF

    # Start the timer on 'c' key press
    if key == ord('c'):
        timer_started = True
        start_time = time.time()
        seconds = 0

    if timer_started:
        # Calculate elapsed time
        seconds = int(time.time() - start_time)

        # Display the timer on the frame
        cv2.putText(frame, f'Timer: {seconds}', (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

        if seconds == 5:
            # Run classification function after 5 seconds
            result = prediction_pipeline(frame)
            
            # Display the classification result on the frame
            cv2.putText(frame, f'Predicted: {result}', (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 0, 0), 2)
            
        # Run detection function every second
        if seconds % 1 == 0:
            detection_pipeline(frame)

    # Show the frame
    cv2.imshow('Frame', frame)
    
    # Exit loop on 'q' key press
    if key == ord('q'):
        break

# Release the video capture and close all windows
cap.release()
# cv2.destroyAllWindows()

In [None]:
!zip -r working.zip /kaggle/working
!ls
from IPython.display import FileLink
FileLink(r'working.zip')

In [None]:
!zip -r yoga_videos.zip /kaggle/input/yoga-part-2/yoga_videos
!ls
from IPython.display import FileLink
FileLink(r'yoga_videos.zip')