# 03_extract_refs (LLM only)

이 노트북은 chunk 본문을 LLM으로 직접 읽어 `internal_refs / external_refs`를 추출합니다.
- 입력: `data/processed/chunks/*.json`, `data/processed/chunks_ordin/*.json`
- 출력:
  - `data/processed/ref_extract/llm_ref_map.json`
  - `data/processed/ref_extract/llm_ref_failed.json`
  - `data/processed/chunks_with_refs/*.json`


In [None]:
import json
import time
from pathlib import Path
from typing import Any

from dotenv import load_dotenv
from langchain_core.prompts import ChatPromptTemplate
from langchain_naver import ChatClovaX

load_dotenv()

DATA_ROOT = Path('data/processed')
CHUNKS_DIR = DATA_ROOT / 'chunks'
CHUNKS_ORDIN_DIR = DATA_ROOT / 'chunks_ordin'
REF_OUT_DIR = DATA_ROOT / 'ref_extract'
WITH_REFS_DIR = DATA_ROOT / 'chunks_with_refs'

REF_OUT_DIR.mkdir(parents=True, exist_ok=True)
WITH_REFS_DIR.mkdir(parents=True, exist_ok=True)

print('DATA_ROOT:', DATA_ROOT)
print('CHUNKS_DIR exists:', CHUNKS_DIR.exists())
print('CHUNKS_ORDIN_DIR exists:', CHUNKS_ORDIN_DIR.exists())


In [None]:
def make_chunk_key(law_id: str, article_num: str, article_sub: str | int | None) -> str:
    sub = '' if article_sub is None else str(article_sub).strip()
    if not sub:
        sub = '0'
    return f"{str(law_id).strip()}:{str(article_num).strip()}:{sub}"


# 비워두면 전체 파일(*_chunks.json) 로드
TARGET_CHUNK_FILES = [
    # '001823_건축법_chunks.json',
    # '2205103_서울특별시_건축물관리_조례_chunks.json',
    '22*_서울특별시_*_조례_chunks.json',
]

# False면 chunks_ordin 로딩 제외
INCLUDE_ORDIN = True


def resolve_chunk_files(base_dir: Path, selected: list[str] | None) -> list[Path]:
    if not base_dir.exists():
        return []

    if not selected:
        return sorted(base_dir.glob('*_chunks.json'))

    out: list[Path] = []
    seen: set[str] = set()

    for token in selected:
        token = str(token).strip()
        if not token:
            continue

        # 1) 절대경로/상대경로 직접 지정
        p = Path(token)
        candidates = []
        if p.is_absolute() and p.exists():
            candidates = [p]
        else:
            q = base_dir / token
            if q.exists():
                candidates = [q]
            else:
                # 2) 글롭 패턴
                candidates = sorted(base_dir.glob(token))

        for cp in candidates:
            key = str(cp.resolve())
            if key in seen:
                continue
            seen.add(key)
            out.append(cp)

    return out


def load_all_chunks(selected_files: list[str] | None = None, include_ordin: bool = True):
    rows: list[dict[str, Any]] = []
    loaded_files: list[str] = []

    law_files = resolve_chunk_files(CHUNKS_DIR, selected_files)
    for f in law_files:
        rows.extend(json.loads(f.read_text(encoding='utf-8')))
        loaded_files.append(str(f))

    if include_ordin:
        ordin_files = resolve_chunk_files(CHUNKS_ORDIN_DIR, selected_files)
        for f in ordin_files:
            rows.extend(json.loads(f.read_text(encoding='utf-8')))
            loaded_files.append(str(f))

    for r in rows:
        r.setdefault('article_sub', '0')
        r.setdefault('internal_refs', [])
        r.setdefault('external_refs', [])
        r.setdefault('parent_law_refs', [])

    return rows, loaded_files


all_chunks, loaded_chunk_files = load_all_chunks(
    selected_files=TARGET_CHUNK_FILES,
    include_ordin=INCLUDE_ORDIN,
)

print('loaded files:', len(loaded_chunk_files))
for x in loaded_chunk_files[:10]:
    print('-', x)
print('all_chunks:', len(all_chunks))
print('sample key:', make_chunk_key(all_chunks[0].get('law_id',''), all_chunks[0].get('article_num',''), all_chunks[0].get('article_sub','')) if all_chunks else 'N/A')


In [None]:
REF_SYSTEM_PROMPT = """
당신은 한국 법령/조례 문서의 참조 조항 추출기입니다.
아래 chunk 본문에서 명시된 참조만 추출하세요.

규칙:
1) 추측 금지. 본문에 없으면 넣지 마세요.
2) 출력은 JSON 객체 하나만.
3) 반드시 다음 키만 출력:
   - internal_refs: list
   - external_refs: list
4) 각 ref 원소는 아래 키만 사용:
   - law_name: string  (내부참조면 현재 법령명)
   - article: string   (조 번호 숫자/의숫자 형태, 예: "46", "1의2")
   - paragraph: string (항 번호만, 없으면 "")
   - item: string      (호 번호만, 없으면 "")
   - raw: string       (원문 참조 문자열)
5) 형식 예시:
{
  "internal_refs": [{"law_name":"건축법","article":"46","paragraph":"1","item":"","raw":"제46조제1항"}],
  "external_refs": [{"law_name":"건축법 시행령","article":"31","paragraph":"","item":"","raw":"「건축법 시행령」 제31조"}]
}
6) 참조가 전혀 없으면:
{"internal_refs": [], "external_refs": []}
""".strip()

prompt = ChatPromptTemplate.from_messages(
    [
        ('system', REF_SYSTEM_PROMPT),
        ('human', '현재 법령명: {law_name}\n\nchunk 본문:\n{chunk_text}'),
    ]
)


def build_ref_llm(model: str = 'HCX-005', temperature: float = 0.0, max_tokens: int = 1200):
    return ChatClovaX(model=model, temperature=temperature, max_tokens=max_tokens)


llm = build_ref_llm(model='HCX-005', temperature=0.0, max_tokens=1200)
chain = prompt | llm


In [None]:
def _extract_json_text(raw_text: str) -> str:
    text = (raw_text or '').strip()
    if '```json' in text:
        text = text.split('```json', 1)[1]
        text = text.split('```', 1)[0]
    elif '```' in text:
        text = text.split('```', 1)[1]
        text = text.split('```', 1)[0]

    start = text.find('{')
    end = text.rfind('}')
    if start >= 0 and end > start:
        return text[start:end + 1]
    return text


def _norm_ref_list(v: Any) -> list[dict[str, str]]:
    if not isinstance(v, list):
        return []
    out: list[dict[str, str]] = []
    for x in v:
        if not isinstance(x, dict):
            continue
        out.append(
            {
                'law_name': str(x.get('law_name', '') or '').strip(),
                'article': str(x.get('article', '') or '').strip(),
                'paragraph': str(x.get('paragraph', '') or '').strip(),
                'item': str(x.get('item', '') or '').strip(),
                'raw': str(x.get('raw', '') or '').strip(),
            }
        )
    return out


def extract_refs_from_chunk_llm(law_name: str, chunk_text: str, max_retries: int = 4, base_sleep: float = 1.0) -> dict[str, Any]:
    last_err = ''
    for attempt in range(max_retries + 1):
        try:
            resp = chain.invoke({'law_name': law_name, 'chunk_text': chunk_text})
            raw_text = getattr(resp, 'content', str(resp))
            obj = json.loads(_extract_json_text(raw_text))
            return {
                'internal_refs': _norm_ref_list(obj.get('internal_refs', [])),
                'external_refs': _norm_ref_list(obj.get('external_refs', [])),
                'llm_raw_text': raw_text,
                'error': '',
            }
        except Exception as e:
            last_err = str(e)
            if ('429' in last_err or 'rate exceeded' in last_err.lower()) and attempt < max_retries:
                time.sleep(min(base_sleep * (2 ** attempt), 30.0))
                continue
            if attempt < max_retries:
                time.sleep(0.7)
                continue
            break

    return {
        'internal_refs': [],
        'external_refs': [],
        'llm_raw_text': '',
        'error': last_err,
    }


In [None]:
REF_MAP_PATH = REF_OUT_DIR / 'llm_ref_map.json'
FAILED_PATH = REF_OUT_DIR / 'llm_ref_failed.json'


def load_json_or_default(path: Path, default):
    if path.exists():
        return json.loads(path.read_text(encoding='utf-8'))
    return default


def save_json(path: Path, obj: Any):
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(obj, ensure_ascii=False, indent=2), encoding='utf-8')


def run_ref_extraction(chunks: list[dict[str, Any]], limit: int | None = None, save_every: int = 10):
    ref_map = load_json_or_default(REF_MAP_PATH, {})
    failed = load_json_or_default(FAILED_PATH, {})

    targets = chunks if limit is None else chunks[:limit]
    done = 0
    new = 0

    print('resume_mode:', REF_MAP_PATH.exists())
    print('already_done:', len(ref_map))
    print('targets:', len(targets))

    for i, c in enumerate(targets, 1):
        key = make_chunk_key(c.get('law_id', ''), c.get('article_num', ''), c.get('article_sub', '0'))
        if key in ref_map:
            done += 1
            continue

        out = extract_refs_from_chunk_llm(c.get('law_name', ''), c.get('content', ''))
        ref_map[key] = {
            'internal_refs': out['internal_refs'],
            'external_refs': out['external_refs'],
        }

        if out.get('error'):
            failed[key] = {
                'error': out['error'][:500],
                'law_name': c.get('law_name', ''),
                'article_num': c.get('article_num', ''),
            }
        else:
            failed.pop(key, None)

        new += 1

        if new % save_every == 0:
            save_json(REF_MAP_PATH, ref_map)
            save_json(FAILED_PATH, failed)
            print(f'checkpoint: +{new} new, processed={i}/{len(targets)}')

    save_json(REF_MAP_PATH, ref_map)
    save_json(FAILED_PATH, failed)

    print('done_skip:', done)
    print('new_saved:', new)
    print('total_ref_map:', len(ref_map))
    print('failed:', len(failed))

    return ref_map, failed


In [None]:
# 실행
# 테스트: limit=20
# 전체: limit=None
ref_map, failed_map = run_ref_extraction(all_chunks, limit=20, save_every=5)

print('REF_MAP_PATH:', REF_MAP_PATH)
print('FAILED_PATH:', FAILED_PATH)


In [None]:
def apply_refs_to_chunks(chunks: list[dict[str, Any]], ref_map: dict[str, Any]) -> list[dict[str, Any]]:
    out = []
    for c in chunks:
        row = dict(c)
        key = make_chunk_key(row.get('law_id', ''), row.get('article_num', ''), row.get('article_sub', '0'))
        refs = ref_map.get(key, {}) or {}
        row['internal_refs'] = refs.get('internal_refs', []) if isinstance(refs.get('internal_refs', []), list) else []
        row['external_refs'] = refs.get('external_refs', []) if isinstance(refs.get('external_refs', []), list) else []
        row['parent_law_refs'] = []
        out.append(row)
    return out


def safe_name(name: str) -> str:
    return ''.join(ch if ch.isalnum() or ch in ['_', '-', ' '] else '_' for ch in str(name)).replace(' ', '_')


def save_chunks_with_refs(rows: list[dict[str, Any]], out_dir: Path):
    by_law: dict[str, list[dict[str, Any]]] = {}
    for r in rows:
        lid = str(r.get('law_id', '')).strip()
        by_law.setdefault(lid, []).append(r)

    for lid, items in by_law.items():
        lname = str(items[0].get('law_name', '')).strip()
        p = out_dir / f'{lid}_{safe_name(lname)}_chunks_with_refs.json'
        p.write_text(json.dumps(items, ensure_ascii=False, indent=2), encoding='utf-8')

    p_all = out_dir / 'all_chunks_with_refs.json'
    p_all.write_text(json.dumps(rows, ensure_ascii=False, indent=2), encoding='utf-8')

    return {'laws': len(by_law), 'all_path': str(p_all)}


chunks_with_refs = apply_refs_to_chunks(all_chunks, ref_map)
save_info = save_chunks_with_refs(chunks_with_refs, WITH_REFS_DIR)
print('saved:', save_info)
