In [10]:
import cv2
import mediapipe as mp
import pandas as pd
import numpy as np
import math
import os
from IPython.display import display
import ipywidgets as widgets
from pathlib import Path
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

# Create dataset folder if needed
os.makedirs('uploaded_dataset', exist_ok=True)


In [11]:
# Cell 2: Upload widget
uploader = widgets.FileUpload(
    accept='.mp4,.avi,.mov', 
    multiple=False
)
display(uploader)


FileUpload(value=(), accept='.mp4,.avi,.mov', description='Upload')

In [12]:
# Cell 3: Save uploaded video
video_path = None

if uploader.value:
    uploaded_file = uploader.value[0]
    video_name = uploaded_file.name
    video_path = f"uploaded_dataset/{video_name}"
    with open(video_path, 'wb') as f:
        f.write(uploaded_file.content)
    print(f"Saved to: {video_path}")

Saved to: uploaded_dataset/AnteriorDiplegic.mp4


In [13]:
# Cell 4: Create Dataset
def extract_gait_features(video_path, label=None, output_csv=True):
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(static_image_mode=False, model_complexity=1)
    scale_factor = 0.3
    frame_skip = 5
    max_duration = 10

    def resize_image(image):
        h, w = int(image.shape[0] * scale_factor), int(image.shape[1] * scale_factor)
        return cv2.resize(image, (w, h))

    def calculate_angle(a, b, c):
        a, b, c = np.array(a), np.array(b), np.array(c)
        radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
        angle = np.abs(radians * 180.0 / np.pi)
        return 360 - angle if angle > 180.0 else angle

    def get_coords(landmarks, index):
        return (landmarks[index].x, landmarks[index].y)

    RIGHT_HIP, RIGHT_KNEE, RIGHT_ANKLE = 24, 26, 28
    LEFT_HIP, LEFT_KNEE, LEFT_ANKLE = 23, 25, 27

    cap = cv2.VideoCapture(video_path)
    prev_angle, prev_angular_velocity, prev_time = None, None, 0
    data_list = []
    frame_idx = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_idx += 1
        if frame_idx % frame_skip != 0:
            continue

        current_time = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000
        if current_time > max_duration:
            break

        frame = resize_image(frame)
        results = pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        if results.pose_landmarks:
            lm = results.pose_landmarks.landmark
            rh, rk, ra = get_coords(lm, RIGHT_HIP), get_coords(lm, RIGHT_KNEE), get_coords(lm, RIGHT_ANKLE)
            lh, lk, la = get_coords(lm, LEFT_HIP), get_coords(lm, LEFT_KNEE), get_coords(lm, LEFT_ANKLE)

            rk_angle = calculate_angle(rh, rk, ra)
            lk_angle = calculate_angle(lh, lk, la)

            v1 = (rk[0] - ra[0], rk[1] - ra[1])
            v2 = (rh[0] - ra[0], rh[1] - ra[1])
            dot = v1[0]*v2[0] + v1[1]*v2[1]
            mag1 = math.sqrt(v1[0]**2 + v1[1]**2)
            mag2 = math.sqrt(v2[0]**2 + v2[1]**2)
            angle_rad = math.acos(dot / (mag1 * mag2 + 1e-6))
            angle_deg = math.degrees(angle_rad)

            time_diff = current_time - prev_time
            angular_velocity = (angle_deg - prev_angle) / time_diff if prev_angle is not None and time_diff > 0 else None
            linear_acceleration = (angular_velocity - prev_angular_velocity) if prev_angular_velocity is not None and angular_velocity is not None else None

            # Skip this frame if any values are None
            if None in [rk_angle, lk_angle, angular_velocity, linear_acceleration]:
                continue

            prev_angle = angle_deg
            prev_angular_velocity = angular_velocity
            prev_time = current_time

            data_list.append({
                'time': current_time,
                'right_knee_angle': rk_angle,
                'left_knee_angle': lk_angle,
                'angular_velocity': angular_velocity,
                'linear_acceleration': linear_acceleration,
                'label': label
            })

    cap.release()
    pose.close()

    df = pd.DataFrame(data_list)
    if output_csv and not df.empty:
        csv_name = Path(video_path).stem + '_gait.csv'
        df.to_csv(f'uploaded_dataset/{csv_name}', index=False)
        print(f"Saved: uploaded_dataset/{csv_name}")
    elif df.empty:
        print(f"No valid data extracted from {video_path}")
    return df


In [6]:
# Labeling for model training
label_selector = widgets.Dropdown(
    options=[('Normal', 0), ('Abnormal', 1)],
    description='Label this video:',
    style={'description_width': 'initial'}
)
display(label_selector)

save_button = widgets.Button(description="Save & Extract Features")

def on_save_button_clicked(b):
    if not uploader.value:
        print("Please upload a video first.")
        return
    # Save uploaded video
    uploaded_file = uploader.value[0]
    video_name = uploaded_file.name
    video_path = f"uploaded_dataset/{video_name}"
    with open(video_path, 'wb') as f:
        f.write(uploaded_file.content)
    print(f"Saved video as: {video_path}")
    
    # Extract features with selected label
    label = label_selector.value
    df = extract_gait_features(video_path, label=label)
    display(df.head())
    
    print("Video labeled as:", "Normal" if label == 0 else "Abnormal")

save_button.on_click(on_save_button_clicked)
display(save_button)

Dropdown(description='Label this video:', options=(('Normal', 0), ('Abnormal', 1)), style=DescriptionStyle(des…

Button(description='Save & Extract Features', style=ButtonStyle())

In [14]:
# Cell 5: Prediction
required_columns = {'right_knee_angle', 'left_knee_angle', 'angular_velocity', 'linear_acceleration', 'label'}

all_csvs = list(Path("uploaded_dataset").glob("*_gait.csv"))

valid_csvs = []
for csv_file in all_csvs:
    df = pd.read_csv(csv_file).dropna()
    if not df.empty and required_columns.issubset(df.columns):
        valid_csvs.append(csv_file)

if valid_csvs:
    combined_df = pd.concat([pd.read_csv(f).dropna() for f in valid_csvs], ignore_index=True)
    X = combined_df[['right_knee_angle', 'left_knee_angle', 'angular_velocity', 'linear_acceleration']]
    y = combined_df['label']

    if not X.empty and not y.empty:
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        model = RandomForestClassifier(n_estimators=100, random_state=42)
        model.fit(X_train, y_train)

        most_recent_csv = max(all_csvs, key=os.path.getmtime)
        df_recent = pd.read_csv(most_recent_csv).dropna()

        if all(col in df_recent.columns for col in X.columns) and not df_recent.empty:
            input_features = df_recent[X.columns].mean().values.reshape(1, -1)
            prediction = model.predict(input_features)
            print(f"Prediction for {most_recent_csv.name}: {'Normal' if prediction[0] == 0 else 'Abnormal'}")
        else:
            print(f"Most recent file {most_recent_csv.name} lacks required features or is empty.")
    else:
        print("Insufficient valid data to train model.")
else:
    print("No valid CSV datasets with required features found.")


Prediction for Parkinsonian Gait (Dr. Yehia Mishriki)_gait.csv: Abnormal


