In [3]:
import os
import json
import shutil
import tarfile
from collections import defaultdict
import random

In [4]:

def extract_tar_contents(tar_path, extract_path):
    """Extracts tar file contents into a specified directory."""
    with tarfile.open(tar_path, 'r') as tar:
        tar.extractall(path=extract_path)

In [10]:
base_path= 'E:/Matrice.ai/'

In [12]:

def load_annotations(base_path):
    """Load annotations from JSON files and combine them."""
    combined_annotations = []
    for phase in ['train', 'val']:  # Test set assumed to have no annotations
        with open(os.path.join(base_path, 'annotations', f'instances_{phase}2024.json'), 'r') as file:
            data = json.load(file)
            for ann in data['annotations']:
                img_info = next((img for img in data['images'] if img['id'] == ann['image_id']), None)
                if img_info:
                    combined_annotations.append({
                        'filename': img_info['file_name'],
                        'filepath': os.path.join(base_path, 'images', phase, img_info['file_name']),
                        'width': img_info['width'],
                        'height': img_info['height'],
                        'bbox': ann['bbox'],
                        'category_id': ann['category_id']
                    })
    return combined_annotations

In [13]:
def filter_by_class_frequency(annotations, min_images=50):
    """Filter annotations to only include classes with at least `min_images` across the dataset."""
    class_counts = defaultdict(int)
    for ann in annotations:
        class_counts[ann['category_id']] += 1

    # Filter annotations based on class count
    filtered_annotations = [ann for ann in annotations if class_counts[ann['category_id']] >= min_images]
    return filtered_annotations

In [14]:
def split_data(annotations, train_ratio=0.7, val_ratio=0.2, test_ratio=0.1):
    """Split data into train, val, and test sets."""
    random.shuffle(annotations)
    total = len(annotations)
    train_end = int(train_ratio * total)
    val_end = train_end + int(val_ratio * total)

    train_set = annotations[:train_end]
    val_set = annotations[train_end:val_end]
    test_set = annotations[val_end:]
    return train_set, val_set, test_set

In [15]:
def write_yolo_annotations(data, output_dir):
    """Convert annotations to YOLO format and write to files."""
    os.makedirs(output_dir, exist_ok=True)
    for item in data:
        label_path = os.path.join(output_dir, os.path.splitext(item['filename'])[0] + '.txt')
        with open(label_path, 'w') as file:
            x_center = (item['bbox'][0] + item['bbox'][2] / 2) / item['width']
            y_center = (item['bbox'][1] + item['bbox'][3] / 2) / item['height']
            width = item['bbox'][2] / item['width']
            height = item['bbox'][3] / item['height']
            file.write(f"{item['category_id']} {x_center} {y_center} {width} {height}\n")
        shutil.copy(item['filepath'], output_dir)

In [16]:

def main(tar_path, output_base, sample_size=500):
    extract_path = 'temp_dataset'
    extract_tar_contents(tar_path, extract_path)

    annotations = load_annotations(extract_path)
    filtered_annotations = filter_by_class_frequency(annotations)
    random_sample = random.sample(filtered_annotations, min(sample_size, len(filtered_annotations)))
    train_data, val_data, test_data = split_data(random_sample)

    for phase, data in zip(['train', 'val', 'test'], [train_data, val_data, test_data]):
        output_dir = os.path.join(output_base, phase)
        write_yolo_annotations(data, output_dir)

    shutil.rmtree(extract_path)  
    
tar_path = 'E:/Matrice.ai/deep_fashion.tar'
output_base = 'E:/Matrice.ai/output'
main(tar_path, output_base)
