In [8]:
%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

Looking in indexes: https://download.pytorch.org/whl/cu121
Note: you may need to restart the kernel to use updated packages.


In [9]:
import cv2
import numpy as np
import albumentations as A
from albumentations import Compose, HorizontalFlip, RandomRotate90, GaussianBlur, Normalize
from albumentations.pytorch import ToTensorV2
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import os.path
from PIL import Image, ImageDraw
from sahi.utils.file import load_json, save_json
from tqdm import tqdm
import timm
import random

import torchvision
import torch


In [10]:
torch.cuda.is_available()
torch.cuda.current_device()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [11]:
coco_file_name  = "cassette1_val"

# Determine target split from coco_file_name
target_split = None
if "train" in coco_file_name.lower():
    target_split = "train"
elif "test" in coco_file_name.lower():
    target_split = "test"
elif "val" in coco_file_name.lower():
    target_split = "val"
if not target_split:
    raise ValueError("Unable to determine target split from coco_file_name.")

print(target_split)

DATA_DIR = os.path.join("..", "data")
COCO_DIR = os.path.join(DATA_DIR, "coco")

ORG_ANNOTATION_PATH = os.path.join(DATA_DIR, "coco",target_split, f"{coco_file_name}_corrected_coco.json")
SLC_ANNOTATION_PATH = os.path.join(DATA_DIR, "coco",target_split, f"{coco_file_name}_sliced_coco.json")

AUGMENTATION_PATH = os.path.join(COCO_DIR, "augmentated") # Folder for all augmentations ./data/coco/augmentated

print(ORG_ANNOTATION_PATH)

#IMAGE_DIR = os.path.join(COCO_DIR, "images")
IMAGE_DIR = os.path.join(COCO_DIR, "images")
NEW_IMAGE_DIR = os.path.join(COCO_DIR, "images", "png") #for saving new original images (untouched)
SLICED_IMAGE_DIR = os.path.join(COCO_DIR, "images_sliced",coco_file_name)

BBOX_VISUALIZATION_DIR = os.path.join(DATA_DIR, "bbox_vis", coco_file_name)
BBOX_SAVE_DIR = os.path.join(BBOX_VISUALIZATION_DIR,"each") #Place to savea each of the before augmented bounding boxes

os.path.exists(DATA_DIR)
os.path.exists(COCO_DIR)

os.path.exists(ORG_ANNOTATION_PATH)
os.path.exists(IMAGE_DIR)
os.path.exists(BBOX_VISUALIZATION_DIR)

os.makedirs(BBOX_SAVE_DIR, exist_ok=True)
os.makedirs(NEW_IMAGE_DIR, exist_ok=True)
os.makedirs(AUGMENTATION_PATH,exist_ok=True)

val
..\data\coco\val\cassette1_val_corrected_coco.json


In [12]:
coco_dict = load_json(ORG_ANNOTATION_PATH)
[img.update({"file_name": img["file_name"].split("/")[-1]}) for img in coco_dict["images"]]
save_json(coco_dict, save_path=ORG_ANNOTATION_PATH)

coco_dict

{'images': [{'width': 4096,
   'height': 2000,
   'id': 4,
   'file_name': '01BN02.bmp'},
  {'width': 4096, 'height': 2000, 'id': 25, 'file_name': '01FW01.bmp'}],
 'annotations': [{'id': 98,
   'image_id': 4,
   'category_id': 5,
   'segmentation': [],
   'bbox': [2214.833880112831,
    223.5076988929449,
    21.737717054118985,
    45.54569287529642],
   'ignore': 0,
   'iscrowd': 0,
   'area': 990.0593847569965},
  {'id': 99,
   'image_id': 4,
   'category_id': 5,
   'segmentation': [],
   'bbox': [2729.2931837269757,
    203.84024060588504,
    28.983622738824923,
    38.29978719059021],
   'ignore': 0,
   'iscrowd': 0,
   'area': 1110.066582909346},
  {'id': 100,
   'image_id': 4,
   'category_id': 5,
   'segmentation': [],
   'bbox': [1882.5573479998739,
    603.4001826482585,
    12.421552602353332,
    22.772846437648226],
   'ignore': 0,
   'iscrowd': 0,
   'area': 282.87410993056216},
  {'id': 101,
   'image_id': 4,
   'category_id': 7,
   'segmentation': [],
   'bbox': [2414.

In [13]:
# Process images with a progress bar
for img in tqdm(coco_dict["images"], desc="Processing original images"):
    # Open and convert the image to grayscale
    mono_img = Image.open(os.path.join(IMAGE_DIR, img["file_name"])).convert("L")
    
    # Save the grayscale image as .png
    png_file_name = img["file_name"].replace(".bmp", ".png")
    mono_img.save(os.path.join(NEW_IMAGE_DIR, png_file_name), format="PNG")
    
    # Convert grayscale image to RGB for bounding box visualization
    rgb_img = Image.merge("RGB", (mono_img, mono_img, mono_img))

    # Visualize bounding boxes on the RGB image
    for ann in coco_dict["annotations"]:
        if ann["image_id"] == img["id"]:
            xywh = ann["bbox"]
            xyxy = [xywh[0], xywh[1], xywh[0] + xywh[2], xywh[1] + xywh[3]]
            ImageDraw.Draw(rgb_img).rectangle(xyxy, width=5, outline="lime")

    # Display and save the image with bounding boxes as .png
    fig, ax = plt.subplots(1, 1, figsize=(12, 9), constrained_layout=True)
    ax.axis("off")
    ax.imshow(rgb_img)
    fig.savefig(os.path.join(BBOX_SAVE_DIR, png_file_name))
    plt.close(fig)

Processing original images: 100%|██████████| 2/2 [00:01<00:00,  1.12it/s]


In [14]:
slc_dict= load_json(SLC_ANNOTATION_PATH)
print(SLC_ANNOTATION_PATH)

slc_dict

..\data\coco\val\cassette1_val_sliced_coco.json


{'images': [{'height': 640,
   'width': 640,
   'id': 1,
   'file_name': '01BN02_0_0_0_640_640.png'},
  {'height': 640,
   'width': 640,
   'id': 2,
   'file_name': '01BN02_0_512_0_1152_640.png'},
  {'height': 640,
   'width': 640,
   'id': 3,
   'file_name': '01BN02_0_1024_0_1664_640.png'},
  {'height': 640,
   'width': 640,
   'id': 4,
   'file_name': '01BN02_0_1536_0_2176_640.png'},
  {'height': 640,
   'width': 640,
   'id': 5,
   'file_name': '01BN02_0_2048_0_2688_640.png'},
  {'height': 640,
   'width': 640,
   'id': 6,
   'file_name': '01BN02_0_2560_0_3200_640.png'},
  {'height': 640,
   'width': 640,
   'id': 7,
   'file_name': '01BN02_0_3072_0_3712_640.png'},
  {'height': 640,
   'width': 640,
   'id': 8,
   'file_name': '01BN02_0_3456_0_4096_640.png'},
  {'height': 640,
   'width': 640,
   'id': 9,
   'file_name': '01BN02_0_0_512_640_1152.png'},
  {'height': 640,
   'width': 640,
   'id': 10,
   'file_name': '01BN02_0_512_512_1152_1152.png'},
  {'height': 640,
   'width': 640

In [15]:
for img in tqdm(slc_dict["images"], desc="Processing sliced images"):
    fig, ax = plt.subplots(1, 1, figsize=(12, 9), constrained_layout=True)
    
    # Open the sliced image file in grayscale, convert it to RGB
    sliced_img_path = os.path.join(SLICED_IMAGE_DIR, img["file_name"])
    mono_img = Image.open(sliced_img_path).convert("L")
    rgb_img = Image.merge("RGB", (mono_img, mono_img, mono_img))
    
    # Iterate over all annotations for this specific image
    for annotation in slc_dict["annotations"]:
        if annotation["image_id"] == img["id"]:
            # Extract and convert bounding box coordinates
            xywh = annotation["bbox"]
            xyxy = [xywh[0], xywh[1], xywh[0] + xywh[2], xywh[1] + xywh[3]]
            
            # Draw the bounding box on the image
            ImageDraw.Draw(rgb_img).rectangle(xyxy, width=5, outline="lime")
    
    # Display and save the image with bounding boxes
    ax.axis("off")
    ax.imshow(rgb_img)
    fig.savefig(os.path.join(BBOX_SAVE_DIR, img["file_name"][:-4] + ".png"))  # Save each image to the specified folder
    plt.close(fig)

Processing sliced images: 100%|██████████| 64/64 [00:14<00:00,  4.41it/s]


In [16]:
splits = ["train", "test", "val"]

# Create 'original' and 'sliced' directories within each split
for split in splits:
    base_path = os.path.join(AUGMENTATION_PATH, split)
    os.makedirs(base_path, exist_ok=True)
    
    # Create 'original' and 'sliced' subdirectories within each split
    os.makedirs(os.path.join(base_path, "original"), exist_ok=True)
    os.makedirs(os.path.join(base_path, "sliced"), exist_ok=True)

## spatial AUGMENTATION

### Horizontal Flip

In [17]:
def horizontal_flip(
    image_dir,
    coco_annotations,
    output_dir,
    aug_type="horizontal_flip",
    dataset_type="original",
    mean=(0.485, 0.456, 0.406),
    std=(0.229, 0.224, 0.225),
    max_pixel_value=255.0
):
    # Set directories based on dataset_type
    AUG_SAVE_DIR = os.path.join(AUGMENTATION_PATH, target_split, dataset_type, aug_type, "each")
    AUG_VIEW_DIR = os.path.join(AUGMENTATION_PATH, target_split, dataset_type, aug_type)

    os.makedirs(AUG_SAVE_DIR, exist_ok=True)
    os.makedirs(AUG_VIEW_DIR, exist_ok=True)
    
    # Augmentation pipeline with normalization
    augmentation_pipeline = A.Compose([
        A.HorizontalFlip(p=1.0), # P is probability
        A.Normalize(mean=mean, std=std, max_pixel_value=max_pixel_value)
    ], bbox_params=A.BboxParams(format="coco", label_fields=["class_labels"]))

    for img in tqdm(coco_annotations["images"], desc=f"Processing {dataset_type} images"):
        file_name = img["file_name"].replace(".bmp", ".png")  # Replace .bmp with .png
        image_id = img["id"]

        # Load and convert the image to grayscale
        image_path = os.path.join(image_dir, file_name)
        mono_img = Image.open(image_path).convert("L")
        image_np = np.array(mono_img)

        # Collect bounding boxes and class labels for the image
        bboxes = [ann["bbox"] for ann in coco_annotations["annotations"] if ann["image_id"] == image_id]
        class_labels = [ann["category_id"] for ann in coco_annotations["annotations"] if ann["image_id"] == image_id]

        # Apply augmentation, including class_labels
        augmented = augmentation_pipeline(image=image_np, bboxes=bboxes, class_labels=class_labels)
        
        # Rescale augmented image back to 0–255 for viewing and saving
        augmented_image = ((augmented["image"] * np.array(std[0]) + mean[0]) * max_pixel_value).clip(0, 255).astype("uint8")
        Image.fromarray(augmented_image).save(os.path.join(AUG_SAVE_DIR, f"{aug_type}_{file_name}"))

        # Convert the augmented image to RGB for bounding box visualization
        rgb_img = Image.merge("RGB", (Image.fromarray(augmented_image),) * 3)
        draw = ImageDraw.Draw(rgb_img)
        
        # Draw bounding boxes on the RGB image
        for bbox in augmented["bboxes"]:
            xyxy = [bbox[0], bbox[1], bbox[0] + bbox[2], bbox[1] + bbox[3]]
            draw.rectangle(xyxy, outline="lime", width=5)
        
        # Save the image with bounding boxes drawn
        rgb_img.save(os.path.join(AUG_VIEW_DIR, f"{aug_type}_{file_name}"))

    # Save the updated COCO JSON annotations
    augmented_json_path = os.path.join(output_dir, f"{coco_file_name}_{aug_type}.json")
    save_json(coco_annotations, augmented_json_path)
    print(f"Augmented annotations saved to {augmented_json_path}")

# Run the function for both original and sliced cases
horizontal_flip(NEW_IMAGE_DIR, coco_dict, os.path.join(AUGMENTATION_PATH, target_split, "original", "horizontal_flip"), dataset_type="original")
horizontal_flip(SLICED_IMAGE_DIR, slc_dict, os.path.join(AUGMENTATION_PATH, target_split, "sliced", "horizontal_flip"), dataset_type="sliced")

Processing original images: 100%|██████████| 2/2 [00:04<00:00,  2.27s/it]


Augmented annotations saved to ..\data\coco\augmentated\val\original\horizontal_flip\cassette1_val_horizontal_flip.json


Processing sliced images: 100%|██████████| 64/64 [00:05<00:00, 10.87it/s]

Augmented annotations saved to ..\data\coco\augmentated\val\sliced\horizontal_flip\cassette1_val_horizontal_flip.json





### Vertical Flip

In [18]:
def vertical_flip(
    image_dir,
    coco_annotations,
    output_dir,
    aug_type="vertical_flip",
    dataset_type="original",
    mean=(0.485, 0.456, 0.406),
    std=(0.229, 0.224, 0.225),
    max_pixel_value=255.0
):
    # Set directories based on dataset_type
    AUG_SAVE_DIR = os.path.join(AUGMENTATION_PATH, target_split, dataset_type, aug_type, "each")
    AUG_VIEW_DIR = os.path.join(AUGMENTATION_PATH, target_split, dataset_type, aug_type)

    os.makedirs(AUG_SAVE_DIR, exist_ok=True)
    os.makedirs(AUG_VIEW_DIR, exist_ok=True)
    
    # Augmentation pipeline with vertical flip and normalization
    augmentation_pipeline = A.Compose([
        A.VerticalFlip(p=1.0),
        A.Normalize(mean=mean, std=std, max_pixel_value=max_pixel_value)
    ], bbox_params=A.BboxParams(format="coco", label_fields=["class_labels"]))

    for img in tqdm(coco_annotations["images"], desc=f"Processing {dataset_type} images"):
        file_name = img["file_name"].replace(".bmp", ".png")  # Replace .bmp with .png
        image_id = img["id"]

        # Load and convert the image to grayscale
        image_path = os.path.join(image_dir, file_name)
        mono_img = Image.open(image_path).convert("L")
        image_np = np.array(mono_img)

        # Collect bounding boxes and class labels for the image
        bboxes = [ann["bbox"] for ann in coco_annotations["annotations"] if ann["image_id"] == image_id]
        class_labels = [ann["category_id"] for ann in coco_annotations["annotations"] if ann["image_id"] == image_id]

        # Apply augmentation, including class_labels
        augmented = augmentation_pipeline(image=image_np, bboxes=bboxes, class_labels=class_labels)
        
        # Rescale augmented image back to 0–255 for viewing and saving
        augmented_image = ((augmented["image"] * np.array(std[0]) + mean[0]) * max_pixel_value).clip(0, 255).astype("uint8")
        Image.fromarray(augmented_image).save(os.path.join(AUG_SAVE_DIR, f"{aug_type}_{file_name}"))

        # Convert the augmented image to RGB for bounding box visualization
        rgb_img = Image.merge("RGB", (Image.fromarray(augmented_image),) * 3)
        draw = ImageDraw.Draw(rgb_img)
        
        # Draw bounding boxes on the RGB image
        for bbox in augmented["bboxes"]:
            xyxy = [bbox[0], bbox[1], bbox[0] + bbox[2], bbox[1] + bbox[3]]
            draw.rectangle(xyxy, outline="lime", width=5)
        
        # Save the image with bounding boxes drawn
        rgb_img.save(os.path.join(AUG_VIEW_DIR, f"{aug_type}_{file_name}"))

    # Save the updated COCO JSON annotations
    augmented_json_path = os.path.join(output_dir, f"{coco_file_name}_{aug_type}.json")
    save_json(coco_annotations, augmented_json_path)
    print(f"Augmented annotations saved to {augmented_json_path}")

# Run the function for both original and sliced cases
vertical_flip(NEW_IMAGE_DIR, coco_dict, os.path.join(AUGMENTATION_PATH, target_split, "original", "vertical_flip"), dataset_type="original")
vertical_flip(SLICED_IMAGE_DIR, slc_dict, os.path.join(AUGMENTATION_PATH, target_split, "sliced", "vertical_flip"), dataset_type="sliced")

Processing original images: 100%|██████████| 2/2 [00:04<00:00,  2.13s/it]


Augmented annotations saved to ..\data\coco\augmentated\val\original\vertical_flip\cassette1_val_vertical_flip.json


Processing sliced images: 100%|██████████| 64/64 [00:05<00:00, 10.90it/s]

Augmented annotations saved to ..\data\coco\augmentated\val\sliced\vertical_flip\cassette1_val_vertical_flip.json





In [19]:
import random

def random_flip(
    image_dir,
    coco_annotations,
    output_dir,
    dataset_type="original",
    mean=(0.485, 0.456, 0.406),
    std=(0.229, 0.224, 0.225),
    max_pixel_value=255.0
):
    # Set a fixed aug_type to ensure output is saved in the "random_flip" folder
    aug_type = "random_flip"

    # Randomly choose between horizontal and vertical flip
    if random.choice([True, False]):
        print("Applying horizontal flip")
        horizontal_flip(
            image_dir=image_dir,
            coco_annotations=coco_annotations,
            output_dir=output_dir,
            aug_type=aug_type,  # Use "random_flip" as aug_type
            dataset_type=dataset_type,
            mean=mean,
            std=std,
            max_pixel_value=max_pixel_value
        )
    else:
        print("Applying vertical flip")
        vertical_flip(
            image_dir=image_dir,
            coco_annotations=coco_annotations,
            output_dir=output_dir,
            aug_type=aug_type,  # Use "random_flip" as aug_type
            dataset_type=dataset_type,
            mean=mean,
            std=std,
            max_pixel_value=max_pixel_value
        )

# Run the random_flip function for both original and sliced cases
random_flip(NEW_IMAGE_DIR, coco_dict, os.path.join(AUGMENTATION_PATH, target_split, "original", "random_flip"), dataset_type="original")
random_flip(SLICED_IMAGE_DIR, slc_dict, os.path.join(AUGMENTATION_PATH, target_split, "sliced", "random_flip"), dataset_type="sliced")


Applying horizontal flip


Processing original images: 100%|██████████| 2/2 [00:04<00:00,  2.20s/it]


Augmented annotations saved to ..\data\coco\augmentated\val\original\random_flip\cassette1_val_random_flip.json
Applying vertical flip


Processing sliced images: 100%|██████████| 64/64 [00:05<00:00, 10.87it/s]

Augmented annotations saved to ..\data\coco\augmentated\val\sliced\random_flip\cassette1_val_random_flip.json





### Safe Rotate

In [20]:
def safe_rotate(
    image_dir,
    coco_annotations,
    output_dir,
    limit=(-90, 90),  # Range for random rotation angle
    interpolation=1,  # Default is cv2.INTER_LINEAR
    border_mode=4,  # Default is cv2.BORDER_REFLECT_101
    rotate_method="largest_box",  # How to handle bounding boxes
    aug_type="safe_rotate",
    dataset_type="original",
    mean=(0.485, 0.456, 0.406),
    std=(0.229, 0.224, 0.225),
    max_pixel_value=255.0
):
    # Set directories based on dataset_type
    AUG_SAVE_DIR = os.path.join(AUGMENTATION_PATH, target_split, dataset_type, aug_type, "each")
    AUG_VIEW_DIR = os.path.join(AUGMENTATION_PATH, target_split, dataset_type, aug_type)

    os.makedirs(AUG_SAVE_DIR, exist_ok=True)
    os.makedirs(AUG_VIEW_DIR, exist_ok=True)
    
    # Augmentation pipeline with SafeRotate and normalization
    augmentation_pipeline = A.Compose([
        A.SafeRotate(
            limit=limit,  # Random rotation within specified range
            interpolation=interpolation,
            border_mode=border_mode,
            rotate_method=rotate_method,
            p=1.0
        ),
        A.Normalize(mean=mean, std=std, max_pixel_value=max_pixel_value)
    ], bbox_params=A.BboxParams(format="coco", label_fields=["class_labels"]))

    # Initialize a new list to store augmented annotations
    new_annotations = []

    for img in tqdm(coco_annotations["images"], desc=f"Processing {dataset_type} images"):
        file_name = img["file_name"].replace(".bmp", ".png")  # Replace .bmp with .png
        image_id = img["id"]

        # Load and convert the image to grayscale
        image_path = os.path.join(image_dir, file_name)
        mono_img = Image.open(image_path).convert("L")
        image_np = np.array(mono_img)

        # Collect bounding boxes and class labels for the image
        bboxes = [ann["bbox"] for ann in coco_annotations["annotations"] if ann["image_id"] == image_id]
        class_labels = [ann["category_id"] for ann in coco_annotations["annotations"] if ann["image_id"] == image_id]

        # Apply augmentation, including class_labels
        augmented = augmentation_pipeline(image=image_np, bboxes=bboxes, class_labels=class_labels)
        
        # Rescale augmented image back to 0–255 for viewing and saving
        augmented_image = ((augmented["image"] * np.array(std[0]) + mean[0]) * max_pixel_value).clip(0, 255).astype("uint8")
        Image.fromarray(augmented_image).save(os.path.join(AUG_SAVE_DIR, f"{aug_type}_{file_name}"))

        # Convert the augmented image to RGB for bounding box visualization
        rgb_img = Image.merge("RGB", (Image.fromarray(augmented_image),) * 3)
        draw = ImageDraw.Draw(rgb_img)

        # Update new annotations with augmented bounding boxes directly
        for bbox, label in zip(augmented["bboxes"], class_labels):
            # Visualize the bounding box
            xyxy = [bbox[0], bbox[1], bbox[0] + bbox[2], bbox[1] + bbox[3]]
            draw.rectangle(xyxy, outline="lime", width=5)
            
            # Save the updated annotation
            new_annotations.append({
                "image_id": image_id,
                "category_id": label,
                "bbox": bbox,
                "area": bbox[2] * bbox[3],
                "iscrowd": 0,
                "id": len(new_annotations) + 1  # Unique ID for each annotation
            })

        # Save the image with bounding boxes drawn
        rgb_img.save(os.path.join(AUG_VIEW_DIR, f"{aug_type}_{file_name}"))

    # Replace the original annotations with the new, augmented ones
    coco_annotations["annotations"] = new_annotations

    # Save the updated COCO JSON annotations
    augmented_json_path = os.path.join(output_dir, f"{coco_file_name}_{aug_type}.json")
    save_json(coco_annotations, augmented_json_path)
    print(f"Augmented annotations saved to {augmented_json_path}")

# Run the safe_rotate function for both original and sliced cases
safe_rotate(NEW_IMAGE_DIR, coco_dict, os.path.join(AUGMENTATION_PATH, target_split, "original", "safe_rotate"), dataset_type="original")
safe_rotate(SLICED_IMAGE_DIR, slc_dict, os.path.join(AUGMENTATION_PATH, target_split, "sliced", "safe_rotate"), dataset_type="sliced")

Processing original images: 100%|██████████| 2/2 [00:05<00:00,  2.59s/it]


Augmented annotations saved to ..\data\coco\augmentated\val\original\safe_rotate\cassette1_val_safe_rotate.json


Processing sliced images: 100%|██████████| 64/64 [00:06<00:00,  9.58it/s]

Augmented annotations saved to ..\data\coco\augmentated\val\sliced\safe_rotate\cassette1_val_safe_rotate.json





### Grid Distortion

In [24]:
def grid_distortion(
    image_dir,
    coco_annotations,
    output_dir,
    grid_num_steps=5,  # Number of grid cells along each side
    grid_distort_limit=(-0.3, 0.3),  # Distortion strength range
    interpolation=1,  # Default is cv2.INTER_LINEAR
    border_mode=4,  # Default is cv2.BORDER_REFLECT_101
    aug_type="grid_distortion",
    dataset_type="original",
    mean=(0.485, 0.456, 0.406),
    std=(0.229, 0.224, 0.225),
    max_pixel_value=255.0
):
    # Set directories based on dataset_type
    AUG_SAVE_DIR = os.path.join(AUGMENTATION_PATH, target_split, dataset_type, aug_type, "each")
    AUG_VIEW_DIR = os.path.join(AUGMENTATION_PATH, target_split, dataset_type, aug_type)

    os.makedirs(AUG_SAVE_DIR, exist_ok=True)
    os.makedirs(AUG_VIEW_DIR, exist_ok=True)

    # Augmentation pipeline with GridDistortion and normalization
    augmentation_pipeline = A.Compose([
        A.GridDistortion(
            num_steps=grid_num_steps,
            distort_limit=grid_distort_limit,
            interpolation=interpolation,
            border_mode=border_mode,
            normalized=True,
            p=1.0
        ),
        A.Normalize(mean=mean, std=std, max_pixel_value=max_pixel_value)
    ], bbox_params=A.BboxParams(format="coco", label_fields=["class_labels"]))

    # Initialize a new list to store augmented annotations
    new_annotations = []

    for img in tqdm(coco_annotations["images"], desc=f"Processing {dataset_type} images"):
        file_name = img["file_name"].replace(".bmp", ".png")
        image_id = img["id"]

        # Load and convert the image to grayscale
        image_path = os.path.join(image_dir, file_name)
        mono_img = Image.open(image_path).convert("L")
        image_np = np.array(mono_img)

        # Collect bounding boxes and class labels for the image
        bboxes = [ann["bbox"] for ann in coco_annotations["annotations"] if ann["image_id"] == image_id]
        class_labels = [ann["category_id"] for ann in coco_annotations["annotations"] if ann["image_id"] == image_id]

        # Apply augmentation, including class_labels
        augmented = augmentation_pipeline(image=image_np, bboxes=bboxes, class_labels=class_labels)
        
        # Rescale augmented image back to 0–255 for viewing and saving
        augmented_image = ((augmented["image"] * np.array(std[0]) + mean[0]) * max_pixel_value).clip(0, 255).astype("uint8")
        Image.fromarray(augmented_image).save(os.path.join(AUG_SAVE_DIR, f"{aug_type}_{file_name}"))

        # Convert the augmented image to RGB for bounding box visualization
        rgb_img = Image.merge("RGB", (Image.fromarray(augmented_image),) * 3)
        draw = ImageDraw.Draw(rgb_img)

        # Update new annotations with augmented bounding boxes directly
        for bbox, label in zip(augmented["bboxes"], class_labels):
            # Draw bounding box on the RGB image
            xyxy = [bbox[0], bbox[1], bbox[0] + bbox[2], bbox[1] + bbox[3]]
            draw.rectangle(xyxy, outline="lime", width=5)
            
            # Save the updated annotation
            new_annotations.append({
                "image_id": image_id,
                "category_id": label,
                "bbox": bbox,
                "area": bbox[2] * bbox[3],
                "iscrowd": 0,
                "id": len(new_annotations) + 1
            })

        # Save the image with bounding boxes drawn
        rgb_img.save(os.path.join(AUG_VIEW_DIR, f"{aug_type}_{file_name}"))

    # Replace the original annotations with the new, augmented ones
    coco_annotations["annotations"] = new_annotations

    # Save the updated COCO JSON annotations
    augmented_json_path = os.path.join(output_dir, f"{coco_file_name}_{aug_type}.json")
    save_json(coco_annotations, augmented_json_path)
    print(f"Augmented annotations saved to {augmented_json_path}")

# Run the function for both original and sliced cases
grid_distortion(
    NEW_IMAGE_DIR, coco_dict,
    os.path.join(AUGMENTATION_PATH, target_split, "original", "grid_distortion"),
    dataset_type="original"
)
grid_distortion(
    SLICED_IMAGE_DIR, slc_dict,
    os.path.join(AUGMENTATION_PATH, target_split, "sliced", "grid_distortion"),
    dataset_type="sliced"
)

Processing original images: 100%|██████████| 2/2 [00:05<00:00,  2.68s/it]


Augmented annotations saved to ..\data\coco\augmentated\val\original\grid_distortion\cassette1_val_grid_distortion.json


Processing sliced images: 100%|██████████| 64/64 [00:07<00:00,  8.41it/s]

Augmented annotations saved to ..\data\coco\augmentated\val\sliced\grid_distortion\cassette1_val_grid_distortion.json





# A

In [22]:
class ImageAugmentor:
    def __init__(self, coco_file_name, config, split="train", image_type="original"):
        self.split = split
        self.coco_file_name = coco_file_name
        self.config = config
        self.image_type = image_type

        self.DATA_DIR = config.get("DATA_DIR", "../data")
        self.AUGMENTATION_PATH = config.get("AUGMENTATION_PATH", os.path.join(self.DATA_DIR, "augmentation"))
        self.VISUALIZATION_DIR = config.get("VISUALIZATION_DIR", os.path.join(self.DATA_DIR, "bbox_vis"))

        image_subdir = "original_images" if self.image_type == "original" else "sliced_images"
        self.output_dir = os.path.join(self.AUGMENTATION_PATH, image_subdir, f"{split}_images")
        self.bbox_vis_dir = os.path.join(self.VISUALIZATION_DIR, coco_file_name, image_subdir)

        self.org_annotation_path = os.path.join(self.DATA_DIR, "coco", split, f"{coco_file_name}_corrected_coco.json")
        self.image_dir = os.path.join(self.DATA_DIR, "coco", "images_sliced" if self.image_type == "sliced" else "images")

        self._create_directories([self.output_dir, self.bbox_vis_dir])

        # Load COCO annotations and initialize augmented annotations structure
        self.coco_dict = load_json(self.org_annotation_path)
        self.augmented_annotations = {
            "images": [],
            "annotations": [],
            "categories": self.coco_dict["categories"]
        }
        self.annotation_id = 1  # Initialize annotation ID counter

    def _create_directories(self, directories):
        for directory in directories:
            os.makedirs(directory, exist_ok=True)

    def process_images(self):
        annotations_by_image = self._organize_annotations_by_image_id()
        
        for img in tqdm(self.coco_dict["images"], desc="Processing Images"):
            image_path = os.path.join(self.image_dir, img["file_name"])
            if not os.path.exists(image_path):
                print(f"Warning: Image {img['file_name']} not found.")
                continue
            
            image = self._load_image(image_path)
            bboxes, class_labels = self._extract_bboxes_and_labels(img["id"], annotations_by_image)
            
            # Apply and save each augmentation type separately
            self._apply_and_save(image, bboxes, class_labels, img["file_name"], "flip")
            self._apply_and_save(image, bboxes, class_labels, img["file_name"], "contrast")
            self._apply_and_save(image, bboxes, class_labels, img["file_name"], "noise")

    def _apply_and_save(self, image, bboxes, class_labels, filename, aug_type):
        if aug_type == "flip":
            augmentation = A.Compose([A.HorizontalFlip(p=1)], bbox_params=A.BboxParams(format='coco', label_fields=['class_labels']))
        elif aug_type == "contrast":
            augmentation = A.Compose([A.RandomBrightnessContrast(p=1)], bbox_params=A.BboxParams(format='coco', label_fields=['class_labels']))
        elif aug_type == "noise":
            augmentation = A.Compose([A.GaussNoise(var_limit=(10, 50), p=1)], bbox_params=A.BboxParams(format='coco', label_fields=['class_labels']))
        
        augmented = augmentation(image=image, bboxes=bboxes, class_labels=class_labels)
        augmented_image = augmented['image']
        augmented_bboxes = augmented['bboxes']

        # Save augmented image and bbox visualization
        save_filename = f"{filename[:-4]}_{aug_type}.png"
        self._save_augmented_image(augmented_image, save_filename)
        self._visualize_augmented_with_bboxes(augmented_image, augmented_bboxes, save_filename, aug_type)

        # Save augmented annotation
        self._save_augmented_annotation(save_filename, augmented_bboxes, class_labels, img_width=augmented_image.shape[1], img_height=augmented_image.shape[0])

    def _save_augmented_annotation(self, filename, bboxes, class_labels, img_width, img_height):
        image_entry = {
            "id": len(self.augmented_annotations["images"]) + 1,
            "file_name": filename,
            "width": img_width,
            "height": img_height
        }
        self.augmented_annotations["images"].append(image_entry)

        for bbox, label in zip(bboxes, class_labels):
            x, y, w, h = bbox
            annotation_entry = {
                "id": self.annotation_id,
                "image_id": image_entry["id"],
                "category_id": label,
                "bbox": [x, y, w, h],
                "area": w * h,
                "iscrowd": 0
            }
            self.augmented_annotations["annotations"].append(annotation_entry)
            self.annotation_id += 1

    def save_augmented_coco_json(self, output_path):
        with open(output_path, 'w') as f:
            json.dump(self.augmented_annotations, f)

    def _load_image(self, image_path):
        image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
        return cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)

    def _save_augmented_image(self, image, filename):
        output_path = os.path.join(self.output_dir, filename)
        image = image.permute(1, 2, 0).cpu().numpy()
        image = (image * 255).astype('uint8')
        cv2.imwrite(output_path, cv2.cvtColor(image, cv2.COLOR_RGB2BGR))

    def _visualize_augmented_with_bboxes(self, image, bboxes, filename, aug_type):
        # Convert image to PIL format for drawing bounding boxes
        pil_image = Image.fromarray((image * 255).astype(np.uint8)).convert("L")
        rgb_image = Image.merge("RGB", (pil_image, pil_image, pil_image))
        
        # Draw bounding boxes on the image
        draw = ImageDraw.Draw(rgb_image)
        for bbox in bboxes:
            x_min, y_min, width, height = bbox
            x_max, y_max = x_min + width, y_min + height
            draw.rectangle([x_min, y_min, x_max, y_max], outline="lime", width=3)

        save_path = os.path.join(self.bbox_vis_dir, f"{filename[:-4]}_{aug_type}_bbox.png")
        rgb_image.save(save_path)


    def _organize_annotations_by_image_id(self):
        annotations_by_image = {}
        for annotation in self.coco_dict["annotations"]:
            image_id = annotation["image_id"]
            if image_id not in annotations_by_image:
                annotations_by_image[image_id] = []
            annotations_by_image[image_id].append(annotation)
        return annotations_by_image

    def _extract_bboxes_and_labels(self, image_id, annotations_by_image):
        bboxes, class_labels = [], []
        if image_id in annotations_by_image:
            for annotation in annotations_by_image[image_id]:
                x, y, w, h = annotation["bbox"]
                bboxes.append([x, y, w, h])
                class_labels.append(annotation["category_id"])
        return bboxes, class_labels


def plot_augmented_bboxes_coco(annotation: dict, img_dir: str, save_dir: str, aug_type: str):
    os.makedirs(save_dir, exist_ok=True)

    for img in tqdm(annotation["images"], desc=f"Plotting {aug_type} augmented images with bounding boxes"):
        fig, ax = plt.subplots(1, 1, figsize=(12, 9), constrained_layout=True)
        mono_img = Image.open(os.path.join(img_dir, img["file_name"])).convert("L")
        rgb_img = Image.merge("RGB", (mono_img, mono_img, mono_img))

        for ann in annotation["annotations"]:
            if ann["image_id"] == img["id"]:
                xywh = ann["bbox"]
                xyxy = [xywh[0], xywh[1], xywh[0] + xywh[2], xywh[1] + xywh[3]]
                ImageDraw.Draw(rgb_img).rectangle(xyxy, width=3, outline="lime")

        ax.axis("off")
        ax.imshow(rgb_img)

        save_path = os.path.join(save_dir, f"{img['file_name'][:-4]}_{aug_type}_bbox.png")
        fig.savefig(save_path)
        plt.close()


In [23]:
# Configuration for grayscale-friendly augmentations
config = {
    "DATA_DIR": "../data",
    "AUGMENTATION_PATH": "../data/augmentation",
    "VISUALIZATION_DIR": "../data/bbox_vis",
    "augmentation_params": {
        "horizontal_flip": 0.5,
        "brightness_contrast": 0.5,
        "blur": 3,
        "noise": 0.3,
        "normalize": True
    }
}