In [83]:
import os
import pandas as pd
import numpy as np
import cv2
import albumentations as A
from albumentations.pytorch import ToTensorV2
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import random
import torch



In [84]:
# Configurations
SEED = 42  # 랜덤 시드는 42로 고정
BATCH_SIZE = 32
IMG_SIZE = 500
DATA_PATH = '/data/ephemeral/home/1.datasets_fin'
MIXUP_ALPHA = 0.2  # mixup alpha parameter
MIXUP_PROB = 0.3   # probability of applying mixup

# 이미지 경로
train_original = os.path.join(DATA_PATH, 'train')
train_rotate_path = os.path.join(DATA_PATH, '1_train_rotate')



In [85]:

###############################################################
#                        데이터 준비 함수                         #
###############################################################
def create_directories():
    """필요한 디렉토리 생성"""
    train_aug_path = os.path.join(DATA_PATH, '1_train_aug')
    validation_aug_path = os.path.join(DATA_PATH, '1_validation_aug')
    os.makedirs(train_aug_path, exist_ok=True)
    os.makedirs(validation_aug_path, exist_ok=True)
    return train_aug_path, validation_aug_path

def split_data():
    """데이터 분할"""
    train_df = pd.read_csv(os.path.join(DATA_PATH, 'train.csv'))
    train_data, val_data = train_test_split(
        train_df, 
        test_size=0.2, 
        random_state=SEED, 
        stratify=train_df['target']
    )

    train_data.to_csv(os.path.join(DATA_PATH, '1_train_split.csv'), index=False)
    val_data.to_csv(os.path.join(DATA_PATH, '1_val_split.csv'), index=False)

    print("\n--------------------------------")
    print(f"\n데이터 분할 완료!")
    print(f"분할 결과:")
    print(f"학습 데이터 수: {len(train_data)}")
    print(f"검증 데이터 수: {len(val_data)}")

    return train_data, val_data



In [86]:
###############################################################
#                           회전 함수                           #
###############################################################

def is_nearly_horizontal_or_vertical(angle, threshold=0.1):
    """각도가 수평(0도 또는 180도) 또는 수직(90도 또는 270도)에 너무 가까운지 확인"""
    angle = angle % 360    
    return any(abs((angle - target) % 180) <= threshold for target in [0, 90])

def rotate_document(image, angle):
    """이미지를 주어진 각도로 회전"""
    (img_h, img_w) = image.shape[:2]
    angle_rad = np.deg2rad(angle)
    
    # 회전 후 필요한 새 이미지 크기
    cos = abs(np.cos(angle_rad))
    sin = abs(np.sin(angle_rad))
    new_w = int(img_h * sin + img_w * cos)
    new_h = int(img_h * cos + img_w * sin)
    
    # 회전 행렬
    M = cv2.getRotationMatrix2D((img_w / 2, img_h / 2), angle, 1.0)
    
    # 새 캔버스 중심 보정
    M[0, 2] += (new_w - img_w) / 2
    M[1, 2] += (new_h - img_h) / 2
    
    rotated = cv2.warpAffine(
        image, 
        M, 
        (new_w, new_h),
        flags=cv2.INTER_CUBIC,
        borderMode=cv2.BORDER_CONSTANT,
        borderValue=(255, 255, 255)  # 흰색 배경
    )
    
    return rotated

def augment_with_rotation(image):
    """
    문서 이미지 증강:
    1. 원본 이미지의 윤곽선 감지
    2. 문서 방향 감지
    3. 0, 90, 180, 270도 회전 버전 생성
    """
    rotated_images = []
    
    # 원본 이미지 추가
    rotated_images.append(image.copy())
    
    # 전처리
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    gray = cv2.GaussianBlur(gray, (3, 3), 0)
    
    # 이진화
    _, thresh = cv2.threshold(gray, 250, 255, cv2.THRESH_BINARY_INV)
    kernel = np.ones((3, 3), np.uint8)
    thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel, iterations=2)
    
    # 윤곽선 찾기
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if not contours:
        return [image]  # 윤곽선이 없으면 원본만 반환

    # 가장 큰 윤곽선
    largest_contour = max(contours, key=cv2.contourArea)
    
    # 45, 135, 225, 315도 회전 버전 생성
    for angle in [45, 135, 225, 315]:
        rotated = rotate_document(image, angle)
        rotated_images.append(rotated)
    
    return rotated_images

def process_rotations():
    """원본 이미지들의 회전된 버전을 생성하고 저장하는 함수"""
    # train_split.csv 읽기
    train_split = pd.read_csv(os.path.join(DATA_PATH, '1_train_split.csv'))
    train_files = train_split['ID'].tolist()
    class_num = train_split['target'].tolist()
    
    # train_aug 디렉토리 확인/생성
    train_rot_path = os.path.join(DATA_PATH, '1_train_rotate')
    train_rot_csv = os.path.join(DATA_PATH, '1_train_rotate.csv')
    os.makedirs(train_rot_path, exist_ok=True)
    
    print("\n--------------------------------")
    print("\n회전된 이미지 생성 중...")
    rotated_data = []
    for file, class_num in tqdm(zip(train_files, class_num), total=len(train_files)):
        # 원본 이미지 읽기
        input_path = os.path.join(train_original, file)
        img = cv2.imread(input_path)
        
        if img is None:
            print(f"이미지를 읽을 수 없습니다: {input_path}")
            continue
        
        # 회전된 이미지들 생성
        rotated_images = augment_with_rotation(img)
        
        # 회전된 이미지들 저장
        for idx, rotated_img in enumerate(rotated_images):
            # 원본 파일명에서 확장자 분리
            filename, ext = os.path.splitext(file)
            # 새 파일명 생성 (예: image_001_rot_0.jpg, image_001_rot_45.jpg, etc.)
            if idx == 0:
                new_filename = f"{filename}{ext}"
            else:
                angle = (idx - 1) * 90 + 45  # 45, 135, 225, 315
                new_filename = f"{filename}_r_{angle}{ext}"
            
            output_path = os.path.join(train_rot_path, new_filename)
            cv2.imwrite(output_path, rotated_img, [cv2.IMWRITE_JPEG_QUALITY, 70])

            rotated_data.append((new_filename, class_num))

    rotated_df = pd.DataFrame(rotated_data, columns=['ID', 'target'])
    rotated_df.to_csv(train_rot_csv, index=False)

    
    print(f"\n회전된 이미지 생성 완료!")
    print(f"회전 후 이미지 수: {len(rotated_df)}")
    
    


In [87]:
###############################################################
#                           증강 함수                           #
###############################################################
def get_train_augmentation():
    """학습 이미지에 대한 증강 파이프라인 정의"""
    return A.Compose([
        A.Resize(height=IMG_SIZE, width=IMG_SIZE),
        # 노이즈 증강
        A.SomeOf([
            A.GaussNoise(var_limit=(10.0, 50.0), p=1),
            A.ISONoise(color_shift=(0.01, 0.05), intensity=(0.1, 0.5), p=1),
            A.RandomFog(fog_coef_lower=0.1, fog_coef_upper=0.3, p=1),
        ], n=2, p=0.8),
        # 블러 증강
        A.OneOf([
            A.GaussianBlur(blur_limit=(3, 7), p=1),
            A.MotionBlur(blur_limit=3, p=1),
        ], p=1),
        # 색상 증강
        A.OneOf([
            A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=1),
            A.RGBShift(r_shift_limit=20, g_shift_limit=20, b_shift_limit=20, p=1),
        ], p=0.8),
        # 기하학적 증강
        A.OneOf([
            # A.Rotate(limit=45, p=1),
            A.RandomRotate90(p=1),
            A.HorizontalFlip(p=1),
            A.VerticalFlip(p=1),
        ], p=0.5),
        A.OneOf([            
            A.RandomCrop(height=IMG_SIZE, width=IMG_SIZE, p=0.8),
        ], p=0.5),
        # A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        # ToTensorV2(),
    ])

def get_validation_augmentation():
    """검증 이미지에 대한 증강 파이프라인 정의"""
    return A.Compose([
        A.Resize(height=IMG_SIZE, width=IMG_SIZE),
        # 라이트 노이즈 감소
        A.OneOf([
            A.GaussNoise(var_limit=(5.0, 20.0), p=1),
            A.ISONoise(color_shift=(0.01, 0.03), intensity=(0.1, 0.3), p=1),
        ], p=0.5),
        # 라이트 블러 클리어
        A.GaussianBlur(blur_limit=(3, 5), p=0.3),
        # 기본 색상 보정
        A.OneOf([
            A.RandomBrightnessContrast(brightness_limit=0.1, contrast_limit=0.1, p=1),
            A.RandomGamma(gamma_limit=(90, 110), p=1),
        ], p=0.5),
        A.CLAHE(clip_limit=2.0, tile_grid_size=(8,8), p=0.5),
        # A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        # ToTensorV2(),
    ])

# 이미지를 믹스업하는 함수
def mixup_images(image1, image2, alpha=MIXUP_ALPHA):
    """두 이미지를 mixup하는 함수 (NumPy arrays 사용)"""
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1
    
    # NumPy array를 사용한 mixup
    mixed_image = lam * image1 + (1 - lam) * image2
    
    # uint8로 변환
    mixed_image = mixed_image.astype(np.uint8)
    
    return mixed_image






In [88]:
###############################################################
#                        이미지 처리 함수                         #
###############################################################
def batch_train_process(batch_files, train_rotate_path, train_aug_path, transform):
    """학습 이미지 배치 처리"""
    train_results = []
    augmented_images = []

    for file in batch_files:
        input_path = os.path.join(train_rotate_path, file)
        img = cv2.imread(input_path)
        if img is None:
            print(f"이미지를 읽을 수 없습니다. {input_path}")
            train_results.append((file, False, "이미지를 읽을 수 없습니다."))
            continue
        
        transformed = transform(image=img)["image"]

        train_aug_path = os.path.join(train_aug_path, file)# 증강 이미지 저장 경로
        cv2.imwrite(train_aug_path, transformed, 
                    [cv2.IMWRITE_JPEG_QUALITY, 70, 
                     cv2.IMWRITE_PNG_COMPRESSION, 9])
        augmented_images.append(transformed)    # 믹스업을 위해 증강 이미지 저장
        train_results.append((file, True, "성공"))


    return train_results

def batch_val_process(batch_files, train_original, val_aug_dir, transform):
    """검증 이미지 배치 처리"""
    val_results = []
    for file in batch_files:
        input_path = os.path.join(train_original, file)
        img = cv2.imread(input_path)
        if img is None:
            print(f"이미지를 읽을 수 없습니다. {input_path}")
            val_results.append((file, False, "이미지를 읽을 수 없습니다."))
            continue
        transformed = transform(image=img)["image"]
        output_path = os.path.join(val_aug_dir, file)
        cv2.imwrite(output_path, transformed, [cv2.IMWRITE_JPEG_QUALITY, 70, cv2.IMWRITE_PNG_COMPRESSION, 9])
        val_results.append((file, True, "성공"))
    return val_results



In [89]:

###############################################################
#                          메인 함수                            #
###############################################################
def main():


    
    # 데이터 분할 및 저장
    train_data, val_data = split_data()

    # 회전 이미지 생성
    process_rotations()

    # 랜덤 시드 설정
    random.seed(SEED)
    np.random.seed(SEED)
    
    # 트랜스폼한 이디지 저장 디렉토리 생성
    train_aug_path, validation_aug_path = create_directories()
    
    # 작업을 위한 파일 읽기
    train_split = pd.read_csv(os.path.join(DATA_PATH, '1_train_split.csv'))
    val_split = pd.read_csv(os.path.join(DATA_PATH, '1_val_split.csv'))
    train_rotate = pd.read_csv(os.path.join(DATA_PATH, '1_train_rotate.csv'))
    
    # 증강 파이프라인 함수 불러오기
    train_transform = get_train_augmentation()
    val_transform = get_validation_augmentation()
    
    # 학습 이미지 처리 (회전 시킨 train_rotate.csv에서만)
    print("\n--------------------------------")
    print("\n학습 이미지 처리...")
    train_files = train_rotate['ID'].tolist()
    train_batches = [train_files[i:i + BATCH_SIZE] for i in range(0, len(train_files), BATCH_SIZE)]
    
    train_results = []
    for batch in tqdm(train_batches, desc="학습 이미지 처리 중..."):
        results = batch_train_process(batch, train_rotate_path, train_aug_path, transform=train_transform)
        train_results.extend(results)
    
    # 검증 이미지 처리 (val_split.csv에서만)
    print("검증 이미지 처리...")
    val_files = val_split['ID'].tolist()
    val_batches = [val_files[i:i + BATCH_SIZE] for i in range(0, len(val_files), BATCH_SIZE)]
    
    val_results = []
    for batch in tqdm(val_batches, desc="검증 이미지 처리 중..."):
        results = batch_val_process(batch, train_original, validation_aug_path, transform=val_transform)
        val_results.extend(results)
    
    # 결과 계산
    train_success = sum(1 for _, success, _ in train_results if success)
    train_failed = len(train_results) - train_success
    val_success = sum(1 for _, success, _ in val_results if success)
    val_failed = len(val_results) - val_success
    
    # 결과 출력
    print("\n--------------------------------")
    print("\n결과:")
    print(f"학습 이미지: {train_success}개 성공, {train_failed}개 실패")
    print(f"검증 이미지: {val_success}개 성공, {val_failed}개 실패")
    print("\n데이터 준비 완료!")



In [90]:
if __name__ == "__main__":
    main() 


--------------------------------

데이터 분할 완료!
분할 결과:
학습 데이터 수: 1256
검증 데이터 수: 314

--------------------------------

회전된 이미지 생성 중...


  0%|          | 0/1256 [00:00<?, ?it/s]

100%|██████████| 1256/1256 [00:19<00:00, 63.30it/s]
  Expected `dict[str, any]` but got `UniformParams` with value `UniformParams(noise_type=...6, 0.0784313725490196)])` - serialized value may not be as expected
  return self.__pydantic_serializer__.to_python(



회전된 이미지 생성 완료!
회전 후 이미지 수: 6280

--------------------------------

학습 이미지 처리...


학습 이미지 처리 중...: 100%|██████████| 197/197 [03:06<00:00,  1.06it/s]


검증 이미지 처리...


검증 이미지 처리 중...: 100%|██████████| 10/10 [00:02<00:00,  3.39it/s]


--------------------------------

결과:
학습 이미지: 6280개 성공, 0개 실패
검증 이미지: 314개 성공, 0개 실패

데이터 준비 완료!



