In [None]:
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
import os
from tqdm import tqdm
import glob 
import shutil
from sklearn.model_selection import train_test_split
from collections import defaultdict


## MediaPipe, normalization and smoothing

In [None]:
class TennisSkeletonExtractor:

    # initialization the MediaPipe Pose model
    def __init__(self, min_detection_confidence=0.5, min_tracking_confidence=0.5):
        self.mp_pose = mp.solutions.pose
        self.pose = self.mp_pose.Pose(
            static_image_mode=False,       # optimized for video stream
            model_complexity=2,            # highest accuracy model
            smooth_landmarks=True,         # reduce jitter internally
            min_detection_confidence=min_detection_confidence,
            min_tracking_confidence=min_tracking_confidence
        )

    #  feature Extraction from a video file frame by frame with Sanity Check 
    def get_landmarks(self, video_path, jump_threshold=0.15):

        cap = cv2.VideoCapture(video_path)
        landmarks_data = []
        
        # variable to store the players position in the previous frame
        prev_hip_center = None 
        
        while cap.isOpened():
            success, image = cap.read()
            
            #stop if video ends or frame is empty
            if not success or image is None:
                break
            
            try:
                # convert BGR to RGB 
                image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            except cv2.error:
                continue

            # process the frame to find pose landmarks
            results = self.pose.process(image_rgb)
            
            frame_landmarks = []
            is_valid_frame = False
            
            if results.pose_landmarks:
                # extract landmarks into a flat list
                temp_landmarks = []
                for landmark in results.pose_landmarks.landmark:
                    temp_landmarks.extend([landmark.x, landmark.y, landmark.z, landmark.visibility])
                
                # calculate current hip center
                idx_l = 23 * 4 
                idx_r = 24 * 4  
                
                cx = (temp_landmarks[idx_l] + temp_landmarks[idx_r]) / 2
                cy = (temp_landmarks[idx_l+1] + temp_landmarks[idx_r+1]) / 2
                current_center = np.array([cx, cy])
                
                # compare distance to previous frame
                if prev_hip_center is not None:
                    dist = np.linalg.norm(current_center - prev_hip_center)
                    
                    # if movement is smaller than threshold, valid 
                    if dist < jump_threshold:
                        frame_landmarks = temp_landmarks
                        prev_hip_center = current_center
                        is_valid_frame = True
                    else:
                        # else, not
                        is_valid_frame = False 
                else:
                    # first frame is always valid
                    frame_landmarks = temp_landmarks
                    prev_hip_center = current_center
                    is_valid_frame = True
            
            # append landmarks if valid, otherwise append NaN
            if is_valid_frame:
                landmarks_data.append(frame_landmarks)
            else:
                landmarks_data.append([np.nan] * (33 * 4))
                
        cap.release()
        
        cols = []
        for i in range(33):
            cols.extend([f'x_{i}', f'y_{i}', f'z_{i}', f'v_{i}'])
            
        return pd.DataFrame(landmarks_data, columns=cols)

    # normalization
    def normalize_skeleton(self, df):

        # calculate Hip center
        hip_x = (df['x_23'] + df['x_24']) / 2
        hip_y = (df['y_23'] + df['y_24']) / 2
        
        # calculate torso size
        sh_x = (df['x_11'] + df['x_12']) / 2
        sh_y = (df['y_11'] + df['y_12']) / 2
        
        # distance
        torso_size = np.sqrt((sh_x - hip_x)**2 + (sh_y - hip_y)**2)
        torso_size = torso_size.replace(0, 1.0) # avoid division by zero

        # apply to all landmarks
        norm_df = df.copy()
        for i in range(33):
            norm_df[f'x_{i}'] = (df[f'x_{i}'] - hip_x) / torso_size
            norm_df[f'y_{i}'] = (df[f'y_{i}'] - hip_y) / torso_size
            
        return norm_df

    # smoothing jittery movements
    def smooth_data(self, df, window=5, poly=3):
        
        # fix NaNs from sanity check
        df_filled = df.interpolate(method='linear', limit_direction='both')
        
        # apply smoothing filter
        smoothed = df_filled.copy()
        for col in df_filled.columns:
            try:
                smoothed[col] = savgol_filter(df_filled[col], window, poly)
            except ValueError:
                # if video is too short for the filter window, skip smoothing
                pass
                
        return smoothed

### Main Processing Loop

In [32]:

base_path = r'C:\Users\Noam\OneDrive\Desktop\DL_project\dataset\VIDEO_RGB'
output_base = r'C:\Users\Noam\OneDrive\Desktop\DL_project\dataset\PROCESSED_DATA'

if not os.path.exists(output_base):
    os.makedirs(output_base)
    print(f"Created output directory: {output_base}")

extractor = TennisSkeletonExtractor()


classes = [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))]
print(f"Found {len(classes)} classes: {classes}")

# iterate over each class folder
for class_name in classes:
    class_input_dir = os.path.join(base_path, class_name)
    class_output_dir = os.path.join(output_base, class_name)
    
    # create class output folder
    if not os.path.exists(class_output_dir):
        os.makedirs(class_output_dir)
        
    # list of video files
    videos = [f for f in os.listdir(class_input_dir) if f.endswith(('.avi', '.mp4'))]
    print(f"\nProcessing class: {class_name} ({len(videos)} videos)...")
    
    for video_file in tqdm(videos):
        video_path = os.path.join(class_input_dir, video_file)
        save_path = os.path.join(class_output_dir, video_file.replace('.avi', '.csv').replace('.mp4', '.csv'))
        
        if os.path.exists(save_path):
            continue
            
        try:
            # extract raw landmarks
            raw_df = extractor.get_landmarks(video_path)
            
            # skip empty videos or failed extractions
            if raw_df.empty:
                continue 

            # normalizetion
            norm_df = extractor.normalize_skeleton(raw_df)
            
            #  smoothing
            clean_df = extractor.smooth_data(norm_df)
            
            # save to CSV
            clean_df.to_csv(save_path, index=False)
            
        except Exception as e:
            print(f"Error processing {video_file}: {e}")

print("\n--- Full Dataset Processing Complete! ---")

Created output directory: C:\Users\Noam\OneDrive\Desktop\DL_project\dataset\PROCESSED_DATA
Found 12 classes: ['backhand', 'backhand2hands', 'backhand_slice', 'backhand_volley', 'flat_service', 'forehand_flat', 'forehand_openstands', 'forehand_slice', 'forehand_volley', 'kick_service', 'slice_service', 'smash']

Processing class: backhand (165 videos)...


100%|██████████| 165/165 [18:22<00:00,  6.68s/it]



Processing class: backhand2hands (165 videos)...


100%|██████████| 165/165 [19:16<00:00,  7.01s/it]



Processing class: backhand_slice (165 videos)...


100%|██████████| 165/165 [17:41<00:00,  6.43s/it]



Processing class: backhand_volley (165 videos)...


100%|██████████| 165/165 [16:55<00:00,  6.15s/it]



Processing class: flat_service (165 videos)...


100%|██████████| 165/165 [19:56<00:00,  7.25s/it]



Processing class: forehand_flat (165 videos)...


100%|██████████| 165/165 [18:56<00:00,  6.89s/it]



Processing class: forehand_openstands (165 videos)...


100%|██████████| 165/165 [16:39<00:00,  6.06s/it]



Processing class: forehand_slice (165 videos)...


100%|██████████| 165/165 [17:08<00:00,  6.23s/it]



Processing class: forehand_volley (165 videos)...


100%|██████████| 165/165 [15:43<00:00,  5.72s/it]



Processing class: kick_service (165 videos)...


100%|██████████| 165/165 [19:58<00:00,  7.26s/it]



Processing class: slice_service (165 videos)...


100%|██████████| 165/165 [18:41<00:00,  6.79s/it]



Processing class: smash (165 videos)...


100%|██████████| 165/165 [17:08<00:00,  6.23s/it]


--- Full Dataset Processing Complete! ---





## Data augmentation

### Mirroring the data

In [6]:

ROOT_DIR = r'C:\Users\Noam\OneDrive\Desktop\DL_project\dataset\PROCESSED_DATA'

# MediaPipe (left <->right)
POSE_PAIRS = [
    # face
    (1, 4), (2, 5), (3, 6), (7, 8), (9, 10),
    # torso
    (11, 12), (13, 14), (15, 16),
    # hands
    (17, 18), (19, 20), (21, 22),
    # pelvis
    (23, 24), (25, 26), (27, 28),
    # legs
    (29, 30), (31, 32)
]

def create_mirror_csv(file_path):
    try:
        df = pd.read_csv(file_path)
        
        flipped_df = df.copy()

        for i in range(33):
            col_x = f'x_{i}'
            if col_x in df.columns:
                flipped_df[col_x] = 1.0 - df[col_x]

        for idx_a, idx_b in POSE_PAIRS:
            for params in ['x', 'y', 'z', 'v']:
                col_a = f'{params}_{idx_a}'
                col_b = f'{params}_{idx_b}'
                
                if col_a in df.columns and col_b in df.columns:
                    flipped_df[col_a] = df[col_b] 
                    flipped_df[col_b] = df[col_a] 

        # data.csv -> data_flipped.csv
        new_filename = file_path.replace('.csv', '_flipped.csv')
        flipped_df.to_csv(new_filename, index=False)
        print(f"Created: {os.path.basename(new_filename)}")

    except Exception as e:
        print(f"Error processing {file_path}: {e}")

for root, dirs, files in os.walk(ROOT_DIR):
    for file in files:
        if file.endswith(".csv") and "_flipped" not in file:
            full_path = os.path.join(root, file)
            create_mirror_csv(full_path)

print("Succeeded. Mirror augmentation complete.")

Created: p10_backhand_s1_flipped.csv
Created: p10_backhand_s2_flipped.csv
Created: p10_backhand_s3_flipped.csv
Created: p11_backhand_s1_flipped.csv
Created: p11_backhand_s2_flipped.csv
Created: p11_backhand_s3_flipped.csv
Created: p12_backhand_s1_flipped.csv
Created: p12_backhand_s2_flipped.csv
Created: p12_backhand_s3_flipped.csv
Created: p13_backhand_s1_flipped.csv
Created: p13_backhand_s2_flipped.csv
Created: p13_backhand_s3_flipped.csv
Created: p14_backhand_s1_flipped.csv
Created: p14_backhand_s2_flipped.csv
Created: p14_backhand_s3_flipped.csv
Created: p15_backhand_s1_flipped.csv
Created: p15_backhand_s2_flipped.csv
Created: p15_backhand_s3_flipped.csv
Created: p16_backhand_s1_flipped.csv
Created: p16_backhand_s2_flipped.csv
Created: p16_backhand_s3_flipped.csv
Created: p17_backhand_s1_flipped.csv
Created: p17_backhand_s2_flipped.csv
Created: p17_backhand_s3_flipped.csv
Created: p18_backhand_s1_flipped.csv
Created: p18_backhand_s2_flipped.csv
Created: p18_backhand_s3_flipped.csv
C

### Train/Test/validation split

#### to counter memory leakage, we need to make sure that the same video plus his fliped video in the same split

In [10]:
OOT_DIR = r'C:\Users\Noam\OneDrive\Desktop\DL_project\dataset\PROCESSED_DATA'

def get_base_name(filename):
    # to prevent memory leakage
    name_without_ext = os.path.splitext(filename)[0]
    if name_without_ext.endswith('_flipped'):
        return name_without_ext.replace('_flipped', '')
    return name_without_ext


grouped_files = defaultdict(list)
file_labels = {} 

for root, dirs, files in os.walk(ROOT_DIR):
    for file in files:
        if file.endswith(".csv"):
            full_path = os.path.join(root, file)
            
            # folder name is the label
            label = os.path.basename(root)
            
            base_name = get_base_name(file)
            unique_id = f"{label}_{base_name}" # to prevent same name
            
            grouped_files[unique_id].append(full_path)
            file_labels[unique_id] = label

# split
all_unique_ids = list(grouped_files.keys())
all_labels = [file_labels[uid] for uid in all_unique_ids]

print(f"Total unique sequences found: {len(all_unique_ids)}")

# train/test/validation
train_ids, temp_ids, train_labels, temp_labels = train_test_split(
    all_unique_ids, all_labels, 
    test_size=0.2, 
    random_state=42, 
    stratify=all_labels # all of the classes in all of the splits
)

val_ids, test_ids, _, _ = train_test_split(
    temp_ids, temp_labels, 
    test_size=0.5, 
    random_state=42, 
    stratify=temp_labels
)

# return list
def flatten_file_list(id_list):
    final_files = []
    labels = []
    for uid in id_list:
        files = grouped_files[uid]
        final_files.extend(files)
        labels.extend([file_labels[uid]] * len(files))
    return final_files, labels

train_files, train_y = flatten_file_list(train_ids)
val_files, val_y = flatten_file_list(val_ids)
test_files, test_y = flatten_file_list(test_ids)

print(f"Train files: {len(train_files)}")
print(f"Val files:   {len(val_files)}")
print(f"Test files:  {len(test_files)}")

# example
print("Example Train File:", train_files[0])
print("Example Label:", train_y[0])

Total unique sequences found: 1980
Train files: 3168
Val files:   396
Test files:  396
Example Train File: C:\Users\Noam\OneDrive\Desktop\DL_project\dataset\PROCESSED_DATA\forehand_openstands\p22_foreopen_s2.csv
Example Label: forehand_openstands
