In [1]:
from dotenv import load_dotenv
import numpy as np
import os, re, json

load_dotenv()
OC = os.getenv('OC')

import requests
law_dict = {
    '268547':'도로교통법',
    '272607':'도로교통법 시행령',
    '271547':'도로교통법 시행규칙',
    '268077':'교통사고처리 특례법'
 }

url = 'https://www.law.go.kr'
link = '/DRF/lawService.do'

params = {
    "OC": OC,
    "target":"law",
    "type": 'JSON',
    'MST':''
}

In [2]:
# 조문 내용에 해당 키워드 들어가면 제외
skip_patterns = ["제\d+항에 따른", "제\d+조에 따른","다음 각 목", "별표", "제\d+조제\d+항에서"]

# 조문 제목이 없거나 해당 키워드 들어가면 제외
def article_title_continue(article_title) -> bool:
    if not article_title:
        return True
    keywords = ['정의', '자동차운전 전문학원연합회', '학원', '기능검정원', '강사',
        '수강료', '기능검정', '과태료의 부과ㆍ징수 등', '유사명칭 등의 사용금지', 
        '휴원ㆍ폐원의 신고', '청문', '무등록 유상 운전교육의 금지', '통칙',
        '긴급자동차', '부담금', '실태조사', '목적']
    return any(k in article_title for k in keywords)

# 1. "삭제 <" 포함된 조나 항 제외
# 2. "<개정 ~~~", "<신설 ~~~" 등 제거
def clean_content(content):
    if "삭제 <" in content:
        return ""
    for marker in ["<개정", "<신설"]:
        if marker in content:
            content = content.split(marker)[0].strip()
    return content

# 도로교통법 시행규칙 호내용 중 리스트 형태가 존재해 리스트 -> 문자열 변경
def ho_content_to_str(item):
    if isinstance(item, list):
        return ''.join(ho_content_to_str(x) for x in item)
    return str(item)

  skip_patterns = ["제\d+항에 따른", "제\d+조에 따른","다음 각 목", "별표", "제\d+조제\d+항에서"]
  skip_patterns = ["제\d+항에 따른", "제\d+조에 따른","다음 각 목", "별표", "제\d+조제\d+항에서"]
  skip_patterns = ["제\d+항에 따른", "제\d+조에 따른","다음 각 목", "별표", "제\d+조제\d+항에서"]


In [None]:
vector_chunks = []

for mst, law_name in law_dict.items():
    print(mst)
    params['MST'] = mst

    response = requests.get(url+link, params)
    datas = response.json()

    for c in datas['법령']['조문']['조문단위']:
        # 조문 제목 없거나 사전 정의된 키워드 포함되면 제외
        article_title = c.get("조문제목", "")
        if article_title_continue(article_title):
            continue

        base_meta = {
            "source_type": "law",
            "law_name": law_name,
            "article_no": c.get("조문번호", ""),
            "article_title": article_title,
        }

        content = c.get("조문내용","").strip()
        
        # 사전 정의해둔 키워드 포함되면 제외
        if any(re.search(p, content) for p in skip_patterns):
            continue

        # 제n조의n <-- 이런거 가져오기 위한 용도
        text_part = content.split('(')[0].strip()
        
        # 항 존재하면 항 단위로 저장
        hang_list = c.get("항", [])

        if isinstance(hang_list, list) and hang_list:
            for hang in hang_list:
                meta = base_meta.copy()
                hang_no = hang.get("항번호", "")
                meta["항번호"] = hang_no

                # 항 번호 제거
                # ex) ①도로교통안전에 ~~ -> 도로교통안전에 ~~
                hang_content = re.sub(r'^' + re.escape(hang_no), '', hang.get("항내용","")).strip()
                
                hang_content = clean_content(hang_content)
                if any(re.search(p, hang_content) for p in skip_patterns):
                    continue
                if not hang_content:
                    continue
                
                ho_list = hang.get("호", [])
                
                # 항 > 호 존재시 호단위로 저장
                if ho_list:
                    hang_content = hang_content + ' ' if hang_content else ''
                    merged_contents = []
                    ho_numbers = []

                    for ho in ho_list:
                        ho_no = ho['호번호'].replace('.','').strip()
                        ho_numbers.append(ho_no)
                        meta['호번호'] = ho_no
                        ho_content = ho_content_to_str(ho['호내용'])
                        ho_content = clean_content(ho_content)
                        ho_content = ho_content.replace(ho['호번호'], '').strip()
                        
                        if any(re.search(p, ho_content) for p in skip_patterns):
                            continue
                        if not ho_content: 
                            continue
                        
                        merged_contents.append(ho_content)
                    
                    if merged_contents:
                        if len(ho_numbers) > 1:
                            ho_range = f"{ho_numbers[0]}-{ho_numbers[-1]}"
                        else:
                            ho_range = ho_numbers[0]

                        vector_chunks.append({"id": f"{base_meta['law_name']}_{text_part}_{hang_no}_{ho_range}",
                        "content": hang_content + ", ".join(merged_contents),
                        "metadata": meta
                    })
                else:
                    vector_chunks.append({
                        "id": f"{meta['law_name']}_{text_part}_{hang_no}",
                        "content": hang_content,
                        "metadata": meta
                    })
        
        # 항에 호만 존재시 호단위로 저장
        elif isinstance(hang_list, dict):
            # 호내용에 조문내용 붙여서 저장
            content = clean_content(content) + '_'
            if '이 법에서 사용하는 용어의 뜻은' in content:
                content = ""
            
            ho_list = hang_list.get("호", [])
            merged_contents = []
            ho_numbers = []

            for ho in ho_list:
                ho_no = ho['호번호'].replace('.','').strip()
                ho_numbers.append(ho_no)
                ho_content = ho['호내용']
                ho_content = ho_content.replace(ho['호번호'], '').strip()
                ho_content = clean_content(ho_content)

                if any(re.search(p, ho_content) for p in skip_patterns):
                    continue
                if not ho_content: 
                    continue

                merged_contents.append(ho_content)
            
            if merged_contents:
                if len(ho_numbers) > 1:
                    ho_range = f"{ho_numbers[0]}-{ho_numbers[-1]}"
                else:
                    ho_range = ho_numbers[0]

                vector_chunks.append({"id": f"{base_meta['law_name']}_{text_part}_{ho_no}",
                "content": content + " " + ", ".join(merged_contents),
                "metadata": base_meta
            })

        # 항 없으면 조 단위로 저장
        else:
            content = clean_content(content)
            if not content:
                continue

            vector_chunks.append({
                "id": f"{base_meta['law_name']}_{text_part}",
                "content": content,
                "metadata": base_meta
            })

268547
272607
271547
268077


In [6]:
len(vector_chunks)

1434

In [5]:
# with open('vector_chunks.json', 'w', encoding='utf-8') as f:
#     json.dump(vector_chunks, f, ensure_ascii=False, indent=2)

In [6]:
# ['도로교통법', '도로교통법 시행령', '도로교통법 시행규칙', '교통사고처리 특례법']

In [7]:
for v in vector_chunks:
    # if '인명보호장구' in v['metadata']['article_title']:
    if '인명보호장구' in v['content']:
    # if v['metadata']['law_name'] == '도로교통법 시행령':
        print(f"id: {v['id']}")
        print(f"    content: {v['content']}")
        print(f'    metadata: {v['metadata']}')

In [8]:
# file_path = "vector_chunks.json"
# with open(file_path, "r", encoding="utf-8") as f:
#     vector_chunks = json.load(f)

# print(len(vector_chunks))

In [9]:
from transformers import AutoTokenizer, AutoModel
import torch

# KURE-v1 로드
tokenizer = AutoTokenizer.from_pretrained("nlpai-lab/KURE-v1")
model = AutoModel.from_pretrained("nlpai-lab/KURE-v1")

def get_embedding(text: str):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
    with torch.no_grad():
        outputs = model(**inputs)
    
    embeddings = outputs.last_hidden_state.mean(dim=1)

    return embeddings[0].tolist()


In [10]:
import chromadb 

chroma_client = chromadb.PersistentClient(path="./law_db")

collection = chroma_client.get_or_create_collection(name="laws")

In [11]:
for chunk in vector_chunks:
    # 1) content 벡터
    collection.add(
        embeddings=[get_embedding(chunk["content"])],
        ids=[f"content_{chunk['id']}"],
        documents=[chunk["content"]],
        metadatas=[chunk["metadata"]]
    )

    # 2) title 벡터
    collection.add(
        embeddings=[get_embedding(chunk["metadata"]["article_title"])],
        ids=[f"title_{chunk['id']}"],
        documents=[chunk['id']],
        metadatas=[{"source_type": "title", "link_to": f"content_{chunk['id']}"}]
    )

In [16]:
def hybrid_search(query_text, top_k=5, title_weight=0.7, content_weight=0.3):
    query_embedding = get_embedding(query_text)
    
    # title 검색
    title_results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k,
        include=["metadatas", "documents", "distances"],
        where={"source_type":"title"}
    )

    # content 검색
    content_results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k,
        include=["metadatas", "documents", "distances"],
        where={"source_type":"law"}
    )

    combined_scores = {}
    combined_docs = {}

    # score 계산 및 title/content 합산
    for results, weight in [(title_results, title_weight), (content_results, content_weight)]:
        docs = results["documents"][0]
        dists = results["distances"][0]
        metas = results["metadatas"][0]
        ids = results["ids"][0]

        for doc, dist, meta, vid in zip(docs, dists, metas, ids):
            similarity = 1 - dist / 2  # cosine distance → similarity
            score = similarity * weight

            if meta.get("source_type") == "title":
                vid = vid.replace('title_', '')
                content_id = meta.get("link_to")
                if content_id:
                    doc_data = collection.get(ids=[content_id])
                    if doc_data["documents"]:
                        doc = doc_data["documents"][0]
            else:
                vid = vid.replace('content_', '')

            combined_scores[vid] = combined_scores.get(vid, 0) + score
            combined_docs[vid] = doc

    # min–max 정규화 (0~1)
    scores = list(combined_scores.values())
    min_s, max_s = min(scores), max(scores)
    for vid in combined_scores:
        combined_scores[vid] = (combined_scores[vid] - min_s) / (max_s - min_s + 1e-8)

    # 정렬
    sorted_results = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)
    results = [{"Article_no": vid, "Score": combined_scores[vid], "Content": combined_docs[vid]} 
               for vid, _ in sorted_results[:top_k]]
    return results

query_text = "야간운전"
results = hybrid_search(query_text)

for r in results:
    print(f"Article_no: {r['Article_no']}, Score: {r['Score']:.4f}")
    print(f"Content: {r['Content']}\n")

Article_no: 도로교통법_제50조_⑨, Score: 1.0000
Content: 자전거등의 운전자는 밤에 도로를 통행하는 때에는 전조등과 미등을 켜거나 야광띠 등 발광장치를 착용하여야 한다.

Article_no: 도로교통법_제37조_②, Score: 0.9303
Content: 모든 차 또는 노면전차의 운전자는 밤에 차 또는 노면전차가 서로 마주보고 진행하거나 앞차의 바로 뒤를 따라가는 경우에는 대통령령으로 정하는 바에 따라 등화의 밝기를 줄이거나 잠시 등화를 끄는 등의 필요한 조작을 하여야 한다.

Article_no: 도로교통법 시행령_제20조_①_1-2, Score: 0.9210
Content: 법 제37조제2항에 따라 모든 차 또는 노면전차의 운전자는 밤에 운행할 때에는 다음 각 호의 방법으로 등화를 조작하여야 한다. 서로 마주보고 진행할 때에는 전조등의 밝기를 줄이거나 불빛의 방향을 아래로 향하게 하거나 잠시 전조등을 끌 것. 다만, 도로의 상황으로 보아 마주보고 진행하는 차 또는 노면전차의 교통을 방해할 우려가 없는 경우에는 그러하지 아니하다., 앞의 차 또는 노면전차의 바로 뒤를 따라갈 때에는 전조등 불빛의 방향을 아래로 향하게 하고, 전조등 불빛의 밝기를 함부로 조작하여 앞의 차 또는 노면전차의 운전을 방해하지 아니할 것

Article_no: 도로교통법_제37조_①_1-3, Score: 0.9143
Content: 모든 차 또는 노면전차의 운전자는 다음 각 호의 어느 하나에 해당하는 경우에는 대통령령으로 정하는 바에 따라 전조등(前照燈), 차폭등(車幅燈), 미등(尾燈)과 그 밖의 등화를 켜야 한다. 밤(해가 진 후부터 해가 뜨기 전까지를 말한다. 이하 같다)에 도로에서 차 또는 노면전차를 운행하거나 고장이나 그 밖의 부득이한 사유로 도로에서 차 또는 노면전차를 정차 또는 주차하는 경우, 안개가 끼거나 비 또는 눈이 올 때에 도로에서 차 또는 노면전차를 운행하거나 고장이나 그 밖의 부득이한 사유로 도로에서 차 또는 노면전차를 정차 또는 