In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import numpy as np
import cv2
import random
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import tensorflow as tf

class DepthAttenuation:
    def __init__(self, attenuation_rate=(0.0, 3.0), max_attenuation=0.0, p=0.5):
        self.attenuation_rate = attenuation_rate
        self.max_attenuation = max_attenuation
        self.p = p

    def apply(self, img: np.ndarray, scan_mask: np.ndarray = None) -> np.ndarray:
        if random.random() > self.p:
            return img

        img = img.copy()
        attenuation_map = self._generate_attenuation_map(img.shape[0], img.shape[1], scan_mask).astype(img.dtype)

        # Apply the attenuation map to the image
        if img.ndim == 2:
            img = img * attenuation_map
        else:
            img = img * attenuation_map[:, :, None]

        return img

    def _generate_attenuation_map(self, height, width, scan_mask=None):
        x = np.linspace(0, 1, width)
        y = np.linspace(0, 1, height)
        xv, yv = np.meshgrid(x, y)
        distances = np.sqrt((xv - 0.5) ** 2 + yv**2)

        # Randomly choose the attenuation rate within the specified range
        attenuation_rate = random.uniform(*self.attenuation_rate)

        # Apply bounded exponential decay based on the distance
        attenuation_map = self._bounded_exponential_decay(distances, attenuation_rate, self.max_attenuation)

        if scan_mask is not None:
            attenuation_map = attenuation_map * scan_mask

        return attenuation_map

    def _bounded_exponential_decay(self, distances, attenuation_rate, max_attenuation=0):
        intensities = (1 - max_attenuation) * np.exp(-attenuation_rate * distances) + max_attenuation
        return intensities

In [16]:
from typing import Any, Dict, Tuple
import numpy as np
from albumentations.core.transforms_interface import ImageOnlyTransform

class GaussianShadow(ImageOnlyTransform):
    def __init__(
        self,
        strength: float | Tuple[float, float] = (0.25, 0.8),
        sigma_x: float | Tuple[float, float] = (0.01, 0.2),
        sigma_y: float | Tuple[float, float] = (0.01, 0.2),
        p: float = 0.5,
    ) -> None:
        super(GaussianShadow, self).__init__(p=p)
        self.strength = strength
        self.sigma_x = sigma_x
        self.sigma_y = sigma_y

    def apply(self, img: np.ndarray, **params: Any):
        img = img.copy()

        shadow_image = self._generate_shadow_image(
            height=img.shape[0], width=img.shape[1], scan_mask=params["scan_mask"]
        ).astype(img.dtype)

        scan_mask = params["scan_mask"].astype(bool)
        shadow_image = np.where(scan_mask, shadow_image, 1.0)

        if img.ndim == 2:
            img = img * shadow_image
        else:
            shadow_image = np.expand_dims(shadow_image, axis=-1)
            img = img * shadow_image

        return img

    def get_params_dependent_on_data(
        self, params: Dict[str, Any], data: Dict[str, Any]
    ) -> Dict[str, Any]:
        return {"scan_mask": data["scan_mask"]}

    def _generate_shadow_image(
        self,
        height,
        width,
        scan_mask,
    ):
        x = np.arange(0, width)
        y = np.arange(0, height)
        xv, yv = np.meshgrid(x, y)

        mu_x = np.random.choice(x)
        mu_y = np.random.choice(y)

        if isinstance(self.strength, tuple) or isinstance(self.strength, list):
            strength = np.random.uniform(*self.strength)
        else:
            strength = self.strength

        if isinstance(self.sigma_x, tuple) or isinstance(self.sigma_x, list):
            sigma_x = np.random.uniform(*self.sigma_x)
        else:
            sigma_x = self.sigma_x

        if isinstance(self.sigma_y, tuple) or isinstance(self.sigma_y, list):
            sigma_y = np.random.uniform(*self.sigma_y)
        else:
            sigma_y = self.sigma_y

        sigma_x = sigma_x * width
        sigma_y = sigma_y * height

        shadow_image = 1 - strength * np.exp(
            -((xv - mu_x) ** 2 / (2 * sigma_x**2) + (yv - mu_y) ** 2 / (2 * sigma_y**2))
        )
        shadow_image = shadow_image * scan_mask

        return shadow_image


In [17]:
from typing import Any, Dict, Tuple

import numpy as np
from albumentations.core.transforms_interface import ImageOnlyTransform


class HazeArtifact(ImageOnlyTransform):
    def __init__(
        self,
        radius: float | Tuple[float, float] = (0.05, 0.95),
        sigma: float | Tuple[float, float] = (0, 0.1),
        p: float = 0.5,
    ) -> None:
        super(HazeArtifact, self).__init__(p=p)
        self.radius = radius
        self.sigma = sigma

    def apply(self, img: np.ndarray, **params: Any):
        img = img.copy()

        haze = self._generate_haze(width=img.shape[1], height=img.shape[0])
        haze = haze * params["scan_mask"]

        if img.ndim == 2:
            img = img + 0.5 * haze.astype(img.dtype)
        else:
            img = img + 0.5 * haze[:, :, None].astype(img.dtype)
        img = np.clip(img, 0, 1)

        return img

    def get_params_dependent_on_data(
        self, params: Dict[str, Any], data: Dict[str, Any]
    ) -> Dict[str, Any]:
        return {"scan_mask": data["scan_mask"]}

    def _generate_haze(
        self,
        height,
        width,
    ):
        x = np.linspace(0, 1, width)
        y = np.linspace(0, 1, height)
        xv, yv = np.meshgrid(x, y)

        r = np.sqrt((xv - 0.5) ** 2 + (yv - 0) ** 2)

        if isinstance(self.radius, tuple) or isinstance(self.radius, list):
            haze_radius = np.random.uniform(*self.radius)
        else:
            haze_radius = self.radius

        if isinstance(self.sigma, tuple) or isinstance(self.sigma, list):
            haze_sigma = np.random.uniform(*self.sigma)
        else:
            haze_sigma = self.sigma

        haze = np.random.uniform(0, 1, (height, width))
        haze *= np.exp(-((r - haze_radius) ** 2) / (2 * haze_sigma**2))

        return haze

In [18]:
from typing import Any, Dict, Tuple

import numpy as np
from albumentations.core.transforms_interface import ImageOnlyTransform
from skimage.restoration import denoise_bilateral


class SpeckleReduction(ImageOnlyTransform):
    def __init__(
        self,
        sigma_spatial: float | Tuple[float, float] = (0.1, 2.0),
        sigma_color: float | Tuple[float, float] = (0.0, 1.0),
        window_size: int = 5,
        p: float = 0.5,
    ) -> None:
        super(SpeckleReduction, self).__init__(p=p)
        self.sigma_spatial = sigma_spatial
        self.sigma_color = sigma_color
        self.window_size = window_size

    def apply(self, img: np.ndarray, **params: Any):
        img = img.copy()

        if isinstance(self.sigma_spatial, tuple) or isinstance(
            self.sigma_spatial, list
        ):
            sigma_spatial = np.random.uniform(*self.sigma_spatial)
        else:
            sigma_spatial = self.sigma_spatial

        if isinstance(self.sigma_color, tuple) or isinstance(self.sigma_color, list):
            sigma_color = np.random.uniform(*self.sigma_color)
        else:
            sigma_color = self.sigma_color

        channel_axis = -1 if img.ndim == 3 else None
        denoised_img = denoise_bilateral(
            img,
            sigma_color=sigma_color,
            sigma_spatial=sigma_spatial,
            win_size=self.window_size,
            channel_axis=channel_axis,
        )

        if img.ndim == 2:
            # Single-channel image
            img = np.where(params["scan_mask"], denoised_img, img)
        else:
            # Multi-channel image
            img = np.where(params["scan_mask"][:, :, None], denoised_img, img)

        return img

    def get_params_dependent_on_data(
        self, params: Dict[str, Any], data: Dict[str, Any]
    ) -> Dict[str, Any]:
        return {"scan_mask": data["scan_mask"]}

In [19]:
# Khởi tạo augmentation
augmentations = {
    "depth": DepthAttenuation((0.0, 1.0), max_attenuation=0.6, p=1.0),
    "gaussian": GaussianShadow((0.25, 0.8), sigma_x=(0.01, 0.2), sigma_y=(0.01, 0.2), p=1.0),
    "speckle": SpeckleReduction(sigma_spatial=(0.05, 1.0), sigma_color=(0.0, 0.6), window_size=5, p=1.0),
    "haze": HazeArtifact(radius=(0.05, 0.95), sigma=(0, 0.1), p=1.0)
}

In [24]:
import os
import cv2
import numpy as np
from tqdm import tqdm

# Đường dẫn
dataset_path = "/content/drive/MyDrive/OTU-2D-Dataset-main/OTU-2D-Dataset-main/dataset_split/train/images"
scan_mask_path = "/content/drive/MyDrive/OTU-2D-Dataset-main/OTU-2D-Dataset-main/dataset_split/train/scan_mask"
base_path = "/content/drive/MyDrive/OTU-2D-Dataset-main/OTU-2D-Dataset-main/dataset_split/train"

image_files = sorted([f for f in os.listdir(dataset_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))])

for aug_name, aug_transform in augmentations.items():
    print(f"\n🔧 Đang xử lý augmentation: {aug_name.upper()}")
    save_dir = os.path.join(base_path, f"augmented_{aug_name}")
    os.makedirs(save_dir, exist_ok=True)

    count = 0

    for image_name in tqdm(image_files, desc=aug_name):
        base_name = os.path.splitext(image_name)[0]
        image_path = os.path.join(dataset_path, image_name)
        mask_name = f"{base_name}_scan_mask.png"
        mask_path = os.path.join(scan_mask_path, mask_name)

        if not os.path.exists(mask_path):
            print(f"Thiếu mask cho {image_name}")
            continue

        # Đọc ảnh và mask
        image = cv2.imread(image_path, cv2.IMREAD_COLOR)
        if image is None:
            print(f"Không đọc được ảnh: {image_name}")
            continue
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        h, w = image.shape[:2]

        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        if mask is None:
            print(f"Không đọc được mask: {mask_name}")
            continue
        if mask.shape != (h, w):
            print(f"Kích thước không khớp: {image_name} ({h}, {w}) vs {mask.shape}")
            continue

        # Chuẩn hóa và nhị phân hóa mask
        image_norm = image.astype(np.float32) / 255.0
        mask_bin = (mask > 127).astype(np.float32)

        try:
            augmented_image = aug_transform.apply(image_norm, scan_mask=mask_bin)

            # Xử lý NaN hoặc inf nếu có
            if not np.isfinite(augmented_image).all():
                print(f"NaN/inf trong ảnh {image_name} → thay thế bằng 0")
                augmented_image = np.nan_to_num(augmented_image, nan=0.0, posinf=1.0, neginf=0.0)

        except Exception as e:
            print(f"Lỗi áp dụng {aug_name} cho {image_name}: {e}")
            continue

        # Chuyển ảnh về uint8
        image_uint8 = image.astype(np.uint8)
        augmented_uint8 = (np.clip(augmented_image, 0, 1) * 255).astype(np.uint8)

        # Đường dẫn lưu
        save_original = os.path.join(save_dir, f"original_{image_name}")
        save_aug = os.path.join(save_dir, f"{aug_name}_{image_name}")

        cv2.imwrite(save_original, cv2.cvtColor(image_uint8, cv2.COLOR_RGB2BGR))
        cv2.imwrite(save_aug, cv2.cvtColor(augmented_uint8, cv2.COLOR_RGB2BGR))

        count += 2

    print(f"augmented_{aug_name}: {count} ảnh (gồm gốc và augment)")

print("\nHoàn tất toàn bộ quá trình tạo và augment ảnh.")



🔧 Đang xử lý augmentation: DEPTH


depth: 100%|██████████| 1177/1177 [02:24<00:00,  8.12it/s]


augmented_depth: 2354 ảnh (gồm gốc và augment)

🔧 Đang xử lý augmentation: GAUSSIAN


gaussian: 100%|██████████| 1177/1177 [02:19<00:00,  8.45it/s]


augmented_gaussian: 2354 ảnh (gồm gốc và augment)

🔧 Đang xử lý augmentation: SPECKLE


speckle:   3%|▎         | 39/1177 [00:17<10:25,  1.82it/s]

NaN/inf trong ảnh 1040.JPG → thay thế bằng 0


speckle:   4%|▎         | 43/1177 [00:21<14:39,  1.29it/s]

NaN/inf trong ảnh 1047.JPG → thay thế bằng 0


speckle:   7%|▋         | 87/1177 [00:42<09:38,  1.88it/s]

NaN/inf trong ảnh 1097.JPG → thay thế bằng 0


speckle:  15%|█▍        | 175/1177 [01:30<11:35,  1.44it/s]

NaN/inf trong ảnh 1199.JPG → thay thế bằng 0


speckle:  19%|█▉        | 228/1177 [01:55<10:39,  1.48it/s]

NaN/inf trong ảnh 1264.JPG → thay thế bằng 0


speckle:  24%|██▎       | 277/1177 [02:20<04:24,  3.40it/s]

NaN/inf trong ảnh 1319.JPG → thay thế bằng 0


speckle:  24%|██▍       | 288/1177 [02:25<07:18,  2.03it/s]

NaN/inf trong ảnh 1331.JPG → thay thế bằng 0


speckle:  31%|███       | 367/1177 [03:00<08:02,  1.68it/s]

NaN/inf trong ảnh 1420.JPG → thay thế bằng 0


speckle:  41%|████      | 480/1177 [03:54<04:57,  2.34it/s]

NaN/inf trong ảnh 224.JPG → thay thế bằng 0


speckle:  74%|███████▍  | 876/1177 [07:04<02:54,  1.73it/s]

NaN/inf trong ảnh 670.JPG → thay thế bằng 0


speckle:  78%|███████▊  | 919/1177 [07:23<02:06,  2.04it/s]

NaN/inf trong ảnh 718.JPG → thay thế bằng 0


speckle:  86%|████████▌ | 1015/1177 [08:09<02:15,  1.20it/s]

NaN/inf trong ảnh 822.JPG → thay thế bằng 0


speckle: 100%|██████████| 1177/1177 [09:28<00:00,  2.07it/s]


augmented_speckle: 2354 ảnh (gồm gốc và augment)

🔧 Đang xử lý augmentation: HAZE


haze: 100%|██████████| 1177/1177 [02:32<00:00,  7.74it/s]

augmented_haze: 2354 ảnh (gồm gốc và augment)

Hoàn tất toàn bộ quá trình tạo và augment ảnh.



