In [None]:
!pip install ultralytics torch torchvision torchaudio scikit-learn opencv-python

Collecting ultralytics
  Downloading ultralytics-8.3.233-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.233-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m30.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.233 ultralytics-thop-2.0.18


In [None]:

from google.colab import drive
drive.mount('/content/drive', force_remount=True)
import os, glob, math, json, time, collections, random
import numpy as np
import cv2
from pathlib import Path

import torch, torch.nn as nn, torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from ultralytics import YOLO

from tqdm import tqdm

# PyTorch imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms



# Utilities
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import pandas as pd
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

# ==========================================================
# UTILITIES
# ==========================================================

def make_dirs(d):
    os.makedirs(d, exist_ok=True)

def timestamp():
    return time.strftime('%Y%m%d_%H%M%S')


Mounted at /content/drive
Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


Video Cropper


In [None]:
# ============================================
# VIDEO CROPPER
# ============================================

class VideoCropper:
    def __init__(self, model_name='yolo11n.pt', device=None):
        print(f"Loading YOLO model: {model_name}...")
        try:
            self.model = YOLO(model_name)
            print(f"✓ Loaded {model_name}")
        except Exception:
            # Fallback to nano model
            self.model = YOLO('yolo11n.pt')
            print("✓ Loaded yolo11n.pt (fallback)")

        self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')

    def crop_player(self, frame):
        """Detect person and return cropped frame. Falls back to center crop."""
        try:
            results = self.model(frame, imgsz=640, verbose=False)

            # Check if valid results
            if not results or not results[0].boxes or len(results[0].boxes) == 0:
                return self._center_crop(frame)

            # Get first detection (assume it's a person)
            box = results[0].boxes[0]
            x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())

            # Validate box size
            h, w = frame.shape[:2]
            x1, y1 = max(0, x1), max(0, y1)
            x2, y2 = min(w, x2), min(h, y2)

            if x2 - x1 < 20 or y2 - y1 < 20:
                return self._center_crop(frame)

            return frame[y1:y2, x1:x2]

        except Exception:
            return self._center_crop(frame)

    def _center_crop(self, frame):
        """Fallback: crop center square of frame."""
        h, w = frame.shape[:2]
        size = min(h, w)
        y1 = (h - size) // 2
        x1 = (w - size) // 2
        return frame[y1:y1+size, x1:x1+size]


In [None]:
class AdaptiveKeyframeExtractor:
    """
    Adaptive keyframe extraction for 1-3 second action videos.

    Strategy:
    - < 1s: Use ALL frames + repeat to reach target
    - 1-2s: Uniform sampling
    - > 2s: Dense sampling in middle 60% (action region)
    """

    def __init__(self, target_frames=16):
        self.target_frames = target_frames

    def extract_frame_indices(self, video_path):
        """
        Get frame indices to extract based on video duration.

        Returns:
            List of frame indices
        """
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        duration = total_frames / fps if fps > 0 else 0
        cap.release()

        if duration < 1.0:
            # Very short: use ALL frames
            indices = list(range(total_frames))

            # Repeat frames to reach target
            if len(indices) < self.target_frames:
                repeat_factor = self.target_frames / len(indices)
                new_indices = []
                for i in indices:
                    n_repeats = int(np.ceil(repeat_factor))
                    new_indices.extend([i] * n_repeats)
                indices = new_indices[:self.target_frames]

        elif duration <= 2.0:
            # Medium: uniform sampling
            if total_frames <= self.target_frames:
                indices = list(range(total_frames))
            else:
                step = total_frames / self.target_frames
                indices = [int(i * step) for i in range(self.target_frames)]

        else:
            # Longer: dense middle sampling (skip first 20% and last 20%)
            start_frame = int(total_frames * 0.2)
            end_frame = int(total_frames * 0.8)
            action_frames = end_frame - start_frame

            step = max(1, action_frames // self.target_frames)
            indices = [start_frame + i * step for i in range(self.target_frames)]

        return indices[:self.target_frames]

In [None]:
# ============================================
# KEYPOINT EXTRACTOR
# ============================================

class KeypointExtractor:
    """Extract human pose keypoints using YOLO11 pose model."""

    def __init__(self, device='cpu'):
        from ultralytics import YOLO
        print("Loading YOLO11 Pose model...")
        self.model = YOLO('yolo11n-pose.pt')  # Nano pose model
        self.device = device
        self.feature_dim = 51  # 17 keypoints × 3 (x, y, confidence)

    def extract(self, frame):
        """Extract keypoints from frame.

        Returns:
            np.array of shape (51,): flattened [x1,y1,conf1, x2,y2,conf2, ...]
        """
        try:
            results = self.model(frame, imgsz=640, verbose=False)

            if len(results) == 0 or not hasattr(results[0], 'keypoints'):
                return np.zeros(self.feature_dim, dtype=np.float32)

            kpts = results[0].keypoints

            if kpts is None or len(kpts) == 0:
                return np.zeros(self.feature_dim, dtype=np.float32)

            # Get first person's keypoints
            kpt_data = kpts.data[0].cpu().numpy()  # Shape: (17, 3)

            # Normalize coordinates by frame dimensions
            h, w = frame.shape[:2]
            kpt_data[:, 0] /= w  # Normalize x
            kpt_data[:, 1] /= h  # Normalize y

            # Flatten to 1D
            features = kpt_data.flatten()

            return features.astype(np.float32)

        except Exception as e:
            print(f"Keypoint extraction failed: {e}")
            return np.zeros(self.feature_dim, dtype=np.float32)


# ==========================================================
# FEATURE CACHE
# ==========================================================

class VideoFeatureCache:
    """Cache extracted features to disk to speed up training."""

    def __init__(self, cache_dir):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)

    def cache_path(self, video_path):
        fname = Path(video_path).stem + '.npy'
        return self.cache_dir / fname

    def exists(self, video_path):
        return self.cache_path(video_path).exists()

    def save(self, video_path, arr):
        try:
            np.save(str(self.cache_path(video_path)), arr)
        except Exception as e:
            print(f"Warning: Failed to save cache for {video_path}: {e}")

    def load(self, video_path):
        try:
            return np.load(str(self.cache_path(video_path)))
        except Exception as e:
            print(f"Warning: Failed to load cache for {video_path}: {e}")
            return None




# ============================================
# ADAPTIVE FEATURE EXTRACTION
# ============================================

def precompute_keypoint_features_adaptive(videos, extractor, cache, seq_length=16):
    """Pre-extract keypoint features with adaptive sampling."""

    print("\n" + "="*60)
    print("ADAPTIVE KEYFRAME EXTRACTION + FEATURE EXTRACTION")
    print("="*60)

    keyframe_extractor = AdaptiveKeyframeExtractor(target_frames=seq_length)
    feature_dim = extractor.feature_dim
    videos_to_process = [v for v in videos if not cache.exists(v)]

    if len(videos_to_process) == 0:
        print("✓ All features already cached!")
        return

    print(f"Processing {len(videos_to_process)} videos...\n")

    for video_path in tqdm(videos_to_process, desc="Extracting features"):
        try:
            # Get adaptive frame indices
            frame_indices = keyframe_extractor.extract_frame_indices(video_path)

            # Extract features from selected frames
            cap = cv2.VideoCapture(video_path)
            feats = []

            for frame_idx in frame_indices:
                cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
                ok, frame = cap.read()

                if not ok:
                    break

                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                feat = extractor.extract(frame)
                feats.append(feat)

            cap.release()

            # Pad if needed (shouldn't happen with adaptive)
            if len(feats) < seq_length:
                if len(feats) == 0:
                    feats = [np.zeros(feature_dim, dtype=np.float32)] * seq_length
                else:
                    last = feats[-1]
                    while len(feats) < seq_length:
                        feats.append(last.copy())

            arr = np.stack(feats[:seq_length], axis=0).astype(np.float32)
            cache.save(video_path, arr)

        except Exception as e:
            print(f"\nError: {video_path}: {e}")
            zero_features = np.zeros((seq_length, feature_dim), dtype=np.float32)
            cache.save(video_path, zero_features)

    print("\n✓ Feature extraction complete!")




Dataset + Augmentation + load pre-extracted features

In [None]:
class PickleballDataset(Dataset):

    def __init__(self, videos, labels, cache, seq_length=16,
                 augment=False, feature_dim=2048):
        self.videos = videos
        self.labels = labels
        self.cache = cache
        self.seq_length = seq_length
        self.augment = augment
        self.feature_dim = feature_dim

    def __len__(self):
        return len(self.videos)

    def temporal_augmentation(self, features):
        """Apply temporal augmentation to feature sequence."""
        seq_len = features.shape[0]

        # Random temporal shift (±20% of sequence)
        if random.random() < 0.5:
            shift = random.randint(-seq_len // 5, seq_len // 5)
            if shift > 0:
                # Shift right: pad left, trim right
                features = np.concatenate([
                    np.repeat(features[0:1], shift, axis=0),
                    features[:-shift]
                ], axis=0)
            elif shift < 0:
                # Shift left: trim left, pad right
                features = np.concatenate([
                    features[-shift:],
                    np.repeat(features[-1:], -shift, axis=0)
                ], axis=0)

        # Random temporal dropout (drop 1-2 frames, duplicate neighbors)
        if random.random() < 0.3:
            num_drops = random.randint(1, 2)
            for _ in range(num_drops):
                drop_idx = random.randint(1, seq_len - 2)
                # Replace dropped frame with average of neighbors
                features[drop_idx] = (features[drop_idx-1] + features[drop_idx+1]) / 2

        # Random temporal reverse (for symmetric actions)
        if random.random() < 0.2:
            features = features[::-1].copy()

        return features

    def add_feature_noise(self, x, std=0.01):
        """Add small Gaussian noise to precomputed features."""
        noise = np.random.normal(0, std, x.shape).astype(np.float32)
        return x + noise

    def frame_dropout(self, x, max_drops=3):
        L = x.shape[0]
        drops = random.randint(1, max_drops)
        idxs = np.random.choice(L, drops, replace=False)
        x[idxs] = 0
        return x



    def __getitem__(self, idx):
        v = self.videos[idx]

        # Load from cache
        x = self.cache.load(v)

        if x is None:
            # Fallback: return zeros if cache missing
            print(f"Warning: Cache missing for {v}, using zeros")
            x = np.zeros((self.seq_length, self.feature_dim), dtype=np.float32)

        # Apply augmentation during training
        if self.augment:
            x = self.temporal_augmentation(x)

        if self.augment and random.random() < 0.5:
            x = self.add_feature_noise(x, std=0.02)

        if self.augment and random.random() < 0.3:
            x = self.frame_dropout(x)



        y = int(self.labels[idx])
        return torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.long)


LSTM

In [None]:
import os, glob, math, json, time, collections, random
import numpy as np
import cv2
from pathlib import Path

import torch, torch.nn as nn, torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from ultralytics import YOLO

from tqdm import tqdm

# PyTorch imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms



# Utilities
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import pandas as pd
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

# ==========================================================
# UTILITIES
# ==========================================================

def make_dirs(d):
    os.makedirs(d, exist_ok=True)

def timestamp():
    return time.strftime('%Y%m%d_%H%M%S')

# ============================================
# UPDATED MODEL (smaller LSTM for keypoints)
# ============================================

class KeypointLSTMClassifier(nn.Module):
    """Lightweight LSTM for keypoint sequences."""

    def __init__(self, feature_dim=51, hidden=128, num_layers=1,
                 num_classes=3, dropout=0.2):
        super().__init__()

        self.lstm = nn.LSTM(
            feature_dim,
            hidden,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )

        self.dropout = nn.Dropout(dropout)
        self.ln = nn.LayerNorm(hidden)
        self.classifier = nn.Linear(hidden, num_classes)

    def forward(self, x):
        out, (h, c) = self.lstm(x)
        last_hidden = h[-1]
        last_hidden = self.dropout(last_hidden)
        last_hidden = self.ln(last_hidden)
        logits = self.classifier(last_hidden)
        return logits
#Earlystop
class EarlyStopping:
    """Early stopping to prevent overfitting."""

    def __init__(self, patience=7, min_delta=0.0, mode='min'):
        self.patience = patience
        self.min_delta = min_delta
        self.mode = mode
        self.counter = 0
        self.best_score = None
        self.early_stop = False

    def __call__(self, score):
        if self.best_score is None:
            self.best_score = score
            return False

        if self.mode == 'min':
            improved = score < (self.best_score - self.min_delta)
        else:
            improved = score > (self.best_score + self.min_delta)

        if improved:
            self.best_score = score
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

        return self.early_stop

def list_videos(root, classes):
    """List all video files and their labels."""
    vids, labels, counts = [], [], {}

    for ci, cname in enumerate(classes):
        p = Path(root) / cname
        if not p.exists():
            print(f"Warning: {p} does not exist")
            counts[cname] = 0
            continue

        files = list(p.glob('*.mp4')) + list(p.glob('*.avi')) + list(p.glob('*.MOV'))
        vids.extend([str(x) for x in files])
        labels.extend([ci] * len(files))
        counts[cname] = len(files)

    return vids, labels, counts


# ============================================
# UPDATED TRAINING FUNCTION
# ============================================

def train_adaptive_model(params):
    """Train model with adaptive keyframe extraction."""

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"\nUsing device: {device}")

    # Initialize
    print("\nInitializing YOLO Pose model...")
    extractor = KeypointExtractor(device=device)
    cache = VideoFeatureCache(params['cache_dir'])

    # Load data
    print("\nLoading video data...")
    vids, labels, counts = list_videos(params['data_root'], params['classes'])
    print(f"Dataset: {counts}")
    print(f"Total videos: {len(vids)}")

    if len(vids) == 0:
        raise ValueError("No videos found! Check data_root path.")

    tr_v, val_v, tr_y, val_y = train_test_split(
        vids, labels, test_size=0.2, stratify=labels, random_state=42
    )

    print(f"\nTrain: {len(tr_v)} videos")
    print(f"Val: {len(val_v)} videos")

    # Pre-extract with adaptive sampling
    all_videos = list(set(tr_v + val_v))
    precompute_keypoint_features_adaptive(
        all_videos, extractor, cache, seq_length=params['seq_length']
    )

    # Free memory
    del extractor
    if device == 'cuda':
        torch.cuda.empty_cache()

    # Create datasets
    train_ds = PickleballDataset(
        tr_v, tr_y, cache,
        seq_length=params['seq_length'],
        augment=True,
        feature_dim=51
    )
    val_ds = PickleballDataset(
        val_v, val_y, cache,
        seq_length=params['seq_length'],
        augment=False,
        feature_dim=51
    )

    train_loader = DataLoader(train_ds, batch_size=params['batch_size'],
                             shuffle=True, num_workers=0)
    val_loader = DataLoader(val_ds, batch_size=params['batch_size'],
                           shuffle=False, num_workers=0)

    # Initialize model
    print("\nInitializing Keypoint LSTM model...")
    model = KeypointLSTMClassifier(
        feature_dim=51,
        hidden=128,
        num_layers=1,
        num_classes=len(params['classes']),
        dropout=0.3
    ).to(device)

    print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")

    # Optimizer
    optimizer = optim.AdamW(
        model.parameters(),
        lr=params['lr'],
        weight_decay=0.01
    )
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.5, patience=5
    )

    early_stopping = EarlyStopping(patience=10, min_delta=0.001, mode='min')

    # Training
    best_val_loss = float('inf')
    history = {'train_loss': [], 'val_loss': [], 'val_acc': []}

    print("\n" + "="*60)
    print("STARTING TRAINING")
    print("="*60)

    for epoch in range(params['epochs']):
        print(f"\nEpoch {epoch+1}/{params['epochs']}")

        # Train
        model.train()
        train_losses = []

        for X, y in tqdm(train_loader, desc='Training', leave=False):
            X, y = X.to(device), y.to(device)

            optimizer.zero_grad()
            outputs = model(X)
            loss = criterion(outputs, y)
            loss.backward()

            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

            train_losses.append(loss.item())

        avg_train_loss = np.mean(train_losses)

        # Validate
        model.eval()
        val_losses = []
        all_preds, all_labels = [], []

        with torch.no_grad():
            for X, y in tqdm(val_loader, desc='Validation', leave=False):
                X, y = X.to(device), y.to(device)
                outputs = model(X)
                loss = criterion(outputs, y)
                val_losses.append(loss.item())

                preds = torch.argmax(outputs, dim=1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(y.cpu().numpy())

        avg_val_loss = np.mean(val_losses)
        val_acc = np.mean(np.array(all_preds) == np.array(all_labels))

        history['train_loss'].append(avg_train_loss)
        history['val_loss'].append(avg_val_loss)
        history['val_acc'].append(val_acc)

        print(f"Train Loss: {avg_train_loss:.4f}")
        print(f"Val Loss:   {avg_val_loss:.4f}")
        print(f"Val Acc:    {val_acc*100:.2f}%")

        # Classification report every 5 epochs
        if (epoch + 1) % 5 == 0:
            print("\nClassification Report:")
            print(classification_report(
                all_labels, all_preds,
                target_names=params['classes'],
                zero_division=0
            ))

        scheduler.step(avg_val_loss)
        print(f"Learning Rate: {optimizer.param_groups[0]['lr']:.6f}")

        # Save best
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            best_val_acc = val_acc
            torch.save({
                'model_state_dict': model.state_dict(),
                'params': params,
                'val_loss': avg_val_loss,
                'val_acc': val_acc,
            }, params['out_dir'] + '/best_adaptive_model.pth')
            print("✓ Saved best model")

        # Early stopping
        if early_stopping(avg_val_loss):
            print(f"\n⚠ Early stopping triggered at epoch {epoch+1}")
            break


    print("\n" + "="*60)
    print("TRAINING COMPLETE!")
    print("="*60)
    print(f"Best Val Loss: {best_val_loss:.4f}")
    print(f"Best Val Acc:  {best_val_acc*100:.2f}%")
    print(f"Total Epochs:  {len(history['train_loss'])}")

    return model, history


# ============================================
# HOW TO USE
# ============================================

if __name__ == '__main__':
    params = {
        'data_root': '/content/drive/MyDrive/DatasetVideoPickle',
        'classes': ['Serve', 'DriveBackhand', 'DriveForehand'],
        'seq_length': 16,
        'batch_size': 16,  # Can use larger batch now
        'lr': 0.001,
        'epochs': 50,
        'out_dir': '/content/drive/MyDrive/keypoint_experiments',
        'cache_dir': '/content/drive/MyDrive/keypoint_cache',
    }

    os.makedirs(params['out_dir'], exist_ok=True)
    os.makedirs(params['cache_dir'], exist_ok=True)

    model, history = train_adaptive_model(params)

Using device: cuda

Initializing YOLO Pose model...
Loading YOLO11 Pose model...
[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolo11n-pose.pt to 'yolo11n-pose.pt': 100% ━━━━━━━━━━━━ 6.0MB 100.3MB/s 0.1s

Loading video data...
Dataset: {'Serve': 283, 'DriveBackhand': 241, 'DriveForehand': 311}

PRE-EXTRACTING KEYPOINT FEATURES
Processing 835 videos...


Extracting keypoints:   2%|▏         | 20/835 [01:14<50:33,  3.72s/it]


KeyboardInterrupt: 