# Preprocesamiento II

## 5. Data Augmentation

### 5.1 Creacion de los subsets

#### i. Creacion del JSON

In [38]:
import os
import json
import cv2
import albumentations as A
from pathlib import Path
from tqdm import tqdm
from copy import deepcopy

In [2]:
def create_subset_data_augmentation(
    image_folder_path,
    coco_json_path,
    output_image_folder_path,
    output_json_path,
    num_augmentations
):
    """
    Applies data augmentation to images based on COCO annotations and saves the augmented images and new annotations.

    Parameters:
        image_folder_path (str): Path to the original image folder.
        coco_json_path (str): Path to the original COCO JSON annotations.
        output_image_folder_path (str): Directory to save augmented images.
        output_json_path (str): Path to save the new COCO JSON file.
        num_augmentations (int): Number of augmentations to apply per image.
    """
    os.makedirs(output_image_folder_path, exist_ok=True)

    with open(coco_json_path, 'r', encoding='utf-8') as f:
        coco = json.load(f)

    transform = A.Compose([
        A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=10, p=0.9),
        A.RandomBrightnessContrast(brightness_limit=0.1, contrast_limit=0.1, p=0.5),
        A.MotionBlur(blur_limit=3, p=0.3)
    ], bbox_params=A.BboxParams(format='coco', label_fields=['category_ids']))

    new_images = []
    new_annotations = []
    new_image_id = max(img['id'] for img in coco['images']) + 1
    new_annotation_id = max(ann['id'] for ann in coco['annotations']) + 1

    image_id_to_anns = {}
    for ann in coco['annotations']:
        image_id_to_anns.setdefault(ann['image_id'], []).append(ann)

    for img_info in tqdm(coco['images'], desc="Augmenting images"):
        img_path = os.path.join(image_folder_path, img_info['file_name'])
        image = cv2.imread(img_path)
        if image is None:
            continue

        anns = image_id_to_anns.get(img_info['id'], [])
        bboxes = [ann['bbox'] for ann in anns]
        category_ids = [ann['category_id'] for ann in anns]

        if not bboxes:
            continue

        for i in range(1, num_augmentations + 1):
            transformed = transform(image=image, bboxes=bboxes, category_ids=category_ids)
            aug_file_name = f"{Path(img_info['file_name']).stem}_aug-{i}{Path(img_info['file_name']).suffix}"
            aug_path = os.path.join(output_image_folder_path, aug_file_name)

            cv2.imwrite(aug_path, transformed['image'])

            new_images.append({
                "id": new_image_id,
                "file_name": aug_file_name,
                "width": img_info['width'],
                "height": img_info['height']
            })

            for bbox, cat_id in zip(transformed['bboxes'], transformed['category_ids']):
                new_annotations.append({
                    "id": new_annotation_id,
                    "image_id": new_image_id,
                    "category_id": cat_id,
                    "bbox": bbox,
                    "iscrowd": 0,
                    "area": bbox[2] * bbox[3]
                })
                new_annotation_id += 1

            new_image_id += 1

    augmented_coco = {
        "images": new_images,
        "annotations": new_annotations,
        "categories": coco['categories']
    }

    # Create output folder for JSON if it doesn't exist
    os.makedirs(os.path.dirname(output_json_path), exist_ok=True)

    with open(output_json_path, 'w', encoding='utf-8') as f:
        json.dump(augmented_coco, f, indent=2)

    print("Data augmentation complete and saved.")


In [3]:
create_subset_data_augmentation(
    image_folder_path='merged-dataset/images',
    coco_json_path='merged-dataset/subsets/train.json',
    output_image_folder_path='merged-dataset/train_augmented/images',
    output_json_path='merged-dataset/train_augmented/train_augmented.json',
    num_augmentations=4
)

  original_init(self, **validated_kwargs)
Augmenting images: 100%|██████████| 1160/1160 [07:21<00:00,  2.63it/s]


Data augmentation complete and saved.


#### ii. Train unido

In [39]:
import os
import json
import shutil
from pathlib import Path
from tqdm import tqdm
import cv2
import albumentations as A

In [40]:
def create_subset_data_augmentation_unified(
    image_folder_path,
    coco_json_path,
    output_image_folder_path,
    output_json_path,
    num_augmentations
):
    os.makedirs(output_image_folder_path, exist_ok=True)

    with open(coco_json_path, 'r', encoding='utf-8') as f:
        coco = json.load(f)

    transform = A.Compose([
        A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=10, p=0.9),
        A.RandomBrightnessContrast(brightness_limit=0.1, contrast_limit=0.1, p=0.5),
        A.MotionBlur(blur_limit=3, p=0.3)
    ], bbox_params=A.BboxParams(format='yolo', label_fields=['category_ids']))

    new_images = []
    new_annotations = []
    new_image_id = max(img['id'] for img in coco['images']) + 1
    new_annotation_id = max(ann['id'] for ann in coco['annotations']) + 1

    image_id_to_anns = {}
    for ann in coco['annotations']:
        image_id_to_anns.setdefault(ann['image_id'], []).append(ann)

    # Save original images and annotations
    for img_info in tqdm(coco['images'], desc="Copying original images and annotations"):
        orig_img_path = os.path.join(image_folder_path, img_info['file_name'])
        dest_img_path = os.path.join(output_image_folder_path, img_info['file_name'])

        if not os.path.exists(dest_img_path):
            shutil.copy(orig_img_path, dest_img_path)

        new_images.append({
            "id": img_info['id'],
            "file_name": img_info['file_name'],
            "width": img_info['width'],
            "height": img_info['height']
        })

        for ann in image_id_to_anns.get(img_info['id'], []):
            new_annotations.append({
                "id": ann['id'],
                "image_id": ann['image_id'],
                "category_id": ann['category_id'],
                "bbox": ann['bbox'],
                "iscrowd": ann.get('iscrowd', 0),
                "area": ann['bbox'][2] * ann['bbox'][3]
            })

    # Augment
    for img_info in tqdm(coco['images'], desc="Applying augmentations"):
        img_path = os.path.join(image_folder_path, img_info['file_name'])
        image = cv2.imread(img_path)
        if image is None:
            continue

        anns = image_id_to_anns.get(img_info['id'], [])
        if not anns:
            continue

        img_w, img_h = img_info['width'], img_info['height']

        # Convert COCO [x, y, w, h] to YOLO normalized format
        yolo_bboxes = []
        category_ids = []
        for ann in anns:
            x, y, w, h = ann['bbox']
            x_center = (x + w / 2) / img_w
            y_center = (y + h / 2) / img_h
            w_norm = w / img_w
            h_norm = h / img_h

            def clamp01(v): return max(0.0, min(1.0, v))

            yolo_bboxes.append([
                clamp01(x_center),
                clamp01(y_center),
                clamp01(w_norm),
                clamp01(h_norm)
            ])
            category_ids.append(ann['category_id'])

        for i in range(1, num_augmentations + 1):
            try:
                transformed = transform(image=image, bboxes=yolo_bboxes, category_ids=category_ids)
            except Exception as e:
                print(f"Skipping augmentation for {img_info['file_name']} due to error: {e}")
                continue

            aug_file_name = f"{Path(img_info['file_name']).stem}_aug-{i}{Path(img_info['file_name']).suffix}"
            aug_path = os.path.join(output_image_folder_path, aug_file_name)
            cv2.imwrite(aug_path, transformed['image'])

            new_images.append({
                "id": new_image_id,
                "file_name": aug_file_name,
                "width": img_info['width'],
                "height": img_info['height']
            })

            for bbox, cat_id in zip(transformed['bboxes'], transformed['category_ids']):
                x_center, y_center, w_norm, h_norm = bbox

                w = w_norm * img_w
                h = h_norm * img_h
                x = x_center * img_w - w / 2
                y = y_center * img_h - h / 2

                # Clamp bbox to image bounds
                x = max(0, min(x, img_w - 1))
                y = max(0, min(y, img_h - 1))
                w = max(1, min(w, img_w - x))
                h = max(1, min(h, img_h - y))

                new_annotations.append({
                    "id": new_annotation_id,
                    "image_id": new_image_id,
                    "category_id": cat_id,
                    "bbox": [x, y, w, h],
                    "iscrowd": 0,
                    "area": w * h
                })
                new_annotation_id += 1

            new_image_id += 1

    final_coco = {
        "images": new_images,
        "annotations": new_annotations,
        "categories": coco['categories']
    }

    os.makedirs(os.path.dirname(output_json_path), exist_ok=True)
    with open(output_json_path, 'w', encoding='utf-8') as f:
        json.dump(final_coco, f, indent=2)

    print("Original and augmented images with annotations saved.")


In [41]:
create_subset_data_augmentation_unified(
    image_folder_path='merged-dataset/redim/train',
    coco_json_path='merged-dataset/redim/train.json',
    output_image_folder_path='merged-dataset/redim/train_augmented',
    output_json_path='merged-dataset/redim/train_augmented.json',
    num_augmentations=4
)

Copying original images and annotations: 100%|██████████| 1160/1160 [00:56<00:00, 20.68it/s]
Applying augmentations: 100%|██████████| 1160/1160 [06:40<00:00,  2.90it/s]


Original and augmented images with annotations saved.


#### ii. Agregacion de imagenes

In [15]:
import os
import json
import cv2
from tqdm import tqdm
from pathlib import Path
import albumentations as A

def create_subset_data_augmentation(
    image_folder_path,
    coco_json_path,
    output_image_folder_path,
    output_json_path,
    num_augmentations
):
    """
    Applies data augmentation to new images and appends new annotations to an existing COCO JSON.

    Parameters:
        image_folder_path (str): Path to the original image folder.
        coco_json_path (str): Path to the original COCO JSON annotations.
        output_image_folder_path (str): Directory to save augmented images.
        output_json_path (str): Path to save the COCO JSON file (appended, not overwritten).
        num_augmentations (int): Number of augmentations to apply per image.
    """
    os.makedirs(output_image_folder_path, exist_ok=True)

    # Load original COCO annotations
    with open(coco_json_path, 'r', encoding='utf-8') as f:
        original_coco = json.load(f)

    # Load existing augmented JSON if exists
    if os.path.exists(output_json_path):
        with open(output_json_path, 'r', encoding='utf-8') as f:
            output_coco = json.load(f)
        existing_augmented_names = {img['file_name'] for img in output_coco['images']}
        new_images = output_coco['images']
        new_annotations = output_coco['annotations']
        new_image_id = max(img['id'] for img in new_images) + 1
        new_annotation_id = max(ann['id'] for ann in new_annotations) + 1
    else:
        existing_augmented_names = set()
        new_images = []
        new_annotations = []
        new_image_id = max(img['id'] for img in original_coco['images']) + 1
        new_annotation_id = max(ann['id'] for ann in original_coco['annotations']) + 1

    transform = A.Compose([
        A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=10, p=0.9),
        A.RandomBrightnessContrast(brightness_limit=0.1, contrast_limit=0.1, p=0.5),
        A.MotionBlur(blur_limit=3, p=0.3)
    ], bbox_params=A.BboxParams(format='coco', label_fields=['category_ids']))

    image_id_to_anns = {}
    for ann in original_coco['annotations']:
        image_id_to_anns.setdefault(ann['image_id'], []).append(ann)

    for img_info in tqdm(original_coco['images'], desc="Checking images for augmentation"):
        img_path = os.path.join(image_folder_path, img_info['file_name'])
        image = cv2.imread(img_path)
        if image is None:
            continue

        anns = image_id_to_anns.get(img_info['id'], [])
        bboxes = [ann['bbox'] for ann in anns]
        category_ids = [ann['category_id'] for ann in anns]

        if not bboxes:
            continue

        base_name = Path(img_info['file_name']).stem
        suffix = Path(img_info['file_name']).suffix

        for i in range(1, num_augmentations + 1):
            aug_file_name = f"{base_name}_aug-{i}{suffix}"
            if aug_file_name in existing_augmented_names:
                continue  # Skip already existing augmentations

            transformed = transform(image=image, bboxes=bboxes, category_ids=category_ids)
            aug_path = os.path.join(output_image_folder_path, aug_file_name)
            cv2.imwrite(aug_path, transformed['image'])

            new_images.append({
                "id": new_image_id,
                "file_name": aug_file_name,
                "width": img_info['width'],
                "height": img_info['height']
            })

            for bbox, cat_id in zip(transformed['bboxes'], transformed['category_ids']):
                new_annotations.append({
                    "id": new_annotation_id,
                    "image_id": new_image_id,
                    "category_id": cat_id,
                    "bbox": bbox,
                    "iscrowd": 0,
                    "area": bbox[2] * bbox[3]
                })
                new_annotation_id += 1

            new_image_id += 1

    final_coco = {
        "images": new_images,
        "annotations": new_annotations,
        "categories": original_coco['categories']
    }

    os.makedirs(os.path.dirname(output_json_path), exist_ok=True)
    with open(output_json_path, 'w', encoding='utf-8') as f:
        json.dump(final_coco, f, indent=2)

    print("Data augmentation complete. New annotations appended.")


In [16]:
create_subset_data_augmentation(
    image_folder_path='merged-dataset/missing_images',
    coco_json_path='merged-dataset/subsets/train.json',
    output_image_folder_path='merged-dataset/train_augmented/images',
    output_json_path='merged-dataset/train_augmented/train_augmented.json',
    num_augmentations=4
)

  original_init(self, **validated_kwargs)
Checking images for augmentation: 100%|██████████| 1160/1160 [00:00<00:00, 1287.73it/s]


Data augmentation complete. New annotations appended.


### 5.2 Verificacion de BBOX

In [22]:
import os
from PIL import Image, ImageDraw
from pycocotools.coco import COCO
import random

In [23]:
def save_coco_bboxes_to_images(image_dir, annotation_path, output_dir='output', max_images=None):
    """
    Save images with bounding boxes drawn from COCO annotations.

    Parameters:
        image_dir (str): Directory containing the images.
        annotation_path (str): Full path to the COCO annotation file.
        output_dir (str): Directory to save output images with drawn bounding boxes.
        max_images (int, optional): If provided, randomly selects up to this number of images (without repetition).
    """
    os.makedirs(output_dir, exist_ok=True)
    coco = COCO(annotation_path)
    image_ids = coco.getImgIds()

    if max_images is not None and max_images < len(image_ids):
        image_ids = random.sample(image_ids, max_images)

    for img_id in image_ids:
        img_data = coco.loadImgs(img_id)[0]
        img_path = os.path.join(image_dir, img_data['file_name'])
        output_path = os.path.join(output_dir, img_data['file_name'])

        if not os.path.exists(img_path):
            print(f"[WARNING] Image file not found: {img_path}")
            continue

        image = Image.open(img_path).convert("RGB")
        draw = ImageDraw.Draw(image)

        ann_ids = coco.getAnnIds(imgIds=img_id)
        anns = coco.loadAnns(ann_ids)

        for ann in anns:
            x, y, w, h = ann['bbox']
            draw.rectangle([x, y, x + w, y + h], outline='red', width=3)

        image.save(output_path)


In [6]:
save_coco_bboxes_to_images(
    image_dir='merged-dataset/train_augmented/images',
    annotation_path='merged-dataset/train_augmented/train_augmented.json',
    output_dir='merged-dataset/train_augmented/images_bbox_test',
    max_images=10
)


loading annotations into memory...
Done (t=0.02s)
creating index...
index created!


In [44]:
save_coco_bboxes_to_images(
    image_dir='merged-dataset/redim/train_augmented',
    annotation_path='merged-dataset/redim/train_augmented.json',
    output_dir='merged-dataset/redim/train_augmented_bbox_test',
    max_images=40
)


loading annotations into memory...
Done (t=0.02s)
creating index...
index created!


### 5.3 Cantidad de Imagenes COCO

In [3]:
import json

def count_images_in_coco(coco_json_path):
    """
    Reads a COCO-format JSON file and prints the total number of images.

    Parameters:
        coco_json_path (str): Path to the COCO JSON annotation file.
    """
    with open(coco_json_path, 'r', encoding='utf-8') as f:
        coco_data = json.load(f)

    num_images = len(coco_data.get("images", []))
    print(f"Total number of images: {num_images}")


In [4]:
count_images_in_coco('merged-dataset/subsets/train.json')

Total number of images: 1160


In [6]:
count_images_in_coco('merged-dataset/redim/test.json')

Total number of images: 185


### 5.4 Cantidad e Imagenes Path

In [8]:
import os

def count_images_in_folder(folder_path, recursive=False):
    """
    Counts and displays the total number of image files in a folder.

    Parameters:
        folder_path (str): Path to the folder.
        recursive (bool): If True, searches subdirectories as well.
    """
    image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tif', '.tiff', '.webp', '.gif'}
    count = 0

    if recursive:
        for root, _, files in os.walk(folder_path):
            for file in files:
                if os.path.splitext(file)[1].lower() in image_extensions:
                    count += 1
    else:
        for file in os.listdir(folder_path):
            if os.path.isfile(os.path.join(folder_path, file)) and \
               os.path.splitext(file)[1].lower() in image_extensions:
                count += 1

    print(f"Total image files in folder: {count}")


In [18]:
count_images_in_folder('merged-dataset/train')

Total image files in folder: 1160


In [34]:
count_images_in_folder('merged-dataset/redim/train')

Total image files in folder: 1160


In [36]:
count_images_in_folder('merged-dataset/redim/test')

Total image files in folder: 185


In [43]:
count_images_in_folder('merged-dataset/redim/train_augmented')

Total image files in folder: 5800


### 5.5 Verificar aumento de datos

In [1]:
import os

def check_augmented_images(original_path, augmented_path):
    """
    Compares original and augmented images to find:
    1. Original images missing some of the 4 required augmented versions.
    2. Augmented images with no matching original.

    Parameters:
        original_path (str): Path to folder containing original images.
        augmented_path (str): Path to folder containing augmented images.
    """
    # Get original base names without extensions
    original_bases = {
        os.path.splitext(f)[0] for f in os.listdir(original_path)
        if os.path.isfile(os.path.join(original_path, f))
    }

    # Mapping from original base name to found suffixes
    aug_suffixes = {"_aug-1", "_aug-2", "_aug-3", "_aug-4"}
    aug_tracking = {base: set() for base in original_bases}
    unmatched_augmented = []

    for f in os.listdir(augmented_path):
        name, _ = os.path.splitext(f)
        matched = False
        for suffix in aug_suffixes:
            if name.endswith(suffix):
                base_name = name.replace(suffix, "")
                if base_name in original_bases:
                    aug_tracking[base_name].add(suffix)
                    matched = True
                break
        if not matched:
            unmatched_augmented.append(f)

    print("Images missing augmented versions:")
    for base, suffixes in aug_tracking.items():
        missing = aug_suffixes - suffixes
        if missing:
            print(f"- {base}: missing {sorted(missing)}")

    print("\nAugmented images with no matching original:")
    for aug in unmatched_augmented:
        print(f"- {aug}")


In [2]:
check_augmented_images("merged-dataset/train", "merged-dataset/train_augmented/images")

Images missing augmented versions:

Augmented images with no matching original:
