In [2]:
# Export → Training data conversion  
# Flexible Label Studio Export Converter
#!/usr/bin/env python3
"""
Complete Flexible Label Studio Export Converter
Converts Label Studio JSON exports to object detection training format
Features:
- Flexible filename matching (handles Label Studio prefixes, Unicode chars, etc.)
- Configurable train/val splits (ratio or manual)
- Custom output directory naming
- Visualization generation
- YOLO format output
- Comprehensive error handling and logging
"""

import json
import os
import shutil
import re
from pathlib import Path
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.patches as patches

class FlexibleLabelStudioConverter:
    
    def __init__(self, class_mapping=None, create_visualizations=True):
        """
        Initialize converter with custom class mapping
        
        Args:
            class_mapping (dict): Maps label names to class IDs
                                 If None, will auto-generate from data
            create_visualizations (bool): Whether to create bbox visualization images
        """
        self.class_mapping = class_mapping or {}
        self.auto_generate_classes = class_mapping is None
        self.create_visualizations = create_visualizations
        
        # Color mapping for visualizations
        self.colors = ['red', 'blue', 'green', 'orange', 'purple', 'brown', 
                      'pink', 'gray', 'olive', 'cyan', 'magenta', 'yellow']
    
    def smart_filename_match(self, expected_filename, actual_files):
        """
        Intelligent filename matching using multiple strategies
        
        Args:
            expected_filename (str): Filename from Label Studio export
            actual_files (list): List of actual filenames in directory
            
        Returns:
            str or None: Best matched filename or None if no match
        """
        
        # Strategy 1: Exact match
        if expected_filename in actual_files:
            return expected_filename
        
        # Strategy 2: Remove Label Studio prefixes (e.g., "abc123-filename.jpg")
        base_expected = expected_filename
        if '-' in expected_filename and len(expected_filename.split('-')[0]) <= 8:
            base_expected = expected_filename.split('-', 1)[1]
        
        for actual in actual_files:
            if base_expected == actual:
                return actual
        
        # Strategy 3: Extract specific time pattern and match precisely
        time_match = re.search(r'(\d+\.\d+\.\d+)', expected_filename)
        if time_match:
            expected_time = time_match.group(1)
            
            for actual_file in actual_files:
                # Clean Unicode characters that might interfere
                cleaned_actual = actual_file.encode('ascii', 'ignore').decode('ascii')
                if expected_time in cleaned_actual:
                    return actual_file
                
                # Try colon format as well
                expected_time_colon = expected_time.replace('.', ':')
                if expected_time_colon in cleaned_actual:
                    return actual_file
        
        # Strategy 4: Normalize spaces, underscores, and URL encoding
        normalized_expected = base_expected.replace('_', ' ').replace('%20', ' ')
        for actual in actual_files:
            if normalized_expected.lower() == actual.lower():
                return actual
        
        # Strategy 5: Look for other number patterns
        number_patterns = re.findall(r'\d{3,}', expected_filename)
        for number in number_patterns:
            for actual in actual_files:
                if number in actual:
                    return actual
        
        # Strategy 6: Similarity matching (last resort)
        best_match = None
        best_score = 0
        
        for actual in actual_files:
            # Simple similarity: count common characters
            common_chars = sum(1 for a, b in zip(expected_filename.lower(), actual.lower()) if a == b)
            similarity = common_chars / max(len(expected_filename), len(actual))
            
            if similarity > 0.8 and similarity > best_score:  # 80% similarity threshold
                best_score = similarity
                best_match = actual
        
        return best_match
    
    def extract_classes_from_annotations(self, annotations):
        """Extract unique class names from annotations"""
        classes = set()
        
        for item in annotations:
            if not item.get('annotations'):
                continue
                
            for annotation in item['annotations']:
                for result in annotation.get('result', []):
                    if result.get('type') == 'rectanglelabels':
                        labels = result['value'].get('rectanglelabels', [])
                        classes.update(labels)
        
        return sorted(list(classes))
    
    def determine_split_assignment(self, num_annotations, split_method='ratio', split_ratio=0.8, train_indices=None):
        """
        Determine train/val split assignments
        
        Args:
            num_annotations (int): Total number of annotations
            split_method (str): 'ratio' or 'manual'
            split_ratio (float): Fraction for training (used if split_method='ratio')
            train_indices (list): Manual indices for training (used if split_method='manual')
            
        Returns:
            list: Boolean list where True = train, False = val
        """
        
        if split_method == 'manual':
            if train_indices is None:
                raise ValueError("train_indices must be provided when using manual split")
            
            assignments = ['val'] * num_annotations
            for idx in train_indices:
                if 0 <= idx < num_annotations:
                    assignments[idx] = 'train'
            return assignments
        
        elif split_method == 'ratio':
            train_count = int(num_annotations * split_ratio)
            assignments = ['train'] * train_count + ['val'] * (num_annotations - train_count)
            return assignments
        
        else:
            raise ValueError("split_method must be 'ratio' or 'manual'")
    
    def extract_bounding_boxes(self, results):
        """Extract bounding boxes from Label Studio results"""
        boxes = []
        
        for result in results:
            if result.get('type') == 'rectanglelabels':
                # Convert Label Studio percentage coordinates to normalized format
                x = result['value']['x'] / 100.0
                y = result['value']['y'] / 100.0
                width = result['value']['width'] / 100.0
                height = result['value']['height'] / 100.0
                
                # Convert to center coordinates (YOLO format)
                center_x = x + width / 2
                center_y = y + height / 2
                
                label = result['value']['rectanglelabels'][0]
                
                boxes.append({
                    'center_x': center_x,
                    'center_y': center_y,
                    'width': width,
                    'height': height,
                    'label': label
                })
        
        return boxes
    
    def create_visualization(self, image_path, bounding_boxes, output_path):
        """Create visualization of bounding boxes on image"""
        
        img = Image.open(image_path)
        fig, ax = plt.subplots(1, figsize=(12, 8))
        ax.imshow(img)
        
        img_width, img_height = img.size
        
        for i, bbox in enumerate(bounding_boxes):
            # Convert back to pixel coordinates
            center_x = bbox['center_x'] * img_width
            center_y = bbox['center_y'] * img_height
            width = bbox['width'] * img_width
            height = bbox['height'] * img_height
            
            x = center_x - width / 2
            y = center_y - height / 2
            
            color = self.colors[i % len(self.colors)]
            
            # Draw rectangle
            rect = patches.Rectangle((x, y), width, height,
                                   linewidth=2, edgecolor=color,
                                   facecolor='none', alpha=0.8)
            ax.add_patch(rect)
            
            # Add label
            ax.text(x, y-10, bbox['label'],
                   bbox=dict(boxstyle="round,pad=0.3", facecolor=color, alpha=0.7),
                   fontsize=10, color='white', weight='bold')
        
        ax.set_title(f"Annotations: {os.path.basename(image_path)}")
        ax.axis('off')
        plt.tight_layout()
        plt.savefig(output_path, dpi=150, bbox_inches='tight')
        plt.close()
    
    def create_yolo_config(self, output_dir, class_mapping):
        """Create YOLO configuration file"""
        
        yaml_content = f"""# Generated by Flexible Label Studio Converter
path: {os.path.abspath(output_dir)}
train: images/train
val: images/val

# Classes
names:
"""
        
        for class_name, class_id in class_mapping.items():
            yaml_content += f"  {class_id}: {class_name}\n"
        
        yaml_content += f"\n# Number of classes\nnc: {len(class_mapping)}\n"
        
        with open(f"{output_dir}/data.yaml", 'w') as f:
            f.write(yaml_content)
    
    def convert_to_object_detection(self, export_file, images_dir, output_dir, 
                                   split_method='ratio', split_ratio=0.8, train_indices=None):
        """
        Convert Label Studio export to object detection format
        
        Args:
            export_file (str): Path to Label Studio JSON export
            images_dir (str): Directory containing source images
            output_dir (str): Output directory name for converted dataset
            split_method (str): 'ratio' or 'manual'
            split_ratio (float): Fraction of data for training (if using ratio method)
            train_indices (list): Manual indices for training (if using manual method)
        
        Returns:
            dict: Conversion statistics and info
        """
        
        print("🔄 Converting Label Studio export to object detection format...")
        print(f"📂 Output directory: {output_dir}")
        print(f"📊 Split method: {split_method}")
        
        if split_method == 'ratio':
            print(f"📊 Split ratio: {split_ratio:.0%} train / {(1-split_ratio):.0%} val")
        else:
            print(f"📊 Manual split: train indices = {train_indices}")
        
        # Load annotations
        with open(export_file, 'r') as f:
            annotations = json.load(f)
        
        # Get actual image files
        images_dir = os.path.abspath(images_dir)
        actual_files = [f for f in os.listdir(images_dir) 
                       if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))
                       and not f.startswith('.') and 'test-copy' not in f.lower()]
        
        print(f"📁 Found {len(actual_files)} image files in {images_dir}")
        print(f"📊 Found {len(annotations)} annotations in export")
        
        # Auto-generate class mapping if needed
        if self.auto_generate_classes:
            unique_classes = self.extract_classes_from_annotations(annotations)
            self.class_mapping = {cls: idx for idx, cls in enumerate(unique_classes)}
            print(f"🏷️  Auto-detected classes: {unique_classes}")
        
        # Determine split assignments
        split_assignments = self.determine_split_assignment(
            len(annotations), split_method, split_ratio, train_indices
        )
        
        # Create output directories
        dirs_to_create = [
            f"{output_dir}/images/train",
            f"{output_dir}/images/val",
            f"{output_dir}/labels/train", 
            f"{output_dir}/labels/val"
        ]
        
        if self.create_visualizations:
            dirs_to_create.append(f"{output_dir}/visualizations")
        
        for dir_path in dirs_to_create:
            os.makedirs(dir_path, exist_ok=True)
        
        # Process annotations
        stats = {
            'processed': 0,
            'skipped': 0,
            'train_images': 0,
            'val_images': 0,
            'total_boxes': 0,
            'matched_files': [],
            'unmatched_files': [],
            'class_counts': {cls: 0 for cls in self.class_mapping.keys()}
        }
        
        print(f"\n📋 Processing annotations...")
        
        for i, item in enumerate(annotations):
            if not item.get('annotations') or not item['annotations'][0].get('result'):
                print(f"⚠️  Skipping annotation {i+1} - no bounding boxes")
                stats['skipped'] += 1
                continue
            
            # Find matching image file
            expected_filename = os.path.basename(item['data']['image'])
            matched_file = self.smart_filename_match(expected_filename, actual_files)
            
            if not matched_file:
                print(f"❌ No match for: {expected_filename}")
                stats['unmatched_files'].append(expected_filename)
                stats['skipped'] += 1
                continue
            
            print(f"✅ {expected_filename} → {matched_file}")
            stats['matched_files'].append((expected_filename, matched_file))
            
            # Get split assignment
            split = split_assignments[i]
            
            # Copy image
            source_path = os.path.join(images_dir, matched_file)
            target_path = f"{output_dir}/images/{split}/{matched_file}"
            shutil.copy2(source_path, target_path)
            
            # Process bounding boxes
            bounding_boxes = self.extract_bounding_boxes(item['annotations'][0]['result'])
            
            if bounding_boxes:
                # Save YOLO format labels
                base_name = os.path.splitext(matched_file)[0]
                label_path = f"{output_dir}/labels/{split}/{base_name}.txt"
                
                with open(label_path, 'w') as f:
                    for bbox in bounding_boxes:
                        class_id = self.class_mapping.get(bbox['label'], 0)
                        f.write(f"{class_id} {bbox['center_x']:.6f} {bbox['center_y']:.6f} "
                               f"{bbox['width']:.6f} {bbox['height']:.6f}\n")
                        
                        # Update class counts
                        stats['class_counts'][bbox['label']] += 1
                
                # Create visualization
                if self.create_visualizations:
                    self.create_visualization(target_path, bounding_boxes, 
                                            f"{output_dir}/visualizations/{matched_file}")
                
                stats['total_boxes'] += len(bounding_boxes)
            
            stats['processed'] += 1
            if split == 'train':
                stats['train_images'] += 1
            else:
                stats['val_images'] += 1
            
            print(f"   → {split} folder - {len(bounding_boxes)} boxes")
        
        # Save dataset info
        dataset_info = {
            'project_name': output_dir,
            'classes': list(self.class_mapping.keys()),
            'class_mapping': self.class_mapping,
            'num_classes': len(self.class_mapping),
            'split_method': split_method,
            'split_ratio': split_ratio if split_method == 'ratio' else None,
            'train_indices': train_indices if split_method == 'manual' else None,
            'statistics': stats
        }
        
        with open(f"{output_dir}/dataset_info.json", 'w') as f:
            json.dump(dataset_info, f, indent=2)
        
        # Create YOLO config
        self.create_yolo_config(output_dir, self.class_mapping)
        
        # Print summary
        print(f"\n🎉 Conversion complete!")
        print(f"📁 Output directory: {output_dir}/")
        print(f"📊 Successfully processed: {stats['processed']}/{len(annotations)} annotations")
        print(f"🏋️  Training images: {stats['train_images']}")
        print(f"🧪 Validation images: {stats['val_images']}")
        print(f"📦 Total bounding boxes: {stats['total_boxes']}")
        
        if stats['unmatched_files']:
            print(f"⚠️  Unmatched files: {len(stats['unmatched_files'])}")
        
        print(f"\n📈 Class distribution:")
        for class_name, count in stats['class_counts'].items():
            print(f"   {class_name}: {count} boxes")
        
        return dataset_info

# Convenience functions for different use cases
def convert_roof_damage_dataset(export_file, images_dir, output_dir, 
                               split_method='ratio', split_ratio=0.8, train_indices=None):
    """
    Convenience function for roof damage datasets with predefined classes
    
    Args:
        export_file (str): Path to Label Studio JSON export
        images_dir (str): Directory containing source images  
        output_dir (str): Name for output dataset directory
        split_method (str): 'ratio' or 'manual'
        split_ratio (float): Train ratio (if using ratio method)
        train_indices (list): Manual train indices (if using manual method)
    """
    
    roof_classes = {
        'no_damage': 0,
        'light_damage': 1,
        'moderate_damage': 2, 
        'severe_damage': 3,
        'missing_shingle': 4
    }
    
    converter = FlexibleLabelStudioConverter(class_mapping=roof_classes)
    return converter.convert_to_object_detection(
        export_file, images_dir, output_dir, split_method, split_ratio, train_indices
    )

def convert_custom_dataset(export_file, images_dir, output_dir, class_mapping,
                          split_method='ratio', split_ratio=0.8, train_indices=None):
    """
    Convert with custom class mapping
    
    Args:
        export_file (str): Path to Label Studio JSON export
        images_dir (str): Directory containing source images
        output_dir (str): Name for output dataset directory
        class_mapping (dict): Custom class name to ID mapping
        split_method (str): 'ratio' or 'manual' 
        split_ratio (float): Train ratio (if using ratio method)
        train_indices (list): Manual train indices (if using manual method)
    """
    
    converter = FlexibleLabelStudioConverter(class_mapping=class_mapping)
    return converter.convert_to_object_detection(
        export_file, images_dir, output_dir, split_method, split_ratio, train_indices
    )

def convert_auto_detect_dataset(export_file, images_dir, output_dir,
                               split_method='ratio', split_ratio=0.8, train_indices=None):
    """
    Auto-detect classes and convert any Label Studio dataset
    
    Args:
        export_file (str): Path to Label Studio JSON export
        images_dir (str): Directory containing source images
        output_dir (str): Name for output dataset directory
        split_method (str): 'ratio' or 'manual'
        split_ratio (float): Train ratio (if using ratio method) 
        train_indices (list): Manual train indices (if using manual method)
    """
    
    converter = FlexibleLabelStudioConverter()  # Auto-detect classes
    return converter.convert_to_object_detection(
        export_file, images_dir, output_dir, split_method, split_ratio, train_indices
    )

# Example usage and help
if __name__ == "__main__":
    print("🏠 Flexible Label Studio Converter - Usage Examples")
    print("="*60)
    
    print("\n1️⃣  ROOF DAMAGE DATASET (predefined classes):")
    print("   # 80/20 ratio split")
    print("   convert_roof_damage_dataset('export.json', 'images/', 'roof_dataset_v1')")
    print()
    print("   # 70/30 ratio split")
    print("   convert_roof_damage_dataset('export.json', 'images/', 'roof_dataset_v2', split_ratio=0.7)")
    print()
    print("   # Manual split (first 6 for train, rest for val)")
    print("   convert_roof_damage_dataset('export.json', 'images/', 'roof_dataset_v3',")
    print("                              split_method='manual', train_indices=[0,1,2,3,4,5])")
    
    print("\n2️⃣  CUSTOM DATASET (your own classes):")
    print("   custom_classes = {'defect': 0, 'normal': 1, 'critical': 2}")
    print("   convert_custom_dataset('export.json', 'images/', 'quality_control_v1', custom_classes)")
    
    print("\n3️⃣  AUTO-DETECT DATASET (any classes):")
    print("   convert_auto_detect_dataset('export.json', 'images/', 'my_dataset_v1')")
    
    print("\n4️⃣  ADVANCED CUSTOM USAGE:")
    print("   converter = FlexibleLabelStudioConverter({'cat': 0, 'dog': 1})")
    print("   converter.convert_to_object_detection('export.json', 'images/', 'pets_v1',")
    print("                                        split_method='manual', train_indices=[0,2,4])")

🏠 Flexible Label Studio Converter - Usage Examples

1️⃣  ROOF DAMAGE DATASET (predefined classes):
   # 80/20 ratio split
   convert_roof_damage_dataset('export.json', 'images/', 'roof_dataset_v1')

   # 70/30 ratio split
   convert_roof_damage_dataset('export.json', 'images/', 'roof_dataset_v2', split_ratio=0.7)

   # Manual split (first 6 for train, rest for val)
   convert_roof_damage_dataset('export.json', 'images/', 'roof_dataset_v3',
                              split_method='manual', train_indices=[0,1,2,3,4,5])

2️⃣  CUSTOM DATASET (your own classes):
   custom_classes = {'defect': 0, 'normal': 1, 'critical': 2}
   convert_custom_dataset('export.json', 'images/', 'quality_control_v1', custom_classes)

3️⃣  AUTO-DETECT DATASET (any classes):
   convert_auto_detect_dataset('export.json', 'images/', 'my_dataset_v1')

4️⃣  ADVANCED CUSTOM USAGE:
   converter = FlexibleLabelStudioConverter({'cat': 0, 'dog': 1})
   converter.convert_to_object_detection('export.json', 'images/', 'pet

In [7]:
# Option 1: Ratio Split i.e. 6 train, 3 val (recommended)
convert_roof_damage_dataset('projects/roof_shingle_inspection/project-labels.json', 
                           'projects/roof_shingle_inspection/raw_images', 
                           'projects/roof_shingle_inspection/pipeline',
                           split_ratio=0.67)

🔄 Converting Label Studio export to object detection format...
📂 Output directory: projects/roof_shingle_inspection/pipeline
📊 Split method: ratio
📊 Split ratio: 67% train / 33% val
📁 Found 9 image files in /Users/justinmartin/github/my_llm_project/projects/roof_shingle_inspection/raw_images
📊 Found 10 annotations in export

📋 Processing annotations...
✅ 14257669-Screenshot_2025-06-28_at_3.32.26PM.png → Screenshot 2025-06-28 at 3.35.18 PM.png
   → train folder - 2 boxes
✅ b73dafad-Screenshot_2025-06-28_at_3.32.36PM.png → Screenshot 2025-06-28 at 3.32.36 PM.png
   → train folder - 1 boxes
✅ 132000fe-Screenshot_2025-06-28_at_3.32.56PM.png → Screenshot 2025-06-28 at 3.32.56 PM.png
   → train folder - 1 boxes
✅ 2d9f9cac-Screenshot_2025-06-28_at_3.33.09PM.png → Screenshot 2025-06-28 at 3.33.09 PM.png
   → train folder - 1 boxes
✅ 005ab42e-Screenshot_2025-06-28_at_3.33.23PM.png → Screenshot 2025-06-28 at 3.33.23 PM.png
   → train folder - 1 boxes
✅ 2fce1fad-Screenshot_2025-06-28_at_3.33.39PM

{'project_name': 'projects/roof_shingle_inspection/pipeline',
 'classes': ['no_damage',
  'light_damage',
  'moderate_damage',
  'severe_damage',
  'missing_shingle'],
 'class_mapping': {'no_damage': 0,
  'light_damage': 1,
  'moderate_damage': 2,
  'severe_damage': 3,
  'missing_shingle': 4},
 'num_classes': 5,
 'split_method': 'ratio',
 'split_ratio': 0.67,
 'train_indices': None,
 'statistics': {'processed': 10,
  'skipped': 0,
  'train_images': 6,
  'val_images': 4,
  'total_boxes': 11,
  'matched_files': [('14257669-Screenshot_2025-06-28_at_3.32.26PM.png',
    'Screenshot 2025-06-28 at 3.35.18\u202fPM.png'),
   ('b73dafad-Screenshot_2025-06-28_at_3.32.36PM.png',
    'Screenshot 2025-06-28 at 3.32.36\u202fPM.png'),
   ('132000fe-Screenshot_2025-06-28_at_3.32.56PM.png',
    'Screenshot 2025-06-28 at 3.32.56\u202fPM.png'),
   ('2d9f9cac-Screenshot_2025-06-28_at_3.33.09PM.png',
    'Screenshot 2025-06-28 at 3.33.09\u202fPM.png'),
   ('005ab42e-Screenshot_2025-06-28_at_3.33.23PM.png',


In [None]:
# Option 2: Manual control (choose specific images for validation)
convert_roof_damage_dataset('project-1-export.json', 
                           'roof_damage_annotation_project/raw_images', 
                           'roof_damage_manual',
                           split_method='manual', 
                           train_indices=[0,1,2,3,4,5])  # First 6 for train