In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# 1. 기존 리포지토리 폴더로 이동
import os
os.chdir('/content/drive/MyDrive/Codeit_AI_4th_Drug_image_CV_project')

In [12]:
"""
어노테이션 전처리 MVP (3단계) - COCO 포맷 입력 버전
코랩 환경에서 실행 가능한 기초적인 어노테이션 전처리 코드

3단계: 어노테이션 전처리 (원본: COCO 포맷)
- COCO 포맷 JSON 파일 읽기
- 좌표 시스템 정규화 (절대 → 상대 좌표)
- 라벨 인코딩 (category_id 매핑)
- 어노테이션 포맷 변환 (COCO → YOLO, Pascal VOC)
- 계층적 라벨링 파싱
- 데이터 검증 및 통계
"""

import os
import json
import glob
import csv
from collections import defaultdict, Counter
import pandas as pd

def parse_pill_code(filename):
    """
    파일명에서 알약 코드 추출 및 파싱

    Args:
        filename: 파일명 (예: K-003544-010221-016551-027926_0_2_0_2_70_000_200.png)

    Returns:
        dict: 파싱된 알약 코드 정보
    """
    try:
        # 확장자 제거
        name_without_ext = filename.replace('.png', '').replace('.json', '')

        # '_' 기준으로 분할 (첫 번째 부분이 알약 코드)
        parts = name_without_ext.split('_')
        pill_code_part = parts[0]

        # '-' 기준으로 알약 코드 분할
        code_parts = pill_code_part.split('-')

        if len(code_parts) >= 4:
            return {
                'full_code': pill_code_part,
                'prefix': code_parts[0],  # K
                'code1': code_parts[1],   # 003544
                'code2': code_parts[2],   # 010221
                'code3': code_parts[3],   # 016551
                'code4': code_parts[4] if len(code_parts) > 4 else None,  # 027926
                'is_valid': True
            }
        else:
            return {
                'full_code': pill_code_part,
                'prefix': None,
                'code1': None,
                'code2': None,
                'code3': None,
                'code4': None,
                'is_valid': False
            }

    except Exception as e:
        return {
            'full_code': filename,
            'prefix': None,
            'code1': None,
            'code2': None,
            'code3': None,
            'code4': None,
            'is_valid': False,
            'error': str(e)
        }

def read_coco_annotation(annotation_path):
    """
    COCO 포맷 JSON 파일 읽기

    Args:
        annotation_path: COCO JSON 파일 경로

    Returns:
        dict: COCO 데이터 또는 에러 정보
    """
    try:
        with open(annotation_path, 'r', encoding='utf-8') as f:
            coco_data = json.load(f)

        # COCO 포맷 검증
        required_keys = ['images', 'annotations', 'categories']
        missing_keys = [key for key in required_keys if key not in coco_data]

        if missing_keys:
            return {
                'status': 'error',
                'error': f'COCO 포맷 키 누락: {missing_keys}',
                'data': None
            }

        return {
            'status': 'success',
            'data': coco_data,
            'num_images': len(coco_data['images']),
            'num_annotations': len(coco_data['annotations']),
            'num_categories': len(coco_data['categories'])
        }

    except json.JSONDecodeError as e:
        return {
            'status': 'error',
            'error': f'JSON 파싱 오류: {str(e)}',
            'data': None
        }
    except Exception as e:
        return {
            'status': 'error',
            'error': f'파일 읽기 오류: {str(e)}',
            'data': None
        }

def create_category_mapping_from_coco(coco_data):
    """
    COCO 데이터에서 카테고리 매핑 생성

    Args:
        coco_data: COCO 포맷 데이터

    Returns:
        dict: 카테고리 매핑 정보
    """
    print("🏷️ COCO 카테고리 매핑 생성 중...")

    categories = coco_data['categories']

    # 카테고리 ID → 이름 매핑
    id_to_name = {cat['id']: cat['name'] for cat in categories}
    name_to_id = {cat['name']: cat['id'] for cat in categories}

    # 알약 코드 기반 새로운 매핑 생성 (파일명 기반)
    pill_codes = set()

    # 이미지 파일명에서 알약 코드 추출
    for image_info in coco_data['images']:
        filename = image_info['file_name']
        parsed = parse_pill_code(filename)
        if parsed['is_valid']:
            pill_codes.add(parsed['full_code'])

    # 정렬된 알약 코드 리스트
    sorted_pill_codes = sorted(list(pill_codes))

    # 새로운 ID 매핑 (0부터 시작)
    new_code_to_id = {code: idx for idx, code in enumerate(sorted_pill_codes)}
    new_id_to_code = {idx: code for code, idx in new_code_to_id.items()}

    mapping_info = {
        'original_coco': {
            'id_to_name': id_to_name,
            'name_to_id': name_to_id,
            'num_categories': len(categories)
        },
        'pill_code_mapping': {
            'code_to_id': new_code_to_id,
            'id_to_code': new_id_to_code,
            'sorted_codes': sorted_pill_codes,
            'total_classes': len(sorted_pill_codes)
        }
    }

    print(f"✅ COCO 원본 카테고리: {len(categories)}개")
    print(f"✅ 파일명 기반 알약 코드: {len(sorted_pill_codes)}개")

    return mapping_info

def convert_bbox_format(bbox, from_format, to_format, image_width=None, image_height=None):
    """
    바운딩 박스 포맷 변환

    Args:
        bbox: 바운딩 박스 좌표
        from_format: 입력 포맷 ('coco', 'yolo', 'pascal')
        to_format: 출력 포맷 ('coco', 'yolo', 'pascal', 'relative')
        image_width: 이미지 너비
        image_height: 이미지 높이

    Returns:
        list: 변환된 바운딩 박스
    """
    # COCO 포맷을 기준으로 정규화
    if from_format == 'coco':
        # COCO: [x, y, width, height] (좌상단 기준)
        x, y, w, h = bbox
    elif from_format == 'pascal' or from_format == 'pascal_voc':
        # Pascal VOC: [x_min, y_min, x_max, y_max]
        x_min, y_min, x_max, y_max = bbox
        x, y = x_min, y_min
        w, h = x_max - x_min, y_max - y_min
    elif from_format == 'yolo':
        # YOLO: [center_x_rel, center_y_rel, width_rel, height_rel] (상대 좌표)
        if image_width is None or image_height is None:
            raise ValueError("YOLO 포맷 변환에는 이미지 크기가 필요합니다")
        cx_rel, cy_rel, w_rel, h_rel = bbox
        w = w_rel * image_width
        h = h_rel * image_height
        x = (cx_rel * image_width) - w/2
        y = (cy_rel * image_height) - h/2
    else:
        raise ValueError(f"지원하지 않는 입력 포맷: {from_format}")

    # 목표 포맷으로 변환
    if to_format == 'coco':
        return [x, y, w, h]
    elif to_format == 'pascal':
        return [x, y, x + w, y + h]
    elif to_format == 'yolo':
        if image_width is None or image_height is None:
            raise ValueError("YOLO 포맷 변환에는 이미지 크기가 필요합니다")
        center_x = (x + w/2) / image_width
        center_y = (y + h/2) / image_height
        w_rel = w / image_width
        h_rel = h / image_height
        return [center_x, center_y, w_rel, h_rel]
    elif to_format == 'relative':
        if image_width is None or image_height is None:
            raise ValueError("상대 좌표 변환에는 이미지 크기가 필요합니다")
        return [x/image_width, y/image_height, w/image_width, h/image_height]
    else:
        raise ValueError(f"지원하지 않는 출력 포맷: {to_format}")

def process_coco_annotations(coco_data, category_mapping):
    """
    COCO 어노테이션 데이터 처리

    Args:
        coco_data: COCO 포맷 데이터
        category_mapping: 카테고리 매핑 정보

    Returns:
        list: 처리된 어노테이션 리스트
    """
    print("🔄 COCO 어노테이션 처리 중...")

    # 이미지 정보를 ID로 인덱싱
    images_by_id = {img['id']: img for img in coco_data['images']}

    # 이미지별로 어노테이션 그룹화
    annotations_by_image = defaultdict(list)
    for ann in coco_data['annotations']:
        annotations_by_image[ann['image_id']].append(ann)

    processed_data = []

    for image_id, image_info in images_by_id.items():
        filename = image_info['file_name']
        image_width = image_info['width']
        image_height = image_info['height']

        # 파일명에서 알약 코드 추출
        parsed_code = parse_pill_code(filename)

        # 해당 이미지의 모든 어노테이션 처리
        objects = []
        for ann in annotations_by_image[image_id]:
            # 원본 COCO bbox
            coco_bbox = ann['bbox']  # [x, y, width, height]

            # 다양한 포맷으로 변환
            bbox_formats = {
                'coco': coco_bbox,
                'pascal_voc': convert_bbox_format(coco_bbox, 'coco', 'pascal'),
                'yolo': convert_bbox_format(coco_bbox, 'coco', 'yolo', image_width, image_height),
                'relative': convert_bbox_format(coco_bbox, 'coco', 'relative', image_width, image_height)
            }

            # 클래스 정보
            original_category_id = ann['category_id']
            new_class_id = category_mapping['pill_code_mapping']['code_to_id'].get(
                parsed_code['full_code'], -1
            )

            class_info = {
                'original_category_id': original_category_id,
                'new_class_id': new_class_id,
                'pill_code': parsed_code['full_code'],
                'hierarchical': {
                    'prefix': parsed_code['prefix'],
                    'code1': parsed_code['code1'],
                    'code2': parsed_code['code2'],
                    'code3': parsed_code['code3']
                }
            }

            # 객체 정보
            obj_info = {
                'annotation_id': ann['id'],
                'bbox_formats': bbox_formats,
                'class_info': class_info,
                'area': ann.get('area', coco_bbox[2] * coco_bbox[3]),
                'area_relative': bbox_formats['relative'][2] * bbox_formats['relative'][3],
                'iscrowd': ann.get('iscrowd', 0)
            }

            objects.append(obj_info)

        # 이미지별 정보
        image_data = {
            'image_id': image_id,
            'filename': filename,
            'image_size': (image_width, image_height),
            'pill_code': parsed_code['full_code'],
            'parsed_code': parsed_code,
            'objects': objects,
            'num_objects': len(objects)
        }

        processed_data.append(image_data)

    print(f"✅ {len(processed_data)}개 이미지의 어노테이션 처리 완료")
    return processed_data

def export_to_yolo_format(processed_data, output_dir, category_mapping):
    """
    YOLO 포맷으로 어노테이션 내보내기

    Args:
        processed_data: 처리된 어노테이션 데이터
        output_dir: 출력 디렉토리
        category_mapping: 카테고리 매핑 정보
    """
    print("📝 YOLO 포맷으로 내보내기...")

    os.makedirs(output_dir, exist_ok=True)

    for image_data in processed_data:
        if image_data['objects']:
            # 파일명 생성 (.txt)
            base_name = image_data['filename'].replace('.png', '').replace('.jpg', '')
            output_file = os.path.join(output_dir, f"{base_name}.txt")

            with open(output_file, 'w') as f:
                for obj in image_data['objects']:
                    class_id = obj['class_info']['new_class_id']
                    if class_id >= 0:  # 유효한 클래스 ID만
                        yolo_bbox = obj['bbox_formats']['yolo']

                        # YOLO 포맷: class_id center_x center_y width height
                        line = f"{class_id} {yolo_bbox[0]:.6f} {yolo_bbox[1]:.6f} {yolo_bbox[2]:.6f} {yolo_bbox[3]:.6f}\n"
                        f.write(line)

    # 클래스 이름 파일 생성
    classes_file = os.path.join(output_dir, 'classes.txt')
    with open(classes_file, 'w', encoding='utf-8') as f:
        pill_mapping = category_mapping['pill_code_mapping']
        for class_id in range(pill_mapping['total_classes']):
            class_name = pill_mapping['id_to_code'][str(class_id)]
            f.write(f"{class_name}\n")

    print(f"✅ YOLO 포맷 파일들이 {output_dir}에 저장되었습니다")
    print(f"✅ 클래스 파일: {classes_file}")

def export_to_pascal_voc_format(processed_data, output_dir):
    """
    Pascal VOC XML 포맷으로 내보내기 (간단 버전)
    """
    print("📝 Pascal VOC 포맷으로 내보내기...")

    os.makedirs(output_dir, exist_ok=True)

    # CSV 형태로 저장 (XML 대신 간단한 형태)
    annotations_list = []

    for image_data in processed_data:
        for obj in image_data['objects']:
            if obj['class_info']['new_class_id'] >= 0:
                pascal_bbox = obj['bbox_formats']['pascal_voc']

                annotations_list.append({
                    'filename': image_data['filename'],
                    'width': image_data['image_size'][0],
                    'height': image_data['image_size'][1],
                    'class': obj['class_info']['pill_code'],
                    'xmin': int(pascal_bbox[0]),
                    'ymin': int(pascal_bbox[1]),
                    'xmax': int(pascal_bbox[2]),
                    'ymax': int(pascal_bbox[3])
                })

    # CSV로 저장
    df = pd.DataFrame(annotations_list)
    csv_file = os.path.join(output_dir, 'annotations_pascal_voc.csv')
    df.to_csv(csv_file, index=False)

    print(f"✅ Pascal VOC 포맷 (CSV)이 {csv_file}에 저장되었습니다")

def create_annotation_statistics(processed_data, category_mapping):
    """
    어노테이션 통계 생성

    Args:
        processed_data: 처리된 어노테이션 데이터
        category_mapping: 카테고리 매핑 정보

    Returns:
        dict: 통계 정보
    """
    print("📊 어노테이션 통계 생성 중...")

    class_counts = Counter()
    class_areas = defaultdict(list)
    class_bbox_sizes = defaultdict(list)
    total_objects = 0

    for image_data in processed_data:
        for obj in image_data['objects']:
            pill_code = obj['class_info']['pill_code']
            class_counts[pill_code] += 1
            total_objects += 1

            class_areas[pill_code].append(obj['area_relative'])

            # 바운딩 박스 크기 (상대 좌표)
            rel_bbox = obj['bbox_formats']['relative']
            class_bbox_sizes[pill_code].append((rel_bbox[2], rel_bbox[3]))

    # 이미지당 객체 수 통계
    objects_per_image = [len(img['objects']) for img in processed_data]

    # 통계 계산
    stats = {
        'summary': {
            'total_images': len(processed_data),
            'total_objects': total_objects,
            'avg_objects_per_image': total_objects / len(processed_data) if processed_data else 0,
            'min_objects_per_image': min(objects_per_image) if objects_per_image else 0,
            'max_objects_per_image': max(objects_per_image) if objects_per_image else 0
        },
        'class_distribution': dict(class_counts.most_common()),
        'num_classes': len(class_counts),
        'class_details': {}
    }

    for pill_code, count in class_counts.items():
        areas = class_areas[pill_code]
        sizes = class_bbox_sizes[pill_code]

        stats['class_details'][pill_code] = {
            'count': count,
            'percentage': (count / total_objects) * 100 if total_objects > 0 else 0,
            'avg_area': sum(areas) / len(areas) if areas else 0,
            'avg_width': sum(size[0] for size in sizes) / len(sizes) if sizes else 0,
            'avg_height': sum(size[1] for size in sizes) / len(sizes) if sizes else 0,
            'min_area': min(areas) if areas else 0,
            'max_area': max(areas) if areas else 0
        }

    return stats

def preprocess_coco_annotations(data_path, output_dir):
    """
    COCO 포맷 어노테이션 전처리 메인 함수

    Args:
        data_path: 입력 데이터셋 경로 (전처리된 이미지가 있는 곳)
        output_dir: 출력 디렉토리

    Returns:
        dict: 전처리 결과
    """
    print("🚀 COCO 어노테이션 전처리 시작!")
    print(f"📁 입력: {data_path}")
    print(f"📁 출력: {output_dir}")
    print("=" * 50)

    # 출력 디렉토리 생성
    os.makedirs(output_dir, exist_ok=True)
    yolo_dir = os.path.join(output_dir, "yolo_format")
    pascal_dir = os.path.join(output_dir, "pascal_voc_format")

    # 1. COCO 어노테이션 파일들 찾기
    annotation_files = glob.glob(os.path.join(data_path, "train_annotations", "*", "*", "*.json"))

    if not annotation_files:
        print("❌ COCO 어노테이션 파일을 찾을 수 없습니다!")
        return None

    print(f"📋 발견된 어노테이션 파일: {len(annotation_files)}개")

    # 2. 첫 번째 파일로 카테고리 매핑 생성 (모든 파일이 같은 구조라고 가정)
    first_ann_result = read_coco_annotation(annotation_files[0])
    if first_ann_result['status'] != 'success':
        print(f"❌ 첫 번째 어노테이션 파일 읽기 실패: {first_ann_result['error']}")
        return None

    category_mapping = create_category_mapping_from_coco(first_ann_result['data'])

    # 3. 모든 어노테이션 파일 처리
    all_processed_data = []
    failed_files = []

    print(f"\n🔄 어노테이션 파일 처리 중...")

    for i, ann_path in enumerate(annotation_files):
        ann_result = read_coco_annotation(ann_path)

        if ann_result['status'] == 'success':
            processed_data = process_coco_annotations(ann_result['data'], category_mapping)
            all_processed_data.extend(processed_data)
        else:
            failed_files.append({
                'path': ann_path,
                'error': ann_result['error']
            })

        # 진행률 출력
        if (i + 1) % 10 == 0:
            print(f"  진행률: {i + 1}/{len(annotation_files)}")

    # 4. 통계 생성
    stats = create_annotation_statistics(all_processed_data, category_mapping)

    # 5. 다양한 포맷으로 내보내기
    print(f"\n📤 다양한 포맷으로 내보내기...")

    # YOLO 포맷
    export_to_yolo_format(all_processed_data, yolo_dir, category_mapping)

    # Pascal VOC 포맷
    export_to_pascal_voc_format(all_processed_data, pascal_dir)

    # 결과 저장
    # 카테고리 매핑 저장
    mapping_file = os.path.join(output_dir, "category_mapping.json")
    with open(mapping_file, 'w', encoding='utf-8') as f:
        json.dump(category_mapping, f, indent=2, ensure_ascii=False)

    # 통계 저장
    stats_file = os.path.join(output_dir, "annotation_statistics.json")
    with open(stats_file, 'w', encoding='utf-8') as f:
        json.dump(stats, f, indent=2, ensure_ascii=False)

    # 6. 결과 요약
    print(f"\n📊 COCO 어노테이션 전처리 결과:")
    print(f"  ✅ 처리된 이미지: {len(all_processed_data)}개")
    print(f"  ✅ 총 객체 수: {stats['summary']['total_objects']}개")
    print(f"  ✅ 클래스 수: {stats['num_classes']}개")
    print(f"  📈 평균 객체/이미지: {stats['summary']['avg_objects_per_image']:.1f}개")
    print(f"  ❌ 실패한 파일: {len(failed_files)}개")

    # 상위 클래스 출력
    print(f"\n🔝 상위 5개 클래스:")
    for pill_code, count in list(stats['class_distribution'].items())[:5]:
        percentage = stats['class_details'][pill_code]['percentage']
        print(f"  {pill_code}: {count}개 ({percentage:.1f}%)")

    if failed_files:
        print(f"\n❌ 실패한 파일들:")
        for fail in failed_files[:3]:
            print(f"  - {fail['path']}: {fail['error']}")
        if len(failed_files) > 3:
            print(f"  ... 외 {len(failed_files) - 3}개")

    return {
        'processed_data': all_processed_data,
        'failed_files': failed_files,
        'category_mapping': category_mapping,
        'statistics': stats,
        'output_files': {
            'yolo_dir': yolo_dir,
            'pascal_dir': pascal_dir,
            'mapping_file': mapping_file,
            'stats_file': stats_file
        }
    }

def debug_coco_files(data_path):
    """COCO 파일 디버깅 함수"""
    print("🔍 COCO 파일 디버깅...")

    # 1. 파일 존재 확인
    annotation_files = glob.glob(os.path.join(data_path, "train_annotations", "*", "*", "*.json"))
    print(f"1️⃣ 발견된 JSON 파일: {len(annotation_files)}개")

    if not annotation_files:
        print("❌ JSON 파일을 찾을 수 없습니다!")
        return None

    # 2. 첫 번째 파일 읽기
    first_file = annotation_files[0]
    print(f"2️⃣ 첫 번째 파일: {first_file}")

    try:
        with open(first_file, 'r', encoding='utf-8') as f:
            coco_data = json.load(f)

        print(f"3️⃣ COCO 구조:")
        print(f"   - 키: {list(coco_data.keys())}")
        print(f"   - 이미지 수: {len(coco_data.get('images', []))}")
        print(f"   - 어노테이션 수: {len(coco_data.get('annotations', []))}")
        print(f"   - 카테고리 수: {len(coco_data.get('categories', []))}")

        # 3. 샘플 데이터 출력
        if coco_data.get('images'):
            print(f"4️⃣ 샘플 이미지 정보:")
            for i, img in enumerate(coco_data['images'][:3]):
                print(f"   {i+1}. {img.get('file_name', 'NO_FILENAME')}")
                parsed = parse_pill_code(img.get('file_name', ''))
                print(f"      → 파싱: {parsed['full_code']} (유효: {parsed['is_valid']})")

        return coco_data

    except Exception as e:
        print(f"❌ 파일 읽기 실패: {e}")
        return None

def quick_preprocess_prototype_annotations():
    """프로토타입 데이터 COCO 어노테이션 빠른 전처리 (디버깅 포함)"""

    # 1. 먼저 디버깅
    print("🔍 사전 디버깅...")
    debug_result = debug_coco_files("./data/preprocessed_prototype_data")

    if debug_result is None:
        print("❌ 디버깅 실패. 전처리를 중단합니다.")
        return None

    # 2. 실제 전처리 실행
    print("\n" + "="*50)
    print("🚀 실제 전처리 시작...")
    return preprocess_coco_annotations(
        "./data/prototype_data/preprocessed_prototype_data",
        "./data/annotations_prototype"
    )

def quick_preprocess_dev_annotations():
    """개발용 데이터 COCO 어노테이션 빠른 전처리"""
    return preprocess_coco_annotations(
        "./data/preprocessed_dev_data",
        "./data/annotations_dev"
    )

# 사용법 예시
if __name__ == "__main__":
    print("📖 사용법 (COCO 포맷 입력):")
    print("quick_preprocess_prototype_annotations()  # 프로토타입 COCO 어노테이션 전처리")
    print("quick_preprocess_dev_annotations()        # 개발용 COCO 어노테이션 전처리")
    print("preprocess_coco_annotations(input_path, output_path)  # 커스텀 전처리")

    prototype_coco_results = quick_preprocess_prototype_annotations()

📖 사용법 (COCO 포맷 입력):
quick_preprocess_prototype_annotations()  # 프로토타입 COCO 어노테이션 전처리
quick_preprocess_dev_annotations()        # 개발용 COCO 어노테이션 전처리
preprocess_coco_annotations(input_path, output_path)  # 커스텀 전처리
🔍 사전 디버깅...
🔍 COCO 파일 디버깅...
1️⃣ 발견된 JSON 파일: 0개
❌ JSON 파일을 찾을 수 없습니다!
❌ 디버깅 실패. 전처리를 중단합니다.
