In [None]:
rm -rf /kaggle/working/*

In [None]:
import os
import json
import shutil
import random
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Set
from dataclasses import dataclass, field
from collections import defaultdict, Counter
import warnings
import xml.etree.ElementTree as ET

# Import required libraries
try:
    from PIL import Image
    from tqdm import tqdm
except ImportError as e:
    print(f"Missing required library: {e}")
    print("Please install with: pip install tqdm pillow")
    exit(1)

In [None]:
@dataclass
class DatasetConfig:
    """Configuration for Pascal VOC dataset processing"""
    
    # Basic settings
    random_seed: int = 42
    dataset_name: str = "pascal_voc_test_dataset"
    
    # Pascal VOC dataset path
    voc_path: str = '/kaggle/input/pascal-voc-2012-dataset'
    
    # Output path
    output_path: str = '/kaggle/working'
    
    # Class mapping - VOC classes to target classes
    class_mapping: Dict[str, str] = field(default_factory=lambda: {
        'person': 'person',
        'cat': 'pet',
        'dog': 'pet',
        'bus': 'car',
        'truck': 'car',
        'car': 'car'
    })
    
    # A fixed order of categories is critical!
    target_classes: List[str] = field(default_factory=lambda: [
        'person',
        'pet', 
        'car'
    ])
    
    # Dataset sizes for each target class
    dataset_sizes: Dict[str, int] = field(default_factory=lambda: {
        'person': 2000,
        'pet': 2000,
        'car': 2000,
        'negative': 1000
    })
    
    def __post_init__(self):
        self.full_output_path = os.path.join(self.output_path, self.dataset_name)
        self.voc_classes = list(self.class_mapping.keys())
        
        # Validation: check that all class_mapping values are in target_classes
        mapped_classes = set(self.class_mapping.values())
        target_classes_set = set(self.target_classes)
        
        if not mapped_classes.issubset(target_classes_set):
            missing = mapped_classes - target_classes_set
            raise ValueError(f"Class mapping contains classes that are not in target_classes: {missing}")
        
        print(f"VOC Dataset config initialized: {self.dataset_name}")
        print(f"VOC classes: {self.voc_classes}")
        print(f"Target classes (ordered): {self.target_classes}")
        print(f"Output path: {self.full_output_path}")

In [None]:
class VOCProcessor:
    """Main class for processing Pascal VOC dataset"""
    
    def __init__(self, config: DatasetConfig):
        self.config = config
        self.voc_data = {}
        self.class_stats = {}
        self.coco_categories = []
        self.category_id_mapping = {}
        
        # Setup random seed
        random.seed(config.random_seed)
        print(f"Random seed set to: {config.random_seed}")
        
        # Create COCO category mapping
        self._create_coco_categories()
    
    def _create_coco_categories(self):
        """Create COCO format categories from target classes"""
        category_id = 1
        
        # IMPORTANT: use the fixed order from config.target_classes
        for target_class in self.config.target_classes:
            self.coco_categories.append({
                'id': category_id,
                'name': target_class,
                'supercategory': target_class  
            })
            self.category_id_mapping[target_class] = category_id
            category_id += 1
        
        print(f"Created {len(self.coco_categories)} COCO categories in fixed order:")
        for cat in self.coco_categories:
            print(f"  ID {cat['id']}: {cat['name']} (supercategory: {cat['supercategory']})")
    
    def setup_directories(self):
        """Create necessary output directories"""
        print("Setting up output directories...")
        
        # Create main output directory
        Path(self.config.full_output_path).mkdir(exist_ok=True, parents=True)
        
        # Create test directory
        Path(self.config.full_output_path, 'test').mkdir(exist_ok=True)
        
        # Create annotations directory
        Path(self.config.full_output_path, 'annotations').mkdir(exist_ok=True)
        
        print("Directories created successfully")
    
    def load_voc_data(self):
        """Load Pascal VOC dataset from both test and train_val sets"""
        print("Loading Pascal VOC dataset...")
        
        try:
            # Define both test and train_val paths
            voc_paths = [
                os.path.join(self.config.voc_path, 'VOC2012_test', 'VOC2012_test'),
                os.path.join(self.config.voc_path, 'VOC2012_train_val', 'VOC2012_train_val')
            ]
            
            total_processed = 0
            
            for voc_root in voc_paths:
                if not os.path.exists(voc_root):
                    print(f"Path not found: {voc_root}")
                    continue
                    
                print(f"Processing: {voc_root}")
                
                # Load annotations
                annotations_path = os.path.join(voc_root, 'Annotations')
                images_path = os.path.join(voc_root, 'JPEGImages')
                
                if not os.path.exists(annotations_path):
                    print(f"Annotations directory not found: {annotations_path}")
                    continue
                
                if not os.path.exists(images_path):
                    print(f"Images directory not found: {images_path}")
                    continue
                
                # Process all annotation files in this directory
                annotation_files = [f for f in os.listdir(annotations_path) if f.endswith('.xml')]
                print(f"Found {len(annotation_files)} annotation files in {os.path.basename(voc_root)}")
                
                for ann_file in tqdm(annotation_files, desc=f"Loading {os.path.basename(voc_root)} annotations"):
                    ann_path = os.path.join(annotations_path, ann_file)
                    image_id = ann_file.replace('.xml', '')
                    
                    # Skip if we already processed this image (avoid duplicates)
                    if image_id in self.voc_data:
                        continue
                    
                    # Parse XML annotation
                    tree = ET.parse(ann_path)
                    root = tree.getroot()
                    
                    # Get image info
                    filename = root.find('filename').text
                    size = root.find('size')
                    width = int(size.find('width').text)
                    height = int(size.find('height').text)
                    
                    # Check if image file exists
                    img_path = os.path.join(images_path, filename)
                    if not os.path.exists(img_path):
                        continue
                    
                    # Parse objects
                    objects = []
                    for obj in root.findall('object'):
                        voc_class = obj.find('name').text
                        
                        # Only include classes we want to map
                        if voc_class in self.config.class_mapping:
                            target_class = self.config.class_mapping[voc_class]
                            
                            bbox = obj.find('bndbox')
                            xmin = int(float(bbox.find('xmin').text))
                            ymin = int(float(bbox.find('ymin').text))
                            xmax = int(float(bbox.find('xmax').text))
                            ymax = int(float(bbox.find('ymax').text))
                            
                            objects.append({
                                'voc_class': voc_class,
                                'target_class': target_class,
                                'bbox': [xmin, ymin, xmax, ymax]
                            })
                    
                    # Store image data
                    self.voc_data[image_id] = {
                        'filename': filename,
                        'width': width,
                        'height': height,
                        'objects': objects,
                        'image_path': img_path
                    }
                    total_processed += 1
            
            print(f"Loaded {len(self.voc_data)} unique images with annotations")
            print(f"Total files processed: {total_processed}")
            
            if len(self.voc_data) == 0:
                print("No valid VOC data found!")
                raise FileNotFoundError("No valid VOC data found")
            
        except Exception as e:
            print(f"Error loading VOC data: {e}")
            raise
    
    
    def analyze_class_distribution(self):
        """Analyze distribution of target classes"""
        print("Analyzing class distribution...")
        
        try:
            class_counts = {}
            class_images = {}
            
            # Initialize counters ВИКОРИСТОВУЮЧИ ФІКСОВАНИЙ ПОРЯДОК
            for target_class in self.config.target_classes:
                class_counts[target_class] = 0
                class_images[target_class] = set()
            
            # Count images and annotations for each target class
            for image_id, data in self.voc_data.items():
                image_classes = set()
                for obj in data['objects']:
                    target_class = obj['target_class']
                    class_counts[target_class] += 1
                    image_classes.add(target_class)
                
                # Add image to each class it contains
                for target_class in image_classes:
                    class_images[target_class].add(image_id)
            
            # Count negative images (images without any target classes)
            all_target_images = set()
            for class_imgs in class_images.values():
                all_target_images.update(class_imgs)
            
            negative_images = set()
            for image_id in self.voc_data.keys():
                if image_id not in all_target_images:
                    negative_images.add(image_id)
            
            # Store statistics
            self.class_stats = {
                'class_counts': class_counts,
                'class_images': class_images,
                'negative_images': negative_images
            }
            
            # Log statistics В ФІКСОВАНОМУ ПОРЯДКУ
            print("=== CLASS DISTRIBUTION ANALYSIS ===")
            for target_class in self.config.target_classes:
                img_count = len(class_images[target_class])
                ann_count = class_counts[target_class]
                print(f"{target_class}: {img_count:,} images, {ann_count:,} annotations")
            
            print(f"negative: {len(negative_images):,} images")
            print(f"Total images: {len(self.voc_data):,}")
            
        except Exception as e:
            print(f"Error analyzing class distribution: {e}")
            raise

    
    def sample_images_by_class(self, target_counts: Dict[str, int]) -> Dict[str, List[str]]:
        """Sample images from each class according to target counts"""
        print(f"Sampling images with target counts: {target_counts}")
        
        try:
            sampled_images = {}
            
            # Sample from each target class В ФІКСОВАНОМУ ПОРЯДКУ
            for target_class in self.config.target_classes:
                if target_class in target_counts:
                    available_ids = list(self.class_stats['class_images'][target_class])
                    sample_count = min(target_counts[target_class], len(available_ids))
                    
                    if sample_count > 0:
                        sampled = random.sample(available_ids, sample_count)
                        sampled_images[target_class] = sampled
                        print(f"  {target_class}: {sample_count:,} / {len(available_ids):,} images")
            
            # Handle negative images
            if 'negative' in target_counts:
                available_ids = list(self.class_stats['negative_images'])
                sample_count = min(target_counts['negative'], len(available_ids))
                
                if sample_count > 0:
                    sampled = random.sample(available_ids, sample_count)
                    sampled_images['negative'] = sampled
                    print(f"  negative: {sample_count:,} / {len(available_ids):,} images")
            
            return sampled_images
            
        except Exception as e:
            print(f"Error sampling images: {e}")
            raise

    
    def create_test_dataset(self, sampled_images: Dict[str, List[str]]) -> Tuple[int, int]:
        """Create test dataset with sampled images in COCO format"""
        print("Creating test dataset...")
        
        try:
            # Collect all sampled image IDs
            all_img_ids = []
            for class_name, img_ids in sampled_images.items():
                all_img_ids.extend(img_ids)
            
            unique_img_ids = list(set(all_img_ids))
            print(f"Processing {len(unique_img_ids)} unique images for test")
            
            # Create COCO format data
            coco_images = []
            coco_annotations = []
            annotation_id = 1
            
            negative_imgs = set(sampled_images.get('negative', []))
            
            for image_id in tqdm(unique_img_ids, desc="Processing test images"):
                voc_data = self.voc_data[image_id]
                
                # Create COCO image entry
                coco_image = {
                    'id': int(image_id) if image_id.isdigit() else hash(image_id) % (2**31),
                    'file_name': voc_data['filename'],
                    'width': voc_data['width'],
                    'height': voc_data['height']
                }
                coco_images.append(coco_image)
                
                # Create COCO annotations (skip negative images)
                if image_id not in negative_imgs:
                    for obj in voc_data['objects']:
                        # Convert VOC bbox to COCO format
                        xmin, ymin, xmax, ymax = obj['bbox']
                        width = xmax - xmin
                        height = ymax - ymin
                        area = width * height
                        
                        coco_annotation = {
                            'id': annotation_id,
                            'image_id': coco_image['id'],
                            'category_id': self.category_id_mapping[obj['target_class']],
                            'bbox': [xmin, ymin, width, height],
                            'area': area,
                            'iscrowd': 0
                        }
                        coco_annotations.append(coco_annotation)
                        annotation_id += 1
            
            # Copy image files
            print(f"Copying {len(coco_images)} images to test directory...")
            
            for img_data in tqdm(coco_images, desc="Copying test images"):
                image_id = None
                for vid, vdata in self.voc_data.items():
                    if vdata['filename'] == img_data['file_name']:
                        image_id = vid
                        break
                
                if image_id and image_id in self.voc_data:
                    src_path = self.voc_data[image_id]['image_path']
                    dst_path = os.path.join(self.config.full_output_path, 'test', img_data['file_name'])
                    
                    if os.path.exists(src_path):
                        shutil.copy2(src_path, dst_path)
                    else:
                        print(f"Image not found: {src_path}")
            
            # Save annotation file
            annotation_data = {
                'images': coco_images,
                'annotations': coco_annotations,
                'categories': self.coco_categories
            }
            
            ann_file = os.path.join(self.config.full_output_path, 'annotations', 'instances_test.json')
            with open(ann_file, 'w') as f:
                json.dump(annotation_data, f, indent=2)
            
            print(f"✅ Test dataset created: {len(coco_images):,} images, {len(coco_annotations):,} annotations")
            return len(coco_images), len(coco_annotations)
            
        except Exception as e:
            print(f"Error creating test dataset: {e}")
            raise
    
    def process_dataset(self):
        """Main processing pipeline"""
        print("Starting Pascal VOC dataset processing pipeline...")
        
        try:
            # Step 1: Setup
            self.setup_directories()
            
            # Step 2: Load VOC data
            self.load_voc_data()
            
            # Step 3: Analyze class distribution
            self.analyze_class_distribution()
            
            # Step 4: Sample images for test set
            print(f"\n{'='*50}")
            print("Processing TEST split")
            print(f"{'='*50}")
            
            sampled_images = self.sample_images_by_class(self.config.dataset_sizes)
            
            # Step 5: Create test dataset
            img_count, ann_count = self.create_test_dataset(sampled_images)
            
            # Final summary
            print(f"\n{'='*50}")
            print("FINAL DATASET SUMMARY")
            print(f"{'='*50}")
            
            print(f"Test: {img_count:,} images, {ann_count:,} annotations")
            print(f"✅ Dataset created successfully at: {self.config.full_output_path}")
            
            return {'test': {'images': img_count, 'annotations': ann_count}}
            
        except Exception as e:
            print(f"Error in processing pipeline: {e}")
            raise

In [None]:
config = DatasetConfig()

In [None]:
processor = VOCProcessor(config)

In [None]:
results = processor.process_dataset()

In [None]:
import os
from IPython.core.display import display, HTML

os.chdir('/kaggle/working')

# 🧠 Отримуємо ім'я директорії з датасетом
dataset_folder = DatasetConfig.dataset_name  # наприклад, "coco"
zip_filename = f"{dataset_folder}.zip"
print(f"Creating archive: {zip_filename}")

# 🧩 Використовуємо змінну у shell-команді через подвійні дужки
!zip -r -q "$zip_filename" "$dataset_folder"

zip_path = f'/kaggle/working/{zip_filename}'
if os.path.exists(zip_path):
    file_size = os.path.getsize(zip_path) / (1024*1024)  # MB
    print(f"\n✅ Archive created successfully!")
    print(f"📁 File: {zip_filename}")
    print(f"📊 Size: {file_size:.1f} MB")
    print(f"📍 Path: {zip_path}")

    print(f"\n📋 Archive contents:")
    !zipinfo "$zip_filename" | head -20

    display(HTML(f"""
    <div style="background-color: #e8f5e8; padding: 15px; border-radius: 10px; margin: 10px 0;">
        <h3>📥 Download Ready</h3>
        <a href="{zip_filename}" download style="background-color: #4CAF50; color: white; padding: 10px 20px; text-decoration: none; border-radius: 5px; font-weight: bold;">
        📥 Download {zip_filename} ({file_size:.1f} MB)
        </a>
    </div>
    """))
else:
    print("❌ Error: Archive not created!")
    print("Checking working directory contents:")
    !ls -la /kaggle/working/