In [12]:
import os
import kornia as K
from kornia.augmentation import AugmentationSequential
from torchvision.utils import save_image
import torch

In [13]:
# Paths
working_dir = os.getcwd()

train_set = os.path.join(working_dir, 'dataset', 'train')
train_set_augment = os.path.join(working_dir, 'augmented_dataset', 'train')
train_y = [os.path.join(train_set, i) for i in os.listdir(train_set)]
train_y_augment = [os.path.join(train_set_augment, i) for i in os.listdir(train_set)]

(train_set, train_set_augment, train_y, train_y_augment)

('d:\\Kuliah\\master\\deep-learning-project-1\\deeplearning-project-1\\dataset\\train',
 'd:\\Kuliah\\master\\deep-learning-project-1\\deeplearning-project-1\\augmented_dataset\\train',
 ['d:\\Kuliah\\master\\deep-learning-project-1\\deeplearning-project-1\\dataset\\train\\disgust'],
 ['d:\\Kuliah\\master\\deep-learning-project-1\\deeplearning-project-1\\augmented_dataset\\train\\disgust'])

In [14]:
# Pipeline
# https://kornia.readthedocs.io/en/stable/augmentation.container.html
# https://kornia.readthedocs.io/en/stable/augmentation.module.html
from pathlib import Path

def augment_pipe(image_path: str, save_path: str, n: int):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    pipe = AugmentationSequential(
        K.augmentation.ColorJitter(0.1, 0.1, 0.1, 0.1, p=0.5),
        K.augmentation.RandomBoxBlur(kernel_size=(3,3), keepdim=True, p=0.5),
        K.augmentation.RandomBrightness(p=0.5, keepdim=True),
        K.augmentation.RandomGaussianNoise(mean=0, std=0.05, p=0.5, keepdim=True),
        K.augmentation.RandomSaltAndPepperNoise(amount=(0.01, 0.06), salt_vs_pepper=(0.4, 0.6), p=0.5, same_on_batch=False, keepdim=True),
        K.augmentation.RandomSharpness(sharpness=0.5, same_on_batch=False, p=0.5, keepdim=True),
        K.augmentation.RandomErasing(scale=(0.02, 0.33), ratio=(0.3, 3.3), value=0.0, same_on_batch=False, p=0.5, keepdim=True),
        K.augmentation.RandomHorizontalFlip(p=0.25, keepdim=True),
        K.augmentation.RandomVerticalFlip(p=0.25, keepdim=True),
        K.augmentation.RandomRotation(p=0.5, degrees=20, keepdim=True),
        data_keys=["input"],
        same_on_batch=False,
    ).to(device)
    
    for i in range(n):
        in_tensor = K.io.load_image(image_path, K.io.ImageLoadType.RGB32)[None, ...]  # BxCxHxW
        in_tensor.to(device)
        out_tensor = pipe(in_tensor)
        
        image_name = Path(image_path).name
        save_image(out_tensor[0].cpu(), fp=os.path.join(save_path, f'{image_name}_{i}.jpg'))

In [15]:
# parrarelization
from threading import Thread, Lock
from typing import Any, Callable, Tuple, List

def process_parallel(function: Callable[..., Any],
                     args: List[Any]) -> list[Any]:
    """Run all items in args in parallel using threads.

    Args:
        function (Callable[..., Any]): function to apply
        args (Sequence[Sequence[Any]]): list of sequences, one per argument
                                        (like zip), e.g. [[a1, a2], [b1, b2]]
                                        -> calls function(a1, b1), function(a2, b2)

    Returns:
        list[Any]: results of each function call
    """

    threads: list[Thread] = []
    results: list[Any] = []

    lock = Lock()
    
    def wrapper(*args: Tuple[Any, ...]) -> None:
        result = function(*args)
        with lock:
            results.append(result)
    
    function_args = []
    
    for i in range(len(args)):
        function_args.append((args[0][i], args[1][i], args[2][i]))
    
    for item in function_args:
        t = Thread(target=wrapper, args=item)
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

    return results


In [18]:
from pathlib import Path

sublist_n = 25
augment_n = 250

for raw_class, augmented_class in zip(train_y, train_y_augment):
    # read all raw_class
    image_paths = os.listdir(raw_class)
    y_class = Path(raw_class).name
    
    image_path_sublists = [image_paths[i:i + sublist_n] for i in range(0, len(image_paths), sublist_n)]
    for image_path_sublist in image_path_sublists:
        image_full_path_sublist = [os.path.join(train_set, y_class, image) for image in image_path_sublist]
        image_full_path_augment_sublist = [os.path.join(train_set_augment, y_class) for _ in image_full_path_sublist]
        augment_n_list = sublist_n * [augment_n]

        process_parallel(augment_pipe, (image_full_path_sublist, image_full_path_augment_sublist, augment_n_list))
    