In [11]:
import tensorflow as tf
import os
import xml.etree.ElementTree as ET
import numpy as np
from typing import List, Dict, Tuple

# Handle different TF versions
try:
    AUTOTUNE = tf.data.AUTOTUNE  # TF 2.4+
except AttributeError:
    AUTOTUNE = tf.data.experimental.AUTOTUNE  # TF 2.0-2.3

class IDDFGVDLoader:
    def __init__(self, base_dir: str):
        """
        Initialize the data loader for IDD-FGVD dataset with XML annotations
        
        Args:
            base_dir: Path to the root directory (idd_fgvd)
        """
        self.base_dir = base_dir
        
        # Verify directory structure
        self._verify_directory_structure()
        
        # Get file lists
        self.train_files = self._get_file_pairs('train')
        self.val_files = self._get_file_pairs('val')
        self.test_files = self._get_file_pairs('test')
        
        # Initialize label mapping
        self.label_map = self._create_label_map()
        if not self.label_map:
            raise ValueError("No valid labels found in the dataset!")
        self.reverse_label_map = {v: k for k, v in self.label_map.items()}
        
        print(f"\nDataset initialized successfully with:")
        print(f"- {len(self.train_files)} training samples")
        print(f"- {len(self.val_files)} validation samples")
        print(f"- {len(self.test_files)} test samples")
        print(f"- {len(self.label_map)} classes: {self.label_map}\n")

    def _verify_directory_structure(self):
        """Verify the dataset directory structure"""
        required_dirs = [
            ('train', 'images'),
            ('train', 'annos'),
            ('val', 'images'),
            ('val', 'annos'),
            ('test', 'images'),
            ('test', 'annos')
        ]
        
        missing_dirs = []
        for split, subdir in required_dirs:
            dir_path = os.path.join(self.base_dir, split, subdir)
            if not os.path.exists(dir_path):
                missing_dirs.append(dir_path)
        
        if missing_dirs:
            raise FileNotFoundError(
                f"Missing required directories:\n" + 
                "\n".join(missing_dirs) +
                "\nPlease check your dataset structure."
            )

    def _get_file_pairs(self, split: str) -> List[Tuple[str, str]]:
        """Get matching image and XML annotation file pairs for a split"""
        image_dir = os.path.join(self.base_dir, split, 'images')
        anno_dir = os.path.join(self.base_dir, split, 'annos')
        
        # Get sorted lists of files
        image_files = sorted([
            f for f in os.listdir(image_dir) 
            if f.lower().endswith(('.jpg', '.jpeg', '.png'))
        ])
        anno_files = sorted([
            f for f in os.listdir(anno_dir)
            if f.lower().endswith('.xml')
        ])
        
        # Create pairs by matching base names
        pairs = []
        for img_file in image_files:
            base_name = os.path.splitext(img_file)[0]
            anno_file = f"{base_name}.xml"
            
            if anno_file in anno_files:
                pairs.append((
                    os.path.join(image_dir, img_file),
                    os.path.join(anno_dir, anno_file)
                ))
            else:
                print(f"Warning: No annotation found for {img_file}")
        
        if not pairs:
            raise ValueError(f"No valid image-annotation pairs found in {split} split")
            
        return pairs

    def _create_label_map(self) -> Dict[str, int]:
        """Create mapping from simplified labels to unique integers"""
        all_labels = set()
        
        for split in ['train', 'val', 'test']:
            anno_dir = os.path.join(self.base_dir, split, 'annos')
            
            for anno_file in os.listdir(anno_dir):
                if not anno_file.endswith('.xml'):
                    continue
                
                try:
                    tree = ET.parse(os.path.join(anno_dir, anno_file))
                    root = tree.getroot()
                    
                    for obj in root.findall('object'):
                        name = obj.find('name')
                        if name is None:
                            continue
                            
                        # Simplify label (e.g., "car_honda" -> "car")
                        full_label = name.text
                        simplified = full_label.split('_')[0]
                        all_labels.add(simplified)
                except Exception as e:
                    print(f"Error processing {anno_file}: {str(e)}")
                    continue
        
        return {label: idx for idx, label in enumerate(sorted(all_labels), start=1)}

    def _load_image(self, img_path: tf.Tensor) -> tf.Tensor:
        """Load and normalize an image"""
        img = tf.io.read_file(img_path)
        img = tf.image.decode_image(img, channels=3, expand_animations=False)
        img = tf.cast(img, tf.float32)
        return img

    def _parse_annotations(self, anno_path: tf.Tensor) -> Dict:
        """Parse XML annotations into tensors"""
        def _parse_xml(path):
            path = path.numpy().decode('utf-8')
            tree = ET.parse(path)
            root = tree.getroot()
            
            boxes = []
            label_ids = []
            
            for obj in root.findall('object'):
                name = obj.find('name')
                bbox = obj.find('bndbox')
                
                if name is None or bbox is None:
                    continue
                
                # Get simplified label ID
                full_label = name.text
                simplified = full_label.split('_')[0]
                
                # Convert bbox coordinates to float32
                x1 = float(bbox.find('xmin').text)
                y1 = float(bbox.find('ymin').text)
                x2 = float(bbox.find('xmax').text)
                y2 = float(bbox.find('ymax').text)
                
                boxes.append([x1, y1, x2, y2])
                label_ids.append(self.label_map[simplified])
            
            return (
                np.array(boxes, dtype=np.float32),
                np.array(label_ids, dtype=np.int32)
            )
        
        # Parse using tf.py_function
        boxes, labels = tf.py_function(
            _parse_xml,
            [anno_path],
            [tf.float32, tf.int32]
        )
        
        # Create unique image ID
        image_id = tf.strings.to_hash_bucket_fast(anno_path, num_buckets=2**31)
        
        return {
            'boxes': boxes,
            'labels': labels,
            'image_id': tf.cast(image_id, tf.int64)
        }

    def _process_sample(self, img_path: tf.Tensor, anno_path: tf.Tensor) -> Tuple[tf.Tensor, Dict]:
        """Process a single image-annotation pair"""
        img = self._load_image(img_path)
        target = self._parse_annotations(anno_path)
        
        # Add image dimensions
        img_shape = tf.shape(img)
        target['original_size'] = tf.stack([img_shape[0], img_shape[1]])
        target['size'] = tf.identity(target['original_size'])
        
        return img, target

    def get_dataset(self, split: str = 'train', batch_size: int = 1) -> tf.data.Dataset:
        """
        Get a TensorFlow Dataset for the specified split
        
        Args:
            split: One of 'train', 'val', or 'test'
            batch_size: Batch size (recommend 1 for variable-sized images)
        
        Returns:
            A TensorFlow Dataset yielding (image, target) pairs
        """
        if split not in ['train', 'val', 'test']:
            raise ValueError("Split must be 'train', 'val', or 'test'")
            
        files = getattr(self, f"{split}_files")
        
        # Create dataset from paths
        img_paths = tf.constant([f[0] for f in files], dtype=tf.string)
        anno_paths = tf.constant([f[1] for f in files], dtype=tf.string)
        
        dataset = tf.data.Dataset.from_tensor_slices((img_paths, anno_paths))
        
        # Process samples in parallel
        dataset = dataset.map(
            self._process_sample,
            num_parallel_calls=AUTOTUNE
        )
        
        # Batch handling
        if batch_size > 1:
            dataset = dataset.batch(batch_size)
        else:
            # Add batch dimension for batch_size=1
            dataset = dataset.map(
                lambda x, y: (tf.expand_dims(x, 0), {
                    k: tf.expand_dims(v, 0) for k, v in y.items()
                }),
                num_parallel_calls=AUTOTUNE
            )
        
        return dataset.prefetch(buffer_size=AUTOTUNE)


# Example usage
if __name__ == "__main__":
    try:
        # Initialize with your dataset path
        loader = IDDFGVDLoader(base_dir=r"C:\Users\Sejal Hanmante\OneDrive\Desktop\idd detection\IDD_FGVD")
        
        # Get training dataset
        train_dataset = loader.get_dataset('train')
        
        # Inspect first sample
        for images, targets in train_dataset.take(1):
            print("\nFirst sample details:")
            print("Image shape:", images.shape)
            print("Boxes:", targets['boxes'].numpy())
            print("Labels:", targets['labels'].numpy())
            print("Original size:", targets['original_size'].numpy())
            
            # Convert numeric labels back to text
            print("Label names:", [
                loader.reverse_label_map[label] 
                for label in targets['labels'].numpy().flatten()
            ])
            
    except Exception as e:
        print(f"\nError: {str(e)}")
        print("Please verify:")
        print("1. The base directory path is correct")
        print("2. The dataset follows the required structure")
        print("3. Annotation files are in Pascal VOC XML format")


Dataset initialized successfully with:
- 3535 training samples
- 884 validation samples
- 1083 test samples
- 7 classes: {'autorickshaw': 1, 'bus': 2, 'car': 3, 'mini-bus': 4, 'motorcycle': 5, 'scooter': 6, 'truck': 7}


First sample details:
Image shape: (1, 1080, 1920, 3)
Boxes: [[[ 996.    619.   1086.    752.  ]
  [ 855.6   565.    958.9   688.63]]]
Labels: [[5 3]]
Original size: [[1080 1920]]
Label names: ['motorcycle', 'car']


In [12]:
!git clone https://github.com/roboflow/inference.git

Cloning into 'inference'...
Updating files:  32% (590/1827)
Updating files:  33% (603/1827)
Updating files:  34% (622/1827)
Updating files:  35% (640/1827)
Updating files:  36% (658/1827)
Updating files:  37% (676/1827)
Updating files:  38% (695/1827)
Updating files:  39% (713/1827)
Updating files:  40% (731/1827)
Updating files:  41% (750/1827)
Updating files:  42% (768/1827)
Updating files:  43% (786/1827)
Updating files:  44% (804/1827)
Updating files:  45% (823/1827)
Updating files:  46% (841/1827)
Updating files:  47% (859/1827)
Updating files:  48% (877/1827)
Updating files:  49% (896/1827)
Updating files:  50% (914/1827)
Updating files:  51% (932/1827)
Updating files:  52% (951/1827)
Updating files:  53% (969/1827)
Updating files:  54% (987/1827)
Updating files:  55% (1005/1827)
Updating files:  56% (1024/1827)
Updating files:  57% (1042/1827)
Updating files:  58% (1060/1827)
Updating files:  59% (1078/1827)
Updating files:  60% (1097/1827)
Updating files:  61% (1115/1827)
Updat

RT-DETR

In [36]:
import os
import xml.etree.ElementTree as ET

def update_class_names_in_xml(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    modified = False

    for obj in root.findall('object'):
        name = obj.find('name')
        if name is not None and '_' in name.text:
            old = name.text
            name.text = name.text.split('_')[0]  # Keep only first part
            print(f"Updated '{old}' → '{name.text}' in {os.path.basename(xml_file)}")
            modified = True

    if modified:
        tree.write(xml_file)

def update_all_xmls_in_dir(root_dir):
    for subdir, _, files in os.walk(root_dir):
        for file in files:
            if file.endswith('.xml'):
                full_path = os.path.join(subdir, file)
                update_class_names_in_xml(full_path)

# 🔁 Set this to your base annotations folder (train/val/test annos)
base_annos_dir = r"C:\Users\Sejal Hanmante\OneDrive\Desktop\idd detection\IDD_FGVD"

# This will go through all train/val/test/annos folders and update classes
for split in ['train', 'val', 'test']:
    anno_path = os.path.join(base_annos_dir, split, 'annos')
    update_all_xmls_in_dir(anno_path)

print("\n✅ All XML files updated!")


Updated 'motorcycle_Honda_Shine' → 'motorcycle' in 0.xml
Updated 'car_MarutiSuzuki_Omni' → 'car' in 0.xml
Updated 'autorickshaw_Piaggio' → 'autorickshaw' in 1.xml
Updated 'car_MarutiSuzuki_Alto800' → 'car' in 1.xml
Updated 'scooter_TVS_Jupiter' → 'scooter' in 1.xml
Updated 'truck_Others' → 'truck' in 1.xml
Updated 'scooter_Honda_Aviator' → 'scooter' in 10.xml
Updated 'car_Toyota_Qualis' → 'car' in 10.xml
Updated 'car_MarutiSuzuki_Dzire' → 'car' in 10.xml
Updated 'car_Honda_City' → 'car' in 10.xml
Updated 'car_Hyundai_I10' → 'car' in 10.xml
Updated 'motorcycle_TVS_StarCityPlus' → 'motorcycle' in 10.xml
Updated 'motorcycle_Bajaj_Pulsar220F' → 'motorcycle' in 10.xml
Updated 'scooter_Honda_Activa' → 'scooter' in 10.xml
Updated 'motorcycle_Hero_Splendor' → 'motorcycle' in 10.xml
Updated 'autorickshaw_Bajaj' → 'autorickshaw' in 100.xml
Updated 'car_Hyundai_Santro' → 'car' in 100.xml
Updated 'scooter_Honda_Activa' → 'scooter' in 100.xml
Updated 'scooter_Honda_Activa' → 'scooter' in 100.xml
Up

In [37]:
import os
import xml.etree.ElementTree as ET
import json

def xml_to_json(xml_path):
    tree = ET.parse(xml_path)
    root = tree.getroot()
    
    annotation = {'image': root.find('filename').text, 'annos': []}
    
    for obj in root.findall('object'):
        bbox = obj.find('bndbox')
        annotation['annos'].append({
            'class': obj.find('name').text,
            'bbox': [
                float(bbox.find('xmin').text),
                float(bbox.find('ymin').text),
                float(bbox.find('xmax').text),
                float(bbox.find('ymax').text)
            ]
        })
    
    return annotation

def convert_folder_to_json(split_path, json_output_dir):
    annos_path = os.path.join(split_path, 'annos')
    os.makedirs(json_output_dir, exist_ok=True)

    for xml_file in os.listdir(annos_path):
        if xml_file.endswith('.xml'):
            xml_path = os.path.join(annos_path, xml_file)
            json_data = xml_to_json(xml_path)

            json_filename = xml_file.replace('.xml', '.json')
            with open(os.path.join(json_output_dir, json_filename), 'w') as jf:
                json.dump(json_data, jf, indent=2)

base_path = r'C:\Users\Sejal Hanmante\OneDrive\Desktop\idd detection\IDD_FGVD'
splits = ['train', 'val', 'test']

for split in splits:
    split_path = os.path.join(base_path, split)
    json_output_dir = os.path.join(base_path, f'{split}_json')
    convert_folder_to_json(split_path, json_output_dir)
