In [180]:
import os
import cv2
import numpy as np
import random
from tqdm import tqdm  # Import tqdm for the progress bar
import logging
import shutil

In [181]:
# Set up logging for detailed debugging and information
logging.basicConfig(filename='augmentation.log', level=logging.INFO)

In [182]:
def create_directory(path):
    """Create a directory if it does not exist."""
    os.makedirs(path, exist_ok=True)

In [183]:
def get_file_bases(folder, extension):
    """Get base filenames for files with a specific extension in a folder."""
    return {os.path.splitext(f)[0] for f in os.listdir(folder) if f.endswith(extension)}

In [184]:
def log_and_print(message):
    """Log a message and print it to the console."""
    logging.info(message)
    print(message)

In [185]:
def rotate_image(image, angle):
    """Rotate the image by a given angle and return the rotated image and transformation matrix."""
    (h, w) = image.shape[:2]
    center = (w / 2, h / 2)
    M = cv2.getRotationMatrix2D(center, angle, 1.0)
    rotated_image = cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
    return rotated_image, M

In [186]:
def rotate_bounding_box(x_center, y_center, w, h, M):
    """Rotate the bounding box corners based on the transformation matrix and return the new bounding box."""
    corners = np.array([
        [x_center - w / 2, y_center - h / 2],
        [x_center + w / 2, y_center - h / 2],
        [x_center - w / 2, y_center + h / 2],
        [x_center + w / 2, y_center + h / 2]
    ])
    corners = np.hstack([corners, np.ones((4, 1))])  # Convert to homogeneous coordinates
    rotated_corners = M.dot(corners.T).T
    x_min, y_min = rotated_corners[:, 0].min(), rotated_corners[:, 1].min()
    x_max, y_max = rotated_corners[:, 0].max(), rotated_corners[:, 1].max()
    return x_min, y_min, x_max, y_max

In [187]:
def clip_bounding_box(x, y, w, h, frame_width, frame_height):
    """Clip the bounding box to ensure it stays within the frame boundaries."""
    x = max(0, x)
    y = max(0, y)
    w = min(w, frame_width - x)
    h = min(h, frame_height - y)
    return x, y, w, h

In [188]:
def is_within_frame(x, y, w, h, frame_width, frame_height):
    """Check if any part of the bounding box is within the image frame."""
    return not (x + w <= 0 or y + h <= 0 or x >= frame_width or y >= frame_height)

In [189]:
def adaptive_rotation_angle(object_size, distance_from_center):
    """Calculate rotation angle based on object characteristics."""
    if object_size < 0.2:  # Example threshold
        return random.uniform(-30, 30)  # Smaller objects get a larger rotation range
    else:
        return random.uniform(-10, 10)  # Larger objects get a smaller rotation range

In [190]:
def count_labels(file_path):
    """Count the number of class 0 and class 1 in each txt file."""
    with open(file_path, 'r') as file:
        lines = file.readlines()
        count_0 = sum(1 for line in lines if line.startswith('0'))
        count_1 = sum(1 for line in lines if line.startswith('1'))
    return count_0, count_1

In [191]:
def display_label_count_ratio(folder_path, description):
    """Display the count and ratio of class 0 and class 1 labels in the folder."""
    total_count_0 = 0
    total_count_1 = 0

    # Iterate through all label files to count class 0 and class 1 instances
    for file_name in os.listdir(folder_path):
        if file_name.endswith('.txt'):
            count_0, count_1 = count_labels(os.path.join(folder_path, file_name))
            total_count_0 += count_0
            total_count_1 += count_1

    # Calculate and display the ratio
    total = total_count_0 + total_count_1
    ratio_0 = total_count_0 / total if total != 0 else 0
    ratio_1 = total_count_1 / total if total != 0 else 0

    log_and_print(f"{description} - Total: {total}, Class 0: {total_count_0}, Class 1: {total_count_1}, Ratio 0: {ratio_0:.2f}, Ratio 1: {ratio_1:.2f}")
    return total_count_0, total_count_1

In [192]:
def sort_files_by_label_count(folder_path):
    """Sort files by the number of class 1, then by class 0."""
    files = [f for f in os.listdir(folder_path) if f.endswith('.txt')]
    label_counts = []
    
    for file in files:
        file_path = os.path.join(folder_path, file)
        count_0, count_1 = count_labels(file_path)
        label_counts.append((file, count_0, count_1))
    
    # Sort by the number of class 1 descending, then by class 0 ascending
    sorted_files = sorted(label_counts, key=lambda x: (-x[2], x[1]))
    
    return sorted_files

In [193]:
def adjust_ratio(sorted_files, ratio, output_folder, label_folder, image_folder):
    """Adjust the ratio of class 0 and class 1 and save results to a new folder."""
    total_count_1 = sum(count_1 for _, _, count_1 in sorted_files)
    target_count_0 = int(total_count_1 * ratio)
    
    kept_files = []
    deleted_files = []
    
    kept_count_0 = 0
    kept_count_1 = 0
    
    # Prioritize keeping files with more class 1 objects
    for file, count_0, count_1 in sorted_files:
        if kept_count_1 + count_1 <= total_count_1 and kept_count_0 + count_0 <= target_count_0:
            kept_files.append((file, count_0, count_1))
            kept_count_0 += count_0
            kept_count_1 += count_1
        else:
            deleted_files.append((file, count_0, count_1))
    
    # Ensure the new folder exists
    create_directory(output_folder)
    
    # Save the kept files and delete excess files
    for file, count_0, count_1 in kept_files:
        shutil.copy(os.path.join(label_folder, file), os.path.join(output_folder, file))
    
    # Delete images with no corresponding labels
    for file, count_0, count_1 in deleted_files:
        img_name = file.replace('.txt', '.jpg')
        img_path = os.path.join(image_folder, img_name)
        if os.path.exists(img_path):
            os.remove(img_path)

    log_and_print("Ratio adjustment and deletion completed.")

In [194]:
def move_files(files_base, img_src_folder, lbl_src_folder, img_dest_folder, lbl_dest_folder):
    """Move files to the respective folders."""
    for base_name in files_base:
        jpg_file = base_name + '.jpg'
        txt_file = base_name + '.txt'
        
        jpg_source_path = os.path.join(img_src_folder, jpg_file)
        txt_source_path = os.path.join(lbl_src_folder, txt_file)
        
        jpg_dest_path = os.path.join(img_dest_folder, jpg_file)
        txt_dest_path = os.path.join(lbl_dest_folder, txt_file)
        
        try:
            if os.path.exists(jpg_source_path) and os.path.exists(txt_source_path):
                shutil.move(jpg_source_path, jpg_dest_path)
                shutil.move(txt_source_path, txt_dest_path)
            else:
                log_and_print(f"Missing files for base {base_name}: {jpg_source_path} or {txt_source_path}")
        except Exception as e:
            log_and_print(f"Error moving files for base {base_name}: {e}")

In [195]:
def split_dataset(source_folder, train_folder, val_folder, val_ratio=0.2, class_1_boost_factor=1.5):
    """Split dataset into training and validation sets based on a specified ratio, preserving class distribution and boosting class 1 presence."""

    # Create image and label subdirectories in train and val folders
    create_directory(os.path.join(train_folder, 'images'))
    create_directory(os.path.join(train_folder, 'labels'))
    create_directory(os.path.join(val_folder, 'images'))
    create_directory(os.path.join(val_folder, 'labels'))

    source_images_folder = os.path.join(source_folder)
    source_labels_folder = os.path.join(source_folder)

    # Check if the source folders contain any files to process
    jpg_files = [f for f in os.listdir(source_folder) if f.endswith('.jpg')]
    txt_files = [f for f in os.listdir(source_folder) if f.endswith('.txt')]

    if not jpg_files:
        log_and_print("No image files found in the source images folder. Cannot proceed with dataset splitting.")
        return
    if not txt_files:
        log_and_print("No label files found in the source labels folder. Cannot proceed with dataset splitting.")
        return

    # Extract base filenames without extensions
    jpg_files_base = get_file_bases(source_images_folder, '.jpg')
    txt_files_base = get_file_bases(source_labels_folder, '.txt')

    # Find common base filenames that have both jpg and txt files
    common_files_base = jpg_files_base & txt_files_base

    if not common_files_base:
        log_and_print("No common files found between images and labels. Cannot proceed with dataset splitting.")
        return

    # Separate filenames by class
    class_0_files = []
    class_1_files = []

    for base_name in common_files_base:
        txt_file_path = os.path.join(source_labels_folder, base_name + '.txt')
        count_0, count_1 = count_labels(txt_file_path)
        
        if count_1 > count_0:
            class_1_files.append(base_name)
        else:
            class_0_files.append(base_name)

    # Calculate the total number of validation files needed
    total_files = len(class_0_files) + len(class_1_files)
    expected_val_count = round(total_files * val_ratio)

    # Adjust the validation sizes per class to maintain class distribution
    val_size_class_1 = round(len(class_1_files) * val_ratio)
    val_size_class_0 = expected_val_count - val_size_class_1

    # Adjust if any rounding issue causes a mismatch
    if val_size_class_0 > len(class_0_files):
        val_size_class_0 = len(class_0_files)
        val_size_class_1 = expected_val_count - val_size_class_0
    elif val_size_class_1 > len(class_1_files):
        val_size_class_1 = len(class_1_files)
        val_size_class_0 = expected_val_count - val_size_class_1

    # Randomly select files for the validation set for each class
    val_files_class_1 = random.sample(class_1_files, val_size_class_1)
    val_files_class_0 = random.sample(class_0_files, val_size_class_0)

    # Combine class-specific validation files
    val_files_base = set(val_files_class_1 + val_files_class_0)

    # Remaining files will be for the training set
    train_files_class_1 = set(class_1_files) - set(val_files_class_1)
    train_files_class_0 = set(class_0_files) - set(val_files_class_0)

    # **Boost Class 1 in the training set** by duplicating class_1 samples based on the boost factor
    boosted_class_1 = list(train_files_class_1) * int(class_1_boost_factor)
    train_files_base = set(boosted_class_1 + list(train_files_class_0))

    # Move files to respective directories
    move_files(val_files_base, source_images_folder, source_labels_folder, os.path.join(val_folder, 'images'), os.path.join(val_folder, 'labels'))
    move_files(train_files_base, source_images_folder, source_labels_folder, os.path.join(train_folder, 'images'), os.path.join(train_folder, 'labels'))

    log_and_print("Training and validation split completed!")

In [196]:
def augment_images_and_labels(image_folder, label_folder, output_folder, desired_ratio):
    """Augment images and labels by rotating and saving them to the specified output folder structure."""
    
    log_and_print("Starting augmentation process...")

    # Display initial label count and ratio
    initial_counts = display_label_count_ratio(label_folder, "Initial")

    # Create the output directories if they don't exist
    for subfolder in ['all', 'image_with_bounding_box']:
        create_directory(os.path.join(output_folder, subfolder))
    
    # Get list of image files for progress bar
    image_files = [f for f in os.listdir(image_folder) if f.endswith('.jpg')]
    
    # Iterate over all images in the image folder with a progress bar
    for image_filename in tqdm(image_files, desc="Augmenting images", unit="image"):
        image_path = os.path.join(image_folder, image_filename)
        label_path = os.path.join(label_folder, image_filename.replace('.jpg', '.txt'))
        
        # Check if label file exists
        if not os.path.exists(label_path):
            log_and_print(f"Label file for {image_filename} not found. Skipping this image.")
            continue
        
        # Load image
        image = cv2.imread(image_path)
        if image is None:
            log_and_print(f"Could not read image {image_path}. Skipping.")
            continue
        h, w = image.shape[:2]
        
        # Generate a rotation angle based on adaptive function
        angle = adaptive_rotation_angle(1.0, 1.0)  # Use real data here for object size and distance
        
        # Rotate image
        rotated_image, M = rotate_image(image, angle)
        rotated_image_with_boxes = rotated_image.copy()
        
        new_labels = []
        try:
            with open(label_path, 'r') as file:
                for line in file:
                    label = line.strip().split()
                    class_id = label[0]
                    x_center, y_center, width, height = map(float, label[1:])
                    
                    # Convert normalized coordinates to absolute
                    x_center_abs = x_center * w
                    y_center_abs = y_center * h
                    width_abs = width * w
                    height_abs = height * h
                    
                    # Rotate bounding box
                    x_min, y_min, x_max, y_max = rotate_bounding_box(x_center_abs, y_center_abs, width_abs, height_abs, M)
                    
                    # Ensure bounding box stays within image boundaries
                    x_min, y_min, width_abs, height_abs = clip_bounding_box(x_min, y_min, x_max - x_min, y_max - y_min, w, h)
                    
                    # Check visibility of the rotated and clipped bounding box
                    if is_within_frame(x_min, y_min, width_abs, height_abs, w, h):
                        # Convert back to normalized coordinates
                        x_norm = (x_min + width_abs / 2) / w
                        y_norm = (y_min + height_abs / 2) / h
                        w_norm = width_abs / w
                        h_norm = height_abs / h
                        new_labels.append(f"{class_id} {x_norm:.6f} {y_norm:.6f} {w_norm:.6f} {h_norm:.6f}")
                        
                        # Draw the clipped bounding box on the rotated image
                        cv2.rectangle(rotated_image_with_boxes, (int(x_min), int(y_min)), (int(x_min + width_abs), int(y_min + height_abs)), (0, 0, 255), 2)
        except Exception as e:
            log_and_print(f"Error reading label file {label_path}: {e}")
            continue
        
        # Define output paths
        base_filename = image_filename.replace('.jpg', f'_rotated_{angle:.2f}')
        output_all_image_path = os.path.join(output_folder, 'all', base_filename + '.jpg')
        output_all_label_path = os.path.join(output_folder, 'all', base_filename + '.txt')
        output_image_with_boxes_path = os.path.join(output_folder, 'image_with_bounding_box', base_filename + '_with_boxes.jpg')
        
        # Save the augmented images and labels
        cv2.imwrite(output_all_image_path, rotated_image)
        cv2.imwrite(output_image_with_boxes_path, rotated_image_with_boxes)
        try:
            with open(output_all_label_path, 'w') as file:
                for label in new_labels:
                    file.write(label + '\n')
        except Exception as e:
            log_and_print(f"Error writing to label files: {e}")
            continue
    
    log_and_print("Augmentation process finished.")

    # Display counts and ratios after augmentation
    augmented_counts = display_label_count_ratio(os.path.join(output_folder, 'all'), "After Augmentation")

    # Adjust label ratio after augmentation
    sorted_files = sort_files_by_label_count(os.path.join(output_folder, 'all'))
    adjust_ratio(sorted_files, desired_ratio, os.path.join(output_folder, 'adjusted_labels'), os.path.join(output_folder, 'all'), os.path.join(output_folder, 'all'))

    # Copy images to the adjusted_labels folder to ensure all files are present for splitting
    for img_file in os.listdir(os.path.join(output_folder, 'all')):
        if img_file.endswith('.jpg'):
            shutil.copy(os.path.join(output_folder, 'all', img_file), os.path.join(output_folder, 'adjusted_labels', img_file))

    # Display counts and ratios after adjusting the ratio
    adjusted_counts = display_label_count_ratio(os.path.join(output_folder, 'adjusted_labels'), "After Ratio Adjustment")

    # Split dataset into training and validation sets based on adjusted ratio
    split_dataset(os.path.join(output_folder, 'adjusted_labels'), os.path.join(output_folder, 'train'), os.path.join(output_folder, 'val'), 0.2)

    # Display final counts and ratios for training and validation sets
    display_label_count_ratio(os.path.join(output_folder, 'train', 'labels'), "Training Set")
    display_label_count_ratio(os.path.join(output_folder, 'val', 'labels'), "Validation Set")

In [197]:
# Usage
image_folder = r'C:\personal file\work\TKU\news\train_val image'
label_folder = r'C:\personal file\work\TKU\news\train_val label'
output_folder = 'rotation_output_0.4_test6'
desired_ratio = 0.4  # Adjust ratio here

augment_images_and_labels(image_folder, label_folder, output_folder, desired_ratio)

Starting augmentation process...
Initial - Total: 3089, Class 0: 1737, Class 1: 1352, Ratio 0: 0.56, Ratio 1: 0.44


Augmenting images: 100%|██████████| 1002/1002 [02:05<00:00,  8.01image/s]


Augmentation process finished.
After Augmentation - Total: 3089, Class 0: 1737, Class 1: 1352, Ratio 0: 0.56, Ratio 1: 0.44
Ratio adjustment and deletion completed.
After Ratio Adjustment - Total: 1693, Class 0: 540, Class 1: 1153, Ratio 0: 0.32, Ratio 1: 0.68
Training and validation split completed!
Training Set - Total: 1363, Class 0: 435, Class 1: 928, Ratio 0: 0.32, Ratio 1: 0.68
Validation Set - Total: 330, Class 0: 105, Class 1: 225, Ratio 0: 0.32, Ratio 1: 0.68
