In [3]:
import json
import os
from PIL import Image, UnidentifiedImageError
from tqdm import tqdm
import logging

# Setup logging
logging.basicConfig(filename='yolo_conversion_json_to_yolov5_100k.log', level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')

# Define category mapping
category_mapping = {
    "person": 0,
    "pedestrian": 0,  # Merge pedestrian into person
    "rider": 1,
    "car": 2,
    "truck": 3,
    "bus": 4,
    "train": 5,
    "motor": 6,       # Motorcycle
    "motorcycle": 6,  # Merge motorcycle into motor
    "bike": 7,        # Bicycle
    "bicycle": 7,     # Merge bicycle into bike
    "traffic light": 8,
    "traffic sign": 9,
    "trailer": 10,
    "other person": 11,
    "other vehicle": 12
}

# Paths (Update these as needed)
bdd_annotations_path_train = r"C:\Users\sathish\Downloads\FL_ModelForAV\data\bdd100k\label_json\bdd100k_labels_images_train.json"
image_folder_path = r"C:\Users\sathish\Downloads\FL_ModelForAV\data\bdd100k\images\train"
output_label_folder = r"C:\Users\sathish\Downloads\FL_ModelForAV\data\bdd100k\labels\train"

# Create output folder
os.makedirs(output_label_folder, exist_ok=True)

# Load JSON annotations
try:
    with open(bdd_annotations_path_train, 'r') as file:
        annotations = json.load(file)
except (FileNotFoundError, json.JSONDecodeError) as e:
    logging.error(f"Error loading JSON file: {e}")
    raise SystemExit(f"Error loading JSON file: {e}")

# Track statistics
skipped_labels = 0
unrecognized_categories = set()
processed_images = 0
successful_conversions = 0
error_images = 0
missing_images = 0

logging.info("Starting YOLO conversion process...")

for annotation in tqdm(annotations, desc="Converting to YOLO format"):
    image_name = annotation['name']
    labels = annotation.get('labels', [])
    image_path = os.path.join(image_folder_path, image_name)

    if not os.path.exists(image_path):
        missing_images += 1
        logging.warning(f"Warning: Image {image_name} not found. Skipping.")
        continue

    try:
        with Image.open(image_path) as img:
            image_width, image_height = img.size
    except UnidentifiedImageError as e:
        error_images += 1
        logging.error(f"Error opening image {image_name}: {e}. Skipping.")
        continue

    processed_images += 1
    label_file_path = os.path.join(output_label_folder, os.path.splitext(image_name)[0] + '.txt')

    label_lines = []
    for label in labels:
        category = label.get('category', '').strip().lower()

        # Skip unrecognized categories but log them
        if category not in category_mapping:
            unrecognized_categories.add(category)
            logging.warning(f"Skipping unrecognized category '{category}' in image {image_name}.")
            continue

        class_id = category_mapping[category]

        # Check for 'box2d' key
        if 'box2d' not in label:
            skipped_labels += 1
            logging.warning(f"Skipping label: 'box2d' missing for image {image_name}.")
            continue

        x1, y1 = label['box2d']['x1'], label['box2d']['y1']
        x2, y2 = label['box2d']['x2'], label['box2d']['y2']

        x_center = ((x1 + x2) / 2) / image_width
        y_center = ((y1 + y2) / 2) / image_height
        width = (x2 - x1) / image_width
        height = (y2 - y1) / image_height

        label_lines.append(f"{class_id} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}")

    # Write to label file only if there are valid labels
    if label_lines:
        try:
            with open(label_file_path, 'w') as label_file:
                label_file.write("\n".join(label_lines) + "\n")
            successful_conversions += len(label_lines)
        except IOError as e:
            error_images += 1
            logging.error(f"Error writing label file {label_file_path}: {e}")
    else:
        logging.info(f"Skipping file creation for {image_name} (no valid labels).")

    logging.info(f"Processed image: {image_name}")

# Summary logging
completion_msg = f"Conversion complete! YOLOv5 labels saved in {output_label_folder}"
logging.info(completion_msg)

stats_msg = (
    f"Total processed images: {processed_images}\n"
    f"Total successful label entries: {successful_conversions}\n"
    f"Total skipped labels due to missing 'box2d': {skipped_labels}\n"
    f"Total unrecognized categories: {len(unrecognized_categories)}\n"
    f"Total missing images: {missing_images}\n"
    f"Total error images (unreadable): {error_images}"
)
logging.info(stats_msg)

# Log unrecognized categories in a separate file
if unrecognized_categories:
    unrecognized_msg = f"Unrecognized categories found: {', '.join(unrecognized_categories)}"
    logging.warning(unrecognized_msg)
    with open(os.path.join(output_label_folder, "unrecognized_categories.txt"), "w") as f:
        f.write("\n".join(sorted(unrecognized_categories)))


Converting to YOLO format: 100%|██████████| 69863/69863 [06:46<00:00, 171.81it/s]


In [4]:
import json
import os
from PIL import Image, UnidentifiedImageError
from tqdm import tqdm
import logging

# Setup logging
logging.basicConfig(filename='yolo_conversion_json_to_yolov5_val.log', level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')

# Define category mapping
category_mapping = {
    "person": 0,
    "pedestrian": 0,  # Merge pedestrian into person
    "rider": 1,
    "car": 2,
    "truck": 3,
    "bus": 4,
    "train": 5,
    "motor": 6,       # Motorcycle
    "motorcycle": 6,  # Merge motorcycle into motor
    "bike": 7,        # Bicycle
    "bicycle": 7,     # Merge bicycle into bike
    "traffic light": 8,
    "traffic sign": 9,
    "trailer": 10,
    "other person": 11,
    "other vehicle": 12
}

# Paths (Update these as needed)
bdd_annotations_path_val = r"C:\Users\sathish\Downloads\FL_ModelForAV\data\bdd100k\label_json\bdd100k_labels_images_val.json"
image_folder_path = r"C:\Users\sathish\Downloads\FL_ModelForAV\data\bdd100k\images\val"
output_label_folder = r"C:\Users\sathish\Downloads\FL_ModelForAV\data\bdd100k\labels\val"

# Create output folder
os.makedirs(output_label_folder, exist_ok=True)

# Load JSON annotations
try:
    with open(bdd_annotations_path_val, 'r') as file:
        annotations = json.load(file)
except (FileNotFoundError, json.JSONDecodeError) as e:
    logging.error(f"Error loading JSON file: {e}")
    raise SystemExit(f"Error loading JSON file: {e}")

# Track statistics
skipped_labels = 0
unrecognized_categories = set()
processed_images = 0
successful_conversions = 0
error_images = 0
missing_images = 0

logging.info("Starting YOLO conversion process for validation dataset...")

for annotation in tqdm(annotations, desc="Converting to YOLO format"):
    image_name = annotation['name']
    labels = annotation.get('labels', [])
    image_path = os.path.join(image_folder_path, image_name)

    if not os.path.exists(image_path):
        missing_images += 1
        logging.warning(f"Warning: Image {image_name} not found. Skipping.")
        continue

    try:
        with Image.open(image_path) as img:
            image_width, image_height = img.size
    except UnidentifiedImageError as e:
        error_images += 1
        logging.error(f"Error opening image {image_name}: {e}. Skipping.")
        continue

    processed_images += 1
    label_file_path = os.path.join(output_label_folder, os.path.splitext(image_name)[0] + '.txt')

    label_lines = []
    for label in labels:
        category = label.get('category', '').strip().lower()

        # Skip unrecognized categories but log them
        if category not in category_mapping:
            unrecognized_categories.add(category)
            logging.warning(f"Skipping unrecognized category '{category}' in image {image_name}.")
            continue

        class_id = category_mapping[category]

        # Check for 'box2d' key
        if 'box2d' not in label:
            skipped_labels += 1
            logging.warning(f"Skipping label: 'box2d' missing for image {image_name}.")
            continue

        x1, y1 = label['box2d']['x1'], label['box2d']['y1']
        x2, y2 = label['box2d']['x2'], label['box2d']['y2']

        x_center = ((x1 + x2) / 2) / image_width
        y_center = ((y1 + y2) / 2) / image_height
        width = (x2 - x1) / image_width
        height = (y2 - y1) / image_height

        label_lines.append(f"{class_id} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}")

    # Write to label file only if there are valid labels
    if label_lines:
        try:
            with open(label_file_path, 'w') as label_file:
                label_file.write("\n".join(label_lines) + "\n")
            successful_conversions += len(label_lines)
        except IOError as e:
            error_images += 1
            logging.error(f"Error writing label file {label_file_path}: {e}")
    else:
        logging.info(f"Skipping file creation for {image_name} (no valid labels).")

    logging.info(f"Processed image: {image_name}")

# Summary logging
completion_msg = f"Validation dataset conversion complete! YOLOv5 labels saved in {output_label_folder}"
logging.info(completion_msg)

stats_msg = (
    f"Total processed images: {processed_images}\n"
    f"Total successful label entries: {successful_conversions}\n"
    f"Total skipped labels due to missing 'box2d': {skipped_labels}\n"
    f"Total unrecognized categories: {len(unrecognized_categories)}\n"
    f"Total missing images: {missing_images}\n"
    f"Total error images (unreadable): {error_images}"
)
logging.info(stats_msg)

# Log unrecognized categories in a separate file
if unrecognized_categories:
    unrecognized_msg = f"Unrecognized categories found: {', '.join(unrecognized_categories)}"
    logging.warning(unrecognized_msg)
    with open(os.path.join(output_label_folder, "unrecognized_categories.txt"), "w") as f:
        f.write("\n".join(sorted(unrecognized_categories)))


Converting to YOLO format: 100%|██████████| 10000/10000 [00:19<00:00, 514.44it/s]


In [9]:
import os
import shutil
import yaml
import logging
import random
from concurrent.futures import ThreadPoolExecutor

# ======================= SETUP LOGGING ===========================
def setup_logging():
    """Initializes logging for tracking dataset processing."""
    log_dir = "logs"
    os.makedirs(log_dir, exist_ok=True)
    log_file = os.path.join(log_dir, "process.log")
    logging.basicConfig(
        level=logging.DEBUG,
        format="%(asctime)s - %(levelname)s - %(message)s",
        filename=log_file,
        filemode="w"
    )
    print("Logging initialized. Check logs/process.log for details.")
    logging.info("Logging setup complete.")

setup_logging()

# ======================= CATEGORY MAPPING ========================
CATEGORY_MAPPING = {
    "person": 0, "rider": 1, "car": 2, "truck": 3, "bus": 4,
    "train": 5, "motorcycle": 6, "bicycle": 7, "traffic light": 8, 
    "traffic sign": 9, "trailer": 10, "other person": 11, "other vehicle": 12
}

CATEGORY_NAMES = [name for name, index in sorted(CATEGORY_MAPPING.items(), key=lambda item: item[1])]

# ======================= INITIALIZE DATASET =======================
def initialize_dataset_directory(dataset_path):
    """Ensures the dataset directory has the necessary structure and files."""
    logging.info(f"Initializing dataset directory: {dataset_path}")
    
    os.makedirs(dataset_path, exist_ok=True)

    # Define necessary subdirectories
    subdirs = ["images/train", "images/val", "images/test", "labels/train", "labels/val", "labels/test"]
    for subdir in subdirs:
        os.makedirs(os.path.join(dataset_path, subdir), exist_ok=True)

    # Create/update data.yaml
    data_yaml_path = os.path.join(dataset_path, "data.yaml")
    data_yaml_content = {
        "path": os.path.abspath(dataset_path),
        "train": "images/train",
        "val": "images/val",
        "test": "images/test",
        "nc": len(CATEGORY_NAMES),
        "names": CATEGORY_NAMES
    }
    with open(data_yaml_path, "w", encoding="utf-8") as f:
        yaml.dump(data_yaml_content, f, default_flow_style=False, sort_keys=False)

    # Create empty train.txt, val.txt, test.txt if they don’t exist
    for split in ["train", "val", "test"]:
        split_file = os.path.join(dataset_path, f"{split}.txt")
        if not os.path.exists(split_file):
            with open(split_file, "w", encoding="utf-8") as f:
                f.write("")  # Create empty file
            logging.info(f"Created empty {split}.txt in {dataset_path}")

    logging.info(f"Dataset directory structure initialized in {dataset_path}.")

# ======================= CREATE MINI DATASET =======================
def copy_files(source_file, dest_file):
    """Copies a file with error handling."""
    try:
        shutil.copy2(source_file, dest_file)
        return True
    except Exception as e:
        logging.error(f"Error copying {source_file} to {dest_file}: {e}")
        return False

def create_mini_dataset(source_root, dest_root, num_images=10, num_batches=3, max_threads=5):
    """Creates a mini dataset by randomly selecting images and copying labels using multithreading."""
    if not os.path.exists(source_root):
        logging.error(f"Source directory '{source_root}' does not exist.")
        return

    os.makedirs(dest_root, exist_ok=True)

    for batch_num in range(1, num_batches + 1):
        source_batch = os.path.join(source_root, f"batch_{batch_num}")
        dest_batch = os.path.join(dest_root, f"batch_{batch_num}")

        if not os.path.exists(source_batch):
            logging.warning(f"Skipping batch {batch_num}, missing: {source_batch}")
            continue

        os.makedirs(dest_batch, exist_ok=True)

        for split in ["train", "val", "test"]:
            source_images_dir = os.path.join(source_batch, split, "images")
            source_labels_dir = os.path.join(source_batch, split, "labels")
            dest_images_dir = os.path.join(dest_batch, split, "images")
            dest_labels_dir = os.path.join(dest_batch, split, "labels")

            if not os.path.exists(source_images_dir):
                logging.warning(f"Skipping {split} in batch {batch_num}, missing: {source_images_dir}")
                continue

            os.makedirs(dest_images_dir, exist_ok=True)
            os.makedirs(dest_labels_dir, exist_ok=True)

            image_files = [f for f in os.listdir(source_images_dir) if f.endswith(('.jpg', '.png'))]
            if not image_files:
                logging.warning(f"No images found in {source_images_dir}, skipping...")
                continue

            selected_images = random.sample(image_files, min(num_images, len(image_files)))

            # Use multithreading to speed up file copying
            with ThreadPoolExecutor(max_threads) as executor:
                for image in selected_images:
                    src_img = os.path.join(source_images_dir, image)
                    dest_img = os.path.join(dest_images_dir, image)
                    executor.submit(copy_files, src_img, dest_img)

                    label_name = os.path.splitext(image)[0] + ".txt"
                    src_label = os.path.join(source_labels_dir, label_name)
                    dest_label = os.path.join(dest_labels_dir, label_name)

                    if os.path.exists(src_label):
                        executor.submit(copy_files, src_label, dest_label)

        # Copy `data.yaml` if it exists
        source_yaml = os.path.join(source_batch, "data.yaml")
        dest_yaml = os.path.join(dest_batch, "data.yaml")
        if os.path.exists(source_yaml):
            copy_files(source_yaml, dest_yaml)

    logging.info(f"Mini dataset created successfully at {dest_root}")

# ======================= PROCESS BATCHES =======================
def process_batches(batch_root):
    """Main function to process dataset batches."""
    logging.info(f"Starting batch processing in {batch_root}...")

    if not os.path.exists(batch_root):
        logging.error(f"Batch root directory {batch_root} does not exist.")
        return
    
    for batch in os.listdir(batch_root):
        batch_path = os.path.join(batch_root, batch)
        if os.path.isdir(batch_path):
            logging.info(f"Processing batch: {batch_path}")
            initialize_dataset_directory(batch_path)

    logging.info("Batch processing completed successfully.")
    print("Batch processing completed. Check logs/process.log for details.")

# ======================= MAIN EXECUTION =======================
if __name__ == "__main__":
    dataset_path = r"C:\Users\sathish\Downloads\FL_ModelForAV\data\bdd100k"
    mini_dataset_path = r"C:\Users\sathish\Downloads\FL_ModelForAV\my-project\data\bdd100_mini"

    # Initialize dataset structure
    initialize_dataset_directory(dataset_path)

    # Process existing batches
    process_batches(dataset_path)

    # Create mini dataset from original
    create_mini_dataset(dataset_path, mini_dataset_path, num_images=10, num_batches=10, max_threads=10)


Logging initialized. Check logs/process.log for details.
Batch processing completed. Check logs/process.log for details.
