# **Libraries**

In [None]:
# easy ocr 라이브러리
!pip install easyocr -q

# 트랜스포머, 문장 트랜스포머, 데이터셋 라이브러리 설치
!pip install "transformers>=4.42.0" -q
!pip install "sentence-transformers>=2.7.0" -q

# FAISS 설치 (CPU 버전)
!pip install faiss-cpu -q

# Pillow (PIL)은 Colab에 기본 설치되어 있으나, 최신 버전 유지를 위해 명시
!pip install Pillow -q

In [None]:
import os
import re
import torch
import faiss
import easyocr
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm
from typing import List, Tuple, Optional
from sentence_transformers import SentenceTransformer

##**Google Drive Mount**

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


## **OCR & Embedding Vector DB**

In [None]:
#====================================================
#  Util: chunk_list
#  - 지정된 chunk size로 리스트를 분할하여 batch 형성
#====================================================
def chunk_list(lst: List, chunk_size: int) -> List[List]:
    return [lst[i:i+chunk_size] for i in range(0, len(lst), chunk_size)]

#==========================================================
#  OCR Optional Tools: light_clean
#  - 공백워터마크 제거 / 모든 공백류를 일반 스페이스로 정제
#==========================================================
def light_clean(t: str) -> str:
    if not isinstance(t, str):
        return ""
    t = t.replace("\u200b", "")
    t = " ".join(t.split())
    return t

In [None]:
#==============================================================
#  EasyOCR function
#  - 처리할 이미지 경로들이 담긴 배치(리스트)를 입력
#  - batch size 크기 만큼의 ocr text 추출 결과를 리스트로 반환
#==============================================================
def ocr_with_easyocr_batch(
    image_paths: List[str],
    reader: easyocr.Reader,
    min_confidence: float = 0.3,
    join_with: str = " "
) -> List[str]:
    results = []
    for p in image_paths:
        try:
            ocr = reader.readtext(p, detail=1, paragraph=False)  # [[bbox, text, conf], ...] (보통)
            lines = []
            for item in ocr:
                if not isinstance(item, (list, tuple)) or len(item) < 2:
                    continue

                # 길이에 따라 안전 언팩
                if len(item) >= 3:
                    _, text, conf = item[:3]
                else:  # len == 2 인 케이스 방어
                    _, text = item
                    conf = None

                if not text:
                    continue
                if (conf is not None) and (conf < min_confidence):
                    continue

                lines.append(text)

            combined = join_with.join(lines).strip()
            results.append(combined)

        except Exception as e:
            print(f"[경고] EasyOCR 실패: {os.path.basename(p)} (오류: {e})")
            results.append("")
    return results

In [None]:
SUPPORTED_EXTENSIONS = {'.png', '.jpg', '.jpeg'}

def build_math_problem_db_easyocr(
    image_dir: str,
    db_output_dir: str,
    emb_model: SentenceTransformer,
    langs: Tuple[str, ...] = ("ko","en"),
    use_gpu_for_easyocr: Optional[bool] = None,
    ocr_batch: int = 4,
    emb_batch: int = 32,
    add_instruction_to_docs: bool = True,
    min_confidence: float = 0.3,
    instruction: str = "Represent this math problem for retrieving problems with similar solution concepts: "
) -> Tuple[Optional[faiss.Index], Optional[pd.DataFrame], dict]:

    # DB 디렉토리 생성
    os.makedirs(db_output_dir, exist_ok=True)

    # 이미지 경로 설정 및 처리할 문제 이미지 chunking 및 배치 형성
    all_files = sorted(os.listdir(image_dir))
    image_files = [f for f in all_files if os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS]
    total_images = len(image_files)
    if total_images == 0:
        print("[중단] 처리할 이미지가 없습니다.")
        return None, None, {"embedded_ok": 0, "embedded_failed": 0}
    img_paths = [os.path.join(image_dir, fn) for fn in image_files]
    batches = chunk_list(img_paths, ocr_batch)

    # EasyOCR Reader gpu 설정
    if use_gpu_for_easyocr is None:
        import torch
        use_gpu_for_easyocr = torch.cuda.is_available()
    reader = easyocr.Reader(list(langs), gpu=use_gpu_for_easyocr)

    # OCR Processing
    ocr_texts, ocr_files = [], []
    for batch in tqdm(batches, desc="OCR 진행 중..."):
        batch_texts = ocr_with_easyocr_batch(batch, reader, min_confidence=min_confidence, join_with=" ")
        for p, t in zip(batch, batch_texts):
            if t and t.strip():
                ocr_texts.append(light_clean(t))
                ocr_files.append(os.path.basename(p))
            else:
                print(f"[실패] OCR 결과 없음 → {os.path.basename(p)}")

    n_ok = len(ocr_texts)
    n_fail = total_images - n_ok

    if n_ok == 0:
        print("[중단] 유효한 OCR 텍스트가 없어 임베딩을 수행하지 않습니다.")
        return None, pd.DataFrame(columns=["original_filename","ocr_text"]), {"embedded_ok": 0, "embedded_failed": total_images}

    # Vector Embedding with Instruction
    texts_for_embed = [(instruction + t) if add_instruction_to_docs else t for t in ocr_texts]
    embeddings = emb_model.encode(
        texts_for_embed,
        batch_size=emb_batch,
        normalize_embeddings=True,
        convert_to_numpy=True,
        show_progress_bar=True
    ).astype("float32")

    # 메타데이터
    metadata_df = pd.DataFrame({
        "original_filename": ocr_files,
        "ocr_text": ocr_texts
    }).reset_index().rename(columns={"index": "vector_id"})

    # FAISS(IndexFlatIP = 코사인)
    dim = embeddings.shape[1]
    index = faiss.IndexFlatIP(dim)
    index.add(embeddings)

    # 저장
    faiss_path = os.path.join(db_output_dir, "math_problems.index")
    meta_path  = os.path.join(db_output_dir, "math_problems_metadata.pkl")
    faiss.write_index(index, faiss_path)
    metadata_df.to_pickle(meta_path)

    # 경량 요약만 출력
    print("\nDB 구축 완료")
    print(f"- 총 이미지: {total_images}")
    print(f"- 임베딩 성공: {n_ok}")
    print(f"- 임베딩 실패: {n_fail}")
    print(f"- FAISS 인덱스: {faiss_path}")
    print(f"- 메타데이터 : {meta_path}")

    return index, metadata_df, {"embedded_ok": n_ok, "embedded_failed": n_fail}

In [None]:
emb_model = SentenceTransformer("BAAI/bge-m3")
IMAGE_DIRECTORY = '/content/drive/MyDrive/ICT_project/datasets/math_problem'
DB_OUTPUT_DIRECTORY = '/content/drive/MyDrive/ICT_project'

index, metadata, stats = \
build_math_problem_db_easyocr(
    image_dir=IMAGE_DIRECTORY,
    db_output_dir=DB_OUTPUT_DIRECTORY,
    emb_model=emb_model,
    langs=("ko","en"),
    use_gpu_for_easyocr=None,
    ocr_batch=4,
    emb_batch=32,
    add_instruction_to_docs=True,
    min_confidence=0.3,
)
print("\n최종결과:", stats)  # {'embedded_ok': N, 'embedded_failed': M}

OCR 진행 중...: 100%|██████████| 20/20 [00:31<00:00,  1.57s/it]


Batches:   0%|          | 0/3 [00:00<?, ?it/s]


DB 구축 완료
- 총 이미지: 80
- 임베딩 성공: 80
- 임베딩 실패: 0
- FAISS 인덱스: /content/drive/MyDrive/ICT_project/math_problems.index
- 메타데이터 : /content/drive/MyDrive/ICT_project/math_problems_metadata.pkl

최종결과: {'embedded_ok': 80, 'embedded_failed': 0}


In [None]:
index = faiss.read_index("/content/drive/MyDrive/ICT_project/math_problems.index")
metadata = pd.read_pickle("/content/drive/MyDrive/ICT_project/math_problems_metadata.pkl")

from sentence_transformers import SentenceTransformer

emb_model = SentenceTransformer("BAAI/bge-m3")

query = "표본공간에서 서로 배반사건인 경우의 수 구하기"
q_emb = emb_model.encode([query], normalize_embeddings=True).astype("float32")

D, I = index.search(q_emb, k=3)  # top-3
print("쿼리:", query)
print("Top-3 결과:")
for rank, (i, d) in enumerate(zip(I[0], D[0]), 1):
    print(f"[{rank}] {metadata.loc[i, 'original_filename']} (유사도={d:.4f})")
    print(" →", metadata.loc[i, "ocr_text"][:80], "...")

쿼리: 표본공간에서 서로 배반사건인 경우의 수 구하기
Top-3 결과:
[1] 2025 수능특강 03.확률의뜻과활용 유제1.PNG (유사도=0.7780)
 → 표본공간 S = {1, 2, 3. 4, 5 . 6, 7} 의 두 사건 A, B에l 대하여 A= {1, 3, 5, 7}이다 두 사건 A와 [240 ...
[2] 2024 수능특강 03.확률의뜻과활용 level1_1.PNG (유사도=0.7489)
 → [23010-0059] 7 표본공간 S = {1, 2, 3 4, 5} 의 두 사건 A, B가 다음 조건올 만족시길 때 , 모든 순서쌍 ( A,  ...
[3] 2025 수능특강 03.확률의뜻과활용 level1_3.PNG (유사도=0.6691)
 → [24010-0059] 3 두 사건 A와 B는 서로 배반사건이고 P(AUB)=3 6 4 일 때, P(B)의 값은? 1 5 2 3 12 2 12  ...
