# CARPK Parking Dataset Preparation

This notebook prepares the CARPK dataset from HuggingFace for object detection training.

## Notebook Structure
1. Environment Setup and Imports
2. Dataset Configuration
3. Data Loading and Inspection
4. Bounding Box Processing
5. Dataset Splitting and Processing
6. COCO Format Conversion
7. Data Analysis and Visualization

## 1. Environment Setup and Imports

In [None]:
import os
import json
import numpy as np
from pathlib import Path
from tqdm import tqdm
from datasets import load_dataset
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from io import BytesIO

print("All required packages imported successfully")

## 2. Dataset Configuration

In [None]:
# Configuration parameters
config = {
    'output_dir': './data',
    'train_split': 0.7,
    'val_split': 0.2,
    'test_split': 0.1,
    'min_box_size': 3,
    'max_samples': None,  # Set to a number for testing, None for full dataset
    'image_quality': 95
}

# Initialize statistics tracking
stats = {
    'train': {'images': 0, 'boxes': 0, 'tiny_boxes': 0, 'invalid_boxes': 0},
    'val': {'images': 0, 'boxes': 0, 'tiny_boxes': 0, 'invalid_boxes': 0},
    'test': {'images': 0, 'boxes': 0, 'tiny_boxes': 0, 'invalid_boxes': 0}
}

box_sizes = []
bbox_format = None

print("Configuration:")
for key, value in config.items():
    print(f"  {key}: {value}")

### Create Directory Structure

In [None]:
output_dir = Path(config['output_dir'])

for split in ['train', 'val', 'test']:
    (output_dir / split).mkdir(parents=True, exist_ok=True)

print(f"Created directory structure at {output_dir.absolute()}")

## 3. Data Loading and Inspection

In [None]:
print("Loading dataset from HuggingFace...")
dataset = load_dataset("backseollgi/parking_dataset", "carpk", streaming=False)

print(f"Dataset loaded successfully")
print(f"Available splits: {list(dataset.keys())}")

# Get samples
if 'train' in dataset:
    samples = list(dataset['train'])
else:
    key = list(dataset.keys())[0]
    print(f"Using '{key}' split instead of 'train'")
    samples = list(dataset[key])

total_samples = len(samples)
print(f"Found {total_samples} samples")

if config['max_samples']:
    total_samples = min(total_samples, config['max_samples'])
    samples = samples[:total_samples]
    print(f"Limited to {total_samples} samples for testing")

### Inspect First Sample

In [None]:
print("Inspecting first sample...")
sample = samples[0]

print(f"Keys: {list(sample.keys())}")
print(f"Image type: {type(sample['image'])}")

# Check for bounding boxes
if 'bboxes' in sample:
    print(f"Number of bboxes: {len(sample['bboxes'])}")
    if len(sample['bboxes']) > 0:
        print(f"First bbox: {sample['bboxes'][0]}")
elif 'bbox' in sample:
    print(f"Number of bboxes: {len(sample['bbox'])}")
    if len(sample['bbox']) > 0:
        print(f"First bbox: {sample['bbox'][0]}")
else:
    print("Warning: No 'bboxes' or 'bbox' field found")

# Check labels
if 'labels' in sample:
    print(f"Labels field exists")
elif 'label' in sample:
    print(f"Label field exists")

## 4. Bounding Box Processing Functions

In [None]:
def detect_bbox_format(bbox, img_width, img_height):
    '''
    Auto-detect bounding box format
    Returns: 'xyxy' or 'xywh'
    '''
    global bbox_format
    
    if bbox_format is not None:
        return bbox_format
    
    x1, y1, x2_or_w, y2_or_h = bbox
    
    if x2_or_w < x1 or y2_or_h < y1:
        bbox_format = 'xywh'
    elif x2_or_w > img_width or y2_or_h > img_height:
        bbox_format = 'xywh'
    else:
        bbox_format = 'xyxy'
    
    print(f"Detected bounding box format: {bbox_format}")
    return bbox_format

def normalize_bbox(bbox, img_width, img_height):
    '''
    Convert bbox to [x1, y1, x2, y2] format
    '''
    fmt = detect_bbox_format(bbox, img_width, img_height)
    
    if fmt == 'xywh':
        x, y, w, h = bbox
        return [x, y, x + w, y + h]
    else:
        return list(bbox)

def validate_and_clean_bbox(bbox, img_width, img_height, min_size=3):
    '''
    Validate and clean bounding box
    Returns: (is_valid, cleaned_bbox, reason)
    '''
    try:
        bbox = normalize_bbox(bbox, img_width, img_height)
    except Exception as e:
        return False, None, f"normalization_error: {e}"
    
    x1, y1, x2, y2 = bbox
    
    if x2 <= x1 or y2 <= y1:
        return False, None, "invalid_coordinates"
    
    # Clip to image boundaries
    x1 = max(0, min(x1, img_width - 1))
    y1 = max(0, min(y1, img_height - 1))
    x2 = max(0, min(x2, img_width))
    y2 = max(0, min(y2, img_height))
    
    if x2 <= x1 or y2 <= y1:
        return False, None, "invalid_after_clipping"
    
    width = x2 - x1
    height = y2 - y1
    
    if width < min_size or height < min_size:
        return False, None, "too_small"
    
    box_sizes.append((width, height))
    
    return True, [float(x1), float(y1), float(x2), float(y2)], "valid"

print("Bounding box processing functions defined")

### Image Loading Function

In [None]:
def load_and_normalize_image(image):
    '''
    Load image from various formats and convert to RGB numpy array
    '''
    try:
        if isinstance(image, str):
            image = Image.open(image).convert('RGB')
            return np.array(image)
        
        elif isinstance(image, Image.Image):
            image = image.convert('RGB')
            return np.array(image)
        
        elif isinstance(image, np.ndarray):
            if len(image.shape) == 2:
                image = np.stack([image] * 3, axis=-1)
            elif image.shape[2] == 4:
                image = image[:, :, :3]
            return image
        
        elif isinstance(image, bytes):
            image = Image.open(BytesIO(image)).convert('RGB')
            return np.array(image)
        
        else:
            raise ValueError(f"Unsupported image type: {type(image)}")
            
    except Exception as e:
        raise ValueError(f"Failed to load image: {e}")

print("Image loading function defined")

## 5. Dataset Splitting and Processing

In [None]:
def get_split_name(index, total_samples):
    '''
    Determine which split a sample belongs to
    '''
    train_end = int(total_samples * config['train_split'])
    val_end = int(total_samples * (config['train_split'] + config['val_split']))
    
    if index < train_end:
        return 'train'
    elif index < val_end:
        return 'val'
    else:
        return 'test'

def process_sample(sample, sample_idx, split, annotation_id):
    '''
    Process a single sample with validation
    Returns: (image_info, annotations, new_annotation_id)
    '''
    try:
        # Load and normalize image
        image_np = load_and_normalize_image(sample['image'])
        img_height, img_width = image_np.shape[:2]
        
        # Generate filename and save
        img_filename = f"{split}_{sample_idx:06d}.jpg"
        img_path = output_dir / split / img_filename
        Image.fromarray(image_np).save(img_path, quality=config['image_quality'])
        
        # Get bounding boxes
        bboxes = sample.get('bboxes', sample.get('bbox', []))
        if not isinstance(bboxes, list):
            bboxes = list(bboxes)
        
        # Get labels
        labels = sample.get('labels', sample.get('label', [0] * len(bboxes)))
        if not isinstance(labels, list):
            labels = list(labels)
        
        if len(labels) < len(bboxes):
            labels.extend([0] * (len(bboxes) - len(labels)))
        
        annotations = []
        invalid_count = 0
        tiny_count = 0
        
        for bbox, label in zip(bboxes, labels):
            if bbox is None or len(bbox) != 4:
                invalid_count += 1
                continue
            
            is_valid, cleaned_bbox, reason = validate_and_clean_bbox(
                bbox, img_width, img_height, config['min_box_size']
            )
            
            if not is_valid:
                if reason == "too_small":
                    tiny_count += 1
                else:
                    invalid_count += 1
                continue
            
            x1, y1, x2, y2 = cleaned_bbox
            width = x2 - x1
            height = y2 - y1
            
            # COCO format: [x, y, width, height]
            coco_bbox = [x1, y1, width, height]
            
            ann = {
                'id': annotation_id,
                'image_id': sample_idx,
                'category_id': 1,
                'bbox': coco_bbox,
                'area': float(width * height),
                'iscrowd': 0
            }
            annotations.append(ann)
            annotation_id += 1
        
        # Skip images with no valid annotations
        if len(annotations) == 0:
            os.remove(img_path)
            return None, None, annotation_id
        
        # Update statistics
        stats[split]['images'] += 1
        stats[split]['boxes'] += len(annotations)
        stats[split]['tiny_boxes'] += tiny_count
        stats[split]['invalid_boxes'] += invalid_count
        
        image_info = {
            'id': sample_idx,
            'file_name': img_filename,
            'width': img_width,
            'height': img_height
        }
        
        return image_info, annotations, annotation_id
        
    except Exception as e:
        print(f"Error processing sample {sample_idx}: {e}")
        return None, None, annotation_id

print("Processing functions defined")

### Process All Samples

In [None]:
print("Processing samples...\n")

train_images, train_annotations = [], []
val_images, val_annotations = [], []
test_images, test_annotations = [], []

train_ann_id = 1
val_ann_id = 1
test_ann_id = 1

for idx in tqdm(range(total_samples), desc="Processing"):
    sample = samples[idx]
    split = get_split_name(idx, total_samples)
    
    if split == 'train':
        img_info, anns, train_ann_id = process_sample(
            sample, idx, split, train_ann_id
        )
        if img_info is not None:
            train_images.append(img_info)
            train_annotations.extend(anns)
            
    elif split == 'val':
        img_info, anns, val_ann_id = process_sample(
            sample, idx, split, val_ann_id
        )
        if img_info is not None:
            val_images.append(img_info)
            val_annotations.extend(anns)
            
    else:
        img_info, anns, test_ann_id = process_sample(
            sample, idx, split, test_ann_id
        )
        if img_info is not None:
            test_images.append(img_info)
            test_annotations.extend(anns)

print(f"\nProcessing complete!")
print(f"Train images: {len(train_images)}")
print(f"Val images: {len(val_images)}")
print(f"Test images: {len(test_images)}")

## 6. COCO Format Conversion

In [None]:
def create_coco_json(split, images, annotations):
    '''
    Create COCO format JSON file
    '''
    coco_format = {
        'images': images,
        'annotations': annotations,
        'categories': [
            {
                'id': 1,
                'name': 'car',
                'supercategory': 'vehicle'
            }
        ]
    }
    
    json_path = output_dir / f'{split}.json'
    with open(json_path, 'w') as f:
        json.dump(coco_format, f, indent=2)
    
    print(f"Created {json_path}")
    print(f"  Images: {len(images)}")
    print(f"  Annotations: {len(annotations)}")

print("Creating COCO format files...\n")

if len(train_images) > 0:
    create_coco_json('train', train_images, train_annotations)
else:
    print("No training images processed")

if len(val_images) > 0:
    create_coco_json('val', val_images, val_annotations)
else:
    print("No validation images processed")

if len(test_images) > 0:
    create_coco_json('test', test_images, test_annotations)
else:
    print("No test images processed")

## 7. Data Analysis and Visualization

### Dataset Statistics

In [None]:
print("="*70)
print("Dataset Statistics")
print("="*70)

total_images = sum(s['images'] for s in stats.values())
total_boxes = sum(s['boxes'] for s in stats.values())

if total_images == 0:
    print("\nNo images were processed successfully!")
else:
    for split in ['train', 'val', 'test']:
        split_stats = stats[split]
        imgs = split_stats['images']
        
        if imgs == 0:
            continue
        
        boxes = split_stats['boxes']
        tiny = split_stats['tiny_boxes']
        invalid = split_stats['invalid_boxes']
        avg_boxes = boxes / imgs if imgs > 0 else 0
        
        print(f"\n{split.upper()}:")
        print(f"  Images: {imgs} ({imgs/total_images*100:.1f}%)")
        print(f"  Valid boxes: {boxes}")
        print(f"  Avg boxes/image: {avg_boxes:.2f}")
        print(f"  Rejected (too small): {tiny}")
        print(f"  Rejected (invalid): {invalid}")
    
    print(f"\nTOTAL:")
    print(f"  Images: {total_images}")
    print(f"  Valid boxes: {total_boxes}")
    print(f"  Avg boxes/image: {total_boxes/total_images:.2f}")

### Bounding Box Size Analysis

In [None]:
if len(box_sizes) > 0:
    widths = [w for w, h in box_sizes]
    heights = [h for w, h in box_sizes]
    areas = [w * h for w, h in box_sizes]
    
    print("="*70)
    print("Bounding Box Size Analysis")
    print("="*70)
    
    print(f"\nWidth Statistics:")
    print(f"  Min: {np.min(widths):.1f}px")
    print(f"  Max: {np.max(widths):.1f}px")
    print(f"  Mean: {np.mean(widths):.1f}px")
    print(f"  Median: {np.median(widths):.1f}px")
    print(f"  25th percentile: {np.percentile(widths, 25):.1f}px")
    print(f"  75th percentile: {np.percentile(widths, 75):.1f}px")
    
    print(f"\nHeight Statistics:")
    print(f"  Min: {np.min(heights):.1f}px")
    print(f"  Max: {np.max(heights):.1f}px")
    print(f"  Mean: {np.mean(heights):.1f}px")
    print(f"  Median: {np.median(heights):.1f}px")
    print(f"  25th percentile: {np.percentile(heights, 25):.1f}px")
    print(f"  75th percentile: {np.percentile(heights, 75):.1f}px")
    
    print(f"\nArea Statistics:")
    print(f"  Min: {np.min(areas):.1f}px²")
    print(f"  Max: {np.max(areas):.1f}px²")
    print(f"  Mean: {np.mean(areas):.1f}px²")
    print(f"  Median: {np.median(areas):.1f}px²")
    
    # Recommend anchor sizes
    sqrt_areas = np.sqrt(areas)
    percentiles = [10, 30, 50, 70, 90]
    anchor_sizes = [int(np.percentile(sqrt_areas, p)) for p in percentiles]
    
    print("\nRecommended Anchor Sizes:")
    print(f"  {tuple(anchor_sizes)}")
    print(f"  (Based on {percentiles}th percentiles of sqrt(area))")
    print(f"\nUse this when training:")
    print(f"  --anchor-sizes \"{','.join(map(str, anchor_sizes))}\"")
else:
    print("No box sizes to analyze")

### Visualize Sample Images

In [None]:
def visualize_samples(split, num_samples=3):
    '''
    Visualize samples to verify data quality
    '''
    json_path = output_dir / f'{split}.json'
    
    if not json_path.exists():
        print(f"Cannot visualize {split}: JSON file not found")
        return
    
    with open(json_path, 'r') as f:
        data = json.load(f)
    
    if len(data['images']) == 0:
        print(f"No images in {split} split to visualize")
        return
    
    print(f"\nVisualizing {num_samples} samples from {split}...")
    
    num_samples = min(num_samples, len(data['images']))
    fig, axes = plt.subplots(1, num_samples, figsize=(15, 5))
    if num_samples == 1:
        axes = [axes]
    
    for idx in range(num_samples):
        img_info = data['images'][idx]
        img_path = output_dir / split / img_info['file_name']
        
        if not img_path.exists():
            print(f"Image not found: {img_path}")
            continue
        
        img = Image.open(img_path)
        img_anns = [ann for ann in data['annotations'] if ann['image_id'] == img_info['id']]
        
        axes[idx].imshow(img)
        axes[idx].set_title(f"{split} - {len(img_anns)} boxes")
        axes[idx].axis('off')
        
        for ann in img_anns:
            x, y, w, h = ann['bbox']
            rect = patches.Rectangle(
                (x, y), w, h,
                linewidth=2,
                edgecolor='red',
                facecolor='none'
            )
            axes[idx].add_patch(rect)
    
    plt.tight_layout()
    save_path = output_dir / f'{split}_samples.png'
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    print(f"Saved visualization to {save_path}")
    plt.show()

# Visualize train and val splits
for split in ['train', 'val']:
    if stats[split]['images'] > 0:
        visualize_samples(split, num_samples=3)

## Summary

Dataset preparation is complete. The data is now ready for training.

### Next Steps
1. Review the visualization images above
2. Check the statistics to ensure data quality
3. Note the recommended anchor sizes for training
4. Proceed to train your object detection model

In [None]:
print("="*70)
print("Dataset Preparation Complete")
print("="*70)
print(f"\nData saved to: {output_dir.absolute()}")
print("\nFiles created:")
print(f"  - train.json ({len(train_images)} images)")
print(f"  - val.json ({len(val_images)} images)")
print(f"  - test.json ({len(test_images)} images)")
print(f"  - train/ directory with {len(train_images)} images")
print(f"  - val/ directory with {len(val_images)} images")
print(f"  - test/ directory with {len(test_images)} images")