In [6]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import AutoProcessor, AutoModel
from PIL import Image
import pandas as pd
import json
import os
from tqdm import tqdm
import numpy as np
import cv2
from pathlib import Path
import random
from collections import defaultdict

In [None]:
class WorkoutImageDataset(Dataset):
    def __init__(self, data_dir, annotations_file, processor, transform=None, max_samples=None):
        self.processor = processor
        self.data_dir = Path(data_dir)
        self.transform = transform
        
        # Load annotations
        if annotations_file.endswith('.csv'):
            self.annotations = pd.read_csv(annotations_file)
        else:  # JSON format
            with open(annotations_file, 'r') as f:
                self.annotations = json.load(f)
        
        # Prepare samples
        self.samples = self.prepare_samples()
        
        if max_samples:
            self.samples = self.samples[:max_samples]
    
    def prepare_samples(self):
        """Prepare image-annotation pairs"""
        samples = []
        
        if isinstance(self.annotations, pd.DataFrame):
            # CSV format
            for _, row in self.annotations.iterrows():
                image_path = self.data_dir / row['image_path']
                if image_path.exists():
                    samples.append({
                        'image_path': str(image_path),
                        'bboxes': eval(row['bboxes']) if isinstance(row['bboxes'], str) else row['bboxes'],
                        'labels': eval(row['labels']) if isinstance(row['labels'], str) else row['labels']
                    })
        else:
            # JSON format
            for image_name, image_info in self.annotations.items():
                image_path = self.data_dir / image_name
                if image_path.exists():
                    samples.append({
                        'image_path': str(image_path),
                        'bboxes': image_info.get('bboxes', []),
                        'labels': image_info.get('labels', [])
                    })
        
        print(f"Loaded {len(samples)} samples")
        return samples
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        sample = self.samples[idx]
        
        # Load image
        image = Image.open(sample['image_path']).convert('RGB')
        
        # Apply transforms if any
        if self.transform:
            image = self.transform(image)
        
        # Get bounding boxes and labels
        bboxes = sample['bboxes']
        labels = sample['labels']
        
        # Convert bboxes to tensor
        if bboxes:
            # Ensure bboxes are in [x1, y1, x2, y2] format normalized [0,1]
            bboxes_tensor = torch.tensor(bboxes, dtype=torch.float)
        else:
            bboxes_tensor = torch.zeros((0, 4), dtype=torch.float)
        
        # Create text prompt
        text_prompt = " . ".join(labels) if labels else "exercise"
        
        return {
            'image': image,
            'text_prompt': text_prompt,
            'boxes': bboxes_tensor,
            'labels': labels,
            'image_path': sample['image_path']
        }

# Data augmentation transforms
def get_transforms(train=True):
    if train:
        return torch.nn.Sequential(
            # You can add more transforms here
            # torchvision.transforms.RandomHorizontalFlip(p=0.5),
            # torchvision.transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        )
    return torch.nn.Sequential()  # Identity transform for validation

# Custom collate function
def collate_fn(batch):
    images = [item['image'] for item in batch]
    text_prompts = [item['text_prompt'] for item in batch]
    boxes = [item['boxes'] for item in batch]
    labels = [item['labels'] for item in batch]
    image_paths = [item['image_path'] for item in batch]
    
    return {
        'images': images,
        'text_prompts': text_prompts,
        'boxes': boxes,
        'labels': labels,
        'image_paths': image_paths
    }

# Loss function for Grounding DINO
class GroundingDINOLoss(nn.Module):
    def __init__(self, weight_dict=None):
        super().__init__()
        self.weight_dict = weight_dict or {
            'loss_ce': 1.0,
            'loss_bbox': 1.0,
            'loss_giou': 1.0,
        }
    
    def forward(self, outputs, targets):
        """
        Compute losses
        outputs: model outputs
        targets: list of target boxes
        """
        losses = {}
        
        # Classification loss (simplified)
        if hasattr(outputs, 'logits'):
            # You might need to adapt this based on actual model output
            pass
        
        # Box regression losses
        total_boxes = sum(len(target) for target in targets)
        if total_boxes > 0:
            # Simplified box loss - you'll need to adapt this
            loss_bbox = torch.tensor(0.0, device=outputs.last_hidden_state.device)
            loss_giou = torch.tensor(0.0, device=outputs.last_hidden_state.device)
            
            # This is a placeholder - you'll need to implement proper matching
            for i, (output, target) in enumerate(zip(outputs.pred_boxes, targets)):
                if len(target) > 0:
                    # Simple L1 loss for boxes (normalized coordinates)
                    pred_boxes = output[:len(target)]
                    loss_bbox += torch.nn.functional.l1_loss(pred_boxes, target)
            
            losses['loss_bbox'] = loss_bbox / len(targets)
            losses['loss_giou'] = loss_giou / len(targets)
        
        # Total loss
        total_loss = sum(losses.get(k, 0) * self.weight_dict.get(k, 1.0) for k in losses)
        losses['total_loss'] = total_loss
        
        return losses

# Training function
def train_grounding_dino(model, processor, train_loader, val_loader, num_epochs=10, device='cuda', output_dir='checkpoints'):
    os.makedirs(output_dir, exist_ok=True)
    
    model.train()
    model.to(device)
    
    # Optimizer
    optimizer = torch.optim.AdamW(
        model.parameters(),
        lr=1e-4,
        weight_decay=1e-4
    )
    
    # Learning rate scheduler
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)
    
    # Loss function
    criterion = GroundingDINOLoss()
    
    best_val_loss = float('inf')
    
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        total_items = 0
        
        progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}')
        
        for batch_idx, batch in enumerate(progress_bar):
            images = batch['images']
            text_prompts = batch['text_prompts']
            target_boxes = batch['boxes']
            
            # Process inputs
            inputs = processor(
                images=images,
                text=text_prompts,
                return_tensors="pt",
                padding=True
            ).to(device)
            
            # Move target boxes to device
            target_boxes = [boxes.to(device) for boxes in target_boxes]
            
            # Forward pass
            outputs = model(**inputs)
            
            # Compute loss
            losses = criterion(outputs, target_boxes)
            loss = losses['total_loss']
            
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            
            total_loss += loss.item()
            total_items += len(images)
            
            progress_bar.set_postfix({
                'loss': f'{loss.item():.4f}',
                'avg_loss': f'{total_loss/total_items:.4f}'
            })
        
        avg_train_loss = total_loss / len(train_loader)
        print(f'Epoch {epoch+1}, Average Train Loss: {avg_train_loss:.4f}')
        
        # Validation
        if val_loader:
            val_loss = validate_model(model, processor, val_loader, criterion, device)
            print(f'Epoch {epoch+1}, Validation Loss: {val_loss:.4f}')
            
            # Save best model
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                torch.save(model.state_dict(), f'{output_dir}/best_model.pth')
                print(f"New best model saved with val_loss: {val_loss:.4f}")
        
        # Save checkpoint
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict(),
            'loss': avg_train_loss,
        }, f'{output_dir}/checkpoint_epoch_{epoch+1}.pth')
        
        scheduler.step()
    
    print("Training completed!")

def validate_model(model, processor, val_loader, criterion, device):
    model.eval()
    total_loss = 0
    
    with torch.no_grad():
        for batch in tqdm(val_loader, desc='Validating'):
            images = batch['images']
            text_prompts = batch['text_prompts']
            target_boxes = batch['boxes']
            
            inputs = processor(
                images=images,
                text=text_prompts,
                return_tensors="pt",
                padding=True
            ).to(device)
            
            target_boxes = [boxes.to(device) for boxes in target_boxes]
            
            outputs = model(**inputs)
            losses = criterion(outputs, target_boxes)
            total_loss += losses['total_loss'].item()
    
    avg_val_loss = total_loss / len(val_loader)
    return avg_val_loss

# Main training script
def main():
    # Initialize model and processor
    model_name = "IDEA-Research/grounding-dino-base"
    processor = AutoProcessor.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name)
    
    # Dataset paths - adjust based on your downloaded dataset structure
    data_dir = "workoutexercises-images"  # Path to extracted dataset
    annotations_file = "workoutexercises-images/annotations.json"  # or annotations.csv
    
    # Create transforms
    train_transform = get_transforms(train=True)
    val_transform = get_transforms(train=False)
    
    # Create datasets
    dataset = WorkoutImageDataset(
        data_dir=data_dir,
        annotations_file=annotations_file,
        processor=processor,
        transform=train_transform,
        max_samples=1000  # Limit for testing, remove for full training
    )
    
    # Split into train/val
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    
    # Update transforms for validation
    val_dataset.dataset.transform = val_transform
    
    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)
    
    print(f"Training samples: {len(train_dataset)}")
    print(f"Validation samples: {len(val_dataset)}")
    
    # Train model
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}")
    
    train_grounding_dino(
        model=model,
        processor=processor,
        train_loader=train_loader,
        val_loader=val_loader,
        num_epochs=20,
        device=device,
        output_dir='workout_finetuned_models'
    )

# Utility function to create annotations if they don't exist
def create_dummy_annotations(data_dir, output_file):
    """Create dummy annotations for testing - you'll need real annotations"""
    annotations = {}
    
    image_extensions = ['.jpg', '.jpeg', '.png', '.bmp']
    image_files = []
    
    for ext in image_extensions:
        image_files.extend(Path(data_dir).glob(f'**/*{ext}'))
    
    for image_path in image_files:
        # Create dummy annotations - replace with real ones
        annotations[image_path.name] = {
            'bboxes': [[0.1, 0.1, 0.3, 0.3]],  # [x1, y1, x2, y2] normalized
            'labels': ['dumbbell']  # Replace with actual exercise labels
        }
    
    with open(output_file, 'w') as f:
        json.dump(annotations, f, indent=2)
    
    print(f"Created dummy annotations for {len(annotations)} images at {output_file}")

# Inference with fine-tuned model
def inference_finetuned(image_path, candidate_labels, model_path, box_threshold=0.3):
    """Run inference with fine-tuned model"""
    processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-base")
    model = AutoModel.from_pretrained("IDEA-Research/grounding-dino-base")
    
    # Load fine-tuned weights
    model.load_state_dict(torch.load(model_path, map_location='cpu'))
    model.eval()
    
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)
    
    # Load and process image
    image = Image.open(image_path).convert('RGB')
    text_prompt = " . ".join(candidate_labels)
    
    inputs = processor(images=image, text=text_prompt, return_tensors="pt").to(device)
    
    with torch.no_grad():
        outputs = model(**inputs)
    
    # Process outputs (you'll need to adapt this based on your training)
    # This is a simplified version - you might need custom post-processing
    
    return outputs

if __name__ == "__main__":
    # First, download the dataset
    print("Please download the dataset from Kaggle first:")
    print("kaggle datasets download -d hasyimabdillah/workoutexercises-images")
    
    # Extract if needed
    import zipfile
    zip_path = "workoutexercises-images.zip"
    if os.path.exists(zip_path):
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall("workoutexercises-images")
        print("Dataset extracted!")
    
    # Check if annotations exist, create dummy ones if not
    annotations_path = "workoutexercises-images/annotations.json"
    if not os.path.exists(annotations_path):
        print("Creating dummy annotations for testing...")
        create_dummy_annotations("workoutexercises-images", annotations_path)
        print("Please replace dummy annotations with real ones!")
    
    # Start training
    main()

In [None]:

# Custom collate function
def collate_fn(batch):
    images = [item['image'] for item in batch]
    text_prompts = [item['text_prompt'] for item in batch]
    boxes = [item['boxes'] for item in batch]
    labels = [item['labels'] for item in batch]
    
    return {
        'images': images,
        'text_prompts': text_prompts,
        'boxes': boxes,
        'labels': labels
    }

# Training function
def train_grounding_dino(model, processor, train_loader, val_loader, num_epochs=10, device='cuda'):
    model.train()
    model.to(device)
    
    # Optimizer
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5, weight_decay=1e-4)
    
    # Loss function
    def compute_loss(outputs, targets):
        # Simplified loss function - you might need to adapt this based on Grounding DINO's output
        loss_dict = outputs.loss_dict
        total_loss = sum(loss_dict.values())
        return total_loss
    
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        
        progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}')
        
        for batch_idx, batch in enumerate(progress_bar):
            images = batch['images']
            text_prompts = batch['text_prompts']
            target_boxes = batch['boxes']
            
            # Process inputs
            inputs = processor(
                images=images,
                text=text_prompts,
                return_tensors="pt",
                padding=True
            ).to(device)
            
            # Move target boxes to device
            target_boxes = [boxes.to(device) for boxes in target_boxes]
            
            # Forward pass
            outputs = model(**inputs)
            
            # Compute loss
            loss = compute_loss(outputs, target_boxes)
            
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            progress_bar.set_postfix({'loss': f'{loss.item():.4f}'})
        
        avg_loss = total_loss / len(train_loader)
        print(f'Epoch {epoch+1}, Average Loss: {avg_loss:.4f}')
        
        # Validation
        if val_loader:
            validate_model(model, processor, val_loader, device)
        
        # Save checkpoint
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': avg_loss,
        }, f'grounding_dino_finetuned_epoch_{epoch+1}.pth')

def validate_model(model, processor, val_loader, device):
    model.eval()
    total_loss = 0
    
    with torch.no_grad():
        for batch in val_loader:
            images = batch['images']
            text_prompts = batch['text_prompts']
            target_boxes = batch['boxes']
            
            inputs = processor(
                images=images,
                text=text_prompts,
                return_tensors="pt",
                padding=True
            ).to(device)
            
            target_boxes = [boxes.to(device) for boxes in target_boxes]
            
            outputs = model(**inputs)
            loss = compute_loss(outputs, target_boxes)
            total_loss += loss.item()
    
    avg_val_loss = total_loss / len(val_loader)
    print(f'Validation Loss: {avg_val_loss:.4f}')
    return avg_val_loss

# Main training script
def main():
    # Initialize model and processor
    model_name = "IDEA-Research/grounding-dino-base"
    processor = AutoProcessor.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name)
    
    # Dataset paths (you'll need to adjust these)
    video_dir = "/path/to/workoutfitness-video/videos"
    annotations_file = "/path/to/workoutfitness-video/annotations.json"
    
    # Create datasets
    dataset = WorkoutDataset(video_dir, annotations_file, processor)
    
    # Split into train/val
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    
    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)
    
    print(f"Training samples: {len(train_dataset)}")
    print(f"Validation samples: {len(val_dataset)}")
    
    # Train model
    device = "cuda" if torch.cuda.is_available() else "cpu"
    train_grounding_dino(model, processor, train_loader, val_loader, num_epochs=10, device=device)

# Inference with fine-tuned model
def inference_with_finetuned_model(video_path, output_path, candidate_labels, model_path):
    """Use fine-tuned model for inference"""
    processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-base")
    model = AutoModel.from_pretrained("IDEA-Research/grounding-dino-base")
    
    # Load fine-tuned weights
    checkpoint = torch.load(model_path, map_location='cpu')
    model.load_state_dict(checkpoint['model_state_dict'])
    model.eval()
    
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)
    
    # Video processing (similar to previous code)
    cap = cv2.VideoCapture(video_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    output_fps = 8
    frame_interval = max(1, fps // output_fps)
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, output_fps, (width, height))
    
    frame_count = 0
    processed_count = 0
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        if frame_count % frame_interval == 0:
            pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            text_prompt = " . ".join(candidate_labels)
            
            inputs = processor(images=pil_image, text=text_prompt, return_tensors="pt").to(device)
            
            with torch.no_grad():
                outputs = model(**inputs)
            
            # Process outputs and draw detections
            # (You'll need to adapt the post-processing based on your training)
            
            out.write(frame)
            processed_count += 1
        
        frame_count += 1
    
    cap.release()
    out.release()

if __name__ == "__main__":
    main()