In [None]:
import mediapipe as mp
import cv2
import pandas as pd
import numpy as np
import csv
import pickle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.utils import to_categorical

# Initialize MediaPipe
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

# Define important landmarks
IMPORTANT_LMS = [
    "NOSE", "LEFT_SHOULDER", "RIGHT_SHOULDER", "LEFT_HIP", "RIGHT_HIP",
    "LEFT_KNEE", "RIGHT_KNEE", "LEFT_ANKLE", "RIGHT_ANKLE"
]

# Function to extract landmarks and save to CSV
def export_landmark_to_csv(dataset_path: str, results, action: str) -> None:
    landmarks = results.pose_landmarks.landmark
    keypoints = []

    try:
        for lm in IMPORTANT_LMS:
            keypoint = landmarks[mp_pose.PoseLandmark[lm].value]
            keypoints.append([keypoint.x, keypoint.y, keypoint.z, keypoint.visibility])

        keypoints = list(np.array(keypoints).flatten())
        keypoints.insert(0, action)

        with open(dataset_path, mode="a", newline="") as f:
            csv_writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
            csv_writer.writerow(keypoints)
    except Exception as e:
        print(e)
        pass

# Function to initialize CSV
def init_csv(dataset_path: str):
    landmarks = ["label"]
    for lm in IMPORTANT_LMS:
        landmarks += [f"{lm.lower()}_x", f"{lm.lower()}_y", f"{lm.lower()}_z", f"{lm.lower()}_v"]
    with open(dataset_path, mode="w", newline="") as f:
        csv_writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
        csv_writer.writerow(landmarks)

# Function to describe dataset
def describe_dataset(dataset_path: str):
    data = pd.read_csv(dataset_path)
    print(f"Headers: {list(data.columns.values)}")
    print(f'Number of rows: {data.shape[0]} \nNumber of columns: {data.shape[1]}\n')
    print(f"Labels: \n{data['label'].value_counts()}\n")
    print(f"Missing values: {data.isnull().values.any()}\n")
    duplicate = data[data.duplicated()]
    print(f"Duplicate Rows : {duplicate.sum(axis=1)}")
    return data

# Function to round up metric results
def round_up_metric_results(results) -> list:
    return list(map(lambda el: round(el, 3), results))

# Function to train and save the model
def train_and_save_model(dataset_path: str, model_path: str):
    df = describe_dataset(dataset_path)
    df.loc[df["label"] == "incorrect", "label"] = 0
    df.loc[df["label"] == "correct", "label"] = 1

    X = df.drop("label", axis=1)
    y = df["label"].astype("int")

    sc = StandardScaler()
    X = pd.DataFrame(sc.fit_transform(X))

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1234)

    y_train = to_categorical(y_train, num_classes=2)
    y_test = to_categorical(y_test, num_classes=2)

    model = Sequential()
    model.add(Dense(128, input_dim=X_train.shape[1], activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(64, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(2, activation='softmax'))

    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_test, y_test))

    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)

    accuracy = accuracy_score(y_true, y_pred_classes)
    f1 = f1_score(y_true, y_pred_classes, average='weighted')

    print(f"Accuracy: {accuracy}")
    print(f"F1 Score: {f1}")

    with open(model_path, "wb") as f:
        pickle.dump(model, f)

# Initialize CSV
DATASET_PATH = "./squat_data.csv"
init_csv(DATASET_PATH)

# Extract landmarks from videos and save to CSV (example)
cap = cv2.VideoCapture("squat_video.mp4")
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, image = cap.read()
        if not ret:
            break
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = pose.process(image)
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
        cv2.imshow("CV2", image)
        k = cv2.waitKey(1) & 0xFF
        if k == ord('c'):
            export_landmark_to_csv(DATASET_PATH, results, "correct")
        elif k == ord('i'):
            export_landmark_to_csv(DATASET_PATH, results, "incorrect")
        elif k == ord("q"):
            break
    cap.release()
    cv2.destroyAllWindows()

# Train and save the model
MODEL_PATH = "./squat_model.pkl"
train_and_save_model(DATASET_PATH, MODEL_PATH)