## 멀티스레드 동시성 방어 테스트 개요

- OCR 경로: PaddleOCRService의 전역 세마포어와 인스턴스 락이 동시 predict 수를 제한하는지 검증합니다.
  - 실제 무거운 모델 로딩은 피하기 위해 런타임에 안전한 방식으로 __init__을 패치하여 더미 엔진을 주입합니다.
- LLM 경로: LabTableExtractor의 전역 세마포어와 인스턴스 락이 동시 LLM 호출 수를 제한하는지 검증합니다.
  - 실제 네트워크 호출은 피하고, Chat Completions 인터페이스를 흉내내는 더미 LLM을 주입합니다.

검증 기준
- 작업 시간 S, 요청 수 N, 동시성 상한 M일 때, 전체 경과시간은 최소 ceil(N / M) * S 이상이어야 합니다.
- 약간의 오차(±10~20%)는 스케줄링/인터프리터 오버헤드로 허용합니다.

In [1]:
# OCR 동시성 테스트 (세마포어/락)
import time, math, threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, List

from app.services.ocr.paddle_ocr import PaddleOCRService

class _DummyEngine:
    def __init__(self, sleep_sec: float = 0.6):
        self.sleep_sec = float(sleep_sec)
    def predict(self, inp: Any):
        time.sleep(self.sleep_sec)
        return [{"ok": True, "inp_type": type(inp).__name__}]

def _patch_ocr_init_for_test(sleep_sec: float, max_conc: int, enable_lock: bool, use_pool: bool, pool_size: int|None):
    """Monkeypatch PaddleOCRService.__init__ to avoid heavy model load and inject dummy engine.
    Returns a restore callback to bring back the original __init__.
    """
    original_init = PaddleOCRService.__init__
    def patched(self, lang: str = "korean", **kwargs):
        # Pull concurrency knobs from kwargs (align with production)
        self.lang = lang
        self.init_kwargs = kwargs.copy()
        self.enable_ocr_lock = enable_lock
        self.ocr_max_concurrency = max_conc
        self.ocr_use_thread_pool = use_pool
        self.ocr_pool_size = pool_size
        # Instance lock
        self._ocr_lock = threading.RLock() if self.enable_ocr_lock else None
        # Class-level semaphore (lazy-init)
        m = int(self.ocr_max_concurrency or 0)
        if m > 0:
            cls = self.__class__
            if getattr(cls, "_OCR_SEMAPHORE", None) is None or getattr(cls, "_OCR_SEMAPHORE_MAX", None) != m:
                import threading as _t
                cls._OCR_SEMAPHORE = _t.Semaphore(m)
                cls._OCR_SEMAPHORE_MAX = m
        # Optional pool
        self._executor = None
        if self.ocr_use_thread_pool:
            from concurrent.futures import ThreadPoolExecutor as _TPE
            size = pool_size if (pool_size and pool_size > 0) else max(1, m)
            try:
                self._executor = _TPE(max_workers=size, thread_name_prefix="paddleocr-test")
            except Exception:
                self._executor = None
        # Dummy engine
        self._ocr_engine = _DummyEngine(sleep_sec=sleep_sec)
    def restore():
        PaddleOCRService.__init__ = original_init
    PaddleOCRService.__init__ = patched
    return restore

def run_ocr_concurrency_test(N: int = 6, M: int = 2, S: float = 0.6, use_pool: bool = False):
    restore = _patch_ocr_init_for_test(sleep_sec=S, max_conc=M, enable_lock=True, use_pool=use_pool, pool_size=None)
    try:
        svc = PaddleOCRService(lang="korean")
        # Submit N tasks concurrently
        arr = [[0]]  # dummy input
        t0 = time.time()
        with ThreadPoolExecutor(max_workers=N) as ex:
            futs = [ex.submit(svc.run_ocr_from_nparray, arr) for _ in range(N)]
            for f in as_completed(futs):
                _ = f.result()
        elapsed = time.time() - t0
        expected_min = math.ceil(N / M) * S
        ok = elapsed >= (expected_min * 0.85)  # allow 15% scheduling slack
        print(f"[OCR] N={N}, M={M}, S={S:.2f}s -> elapsed={elapsed:.2f}s, expected_min={expected_min:.2f}s => {'PASS' if ok else 'FAIL'}")
        return elapsed, expected_min, ok
    finally:
        restore()

# 실행
_ = run_ocr_concurrency_test(N=6, M=2, S=0.6, use_pool=False)

[OCR] N=6, M=2, S=0.60s -> elapsed=3.60s, expected_min=1.80s => PASS


In [2]:
# LLM 동시성 테스트 (세마포어/락)
import json, time, math
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List

from app.services.analysis.lab_table_extractor import LabTableExtractor, Settings as ExtSettings

class _DummyChatCompletions:
    def __init__(self, sleep_sec: float = 0.5):
        self.sleep_sec = float(sleep_sec)
    def create(self, **kwargs):
        # Simulate network latency
        time.sleep(self.sleep_sec)
        # Minimal valid JSON roles (reference form)
        content = json.dumps({
            "name": {"label":"llm","hits":["llm"],"col_index":0,"tokens":[],"confidence":0.9,"meets_threshold":True},
            "reference": {"label":"llm","hits":["llm"],"col_index":1,"tokens":[],"confidence":0.9,"meets_threshold":True},
            "result": {"label":"llm","hits":["llm"],"col_index":2,"tokens":[],"confidence":0.9,"meets_threshold":True},
            "unit": {"label":"llm","hits":["llm"],"col_index":3,"tokens":[],"confidence":0.9,"meets_threshold":True}
        })
        class _Msg:
            def __init__(self, c): self.content = c
        class _Choice:
            def __init__(self, m): self.message = m
        class _Resp:
            def __init__(self, ch): self.choices = [ch]
        return _Resp(_Choice(_Msg(content)))

class _DummyLLM:
    def __init__(self, sleep_sec: float = 0.5):
        self.chat = type("Chat", (), {"completions": _DummyChatCompletions(sleep_sec)})()

def _make_body_lines() -> List[List[str]]:
    # Rows with 4 columns (Name | Reference | Result | Unit)
    # - First token should be a known code (e.g., RBC/ALT) for resolve_code
    return [
        ["RBC", "4.0-6.0", "5.1", "K/µL"],
        ["HGB", "10.0-18.0", "12.3", "g/dL"],
        ["HCT", "30-50", "41", "%"],
    ]

def run_llm_concurrency_test(N: int = 6, M: int = 2, S: float = 0.5):
    settings = ExtSettings(use_llm=True, enable_llm_lock=True, llm_max_concurrency=M)
    extractor = LabTableExtractor(settings=settings)
    # Inject dummy LLM (bypasses real network)
    extractor.llm = _DummyLLM(sleep_sec=S)
    lines = _make_body_lines()
    body = lines[:]  # same shape expected by helper

    def _one():
        roles, sample = extractor._infer_header_with_llm(lines, body)
        # basic sanity
        assert isinstance(roles, dict)
        return roles

    t0 = time.time()
    with ThreadPoolExecutor(max_workers=N) as ex:
        futs = [ex.submit(_one) for _ in range(N)]
        for f in as_completed(futs):
            _ = f.result()
    elapsed = time.time() - t0
    expected_min = math.ceil(N / M) * S
    ok = elapsed >= (expected_min * 0.85)  # allow 15% slack
    print(f"[LLM] N={N}, M={M}, S={S:.2f}s -> elapsed={elapsed:.2f}s, expected_min={expected_min:.2f}s => {'PASS' if ok else 'FAIL'}")
    return elapsed, expected_min, ok

# 실행
_ = run_llm_concurrency_test(N=6, M=2, S=0.5)

[LLM] N=6, M=2, S=0.50s -> elapsed=3.01s, expected_min=1.50s => PASS
