In [1]:
import os
import shutil
import random
import numpy as np
from PIL import Image, ImageOps
import torchvision.transforms.functional as TF
from torchvision import datasets
from torch.utils.data import Subset
from sklearn.model_selection import StratifiedShuffleSplit

# 设置随机种子
random.seed(42)
np.random.seed(42)

# 原始train路径
original_train_dir = "train"  # 修改为你的路径
# 增强后的数据保存路径
output_dir = "train_improve-2"

# 如果output_dir已存在，清空重建
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)
os.makedirs(output_dir)

# 加载原始train数据集
train_dataset = datasets.ImageFolder(original_train_dir)
targets = train_dataset.targets
classes = train_dataset.classes

# StratifiedShuffleSplit：只取四分之一
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.9, random_state=42)
for train_idx, _ in sss.split(np.zeros(len(targets)), targets):
    reduced_dataset = Subset(train_dataset, train_idx)

# 数据增强函数
def augment_image(img):
    augmented_images = []
    # 原图
    augmented_images.append(('orig', img))

    # 左右翻转
    augmented_images.append(('flip', ImageOps.mirror(img)))

    # 顺时针90度
    augmented_images.append(('rot90', img.rotate(-90, expand=True)))

    # 逆时针90度
    augmented_images.append(('rot-90', img.rotate(90, expand=True)))

    # 加高斯噪声
    img_np = np.array(img).astype(np.float32)
    noise = np.random.normal(0, 10, img_np.shape).astype(np.float32)
    noisy_img = np.clip(img_np + noise, 0, 255).astype(np.uint8)
    augmented_images.append(('gaussian', Image.fromarray(noisy_img)))
    

    return augmented_images

# 保存图像函数
def save_image(image, save_path):
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    image.save(save_path)

# 执行增强和保存
for idx in train_idx:
    img_path, label = train_dataset.samples[idx]
    class_name = classes[label]
    img = Image.open(img_path).convert("RGB")

    # 获取增强图像
    aug_imgs = augment_image(img)

    base_name = os.path.splitext(os.path.basename(img_path))[0]
    for suffix, aug_img in aug_imgs:
        save_name = f"{base_name}_{suffix}.jpg"
        save_path = os.path.join(output_dir, class_name, save_name)
        save_image(aug_img, save_path)

print("增强完成，图像保存在：", output_dir)


增强完成，图像保存在： train_improve-2


In [2]:
import os
import shutil
import random
import numpy as np
from PIL import Image, ImageOps
import torchvision.transforms.functional as TF
import torchvision.transforms as transforms
from torchvision import datasets
from torch.utils.data import Subset
from sklearn.model_selection import StratifiedShuffleSplit

# 设置随机种子
random.seed(42)
np.random.seed(42)

# 原始train路径
original_train_dir = "train"  # 修改为你的路径
# 增强后的数据保存路径
output_dir = "train_improve-3"

# 如果output_dir已存在，清空重建
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)
os.makedirs(output_dir)

# 加载原始train数据集
train_dataset = datasets.ImageFolder(original_train_dir)
targets = train_dataset.targets
classes = train_dataset.classes

# StratifiedShuffleSplit：只取十分之一
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.9, random_state=42)
for train_idx, _ in sss.split(np.zeros(len(targets)), targets):
    reduced_dataset = Subset(train_dataset, train_idx)

# 数据增强函数
def augment_image(img):
    augmented_images = []
    augmented_images.append(('orig', img))

    # 水平翻转
    augmented_images.append(('flip', ImageOps.mirror(img)))

    # 顺时针90度
    augmented_images.append(('rot90', img.rotate(-90, expand=True)))

    # 逆时针90度
    augmented_images.append(('rot-90', img.rotate(90, expand=True)))

    # 高斯噪声
    img_np = np.array(img).astype(np.float32)
    noise = np.random.normal(0, 10, img_np.shape).astype(np.float32)
    noisy_img = np.clip(img_np + noise, 0, 255).astype(np.uint8)
    augmented_images.append(('gaussian', Image.fromarray(noisy_img)))

    # Color Jitter
    color_jitter = transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4)
    augmented_images.append(('jitter', color_jitter(img)))

    # Random Affine（旋转 ±15°, 平移 ±10%）
    affine = transforms.RandomAffine(degrees=15, translate=(0.1, 0.1))
    augmented_images.append(('affine', affine(img)))

    # Random Crop + Resize（模拟图像局部遮挡）
    resize = transforms.Resize(img.size)
    random_crop = transforms.RandomResizedCrop(img.size[0], scale=(0.8, 1.0))
    cropped = resize(random_crop(img))
    augmented_images.append(('crop', cropped))

    # Random Erasing（遮挡区域）
    to_tensor = transforms.ToTensor()
    to_pil = transforms.ToPILImage()
    erased = transforms.RandomErasing(p=1.0, scale=(0.02, 0.2), ratio=(0.3, 3.3))(to_tensor(img))
    augmented_images.append(('erase', to_pil(erased)))

    return augmented_images

# 保存图像函数
def save_image(image, save_path):
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    image.save(save_path)

# 执行增强和保存
for idx in train_idx:
    img_path, label = train_dataset.samples[idx]
    class_name = classes[label]
    img = Image.open(img_path).convert("RGB")

    # 获取增强图像
    aug_imgs = augment_image(img)

    base_name = os.path.splitext(os.path.basename(img_path))[0]
    for suffix, aug_img in aug_imgs:
        save_name = f"{base_name}_{suffix}.jpg"
        save_path = os.path.join(output_dir, class_name, save_name)
        save_image(aug_img, save_path)

print("增强完成，图像保存在：", output_dir)


增强完成，图像保存在： train_improve-3


In [2]:
import os
import shutil
import random
import numpy as np
from PIL import Image, ImageOps, ImageFilter, ImageEnhance
import torchvision.transforms.functional as TF
import torchvision.transforms as transforms
from torchvision import datasets
from torch.utils.data import Subset
from sklearn.model_selection import StratifiedShuffleSplit
import cv2  # 用于运动模糊

# 设置随机种子
random.seed(42)
np.random.seed(42)

# 原始train路径
original_train_dir = "train"
output_dir = "train_aug_strong"

# 清空并重建输出目录
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)
os.makedirs(output_dir)

# 加载原始数据集
train_dataset = datasets.ImageFolder(original_train_dir)
targets = train_dataset.targets
classes = train_dataset.classes

# 取10%数据
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.9, random_state=42)
for train_idx, _ in sss.split(np.zeros(len(targets)), targets):
    reduced_dataset = Subset(train_dataset, train_idx)

# --- 增强工具函数 ---
def apply_underwater_effect(img):
    """模拟水下环境：蓝绿色调+散射光效"""
    img = np.array(img).astype(np.float32) / 255.0
    
    # 添加蓝绿色偏
    img[:, :, 0] *= 0.8  # 减少红色通道
    img[:, :, 1] *= 1.2  # 增强绿色通道
    img[:, :, 2] *= 1.1  # 增强蓝色通道
    
    # 模拟光散射（径向渐变）
    rows, cols = img.shape[:2]
    center = (cols//2 + random.randint(-50, 50), rows//2 + random.randint(-50, 50))
    radius = max(rows, cols) * random.uniform(0.5, 1.5)
    mask = np.zeros((rows, cols))
    cv2.circle(mask, center, int(radius), 1, -1)
    img += mask[..., None] * 0.3  # 光效增强
    
    return Image.fromarray(np.clip(img*255, 0, 255).astype(np.uint8))

def apply_motion_blur(img):
    """运动模糊效果"""
    img_np = np.array(img)
    
    # 随机运动方向和强度
    size = random.randint(10, 30)
    kernel = np.zeros((size, size))
    if random.random() > 0.5:
        # 水平模糊（模拟水平游动）
        kernel[int((size-1)/2), :] = np.ones(size)
    else:
        # 对角模糊
        np.fill_diagonal(kernel, 1)
    
    kernel /= size
    blurred = cv2.filter2D(img_np, -1, kernel)
    return Image.fromarray(blurred)

def apply_non_rigid_deform(img):
    """非刚性变形（模拟鱼类游动姿态）"""
    img_np = np.array(img)
    rows, cols = img_np.shape[:2]
    
    # 生成随机位移场
    dx = np.random.randn(rows, cols) * 5
    dy = np.random.randn(rows, cols) * 5
    
    # 应用变形
    x, y = np.meshgrid(np.arange(cols), np.arange(rows))
    map_x = np.clip(x + dx, 0, cols-1).astype(np.float32)
    map_y = np.clip(y + dy, 0, rows-1).astype(np.float32)
    
    deformed = cv2.remap(img_np, map_x, map_y, cv2.INTER_LINEAR)
    return Image.fromarray(deformed)

def apply_advanced_noise(img):
    """混合噪声：高斯+泊松"""
    img_np = np.array(img).astype(np.float32)
    
    # 高斯噪声
    gauss_noise = np.random.normal(0, 15, img_np.shape)
    
    # 泊松噪声（模拟传感器噪声）
    poisson_noise = np.random.poisson(img_np * 0.1) / 0.1 - img_np
    
    noisy_img = img_np + gauss_noise + poisson_noise
    return Image.fromarray(np.clip(noisy_img, 0, 255).astype(np.uint8))

# --- 主增强函数 ---
def augment_image(img):
    augmented_images = []
    
    # 原始图像
    augmented_images.append(('orig', img))
    
    # 基础几何增强
    if random.random() > 0.3:
        augmented_images.append(('flip', ImageOps.mirror(img)))
    
    # 水下特效增强
    if random.random() > 0.5:
        augmented_images.append(('underwater', apply_underwater_effect(img)))
    
    # 运动模糊
    if random.random() > 0.6:
        augmented_images.append(('motion', apply_motion_blur(img)))
    
    # 非刚性变形
    if random.random() > 0.7:
        augmented_images.append(('deform', apply_non_rigid_deform(img)))
    
    # 高级噪声
    if random.random() > 0.4:
        augmented_images.append(('noise', apply_advanced_noise(img)))
    
    # 组合增强（随机选择2-3种增强叠加）
    for _ in range(2):
        temp_img = img.copy()
        if random.random() > 0.5:
            temp_img = apply_underwater_effect(temp_img)
        if random.random() > 0.5:
            temp_img = apply_motion_blur(temp_img)
        if random.random() > 0.5:
            temp_img = apply_non_rigid_deform(temp_img)
        augmented_images.append((f'combo_{_}', temp_img))
    
    return augmented_images

# 保存图像
def save_image(image, save_path):
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    image.save(save_path)

# 执行增强
for idx in train_idx:
    img_path, label = train_dataset.samples[idx]
    class_name = classes[label]
    img = Image.open(img_path).convert("RGB")
    
    aug_imgs = augment_image(img)
    base_name = os.path.splitext(os.path.basename(img_path))[0]
    
    for suffix, aug_img in aug_imgs:
        save_name = f"{base_name}_{suffix}.jpg"
        save_path = os.path.join(output_dir, class_name, save_name)
        save_image(aug_img, save_path)

print(f"增强完成！图像保存在: {output_dir}")

增强完成！图像保存在: train_aug_strong
