In [25]:
# Data Harmonizer - Pydantic Models and Configuration Loading
# This notebook demonstrates how to load and work with data harmonization configurations

from pydantic import BaseModel
from typing import List
import yaml


In [26]:

class FieldMapping(BaseModel):
    """Represents a single field mapping between source and target."""
    source_field: str
    target_field: str


class DataMapping(BaseModel):
    """Represents a complete data mapping configuration."""
    origin: str
    fields_map: List[FieldMapping]


In [27]:
def load_config_mappings(config_path: str) -> dict:
    """
    Load configuration from YAML file and convert to dict with target_classes, target_dataset_path and datasets.
    
    Args:
        config_path: Path to the YAML configuration file
        
    Returns:
        Dict with target_classes, target_dataset_path and datasets fields
    """
    with open(config_path, 'r') as file:
        config_data = yaml.safe_load(file)
    
    # Convert datasets to Pydantic objects
    datasets = []
    for mapping_config in config_data["datasets"]:
        # Convert each mapping to Pydantic objects
        field_mappings = [
            FieldMapping(
                source_field=field["source_field"],
                target_field=field["target_field"]
            )
            for field in mapping_config["fields_map"]
        ]
        
        data_mapping = DataMapping(
            origin=mapping_config["origin"],
            fields_map=field_mappings
        )
        datasets.append(data_mapping)
    
    return {
        "target_classes": config_data["target_classes"],
        "target_dataset_path": config_data["target_dataset_path"],
        "datasets": datasets,
        "target_size": config_data["target_size"]
    }

In [28]:
import os
def generate_internal_structure(config, step_path):
    if not os.path.exists(config["target_dataset_path"] + f"/{step_path}" + "/images"):
        os.makedirs(config["target_dataset_path"] + f"/{step_path}" + "/images")
    if not os.path.exists(config["target_dataset_path"] + f"/{step_path}" + "/labels"):
        os.makedirs(config["target_dataset_path"] + f"/{step_path}" + "/labels")

def generate_target_dataset_path(config):
    if not os.path.exists(config["target_dataset_path"]):
        os.makedirs(config["target_dataset_path"])
    generate_internal_structure(config, "train")
    generate_internal_structure(config, "valid")
    generate_internal_structure(config, "test")
    
    return config["target_dataset_path"]

In [29]:
def generate_target_yolo_data_file(config):
    import yaml
    import os
    
    data_yaml_path = os.path.join(config["target_dataset_path"], "data.yaml")
    names = f"[{', '.join(config["target_classes"])}]"
    print(names)
    # Create the YOLO data structure
    yolo_data = {
        'train': '../train/images',
        'val': '../valid/images', 
        'test': '../test/images',
        'nc': len(config["target_classes"]),
        'names': names
    }
    
    if not os.path.exists(data_yaml_path):
        with open(data_yaml_path, "w") as file:
            yaml.dump(yolo_data, file)
    
    return data_yaml_path

In [30]:
def filter_segmentation_labels(target_dataset_path: str):
    """
    Filter out segmentation labels and keep only bounding box labels.
    In YOLO format:
    - Bounding box: <class_id> <x_center> <y_center> <width> <height> (5 items)
    - Segmentation: <class_id> <x1> <y1> <x2> <y2> ... (more than 5 items)
    
    Args:
        target_dataset_path: Path to the target dataset directory
    """
    import os
    
    split_names = ['train', 'valid', 'test']
    
    for split in split_names:
        labels_dir = os.path.join(target_dataset_path, split, "labels")
        images_dir = os.path.join(target_dataset_path, split, "images")
        
        if not os.path.exists(labels_dir):
            continue
        
        files_to_remove = []
        
        # Process each label file
        for filename in os.listdir(labels_dir):
            if not filename.endswith('.txt'):
                continue
                
            label_file = os.path.join(labels_dir, filename)
            
            with open(label_file, 'r') as f:
                lines = f.readlines()
            
            # Filter lines to keep only bounding boxes (exactly 5 items)
            filtered_lines = []
            for line in lines:
                line = line.strip()
                if not line:
                    continue
                parts = line.split()
                # Keep only lines with exactly 5 items (bounding boxes)
                if len(parts) == 5:
                    filtered_lines.append(line)
            
            # Update the label file
            if filtered_lines:
                with open(label_file, 'w') as f:
                    f.write('\n'.join(filtered_lines))
                print(f"✅ Filtered {filename}: kept {len(filtered_lines)} bounding boxes (removed {len(lines) - len(filtered_lines)} segmentation lines)")
            else:
                # Mark for removal if no valid labels remain
                files_to_remove.append(filename)
                print(f"🗑️  Marking for removal: {filename} (only segmentation labels found)")
        
        # Remove images and labels with no valid bounding boxes
        if files_to_remove:
            for filename in files_to_remove:
                # Remove label file
                label_file = os.path.join(labels_dir, filename)
                if os.path.exists(label_file):
                    os.remove(label_file)
                    print(f"🗑️  Removed label: {filename}")
                
                # Remove corresponding image
                image_name = filename[:-4]  # Remove .txt extension
                for ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']:
                    image_file = os.path.join(images_dir, image_name + ext)
                    if os.path.exists(image_file):
                        os.remove(image_file)
                        print(f"🗑️  Removed image: {image_name + ext}")
                        break


In [31]:
from typing import Dict

def extract_origin_classes(dataset: DataMapping):
    with open(dataset.origin + "/data.yaml", 'r') as file:
        classes = yaml.safe_load(file)
    return classes["names"]

def map_classes(config_classes: List[FieldMapping], target_classes: List[str], origin_classes_list: List[str]) -> Dict[int, int]:
    """
     THe result is a dict {idx_origin_class: idx_target_class}
     The idea is that this mapping will be used to map the classes of the origin dataset to the target dataset
    """
    origin_classes_indexed = {origin_classes_list[i]: i for i in range(len(origin_classes_list))}
    target_classes_indexed = {target_classes[i]: i for i in range(len(target_classes))}

    
    result = {}
    for config_class in config_classes:
        result[origin_classes_indexed[config_class.source_field]] = target_classes_indexed[config_class.target_field]
    return result

def copy_and_resize_images(origin_path: str, target_dataset_path: str, target_size: int):
    """
    Copy and resize images from origin to target dataset.
    
    Args:
        origin_path: Path to the origin dataset directory
        target_dataset_path: Path to the target dataset directory
        target_size: Size for the square output (e.g., 640 for 640x640)
    """
    import os
    import cv2
    
    # Define source and target image directories
    source_images_dir = os.path.join(origin_path, "images")
    target_images_dir = os.path.join(target_dataset_path, "images")
    
    # Check if source images directory exists
    if not os.path.exists(source_images_dir):
        print(f"❌ Source images directory not found: {source_images_dir}")
        return False
    
    # Create target images directory if it doesn't exist
    os.makedirs(target_images_dir, exist_ok=True)
    
    # Get list of image files
    image_files = [f for f in os.listdir(source_images_dir) 
                   if f.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".tiff"))]
    
    if not image_files:
        print(f"❌ No image files found in {source_images_dir}")
        return False
    
    print(f"🔄 Processing {len(image_files)} images...")
    
    # Define the new size as a square
    new_size = (target_size, target_size)
    
    processed_count = 0
    try:
        for filename in image_files:
            source_file = os.path.join(source_images_dir, filename)
            target_file = os.path.join(target_images_dir, filename)
            
            # Only process files (not directories)
            if os.path.isfile(source_file):
                # Read the image
                img = cv2.imread(source_file)
                
                if img is None:
                    print(f"⚠️  Could not read image: {filename}")
                    continue
                
                # Resize the image
                resized = cv2.resize(img, new_size)
                
                # Save the resized image
                cv2.imwrite(target_file, resized)
                
                processed_count += 1
                # print(f"✅ Processed: {filename}")
        
        print(f"✅ Successfully processed {processed_count}/{len(image_files)} images to {target_size}x{target_size}")
        return processed_count > 0
        
    except Exception as e:
        print(f"❌ Error processing images: {str(e)}")
        return False

def transform_and_copy_labels(origin_path: str, target_dataset_path: str, classes: Dict[int, int]):
    """
    Transform and copy label files from origin to target dataset.
    
    Args:
        origin_path: Path to the origin dataset directory
        target_dataset_path: Path to the target dataset directory  
        classes: Dictionary mapping origin class indices to target class indices
        
    Returns:
        List of filenames (without extension) that had 0 transformed lines
    """
    import os
    
    # Define source and target label directories
    source_labels_dir = os.path.join(origin_path, "labels")
    target_labels_dir = os.path.join(target_dataset_path, "labels")
    
    # Check if source labels directory exists
    if not os.path.exists(source_labels_dir):
        print(f"❌ Source labels directory not found: {source_labels_dir}")
        return []
    
    # Create target labels directory if it doesn't exist
    os.makedirs(target_labels_dir, exist_ok=True)
    
    # Track files with 0 transformed lines
    empty_files = []
    
    try:
        # Process each label file in the source directory
        for filename in os.listdir(source_labels_dir):
            source_file = os.path.join(source_labels_dir, filename)
            
            # Only process text files
            if os.path.isfile(source_file) and filename.endswith('.txt'):
                target_file = os.path.join(target_labels_dir, filename)
                
                # Read and transform the label file
                transformed_lines = []
                
                with open(source_file, 'r') as f:
                    for line in f:
                        line = line.strip()
                        if not line:  # Skip empty lines
                            continue
                            
                        # Split the line into parts
                        parts = line.split()
                        if len(parts) < 5:  # YOLO format requires at least class + 4 coordinates
                            continue
                            
                        try:
                            # Extract the class number (first value)
                            origin_class = int(parts[0])
                            
                            # Check if this class is in our mapping
                            if origin_class in classes:
                                # Map to target class
                                target_class = classes[origin_class]
                                
                                # Create new line with mapped class and same coordinates
                                new_line = f"{target_class} {' '.join(parts[1:])}"
                                transformed_lines.append(new_line)
                            # If class not in mapping, drop the line (as requested)
                            
                        except ValueError:
                            print(f"❌ Error transforming labels: {source_file}, line: {line}")
                            # Skip lines that don't have valid integer class
                            continue
                
                # Write the transformed content to target file
                with open(target_file, 'w') as f:
                    if len(transformed_lines) > 0:
                        f.write('\n'.join(transformed_lines))
                    else:
                        # Track files with 0 transformed lines
                        empty_files.append(filename[:-4])  # Remove .txt extension
                
                print(f"✅ Transformed: {filename} ({len(transformed_lines)} lines kept) from {origin_path}")
        
        print(f"✅ Successfully transformed all labels from {source_labels_dir} to {target_labels_dir}")
        return empty_files
        
    except Exception as e:
        print(f"❌ Error transforming labels: {str(e)}")
        return []

def remove_empty_images(target_dataset_path: str, empty_files: List[str]):
    """
    Remove images and labels corresponding to files that had 0 transformed lines.
    
    Args:
        target_dataset_path: Path to the target dataset directory
        empty_files: List of filenames (without extension) to remove
    """
    import os
    
    if not empty_files:
        return
    
    target_images_dir = os.path.join(target_dataset_path, "images")
    target_labels_dir = os.path.join(target_dataset_path, "labels")
    
    if not os.path.exists(target_images_dir):
        print(f"❌ Target images directory not found: {target_images_dir}")
        return
    
    if not os.path.exists(target_labels_dir):
        print(f"❌ Target labels directory not found: {target_labels_dir}")
        return
    
    removed_images = 0
    removed_labels = 0
    
    for filename in empty_files:
        # Remove corresponding image file
        for ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']:
            image_file = os.path.join(target_images_dir, filename + ext)
            if os.path.exists(image_file):
                try:
                    os.remove(image_file)
                    print(f"🗑️  Removed empty image: {filename + ext}")
                    removed_images += 1
                    break  # Found and removed, no need to try other extensions
                except Exception as e:
                    print(f"❌ Error removing image {filename + ext}: {str(e)}")
        
        # Remove corresponding label file
        label_file = os.path.join(target_labels_dir, filename + '.txt')
        if os.path.exists(label_file):
            try:
                os.remove(label_file)
                print(f"🗑️  Removed empty label: {filename + '.txt'}")
                removed_labels += 1
            except Exception as e:
                print(f"❌ Error removing label {filename + '.txt'}: {str(e)}")
    
    print(f"✅ Removed {removed_images} images and {removed_labels} labels with no useful content")

def process_dataset(dataset: str, classes: Dict[int, int], target_dataset_path: str, target_size: int):
    copy_and_resize_images(dataset, target_dataset_path, target_size)
    empty_files = transform_and_copy_labels(dataset, target_dataset_path, classes)
    print(f"Empty files: {empty_files}")
    remove_empty_images(target_dataset_path, empty_files)
    pass

def process_origin(dataset, target_classes, target_dataset_path, target_size):
    mapped_classes = map_classes(dataset.fields_map, target_classes, extract_origin_classes(dataset))
    process_dataset(dataset.origin + "/test", mapped_classes, target_dataset_path + "/test", target_size)
    # process_dataset(dataset.origin + "/valid", mapped_classes, target_dataset_path + "/valid", target_size)
    # process_dataset(dataset.origin + "/train", mapped_classes, target_dataset_path + "/train", target_size)


def process_origins(config):
    for dataset in config["datasets"]:
        print("--------------------------------")
        print(dataset)
        print(config["target_classes"])
        print(config["target_dataset_path"])
        print(f"Target size: {config["target_size"]}x{config["target_size"]}")
        process_origin(dataset, config["target_classes"], config["target_dataset_path"], config["target_size"])

In [32]:
# Load the configuration with the new structure
config = load_config_mappings("./config.yml")
generate_target_dataset_path(config)
generate_target_yolo_data_file(config)
process_origins(config)

# filter_segmentation_labels(config["target_dataset_path"])

[Healthy Container]
--------------------------------
origin='/home/emma/facultad/pps/datasets/containers/raw/container.v1i.yolov11' fields_map=[FieldMapping(source_field='0', target_field='Healthy Container')]
['Healthy Container']
/home/emma/facultad/pps/validacion/containers/dataset
Target size: 640x640
❌ Source images directory not found: /home/emma/facultad/pps/datasets/containers/raw/container.v1i.yolov11/test/images
❌ Source labels directory not found: /home/emma/facultad/pps/datasets/containers/raw/container.v1i.yolov11/test/labels
Empty files: []
--------------------------------
origin='/home/emma/facultad/pps/datasets/containers/raw/Shipping Containers.v4i.yolov11' fields_map=[FieldMapping(source_field='container', target_field='Healthy Container')]
['Healthy Container']
/home/emma/facultad/pps/validacion/containers/dataset
Target size: 640x640
🔄 Processing 9 images...
✅ Successfully processed 9/9 images to 640x640
✅ Transformed: Tips-and-tricks-on-how-to-park-a-yard-jockey-_m