In [1]:
from typing import Optional, Tuple
import cv2
import matplotlib.pyplot as plt
import numpy as np
import os
import albumentations as A
import scipy.stats as sps
from tqdm import tqdm

In [30]:
class BaseAugmentator():
    def compare_image_vs_processed(self, image, save_path: Optional[str] = None):
        processed_image = self(image)

        f, axarr = plt.subplots(1, 3, figsize=(20, 8))

        axarr[0].set_title('Initial')
        axarr[0].imshow(image)

        axarr[1].set_title('Processed')
        axarr[1].imshow(processed_image)

        axarr[2].set_title('Diff')
        if image.shape == processed_image.shape:
            axarr[2].imshow(processed_image - image)

        if save_path is not None:
            plt.savefig(save_path)

        plt.show()

    def __call__(self, image: np.ndarray, **kwargs) -> np.ndarray:
        raise NotImplementedError()


class PipelineAugmentor(BaseAugmentator):
    class Noise(BaseAugmentator):
        def __init__(self) -> None:
            super().__init__()

        def __call__(self, image: np.ndarray, **kwargs) -> np.ndarray:
            intensity = sps.uniform(0.5, 0.8).rvs(1)
            color_shift = sps.uniform(0.01, 0.05).rvs(1)

            image = image.astype(np.float32) / 255.0
            hls = cv2.cvtColor(image, cv2.COLOR_RGB2HLS)
            _, stddev = cv2.meanStdDev(hls)

            luminance_noise = sps.poisson(stddev[1] * intensity * 255).rvs(size=hls.shape[:2])
            color_noise = sps.norm(0, color_shift * 360 * intensity).rvs(size=hls.shape[:2])

            hue = hls[..., 0]
            hue += color_noise
            hue[hue < 0] += 360
            hue[hue > 360] -= 360

            luminance = hls[..., 1]
            luminance += (luminance_noise / 255) * (1.0 - luminance)

            image = cv2.cvtColor(hls, cv2.COLOR_HLS2RGB) * 255
            return image.astype(np.uint8)

    class Compression(BaseAugmentator):
        def __init__(self) -> None:
            super().__init__()

        def __call__(self, image: np.ndarray, **kwargs) -> np.ndarray:
            quality = int(sps.uniform(-100, 100).rvs(1))
            _, encoded_img = cv2.imencode('.jpg', image, [int(cv2.IMWRITE_JPEG_QUALITY), quality])
            image = cv2.imdecode(encoded_img, cv2.IMREAD_COLOR)

            return image

    class Blur(BaseAugmentator):
        def __init__(self, kernel_size=7) -> None:
            super().__init__()
            self.kernel_size = kernel_size

        def __call__(self, image: np.ndarray, **kwargs) -> np.ndarray:
            kernel_size = self.kernel_size
            angle = int(sps.uniform(-180, 360).rvs(1))
            kernel = np.zeros((kernel_size, kernel_size))
            kernel[int((kernel_size-1)/2), :] = np.ones(kernel_size)
            kernel = cv2.warpAffine(kernel, cv2.getRotationMatrix2D((kernel_size/2 - 0.5, kernel_size/2 - 0.5), angle, 1.0), (kernel_size, kernel_size))
            kernel = kernel / np.sum(kernel)

            output = cv2.filter2D(image, -1, kernel)
            return output

    class Affine(BaseAugmentator):
        def __init__(self) -> None:
            super().__init__()

        def __call__(self, image: np.ndarray, **kwargs) -> np.ndarray:
            rows, cols, channels = image.shape
            rotation_angle = int(sps.uniform(-10, 20).rvs(1))
            translation = [int(sps.uniform(-15, 15).rvs(1)), int(sps.uniform(-15, 15).rvs(1))]
            scale = float(max(sps.norm(1.0, 0.1).rvs(1), 0.2))

            rotation_matrix = cv2.getRotationMatrix2D((cols/2, rows/2), rotation_angle, scale)
            translation_matrix = np.float32([[1, 0, translation[0]], [0, 1, translation[1]]])
            affine_matrix = np.concatenate([rotation_matrix, np.zeros((1, 3))], axis=0)
            affine_matrix = np.concatenate([affine_matrix, translation_matrix], axis=0)
            affine_matrix[2, 2] = 1

            max_dim = int(np.sqrt(rows**2 + cols**2))
            padded_image = np.zeros((max_dim, max_dim, channels), dtype=np.uint8)
            y_start = int((max_dim-rows)/2)
            x_start = int((max_dim-cols)/2)
            padded_image[y_start:y_start+rows, x_start:x_start+cols] = image

            augmented_image = cv2.warpAffine(padded_image, affine_matrix[:2,:], (max_dim, max_dim), flags=cv2.INTER_LINEAR)

            augmented_image = augmented_image[y_start:y_start+rows, x_start:x_start+cols]

            return augmented_image

    class Sharpen(Blur):
        def __init__(self, kernel_size: Optional[Tuple[int, int]] = None, sigma: Optional[float] = None, alpha: Optional[float] = None, beta: Optional[float] = None) -> None:
            super().__init__()
            if kernel_size is None:
                kernel_size = int(sps.uniform(3, 6).rvs(1))
            if sigma is None:
                sigma = np.abs(sps.norm(1.0, 0.4).rvs(1))
            if alpha is None:
                alpha = float(sps.norm(1.5, 0.4).rvs(1))
            if beta is None:
                beta = float(sps.norm(1.0, 0.4).rvs(1))

            self.kernel_size = kernel_size
            self.sigma = sigma
            self.alpha = alpha
            self.beta = beta

        def __call__(self, image: np.ndarray, **kwargs) -> np.ndarray:
            blurred = super().__call__(image, **kwargs)
            laplacian = cv2.Laplacian(blurred, cv2.CV_64F)
            laplacian = np.uint8(255 * np.absolute(laplacian) / np.max(np.absolute(laplacian)))

            sharpened = cv2.addWeighted(blurred, self.alpha, laplacian, self.beta, 0)
            return sharpened

    class RandomBrightnessContrast(BaseAugmentator):
        def __init__(self) -> None:
            super().__init__()

        def __call__(self, image: np.ndarray, **kwargs) -> np.ndarray:
            brightness = np.random.randint(-50, 50)
            contrast = np.random.randint(-50, 50)
            brightness_image = cv2.add(image, brightness)
            contrast_image = cv2.convertScaleAbs(brightness_image, alpha=(1 + contrast / 100), beta=0)
            return contrast_image

    def __init__(self, stages):
        super().__init__()
        for stage in stages:
            assert stage in self.inner_classes_list()
        self.augmentation_pipeline = [getattr(self, stage)() for stage in stages]

    @classmethod
    def inner_classes_list(cls):
        results = []
        for attrname in dir(cls):
            obj = getattr(cls, attrname)
            if isinstance(obj, type) and issubclass(obj, BaseAugmentator):
                results.append(str(obj)[len(f"""<class '__main__.{cls.__name__}."""):-2])
        return results

    def __call__(self, image: np.ndarray, **kwargs) -> np.ndarray:
        for stage in self.augmentation_pipeline:
            image = stage(image)
        return image

# For testing one aug
need_test = False
if need_test:
    stage = 'RandomBrightnessContrast'
    dirname = 'one_augment'
    n_tests = 5
    augmentator = PipelineAugmentor(stages=[stage])
    sample_image = cv2.imread(f'input_data/1.png')

    os.makedirs(dirname, exist_ok=True)
    for i in range(n_tests):
        augmentator.compare_image_vs_processed(sample_image, save_path=f'{dirname}/{stage}.png')

In [27]:
# Final submission

augmentator = PipelineAugmentor(stages=PipelineAugmentor.inner_classes_list())
for i in tqdm(range(1, 11)):
    assert os.path.exists(f'input_data/{i}.png')
    sample_image = cv2.imread(f'input_data/{i}.png')
    for j in range(10):
        output_image = augmentator(sample_image)
        cv2.imwrite(f'output_images/{i}_{j}.png', output_image)

100%|██████████| 10/10 [00:30<00:00,  3.04s/it]
