# OCR Text Detection - High Precision Focus

**목표**: Precision을 86% → 93%+ 로 개선하면서 Recall 유지

## 기존 문제점 분석
- threshold 0.5 → false positive 과다
- morphological cleanup 없음 → 노이즈 검출
- min_area=30 너무 작음
- 경계면 bleed → 비텍스트 영역 검출

## 개선 전략
1. **Focal + Dice + Boundary Loss**: 경계면 false positive에 강한 패널티
2. **Morphological post-processing**: 노이즈 제거 + 마스크 정제
3. **Adaptive threshold**: 고신뢰 영역만 검출
4. **Aspect ratio / area filtering**: 텍스트답지 않은 영역 제거
5. **Multi-scale TTA**: 여러 스케일 예측 교집합으로 precision 향상

In [None]:
# 1. 데이터 다운로드 (Colab 환경)
!wget -O data.tar.gz "https://aistages-api-public-prod.s3.amazonaws.com/app/Competitions/000377/data/data.tar.gz"
!tar -xzf data.tar.gz
!pip install -q segmentation-models-pytorch albumentations opencv-python-headless

In [None]:
import os
import json
import cv2
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.amp import autocast, GradScaler
import albumentations as A
from albumentations.pytorch import ToTensorV2
import segmentation_models_pytorch as smp
from tqdm import tqdm
import gc

# ========================================
# GPU 최적화
# ========================================
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
torch.backends.cudnn.benchmark = True

# ========================================
# 경로 설정
# ========================================
BASE_PATH = './data/datasets'
TRAIN_IMG_DIR = os.path.join(BASE_PATH, 'images/train')
VAL_IMG_DIR = os.path.join(BASE_PATH, 'images/val')
TEST_IMG_DIR = os.path.join(BASE_PATH, 'images/test')
TRAIN_JSON = os.path.join(BASE_PATH, 'jsons/train.json')
VAL_JSON = os.path.join(BASE_PATH, 'jsons/val.json')
TEST_JSON = os.path.join(BASE_PATH, 'jsons/test.json')
SAMPLE_SUB = os.path.join(BASE_PATH, 'sample_submission.csv')

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {DEVICE}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")

# ========================================
# 하이퍼파라미터 (Precision 최적화)
# ========================================
RESIZE_TARGET = 1024
BATCH_SIZE = 4
ACCUMULATION_STEPS = 8   # 실효 배치 32
EPOCHS = 40
LEARNING_RATE = 5e-4
WARMUP_EPOCHS = 3

In [None]:
# ========================================
# 2. 데이터셋 클래스
# ========================================
class ReceiptDataset(Dataset):
    def __init__(self, img_dir, json_path, transform=None, is_test=False):
        self.img_dir = img_dir
        self.transform = transform
        self.is_test = is_test
        with open(json_path, 'r', encoding='utf-8') as f:
            self.data = json.load(f)['images']
        self.image_names = list(self.data.keys())

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, idx):
        img_name = self.image_names[idx]
        image = cv2.imread(os.path.join(self.img_dir, img_name))
        if image is None:
            return self.__getitem__((idx + 1) % len(self))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        h, w = image.shape[:2]

        if not self.is_test:
            # 텍스트 영역 마스크 생성
            mask = np.zeros((h, w), dtype=np.float32)
            words = self.data[img_name].get('words', {})
            for word_id in words:
                pts = np.array(words[word_id]['points'], dtype=np.int32)
                cv2.fillPoly(mask, [pts], 1)

            # 경계선(boundary) 마스크 생성 - precision 개선 핵심
            kernel = np.ones((3, 3), np.uint8)
            dilated = cv2.dilate(mask, kernel, iterations=2)
            eroded = cv2.erode(mask, kernel, iterations=2)
            boundary = dilated - eroded

            if self.transform:
                augmented = self.transform(image=image, masks=[mask, boundary])
                img_t = augmented['image']
                mask_t = augmented['masks'][0]
                boundary_t = augmented['masks'][1]
                return img_t, mask_t, boundary_t
        else:
            if self.transform:
                augmented = self.transform(image=image)
                return augmented['image'], img_name, (h, w)
        return image

In [None]:
# ========================================
# 3. 증강 파이프라인 (과도한 증강 제거 - Precision 보호)
# ========================================
train_transform = A.Compose([
    A.Resize(RESIZE_TARGET, RESIZE_TARGET),
    # 기하학적 증강 (적당히)
    A.HorizontalFlip(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.1, rotate_limit=5, p=0.4,
                       border_mode=cv2.BORDER_CONSTANT, value=0),
    # 색상/밝기 증강 (영수증 다양성 대응)
    A.OneOf([
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=1.0),
        A.CLAHE(clip_limit=2.0, p=1.0),
    ], p=0.4),
    A.OneOf([
        A.GaussianBlur(blur_limit=3, p=1.0),
        A.MedianBlur(blur_limit=3, p=1.0),
    ], p=0.2),
    A.Normalize(),
    ToTensorV2()
])

val_transform = A.Compose([
    A.Resize(RESIZE_TARGET, RESIZE_TARGET),
    A.Normalize(),
    ToTensorV2()
])

In [None]:
# ========================================
# 4. Precision 중심 손실 함수
# ========================================

class FocalLoss(nn.Module):
    """Focal Loss: false positive에 강한 패널티"""
    def __init__(self, alpha=0.75, gamma=2.0):
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, pred, target):
        pred_sig = torch.sigmoid(pred)
        # alpha를 높여서 false positive(배경을 텍스트로 예측)에 더 큰 패널티
        alpha_t = self.alpha * target + (1 - self.alpha) * (1 - target)
        ce = F.binary_cross_entropy_with_logits(pred, target, reduction='none')
        p_t = pred_sig * target + (1 - pred_sig) * (1 - target)
        focal_weight = alpha_t * (1 - p_t) ** self.gamma
        return (focal_weight * ce).mean()


class BoundaryLoss(nn.Module):
    """경계면에 집중하는 Loss - 마스크 경계를 선명하게"""
    def __init__(self):
        super().__init__()

    def forward(self, pred, boundary_mask):
        bce = F.binary_cross_entropy_with_logits(pred, boundary_mask, reduction='none')
        weight = 1.0 + 4.0 * boundary_mask
        return (weight * bce).mean()


class PrecisionFocusedLoss(nn.Module):
    """
    Precision 개선 복합 손실 함수
    = Focal Loss (FP 억제) + Dice Loss (영역 정합) + Boundary Loss (경계 선명도)
    """
    def __init__(self):
        super().__init__()
        self.focal = FocalLoss(alpha=0.75, gamma=2.0)
        self.dice = smp.losses.DiceLoss(mode='binary')
        self.boundary = BoundaryLoss()

    def forward(self, pred, mask, boundary_mask):
        mask = mask.unsqueeze(1) if mask.dim() == 3 else mask
        boundary_mask = boundary_mask.unsqueeze(1) if boundary_mask.dim() == 3 else boundary_mask

        l_focal = self.focal(pred, mask)
        l_dice = self.dice(pred, mask)
        l_boundary = self.boundary(pred, boundary_mask)

        # Focal 비중을 높여 false positive 억제
        return 0.4 * l_focal + 0.4 * l_dice + 0.2 * l_boundary

In [None]:
# ========================================
# 5. 후처리 파이프라인 (Precision 핵심)
# ========================================

def refine_mask(mask_prob, threshold=0.65):
    """
    마스크 정제: threshold → morphological cleanup → 노이즈 제거
    threshold를 0.5 → 0.65로 올려 고신뢰 영역만 남김
    """
    binary = (mask_prob > threshold).astype(np.uint8)

    # Step 1: morphological opening (침식→팽창) - 작은 노이즈 제거
    kernel_small = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
    binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel_small, iterations=1)

    # Step 2: morphological closing (팽창→침식) - 작은 구멍 채움
    kernel_close = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 3))
    binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel_close, iterations=1)

    return binary


def mask_to_polygons_precise(mask, orig_h, orig_w, min_area=80):
    """
    Precision 최적화 폴리곤 추출
    - min_area 상향 (30→80): 작은 노이즈 제거
    - aspect ratio 필터: 텍스트답지 않은 정사각/세로 영역 제거
    - minAreaRect 기반 4점 폴리곤: 깔끔한 사각형 검출
    """
    polygons = []
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    for cnt in contours:
        area = cv2.contourArea(cnt)
        if area < min_area:
            continue

        # 최소 외접 회전 사각형
        rect = cv2.minAreaRect(cnt)
        box_w, box_h = rect[1]
        if box_w == 0 or box_h == 0:
            continue

        # 너무 작은 영역 필터링 (원본 해상도 기준)
        if min(box_w, box_h) < 4:
            continue

        # 폴리곤 근사: epsilon을 적당히 설정
        epsilon = 0.01 * cv2.arcLength(cnt, True)
        approx = cv2.approxPolyDP(cnt, epsilon, True)

        if len(approx) >= 4:
            points = approx.reshape(-1, 2).tolist()
            polygons.append(points)
        elif len(cnt) >= 4:
            # 컨투어가 단순해도 minAreaRect로 4점 생성
            box = cv2.boxPoints(rect)
            box = np.int32(box)
            polygons.append(box.tolist())

    return polygons


def polygons_to_string(polygons):
    if not polygons:
        return ""
    parts = []
    for poly in polygons:
        coords = " ".join([f"{int(p[0])} {int(p[1])}" for p in poly])
        parts.append(coords)
    return "|".join(parts)

In [None]:
# ========================================
# 6. 학습 함수
# ========================================

def train():
    torch.cuda.empty_cache()
    gc.collect()

    # 데이터 로더
    train_ds = ReceiptDataset(TRAIN_IMG_DIR, TRAIN_JSON, transform=train_transform)
    val_ds = ReceiptDataset(VAL_IMG_DIR, VAL_JSON, transform=val_transform)

    train_loader = DataLoader(
        train_ds, batch_size=BATCH_SIZE, shuffle=True,
        num_workers=4, pin_memory=True, drop_last=True, persistent_workers=True
    )
    val_loader = DataLoader(
        val_ds, batch_size=BATCH_SIZE, shuffle=False,
        num_workers=4, pin_memory=True, persistent_workers=True
    )

    # 모델: EfficientNet-b4 (b3보다 강력, 메모리 허용범위)
    model = smp.UnetPlusPlus(
        encoder_name="efficientnet-b4",
        encoder_weights="imagenet",
        in_channels=3,
        classes=1
    ).to(DEVICE)
    model = model.to(memory_format=torch.channels_last)

    # Optimizer: AdamW + OneCycleLR (warmup 포함)
    optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-2)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(
        optimizer,
        max_lr=LEARNING_RATE,
        epochs=EPOCHS,
        steps_per_epoch=len(train_loader) // ACCUMULATION_STEPS,
        pct_start=WARMUP_EPOCHS / EPOCHS,
        anneal_strategy='cos'
    )

    criterion = PrecisionFocusedLoss()
    scaler = GradScaler('cuda')

    best_val_loss = float('inf')
    patience = 0
    max_patience = 8

    print(f"=== Training Start ===")
    print(f"    Model: UNet++ / EfficientNet-b4")
    print(f"    Resolution: {RESIZE_TARGET}x{RESIZE_TARGET}")
    print(f"    Effective Batch: {BATCH_SIZE * ACCUMULATION_STEPS}")
    print(f"    Epochs: {EPOCHS}, LR: {LEARNING_RATE}")
    print(f"    Loss: Focal(0.4) + Dice(0.4) + Boundary(0.2)")

    for epoch in range(1, EPOCHS + 1):
        model.train()
        train_loss = 0
        optimizer.zero_grad(set_to_none=True)

        pbar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch}/{EPOCHS}")
        for step, (images, masks, boundaries) in pbar:
            images = images.to(DEVICE, non_blocking=True, memory_format=torch.channels_last)
            masks = masks.to(DEVICE, non_blocking=True)
            boundaries = boundaries.to(DEVICE, non_blocking=True)

            with autocast('cuda', dtype=torch.bfloat16):
                outputs = model(images)
                loss = criterion(outputs, masks, boundaries) / ACCUMULATION_STEPS

            scaler.scale(loss).backward()

            if (step + 1) % ACCUMULATION_STEPS == 0:
                scaler.unscale_(optimizer)
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad(set_to_none=True)
                scheduler.step()

            train_loss += loss.item() * ACCUMULATION_STEPS
            pbar.set_postfix({'loss': f'{loss.item() * ACCUMULATION_STEPS:.4f}'})

        torch.cuda.empty_cache()

        # Validation
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for images, masks, boundaries in val_loader:
                images = images.to(DEVICE, non_blocking=True, memory_format=torch.channels_last)
                masks = masks.to(DEVICE, non_blocking=True)
                boundaries = boundaries.to(DEVICE, non_blocking=True)
                with autocast('cuda', dtype=torch.bfloat16):
                    outputs = model(images)
                    loss = criterion(outputs, masks, boundaries)
                val_loss += loss.item()

        avg_train = train_loss / len(train_loader)
        avg_val = val_loss / len(val_loader)
        lr = optimizer.param_groups[0]['lr']

        print(f"Epoch {epoch}: Train={avg_train:.4f}, Val={avg_val:.4f}, LR={lr:.6f}")

        if avg_val < best_val_loss:
            best_val_loss = avg_val
            torch.save(model.state_dict(), 'best_precision_model.pth')
            print(f"  -> Best model saved! (val_loss={avg_val:.4f})")
            patience = 0
        else:
            patience += 1
            if patience >= max_patience:
                print(f"  -> Early stopping at epoch {epoch}")
                break

        mem = torch.cuda.max_memory_allocated() / 1e9
        print(f"  -> Peak Memory: {mem:.1f} GB")
        torch.cuda.reset_peak_memory_stats()

    print(f"Training complete. Best val_loss={best_val_loss:.4f}")
    return model

model = train()

In [None]:
# ========================================
# 7. Multi-Scale TTA 추론 (Precision 극대화)
# ========================================

def inference_multiscale_tta():
    print("=== High-Precision Inference (Multi-Scale TTA) ===")

    # 모델 로드
    model = smp.UnetPlusPlus(
        encoder_name="efficientnet-b4",
        encoder_weights=None,
        in_channels=3,
        classes=1
    ).to(DEVICE)
    model.load_state_dict(torch.load('best_precision_model.pth', map_location=DEVICE))
    model = model.to(memory_format=torch.channels_last)
    model.eval()

    # 테스트 데이터 로드 (원본 이미지 직접 읽기)
    with open(TEST_JSON, 'r', encoding='utf-8') as f:
        test_data = json.load(f)['images']
    test_names = list(test_data.keys())

    sample_df = pd.read_csv(SAMPLE_SUB)
    predictions = {}

    # Multi-Scale 설정: 여러 해상도에서 예측 후 평균
    # → 여러 스케일에서 모두 높은 확률인 영역만 남음 → Precision 향상
    scales = [896, 1024, 1152]

    normalize = A.Normalize()

    with torch.no_grad():
        for img_name in tqdm(test_names, desc="Inference"):
            image = cv2.imread(os.path.join(TEST_IMG_DIR, img_name))
            if image is None:
                predictions[img_name] = ""
                continue
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            orig_h, orig_w = image.shape[:2]

            accumulated_mask = np.zeros((orig_h, orig_w), dtype=np.float32)
            n_preds = 0

            for scale in scales:
                # 리사이즈 + 정규화
                resized = cv2.resize(image, (scale, scale))
                normed = normalize(image=resized)['image']
                tensor = torch.from_numpy(normed.transpose(2, 0, 1)).float().unsqueeze(0)
                tensor = tensor.to(DEVICE, memory_format=torch.channels_last)

                with autocast('cuda', dtype=torch.bfloat16):
                    # 원본 예측
                    p1 = torch.sigmoid(model(tensor))
                    # 좌우반전 TTA
                    p2 = torch.sigmoid(model(torch.flip(tensor, dims=[3])))
                    p2 = torch.flip(p2, dims=[3])

                avg = ((p1 + p2) / 2).float().cpu().numpy()[0, 0]
                # 원본 해상도로 복원
                avg_orig = cv2.resize(avg, (orig_w, orig_h), interpolation=cv2.INTER_LINEAR)
                accumulated_mask += avg_orig
                n_preds += 1

            # 스케일 평균
            final_mask = accumulated_mask / n_preds

            # 후처리 (Precision 핵심)
            refined = refine_mask(final_mask, threshold=0.65)
            polygons = mask_to_polygons_precise(refined, orig_h, orig_w, min_area=80)
            predictions[img_name] = polygons_to_string(polygons)

    # 제출 파일 생성
    sample_df['polygons'] = sample_df['filename'].map(predictions).fillna("")
    sample_df.to_csv('submission_high_precision.csv', index=False)

    # 통계
    poly_counts = sample_df['polygons'].apply(lambda x: len(x.split('|')) if x else 0)
    print(f"\n=== Results ===")
    print(f"    Total images: {len(sample_df)}")
    print(f"    Avg polygons/image: {poly_counts.mean():.1f}")
    print(f"    Min polygons: {poly_counts.min()}, Max: {poly_counts.max()}")
    print(f"    Saved: submission_high_precision.csv")

inference_multiscale_tta()

In [None]:
# ========================================
# 8. Threshold 탐색 (최적 Precision-Recall 밸런스 찾기)
# ========================================

def search_optimal_threshold():
    """
    Validation 셋에서 threshold별 검출 수를 비교하여
    최적의 threshold를 탐색합니다.
    """
    print("=== Threshold Search on Validation ===")

    model = smp.UnetPlusPlus(
        encoder_name="efficientnet-b4",
        encoder_weights=None,
        in_channels=3,
        classes=1
    ).to(DEVICE)
    model.load_state_dict(torch.load('best_precision_model.pth', map_location=DEVICE))
    model = model.to(memory_format=torch.channels_last)
    model.eval()

    # Validation GT 로드
    with open(VAL_JSON, 'r', encoding='utf-8') as f:
        val_data = json.load(f)['images']

    normalize = A.Normalize()
    thresholds = [0.50, 0.55, 0.60, 0.65, 0.70, 0.75]

    # 샘플 50개로 빠르게 탐색
    sample_names = list(val_data.keys())[:50]

    for thresh in thresholds:
        total_gt = 0
        total_pred = 0

        for img_name in sample_names:
            gt_words = val_data[img_name].get('words', {})
            total_gt += len(gt_words)

            image = cv2.imread(os.path.join(VAL_IMG_DIR, img_name))
            if image is None:
                continue
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            orig_h, orig_w = image.shape[:2]

            resized = cv2.resize(image, (1024, 1024))
            normed = normalize(image=resized)['image']
            tensor = torch.from_numpy(normed.transpose(2, 0, 1)).float().unsqueeze(0)
            tensor = tensor.to(DEVICE, memory_format=torch.channels_last)

            with torch.no_grad(), autocast('cuda', dtype=torch.bfloat16):
                pred = torch.sigmoid(model(tensor)).float().cpu().numpy()[0, 0]

            pred_resized = cv2.resize(pred, (orig_w, orig_h))
            refined = refine_mask(pred_resized, threshold=thresh)
            polygons = mask_to_polygons_precise(refined, orig_h, orig_w, min_area=80)
            total_pred += len(polygons)

        ratio = total_pred / total_gt if total_gt > 0 else 0
        print(f"  Threshold={thresh:.2f}: GT={total_gt}, Pred={total_pred}, Pred/GT={ratio:.3f}")
        # Pred/GT가 1.0에 가까울수록 좋음 (>1.0은 false positive 많음)

search_optimal_threshold()

In [None]:
# ========================================
# 9. (선택) 최적 threshold로 재추론
# ========================================
# 위 탐색 결과에서 Pred/GT가 0.95~1.05 사이인 threshold를 선택하세요.
# 예: threshold=0.65가 최적이면 그대로 유지, 0.70이 더 좋으면 아래 수정
#
# OPTIMAL_THRESHOLD = 0.65  # 탐색 결과에 따라 조정
# refine_mask() 함수의 threshold 파라미터를 변경 후
# inference_multiscale_tta() 를 다시 실행하면 됩니다.