# Tennis Swing Analysis v2 — Reference Feature ExtractionThis notebook extracts biomechanical features from pro player training videosusing MediaPipe Pose Landmarker, producing CSV reference files for the DTW comparison engine.**Steps:**1. Mount Google Drive2. Install dependencies & download MediaPipe model3. Configure paths to your training videos4. Extract pose landmarks + biomechanical features5. Download/save reference CSVs

## 1. Setup

In [None]:
# Mount Google Drivefrom google.colab import drivedrive.mount('/content/drive/')

In [None]:
# Install dependencies!pip install -q mediapipe opencv-python-headless numpy pandas scipy

In [None]:
# Download MediaPipe Pose Landmarker modelimport osMODEL_PATH = '/content/pose_landmarker_heavy.task'if not os.path.exists(MODEL_PATH):    !wget -q -O {MODEL_PATH} https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task    print(f"Downloaded model to {MODEL_PATH}")else:    print("Model already exists.")

## 2. Configure Paths

In [None]:
# =====================================================# EDIT THESE PATHS to match your Google Drive structure# =====================================================# Root directory containing player subdirectories with MP4 videos# Expected structure:#   VIDEOS_DIR/#       Federer/#           forehand_1.mp4#           forehand_2.mp4#       Nadal/#           forehand_1.mp4#       ...VIDEOS_DIR = 'drive/My Drive/tennis-swing-classification/videos/MP4'# Where to save extracted reference CSVsOUTPUT_DIR = 'drive/My Drive/tennis-swing-classification/references_v2'# Max seconds to process per video (None = full video)MAX_SECONDS = None# Sliding window settings# Each long video is sliced into overlapping segments for DTW comparisonWINDOW_SECONDS = 10   # Each reference segment length (matches user upload)STRIDE_SECONDS = 5    # Slide step (50% overlap for good coverage)

In [None]:
# List available videosfrom pathlib import Pathvideos_dir = Path(VIDEOS_DIR)print(f"Looking for videos in: {videos_dir}\n")if not videos_dir.exists():    print("ERROR: Directory not found! Check VIDEOS_DIR path above.")else:    for player_dir in sorted(videos_dir.iterdir()):        if player_dir.is_dir():            videos = list(player_dir.glob('*.mp4')) + list(player_dir.glob('*.MP4'))            print(f"{player_dir.name}: {len(videos)} videos")            for v in sorted(videos):                print(f"  - {v.name}")    print()

## 3. Biomechanical Feature Extraction Functions

In [None]:
import numpy as npimport pandas as pdfrom scipy.signal import savgol_filter# MediaPipe Pose landmark indicesL_SHOULDER, R_SHOULDER = 11, 12L_ELBOW, R_ELBOW = 13, 14L_WRIST, R_WRIST = 15, 16L_HIP, R_HIP = 23, 24L_KNEE, R_KNEE = 25, 26L_ANKLE, R_ANKLE = 27, 28def compute_angle(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> float:    """Compute angle (degrees) at point b, formed by points a-b-c."""    ba = a - b    bc = c - b    cos_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-8)    return float(np.degrees(np.arccos(np.clip(cos_angle, -1.0, 1.0))))def extract_frame_features(landmarks) -> dict:    """Extract biomechanical features from one frame of landmarks."""    pts = np.array([[lm.x, lm.y, lm.z] for lm in landmarks])    f = {}    # Joint angles    f['r_elbow_angle'] = compute_angle(pts[R_SHOULDER], pts[R_ELBOW], pts[R_WRIST])    f['l_elbow_angle'] = compute_angle(pts[L_SHOULDER], pts[L_ELBOW], pts[L_WRIST])    f['r_shoulder_angle'] = compute_angle(pts[R_HIP], pts[R_SHOULDER], pts[R_ELBOW])    f['l_shoulder_angle'] = compute_angle(pts[L_HIP], pts[L_SHOULDER], pts[L_ELBOW])    f['r_knee_angle'] = compute_angle(pts[R_HIP], pts[R_KNEE], pts[R_ANKLE])    f['l_knee_angle'] = compute_angle(pts[L_HIP], pts[L_KNEE], pts[L_ANKLE])    f['r_hip_angle'] = compute_angle(pts[R_SHOULDER], pts[R_HIP], pts[R_KNEE])    f['l_hip_angle'] = compute_angle(pts[L_SHOULDER], pts[L_HIP], pts[L_KNEE])    # Torso rotation    shoulder_vec = pts[R_SHOULDER][:2] - pts[L_SHOULDER][:2]    hip_vec = pts[R_HIP][:2] - pts[L_HIP][:2]    f['torso_rotation'] = float(np.degrees(        np.arctan2(shoulder_vec[1], shoulder_vec[0]) -        np.arctan2(hip_vec[1], hip_vec[0])    ))    # Distances    f['stance_width'] = float(np.linalg.norm(pts[R_ANKLE][:2] - pts[L_ANKLE][:2]))    f['shoulder_width'] = float(np.linalg.norm(pts[R_SHOULDER][:2] - pts[L_SHOULDER][:2]))    f['r_wrist_height'] = float(pts[R_SHOULDER][1] - pts[R_WRIST][1])    f['l_wrist_height'] = float(pts[L_SHOULDER][1] - pts[L_WRIST][1])    f['wrist_separation'] = float(np.linalg.norm(pts[R_WRIST][:2] - pts[L_WRIST][:2]))    # Body center & balance    hip_center = (pts[L_HIP] + pts[R_HIP]) / 2    shoulder_center = (pts[L_SHOULDER] + pts[R_SHOULDER]) / 2    f['body_lean'] = float(shoulder_center[0] - hip_center[0])    f['forward_lean'] = float(shoulder_center[1] - hip_center[1])    # Arm extension    f['r_arm_extension'] = float(np.linalg.norm(pts[R_WRIST] - pts[R_SHOULDER]))    f['l_arm_extension'] = float(np.linalg.norm(pts[L_WRIST] - pts[L_SHOULDER]))    return fdef extract_sequence_features(landmarks_sequence: list) -> pd.DataFrame:    """Extract features from all frames, with smoothing."""    frames = []    for lm in landmarks_sequence:        if lm is not None:            frames.append(extract_frame_features(lm))        else:            frames.append(None)    df = pd.DataFrame(frames)    df = df.ffill().bfill()    # Smooth    for col in df.columns:        if len(df[col].dropna()) > 11:            try:                df[col] = savgol_filter(df[col].values, window_length=11, polyorder=3)            except Exception:                pass    return dfprint("Feature extraction functions loaded.")print(f"Features per frame: {len(extract_frame_features.__code__.co_varnames)}")# Show feature namessample_features = [    'r_elbow_angle', 'l_elbow_angle', 'r_shoulder_angle', 'l_shoulder_angle',    'r_knee_angle', 'l_knee_angle', 'r_hip_angle', 'l_hip_angle',    'torso_rotation', 'stance_width', 'shoulder_width',    'r_wrist_height', 'l_wrist_height', 'wrist_separation',    'body_lean', 'forward_lean', 'r_arm_extension', 'l_arm_extension']print(f"Features ({len(sample_features)}): {sample_features}")

## 4. Pose Detection Functions

In [None]:
import cv2import mediapipe as mpBaseOptions = mp.tasks.BaseOptionsPoseLandmarker = mp.tasks.vision.PoseLandmarkerPoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptionsVisionRunningMode = mp.tasks.vision.RunningMode# Check if GPU is availableimport subprocesstry:    result = subprocess.run(['nvidia-smi'], capture_output=True, text=True)    GPU_AVAILABLE = result.returncode == 0    if GPU_AVAILABLE:        gpu_name = [l for l in result.stdout.split('\n') if 'Tesla' in l or 'A100' in l or 'V100' in l or 'T4' in l]        print(f"✅ GPU detected: {gpu_name[0].strip() if gpu_name else 'Yes'}")        print("   MediaPipe will use GPU delegate for faster inference.")    else:        print("⚠️  No GPU detected. Using CPU (slower but works fine).")except FileNotFoundError:    GPU_AVAILABLE = False    print("⚠️  No GPU detected. Using CPU (slower but works fine).")# Select delegateDELEGATE = BaseOptions.Delegate.GPU if GPU_AVAILABLE else BaseOptions.Delegate.CPUdef extract_landmarks_from_video(video_path: str, max_seconds: float = None):    """Run MediaPipe Pose Landmarker on a video, return (landmarks_sequence, fps)."""    options = PoseLandmarkerOptions(        base_options=BaseOptions(            model_asset_path=MODEL_PATH,            delegate=DELEGATE,        ),        running_mode=VisionRunningMode.VIDEO,        num_poses=1,        min_pose_detection_confidence=0.5,        min_pose_presence_confidence=0.5,        min_tracking_confidence=0.5,    )    landmarker = PoseLandmarker.create_from_options(options)    cap = cv2.VideoCapture(video_path)    fps = cap.get(cv2.CAP_PROP_FPS)    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))    max_frames = int(fps * max_seconds) if max_seconds else total_frames    landmarks_sequence = []    frame_idx = 0    while cap.isOpened() and frame_idx < max_frames:        ret, frame = cap.read()        if not ret:            break        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb)        timestamp_ms = int(frame_idx * 1000 / fps)        result = landmarker.detect_for_video(mp_image, timestamp_ms)        if result.pose_landmarks and len(result.pose_landmarks) > 0:            landmarks_sequence.append(result.pose_landmarks[0])        else:            landmarks_sequence.append(None)        frame_idx += 1    cap.release()    landmarker.close()    return landmarks_sequence, fpsprint("\nPose detection functions loaded.")print(f"Delegate: {'GPU' if GPU_AVAILABLE else 'CPU'}")

## 5. Run Extraction

In [None]:
import timeoutput_dir = Path(OUTPUT_DIR)total_processed = 0total_segments = 0total_skipped = 0results_summary = []for player_dir in sorted(videos_dir.iterdir()):    if not player_dir.is_dir():        continue    player_name = player_dir.name    video_files = sorted(list(player_dir.glob('*.mp4')) + list(player_dir.glob('*.MP4')))    if not video_files:        continue    sep = '=' * 60    print(f"\n{sep}")    print(f"Player: {player_name} ({len(video_files)} videos)")    print(sep)    for video_file in video_files:        start_time = time.time()        print(f"\n  📹 {video_file.name}")        # Extract landmarks from full video        print("     Pose detection...", end=' ', flush=True)        landmarks_seq, fps = extract_landmarks_from_video(            str(video_file), MAX_SECONDS        )        valid = sum(1 for lm in landmarks_seq if lm is not None)        total = len(landmarks_seq)        ratio = valid / total if total > 0 else 0        duration = total / fps if fps > 0 else 0        print(f"{total} frames ({duration:.1f}s), {valid} with pose ({ratio*100:.0f}%)")        if ratio < 0.5:            print("     ⚠️  Skipping — too few valid frames")            total_skipped += 1            continue        # Extract features from full video        print("     Extracting features...", end=' ', flush=True)        all_frames_features = []        for lm in landmarks_seq:            if lm is not None:                all_frames_features.append(extract_frame_features(lm))        full_df = pd.DataFrame(all_frames_features)        # Smooth the full sequence        for col in full_df.columns:            if len(full_df[col].dropna()) > 11:                try:                    full_df[col] = savgol_filter(full_df[col].values, window_length=11, polyorder=3)                except Exception:                    pass        print(f"{len(full_df)} frames x {len(full_df.columns)} features")        # Slice into windows        window_frames = int(WINDOW_SECONDS * fps)        stride_frames = int(STRIDE_SECONDS * fps)        num_segments = 0        for win_start in range(0, len(full_df) - window_frames + 1, stride_frames):            win_end = win_start + window_frames            segment_df = full_df.iloc[win_start:win_end].reset_index(drop=True)            # Save segment            seg_name = f"{video_file.stem}_seg{num_segments:03d}"            save_path = output_dir / player_name / f"{seg_name}.csv"            save_path.parent.mkdir(parents=True, exist_ok=True)            segment_df.to_csv(save_path, index=False)            num_segments += 1        elapsed = time.time() - start_time        print(f"     ✅ {num_segments} segments saved ({elapsed:.1f}s)")        total_processed += 1        total_segments += num_segments        results_summary.append({            'player': player_name,            'video': video_file.name,            'duration_s': round(duration, 1),            'frames': total,            'segments': num_segments,            'time_s': round(elapsed, 1),        })sep = '=' * 60print(f"\n\n{sep}")print(f"DONE! Videos: {total_processed} | Segments: {total_segments} | Skipped: {total_skipped}")print(f"Output: {output_dir}")

## 6. Verify Results

In [None]:
# Summary tableimport pandas as pdif results_summary:    df_summary = pd.DataFrame(results_summary)    print("Extraction Summary:")    print(df_summary.to_string(index=False))    print(f"\nTotal processing time: {df_summary['time_s'].sum():.0f}s")    print(f"Total segments: {df_summary['segments'].sum()}")    print("\nPer player:")    per_player = df_summary.groupby('player').agg(        videos=('video', 'count'),        segments=('segments', 'sum'),        total_duration=('duration_s', 'sum')    )    print(per_player.to_string())

In [None]:
# Quick sanity check — plot one reference's featuresimport matplotlib.pyplot as pltoutput_dir = Path(OUTPUT_DIR)csv_files = list(output_dir.rglob('*.csv'))if csv_files:    sample_csv = csv_files[0]    df = pd.read_csv(sample_csv)    print(f"Sample: {sample_csv}")    print(f"Shape: {df.shape}")    print(f"Columns: {df.columns.tolist()}")    fig, axes = plt.subplots(2, 2, figsize=(16, 10))    fig.suptitle(f"Feature Visualization — {sample_csv.parent.name}/{sample_csv.name}", fontsize=14)    # Joint angles    angle_cols = [c for c in df.columns if 'angle' in c]    df[angle_cols].plot(ax=axes[0, 0], title='Joint Angles (degrees)')    # Distances    dist_cols = ['stance_width', 'shoulder_width', 'wrist_separation', 'r_arm_extension', 'l_arm_extension']    df[[c for c in dist_cols if c in df.columns]].plot(ax=axes[0, 1], title='Distances')    # Heights & lean    height_cols = ['r_wrist_height', 'l_wrist_height', 'body_lean', 'forward_lean']    df[[c for c in height_cols if c in df.columns]].plot(ax=axes[1, 0], title='Heights & Lean')    # Torso rotation    if 'torso_rotation' in df.columns:        df['torso_rotation'].plot(ax=axes[1, 1], title='Torso Rotation (degrees)')    plt.tight_layout()    plt.show()else:    print("No CSV files found. Check OUTPUT_DIR.")

## 6b. Visual Check — Pose Overlay on Video

In [None]:
# =====================================================# Pick a video to visualize (edit these)# =====================================================VIZ_PLAYER = "Djokovic"VIZ_VIDEO = "Djokovic1_v3.mp4"   # Pick any video you already processedVIZ_START_SEC = 0                 # Start time in secondsVIZ_DURATION_SEC = 10             # Duration to visualize# =====================================================import cv2import mediapipe as mpfrom mediapipe import solutionsfrom IPython.display import HTMLimport tempfileimport base64import numpy as npviz_video_path = str(Path(VIDEOS_DIR) / VIZ_PLAYER / VIZ_VIDEO)print(f"Video: {viz_video_path}")# Set up MediaPipeoptions = PoseLandmarkerOptions(    base_options=BaseOptions(model_asset_path=MODEL_PATH),    running_mode=VisionRunningMode.VIDEO,    num_poses=1,)landmarker = PoseLandmarker.create_from_options(options)cap = cv2.VideoCapture(viz_video_path)fps = cap.get(cv2.CAP_PROP_FPS)width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))start_frame = int(VIZ_START_SEC * fps)max_frames = int(VIZ_DURATION_SEC * fps)# Skip to startcap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)# MediaPipe drawing specPOSE_CONNECTIONS = solutions.pose.POSE_CONNECTIONSLANDMARK_STYLE = solutions.drawing_styles.get_default_pose_landmarks_style()# Process frames and draw landmarksoutput_frames = []for i in range(max_frames):    ret, frame = cap.read()    if not ret:        break    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb)    timestamp_ms = int((start_frame + i) * 1000 / fps)    result = landmarker.detect_for_video(mp_image, timestamp_ms)    # Draw landmarks on frame    annotated = frame.copy()    if result.pose_landmarks and len(result.pose_landmarks) > 0:        # Convert to drawing format        landmark_proto = mp.framework.formats.landmark_pb2.NormalizedLandmarkList()        for lm in result.pose_landmarks[0]:            landmark_proto.landmark.add(x=lm.x, y=lm.y, z=lm.z, visibility=lm.visibility)        solutions.drawing_utils.draw_landmarks(            annotated,            landmark_proto,            POSE_CONNECTIONS,            landmark_drawing_spec=LANDMARK_STYLE,        )    output_frames.append(annotated)cap.release()landmarker.close()print(f"Processed {len(output_frames)} frames ({len(output_frames)/fps:.1f}s)")# Write to temp video filetmp = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)out = cv2.VideoWriter(tmp.name, cv2.VideoWriter_fourcc(*'avc1'), fps, (width, height))for f in output_frames:    out.write(f)out.release()# Display in Colabwith open(tmp.name, 'rb') as f:    video_bytes = f.read()b64 = base64.b64encode(video_bytes).decode()html = f"""<p><b>{VIZ_PLAYER} — {VIZ_VIDEO}</b> (frames {start_frame}–{start_frame + len(output_frames)}, {len(output_frames)/fps:.1f}s)</p><video width="640" controls loop autoplay muted>  <source src="data:video/mp4;base64,{b64}" type="video/mp4"></video>"""display(HTML(html))print("\n✅ Video with pose overlay displayed above.")

## 7. Copy to ProjectAfter extraction, copy the  folder into your project:Or download the folder and place it manually.

In [None]:
# Optional: zip the references for easy downloadimport shutiloutput_dir = Path(OUTPUT_DIR)zip_path = output_dir.parent / 'references_v2'if output_dir.exists():    shutil.make_archive(str(zip_path), 'zip', str(output_dir))    print(f"Zipped to: {zip_path}.zip")    print("Download this file and extract into backend/data/references/")