# Video Annotation with Grounding DINO (YOLOv11 Format)

This notebook uses **Grounding DINO** to automatically annotate road defect videos and export labels in **YOLOv11 format**.

## Target Classes:
- 0: road_crack_longitudinal
- 1: road_crack_transverse
- 2: road_crack_alligator
- 3: road_rutting
- 4: pothole
- 5: marking_faded
- 6: distractor_manhole
- 7: distractor_patch

## 1. Installation and Setup

In [None]:
# Install required packages
!pip install transformers torch torchvision pillow opencv-python matplotlib numpy timm tqdm pyyaml

In [None]:
# Import libraries
import torch
from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection
from PIL import Image, ImageDraw, ImageFont
import cv2
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from tqdm import tqdm
import yaml
from collections import defaultdict

# Check GPU availability
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
print(f"PyTorch version: {torch.__version__}")

## 2. Load Grounding DINO Model

In [None]:
# Load model and processor
model_id = "IDEA-Research/grounding-dino-tiny"  # Use 'grounding-dino-base' for better accuracy

print(f"Loading {model_id}...")
processor = AutoProcessor.from_pretrained(model_id)
model = AutoModelForZeroShotObjectDetection.from_pretrained(model_id).to(device)

print("Model loaded successfully!")

## 3. Define Class Mapping and Text Prompts

In [None]:
# Class mapping (YOLO format)
CLASS_MAP = {
    0: "road_crack_longitudinal",
    1: "road_crack_transverse",
    2: "road_crack_alligator",
    3: "road_rutting",
    4: "pothole",
    5: "marking_faded",
    6: "distractor_manhole",
    7: "distractor_patch"
}

# Reverse mapping (label text -> class ID)
LABEL_TO_CLASS = {
    "longitudinal crack": 0,
    "longitudinal": 0,
    "transverse crack": 1,
    "transverse": 1,
    "alligator crack": 2,
    "alligator": 2,
    "rutting": 3,
    "rut": 3,
    "pothole": 4,
    "hole": 4,
    "faded marking": 5,
    "faded": 5,
    "marking": 5,
    "manhole": 6,
    "manhole cover": 6,
    "patch": 7,
    "road patch": 7
}

# Text prompts for Grounding DINO (use periods to separate)
text_prompt = (
    "a longitudinal crack in the road. "
    "a transverse crack in the road. "
    "an alligator crack in the pavement. "
    "road rutting. "
    "a pothole in the road. "
    "faded road marking. "
    "a manhole cover. "
    "a road patch."
)

print("Class Mapping:")
for idx, name in CLASS_MAP.items():
    print(f"  {idx}: {name}")
print(f"\nText Prompt: {text_prompt}")

## 4. Helper Functions

In [None]:
def map_label_to_class_id(label_text):
    label_lower = label_text.lower()
    
    # Try exact match first
    if label_lower in LABEL_TO_CLASS:
        return LABEL_TO_CLASS[label_lower]
    
    # Try partial match (check if any key is in the label)
    for key, class_id in LABEL_TO_CLASS.items():
        if key in label_lower:
            return class_id
    
    # Default: try to infer from common keywords
    if "longitudinal" in label_lower:
        return 0
    elif "transverse" in label_lower:
        return 1
    elif "alligator" in label_lower:
        return 2
    elif "rut" in label_lower:
        return 3
    elif "pothole" in label_lower or "hole" in label_lower:
        return 4
    elif "faded" in label_lower or "marking" in label_lower:
        return 5
    elif "manhole" in label_lower:
        return 6
    elif "patch" in label_lower:
        return 7
    
    return None


def bbox_to_yolo_format(bbox, img_width, img_height):
    x_min, y_min, x_max, y_max = bbox
    
    # Calculate center and dimensions
    x_center = (x_min + x_max) / 2.0
    y_center = (y_min + y_max) / 2.0
    width = x_max - x_min
    height = y_max - y_min
    
    # Normalize by image dimensions
    x_center /= img_width
    y_center /= img_height
    width /= img_width
    height /= img_height
    
    return [x_center, y_center, width, height]


def save_yolo_annotation(labels, output_path):
    with open(output_path, 'w') as f:
        for class_id, bbox in labels:
            # Format: class_id x_center y_center width height
            line = f"{class_id} {bbox[0]:.6f} {bbox[1]:.6f} {bbox[2]:.6f} {bbox[3]:.6f}\n"
            f.write(line)


def detect_and_convert_to_yolo(image, text_prompt, threshold=0.3):
    # Prepare inputs
    inputs = processor(images=image, text=text_prompt, return_tensors="pt").to(device)
    
    # Run inference
    with torch.no_grad():
        outputs = model(**inputs)
    
    # Post-process
    img_width, img_height = image.size
    target_sizes = torch.tensor([[img_height, img_width]]).to(device)
    results = processor.post_process_grounded_object_detection(
        outputs,
        target_sizes=target_sizes,
        threshold=threshold
    )
    
    # Extract and convert to YOLO format
    result = results[0]
    boxes = result['boxes'].cpu().numpy()
    labels = result['labels']
    scores = result['scores'].cpu().numpy()
    
    yolo_labels = []
    for box, label, score in zip(boxes, labels, scores):
        # Map label to class ID
        class_id = map_label_to_class_id(label)
        if class_id is None:
            print(f"Warning: Could not map label '{label}' to class ID, skipping...")
            continue
        
        # Convert bbox to YOLO format
        yolo_bbox = bbox_to_yolo_format(box, img_width, img_height)
        yolo_labels.append((class_id, yolo_bbox))
    
    return yolo_labels, boxes, labels, scores


print("Helper functions loaded!")

## 5. Visualization Function

In [None]:
def visualize_detections(image, boxes, labels, scores, threshold=0.3):
    img_draw = image.copy()
    draw = ImageDraw.Draw(img_draw)
    
    # Define colors for each class
    class_colors = {
        0: 'blue',      # longitudinal crack
        1: 'green',     # transverse crack
        2: 'orange',    # alligator crack
        3: 'purple',    # rutting
        4: 'red',       # pothole
        5: 'yellow',    # faded marking
        6: 'cyan',      # manhole
        7: 'magenta'    # patch
    }
    
    # Try to load font
    try:
        font = ImageFont.truetype("arial.ttf", 14)
    except:
        font = ImageFont.load_default()
    
    for box, label, score in zip(boxes, labels, scores):
        if score < threshold:
            continue
        
        # Get class ID and color
        class_id = map_label_to_class_id(label)
        if class_id is None:
            continue
        
        color = class_colors.get(class_id, 'white')
        class_name = CLASS_MAP[class_id]
        
        # Draw box
        draw.rectangle(box.tolist(), outline=color, width=2)
        
        # Draw label
        text = f"{class_name}: {score:.2f}"
        text_bbox = draw.textbbox((box[0], box[1] - 15), text, font=font)
        draw.rectangle(text_bbox, fill=color)
        draw.text((box[0], box[1] - 15), text, fill='black', font=font)
    
    return img_draw

print("Visualization function ready!")

## 6. Video Processing Pipeline

In [None]:
def process_video_to_yolo_dataset(
    video_path,
    output_dir,
    text_prompt,
    frame_interval=1,
    threshold=0.3,
    max_frames=None,
    save_visualizations=True
):
    # Create output directories
    output_dir = Path(output_dir)
    images_dir = output_dir / "images"
    labels_dir = output_dir / "labels"
    viz_dir = output_dir / "visualizations"
    
    images_dir.mkdir(parents=True, exist_ok=True)
    labels_dir.mkdir(parents=True, exist_ok=True)
    if save_visualizations:
        viz_dir.mkdir(parents=True, exist_ok=True)
    
    # Open video
    cap = cv2.VideoCapture(str(video_path))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    
    print(f"Video: {video_path}")
    print(f"Total frames: {total_frames}")
    print(f"FPS: {fps}")
    print(f"Processing every {frame_interval} frame(s)\n")
    
    # Statistics
    stats = {
        'total_frames_processed': 0,
        'total_detections': 0,
        'class_counts': defaultdict(int),
        'frames_with_detections': 0
    }
    
    frame_count = 0
    processed_count = 0
    
    # Progress bar
    pbar = tqdm(total=min(total_frames // frame_interval, max_frames) if max_frames else total_frames // frame_interval)
    
    try:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            
            # Skip frames based on interval
            if frame_count % frame_interval != 0:
                frame_count += 1
                continue
            
            # Check max frames limit
            if max_frames and processed_count >= max_frames:
                break
            
            # Convert BGR to RGB
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image = Image.fromarray(frame_rgb)
            
            # Run detection and convert to YOLO
            yolo_labels, boxes, labels, scores = detect_and_convert_to_yolo(
                image, text_prompt, threshold
            )
            
            # Generate filename
            frame_name = f"frame_{processed_count:06d}"
            
            # Save image
            image_path = images_dir / f"{frame_name}.jpg"
            image.save(image_path)
            
            # Save YOLO labels
            label_path = labels_dir / f"{frame_name}.txt"
            save_yolo_annotation(yolo_labels, label_path)
            
            # Save visualization
            if save_visualizations and len(yolo_labels) > 0:
                viz_image = visualize_detections(image, boxes, labels, scores, threshold)
                viz_path = viz_dir / f"{frame_name}_viz.jpg"
                viz_image.save(viz_path)
            
            # Update statistics
            stats['total_frames_processed'] += 1
            stats['total_detections'] += len(yolo_labels)
            if len(yolo_labels) > 0:
                stats['frames_with_detections'] += 1
            
            for class_id, _ in yolo_labels:
                stats['class_counts'][class_id] += 1
            
            processed_count += 1
            frame_count += 1
            pbar.update(1)
    
    finally:
        cap.release()
        pbar.close()
    
    # Save dataset.yaml for YOLO
    dataset_yaml = {
        'path': str(output_dir.absolute()),
        'train': 'images',
        'val': 'images',
        'nc': len(CLASS_MAP),
        'names': CLASS_MAP
    }
    
    yaml_path = output_dir / 'dataset.yaml'
    with open(yaml_path, 'w') as f:
        yaml.dump(dataset_yaml, f, default_flow_style=False)
    
    print(f"\n{'='*60}")
    print("Processing Complete!")
    print(f"{'='*60}")
    print(f"Total frames processed: {stats['total_frames_processed']}")
    print(f"Total detections: {stats['total_detections']}")
    print(f"Frames with detections: {stats['frames_with_detections']}")
    print(f"\nDetections by class:")
    for class_id in sorted(stats['class_counts'].keys()):
        count = stats['class_counts'][class_id]
        class_name = CLASS_MAP[class_id]
        print(f"  {class_id} ({class_name}): {count}")
    
    print(f"\nDataset saved to: {output_dir}")
    print(f"  - Images: {images_dir}")
    print(f"  - Labels: {labels_dir}")
    if save_visualizations:
        print(f"  - Visualizations: {viz_dir}")
    print(f"  - Config: {yaml_path}")
    
    return stats

print("Video processing function ready!")

## 7. Run Video Annotation

**Instructions:**
1. Update `video_path` to point to your video file
2. Adjust `frame_interval` (1 = every frame, 30 = every 30th frame)
3. Tune `threshold` (0.2-0.4 typically works well)
4. Run the cell!

In [None]:
# Configuration
VIDEO_PATH = "path/to/your/road_defect_video.mp4"  # UPDATE THIS
OUTPUT_DIR = "./annotated_dataset"
FRAME_INTERVAL = 30  # Process every 30th frame (adjust based on video FPS)
THRESHOLD = 0.3  # Detection confidence threshold
MAX_FRAMES = 100  # Set to a number to limit processing (e.g., 100 for testing)
SAVE_VISUALIZATIONS = True  # Save annotated images

# Run annotation
stats = process_video_to_yolo_dataset(
    video_path=VIDEO_PATH,
    output_dir=OUTPUT_DIR,
    text_prompt=text_prompt,
    frame_interval=FRAME_INTERVAL,
    threshold=THRESHOLD,
    max_frames=MAX_FRAMES,
    save_visualizations=SAVE_VISUALIZATIONS
)

## 7b. Batch Process Multiple Videos

Process all videos in a folder automatically.

In [None]:
def process_multiple_videos(
    video_dir,
    output_base_dir,
    text_prompt,
    frame_interval=30,
    threshold=0.3,
    max_frames=None,
    save_visualizations=True,
    video_extensions=['.mp4', '.avi', '.mov', '.MP4', '.AVI', '.MOV']
):
    """
    Process all videos in a directory
    
    Args:
        video_dir: Directory containing videos
        output_base_dir: Base directory for all outputs
        text_prompt: Grounding DINO text prompt
        frame_interval: Process every Nth frame
        threshold: Detection confidence threshold
        max_frames: Maximum frames per video (None = all)
        save_visualizations: Save annotated images
        video_extensions: List of video file extensions to process
    
    Returns:
        Dictionary with combined statistics
    """
    video_dir = Path(video_dir)
    output_base_dir = Path(output_base_dir)
    
    # Find all video files
    video_files = []
    for ext in video_extensions:
        video_files.extend(video_dir.glob(f"*{ext}"))
    
    video_files = sorted(video_files)
    
    if not video_files:
        print(f"No video files found in {video_dir}")
        return None
    
    print(f"Found {len(video_files)} video(s) to process:")
    for vf in video_files:
        print(f"  - {vf.name}")
    print()
    
    # Process each video
    all_stats = {}
    combined_stats = {
        'total_videos': len(video_files),
        'total_frames': 0,
        'total_detections': 0,
        'class_counts': defaultdict(int),
        'video_stats': {}
    }
    
    for idx, video_path in enumerate(video_files, 1):
        print(f"\n{'='*60}")
        print(f"Processing video {idx}/{len(video_files)}: {video_path.name}")
        print(f"{'='*60}\n")
        
        # Create output directory for this video
        video_name = video_path.stem
        output_dir = output_base_dir / video_name
        
        try:
            # Process video
            stats = process_video_to_yolo_dataset(
                video_path=video_path,
                output_dir=output_dir,
                text_prompt=text_prompt,
                frame_interval=frame_interval,
                threshold=threshold,
                max_frames=max_frames,
                save_visualizations=save_visualizations
            )
            
            # Update combined statistics
            combined_stats['total_frames'] += stats['total_frames_processed']
            combined_stats['total_detections'] += stats['total_detections']
            
            for class_id, count in stats['class_counts'].items():
                combined_stats['class_counts'][class_id] += count
            
            combined_stats['video_stats'][video_name] = stats
            
        except Exception as e:
            print(f"\nError processing {video_path.name}: {e}")
            continue
    
    # Print combined statistics
    print(f"\n\n{'='*60}")
    print("BATCH PROCESSING COMPLETE")
    print(f"{'='*60}")
    print(f"Total videos processed: {combined_stats['total_videos']}")
    print(f"Total frames extracted: {combined_stats['total_frames']}")
    print(f"Total detections: {combined_stats['total_detections']}")
    print(f"\nCombined class distribution:")
    for class_id in sorted(combined_stats['class_counts'].keys()):
        count = combined_stats['class_counts'][class_id]
        class_name = CLASS_MAP[class_id]
        print(f"  {class_id} ({class_name}): {count}")
    
    print(f"\nAll datasets saved to: {output_base_dir}")
    
    # Create combined dataset.yaml
    combined_yaml = {
        'path': str(output_base_dir.absolute()),
        'train': '*/images',  # All video subdirectories
        'val': '*/images',
        'nc': len(CLASS_MAP),
        'names': CLASS_MAP
    }
    
    yaml_path = output_base_dir / 'combined_dataset.yaml'
    with open(yaml_path, 'w') as f:
        yaml.dump(combined_yaml, f, default_flow_style=False)
    
    print(f"Combined config: {yaml_path}")
    
    return combined_stats

print("Batch processing function ready!")

In [None]:
# Configuration for batch processing
VIDEO_DIR = "/Users/adelainesuhendro/personal/capstone/data/video"  # Your video folder
OUTPUT_BASE_DIR = "./annotated_videos"  # Base output directory
FRAME_INTERVAL = 30  # Process every 30th frame (adjust based on video FPS)
THRESHOLD = 0.3  # Detection confidence threshold
MAX_FRAMES = None  # Set to a number to limit processing per video (e.g., 100 for testing)
SAVE_VISUALIZATIONS = True  # Save annotated images for review

# Run batch processing on all videos in the folder
combined_stats = process_multiple_videos(
    video_dir=VIDEO_DIR,
    output_base_dir=OUTPUT_BASE_DIR,
    text_prompt=text_prompt,
    frame_interval=FRAME_INTERVAL,
    threshold=THRESHOLD,
    max_frames=MAX_FRAMES,
    save_visualizations=SAVE_VISUALIZATIONS
)

## 8. Test on Single Frame

Test the annotation pipeline on a single frame before processing the entire video.

In [None]:
# Extract a single test frame
test_video_path = "path/to/your/road_defect_video.mp4"  # UPDATE THIS
cap = cv2.VideoCapture(test_video_path)

# Get a frame from the middle of the video
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.set(cv2.CAP_PROP_POS_FRAMES, total_frames // 2)
ret, frame = cap.read()
cap.release()

if ret:
    # Convert to RGB
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    test_image = Image.fromarray(frame_rgb)
    
    # Display original
    plt.figure(figsize=(15, 10))
    plt.imshow(test_image)
    plt.title("Test Frame")
    plt.axis('off')
    plt.show()
    
    print(f"Frame size: {test_image.size}")
else:
    print("Could not read frame from video")

In [None]:
# Run detection on test frame
yolo_labels, boxes, labels, scores = detect_and_convert_to_yolo(
    test_image, text_prompt, threshold=0.3
)

print(f"\nDetected {len(yolo_labels)} objects:\n")
for class_id, bbox in yolo_labels:
    class_name = CLASS_MAP[class_id]
    print(f"  Class {class_id} ({class_name}): bbox = {bbox}")

# Visualize
viz_image = visualize_detections(test_image, boxes, labels, scores, threshold=0.3)

plt.figure(figsize=(15, 10))
plt.imshow(viz_image)
plt.title(f"Detections (Threshold: 0.3) - {len(yolo_labels)} objects")
plt.axis('off')
plt.show()

## 9. View Sample Annotations

After processing, view some sample annotated frames.

In [None]:
# Load and display sample annotations
output_dir = Path(OUTPUT_DIR)
viz_dir = output_dir / "visualizations"

if viz_dir.exists():
    viz_files = list(viz_dir.glob("*.jpg"))[:6]  # Show first 6
    
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    axes = axes.flatten()
    
    for idx, viz_path in enumerate(viz_files):
        img = Image.open(viz_path)
        axes[idx].imshow(img)
        axes[idx].set_title(viz_path.stem)
        axes[idx].axis('off')
    
    plt.tight_layout()
    plt.show()
else:
    print(f"No visualizations found in {viz_dir}")
    print("Run the video annotation first (Section 7)")

## 10. Dataset Statistics and Analysis

In [None]:
# Analyze the created dataset
import pandas as pd

output_dir = Path(OUTPUT_DIR)
labels_dir = output_dir / "labels"

if labels_dir.exists():
    label_files = list(labels_dir.glob("*.txt"))
    
    # Count classes across all annotations
    class_counts = defaultdict(int)
    total_annotations = 0
    files_with_annotations = 0
    
    for label_file in label_files:
        with open(label_file, 'r') as f:
            lines = f.readlines()
            if lines:
                files_with_annotations += 1
            for line in lines:
                if line.strip():
                    class_id = int(line.split()[0])
                    class_counts[class_id] += 1
                    total_annotations += 1
    
    # Create DataFrame
    df = pd.DataFrame([
        {
            'Class ID': class_id,
            'Class Name': CLASS_MAP[class_id],
            'Count': count,
            'Percentage': f"{count/total_annotations*100:.1f}%"
        }
        for class_id, count in sorted(class_counts.items())
    ])
    
    print("\n" + "="*60)
    print("DATASET STATISTICS")
    print("="*60)
    print(f"Total frames: {len(label_files)}")
    print(f"Frames with annotations: {files_with_annotations}")
    print(f"Total annotations: {total_annotations}")
    print(f"Average annotations per frame: {total_annotations/len(label_files):.2f}")
    print(f"\nClass Distribution:")
    print(df.to_string(index=False))
    
    # Plot distribution
    plt.figure(figsize=(12, 6))
    plt.bar([CLASS_MAP[cid] for cid in sorted(class_counts.keys())],
            [class_counts[cid] for cid in sorted(class_counts.keys())])
    plt.xlabel('Defect Class')
    plt.ylabel('Count')
    plt.title('Distribution of Detected Defects')
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.show()
else:
    print(f"Labels directory not found: {labels_dir}")
    print("Run the video annotation first (Section 7)")

## 11. Tips for Better Results

### Threshold Tuning:
- **Too high (0.5+)**: May miss valid defects
- **Too low (< 0.2)**: Many false positives
- **Recommended**: Start at 0.3 and adjust

### Text Prompt Engineering:
- Be specific: "longitudinal crack in asphalt"
- Use natural descriptions
- Separate with periods
- Test different phrasings

### Frame Interval:
- **1**: Process every frame (slow, redundant)
- **15-30**: Good for 30 FPS videos (1-2 frames/sec)
- **60**: For high FPS or redundant footage

### Model Selection:
- **grounding-dino-tiny**: Faster, less accurate
- **grounding-dino-base**: Slower, more accurate (recommended)

### Post-Processing:
- Review visualizations for false positives
- Manually correct/remove bad annotations
- Use this as pseudo-labels for training YOLO

## 12. Next Steps

### Option 1: Manual Review
1. Review visualizations in `OUTPUT_DIR/visualizations/`
2. Manually correct labels in `OUTPUT_DIR/labels/`
3. Remove bad frames/annotations

### Option 2: Train YOLOv11
1. Use this annotated dataset to train YOLOv11
2. Split into train/val sets
3. Train with: `yolo train data=dataset.yaml model=yolo11n.pt epochs=100`

### Option 3: Hybrid Approach
1. Use Grounding DINO for initial labeling
2. Manually review and correct
3. Train YOLOv11 for production use

### Using the Dataset:
```python
from ultralytics import YOLO

# Train YOLOv11 on your annotated dataset
model = YOLO('yolo11n.pt')
results = model.train(
    data='./annotated_dataset/dataset.yaml',
    epochs=100,
    imgsz=640,
    batch=16
)
```