# RAG 챗봇 파이프라인

LH 공고 검색을 위한 5단계 RAG 파이프라인:
1. 질문 재구성 (Query Rewriting)
2. 하이브리드 검색 (Vector + Keyword)
3. 재순위화 (Reranking)
4. 컨텍스트 구성 (청크 병합 + 구조화)
5. 답변 생성 (LLM)

## 환경 설정

In [None]:
import os
import asyncio
import asyncpg
import json
from typing import List, Dict, Tuple
from sentence_transformers import SentenceTransformer, CrossEncoder
from openai import OpenAI

# API 키 로드
env_file = '.env'
with open(env_file, 'r') as f:
    for line in f:
        if line.startswith('OPENAI_API_KEY='):
            os.environ['OPENAI_API_KEY'] = line.strip().split('=', 1)[1]
            break

# DB 설정
DB_CONFIG = {
    'host': 'localhost',
    'port': 5432,
    'database': 'skn19_3rd_proj',
    'user': 'rag_user',
    'password': 'skn19'
}

# 클라이언트 및 모델 로드
openai_client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
embedding_model = SentenceTransformer('BAAI/bge-m3')
reranker = CrossEncoder('Dongjin-kr/ko-reranker', device='cpu')

print("환경 설정 완료")

## 1단계: 질문 재구성

In [None]:
async def rewrite_query(query: str, conversation_history: List[Dict] = None) -> Dict:
    """LLM을 활용하여 질문을 재구성하고 확장"""
    
    context_str = ""
    if conversation_history:
        recent = conversation_history[-3:]
        context_str = "\n이전 대화:\n" + "\n".join([
            f"Q: {h['query']}\nA: {h['answer'][:100]}..."
            for h in recent
        ])
    
    system_prompt = """당신은 LH 공고 검색 시스템의 질문 분석 전문가입니다.
사용자의 질문을 분석하여 다음 정보를 JSON 형식으로 추출하세요:

1. rewritten: 완전한 문장으로 재구성된 질문 (대화 맥락 반영)
2. expanded: 검색 최적화를 위한 확장 쿼리 (유사어, 관련어 포함)
3. keywords: 핵심 키워드 리스트 (세부 지역명 포함)
4. filters: 메타데이터 필터
   - region: "경기도", "서울특별시", "서울특별시 외" 중 하나
   - notice_type: "국민임대", "행복주택", "영구임대" 등
   - category: "lease" 또는 "sale"

중요: 세부 지역명(남양주, 수원)은 keywords에만, filters.region은 광역시/도만 사용"""

    user_prompt = f"""{context_str}\n\n현재 질문: {query}\n\n위 질문을 분석하여 검색에 최적화된 형태로 재구성해주세요."""

    try:
        response = openai_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            temperature=0.1,
            response_format={"type": "json_object"}
        )
        result = json.loads(response.choices[0].message.content)
        result['original'] = query
        return result
    except Exception as e:
        return {'original': query, 'rewritten': query, 'expanded': query, 'keywords': query.split(), 'filters': {}}

## 2단계: 하이브리드 검색

In [None]:
async def vector_search(query: str, top_k: int = 15, filters: dict = None, filter_ids: List[str] = None) -> List[Dict]:
    """벡터 유사도 검색 (의미 기반)"""
    query_embedding = embedding_model.encode(query, normalize_embeddings=True)
    conn = await asyncpg.connect(**DB_CONFIG)
    
    try:
        where_clauses, params = [], [str(query_embedding.tolist())]
        
        if filters:
            if 'region' in filters:
                where_clauses.append(f"a.region LIKE ${len(params)+1}")
                params.append(f"%{filters['region']}%")
            if 'category' in filters:
                where_clauses.append(f"a.category = ${len(params)+1}")
                params.append(filters['category'])
            if 'notice_type' in filters:
                where_clauses.append(f"a.notice_type LIKE ${len(params)+1}")
                params.append(f"%{filters['notice_type']}%")
        
        if filter_ids:
            where_clauses.append(f"a.id = ANY(${len(params)+1}::text[])")
            params.append(filter_ids)
        
        where_sql = " AND " + " AND ".join(where_clauses) if where_clauses else ""
        params.append(top_k)
        
        sql = f"""
            SELECT dc.id as chunk_id, dc.announcement_id, a.title, a.category, a.region, a.notice_type,
                   dc.chunk_text, dc.metadata, (1 - (dc.embedding <=> $1::vector)) as similarity, 'vector' as search_type
            FROM document_chunks dc
            JOIN announcements a ON dc.announcement_id = a.id
            WHERE 1=1 {where_sql}
            ORDER BY dc.embedding <=> $1::vector
            LIMIT ${len(params)}
        """
        return [dict(r) for r in await conn.fetch(sql, *params)]
    finally:
        await conn.close()


async def keyword_search(keywords: List[str], top_k: int = 10, filters: dict = None) -> List[Dict]:
    """키워드 기반 검색 (LIKE 검색)"""
    conn = await asyncpg.connect(**DB_CONFIG)
    
    try:
        params, keyword_conditions = [], []
        for kw in keywords:
            keyword_conditions.append(f"dc.chunk_text LIKE ${len(params)+1}")
            params.append(f"%{kw}%")
        
        keyword_sql = " OR ".join(keyword_conditions) if keyword_conditions else "1=1"
        
        where_clauses = []
        if filters:
            if 'region' in filters:
                where_clauses.append(f"a.region LIKE ${len(params)+1}")
                params.append(f"%{filters['region']}%")
            if 'category' in filters:
                where_clauses.append(f"a.category = ${len(params)+1}")
                params.append(filters['category'])
            if 'notice_type' in filters:
                where_clauses.append(f"a.notice_type LIKE ${len(params)+1}")
                params.append(f"%{filters['notice_type']}%")
        
        where_sql = " AND " + " AND ".join(where_clauses) if where_clauses else ""
        params.append(top_k)
        
        sql = f"""
            SELECT DISTINCT ON (dc.id) dc.id as chunk_id, dc.announcement_id, a.title, a.category, a.region,
                   a.notice_type, dc.chunk_text, dc.metadata, 0.5 as similarity, 'keyword' as search_type
            FROM document_chunks dc
            JOIN announcements a ON dc.announcement_id = a.id
            WHERE ({keyword_sql}) {where_sql}
            LIMIT ${len(params)}
        """
        return [dict(r) for r in await conn.fetch(sql, *params)]
    finally:
        await conn.close()


async def hybrid_search(query_analysis: Dict, vector_top_k: int = 15, keyword_top_k: int = 10) -> List[Dict]:
    """하이브리드 검색: Vector + Keyword 결과를 병합"""
    vector_results = await vector_search(query_analysis['expanded'], top_k=vector_top_k, filters=query_analysis.get('filters', {}))
    keyword_results = await keyword_search(query_analysis.get('keywords', []), top_k=keyword_top_k, filters=query_analysis.get('filters', {}))
    
    seen_chunks, combined = set(), []
    for r in vector_results + keyword_results:
        if r['chunk_id'] not in seen_chunks:
            seen_chunks.add(r['chunk_id'])
            combined.append(r)
    return combined

## 3단계: 재순위화

In [None]:
def rerank_results(query: str, search_results: List[Dict], top_k: int = 8) -> List[Dict]:
    """Cross-Encoder를 사용한 정밀 재순위화"""
    if not search_results:
        return []
    
    pairs = [(query, r['chunk_text']) for r in search_results]
    scores = reranker.predict(pairs)
    
    for i, result in enumerate(search_results):
        result['rerank_score'] = float(scores[i])
    
    reranked = sorted(search_results, key=lambda x: x['rerank_score'], reverse=True)
    return reranked[:top_k]

## 4단계: 컨텍스트 구성

In [None]:
def build_context(reranked_results: List[Dict]) -> Tuple[str, List[Dict]]:
    """청크 병합 및 구조화된 컨텍스트 구성"""
    
    # 같은 공고의 청크들을 병합
    announcement_chunks = {}
    for r in reranked_results:
        ann_id = r['announcement_id']
        if ann_id not in announcement_chunks:
            announcement_chunks[ann_id] = {
                'announcement_id': ann_id, 'title': r['title'], 'category': r['category'],
                'region': r['region'], 'notice_type': r['notice_type'], 'metadata': r['metadata'],
                'chunk_texts': [r['chunk_text']], 'rerank_score': r['rerank_score'], 'chunk_count': 1
            }
        else:
            announcement_chunks[ann_id]['chunk_texts'].append(r['chunk_text'])
            announcement_chunks[ann_id]['chunk_count'] += 1
            announcement_chunks[ann_id]['rerank_score'] = max(announcement_chunks[ann_id]['rerank_score'], r['rerank_score'])
    
    merged = sorted(announcement_chunks.values(), key=lambda x: x['rerank_score'], reverse=True)
    
    # 구조화된 컨텍스트 생성
    context_parts, sources = [], []
    for idx, m in enumerate(merged, 1):
        metadata = json.loads(m['metadata']) if isinstance(m['metadata'], str) else m['metadata']
        category_name = "임대" if m['category'] == 'lease' else "분양"
        merged_text = '\n\n'.join(m['chunk_texts'])
        
        context_parts.append(f"""
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
문서 {idx}: {m['title']}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

[기본 정보]
- 분류: {category_name}
- 지역: {m['region']}
- 유형: {m['notice_type'] or 'N/A'}
- 관련도: {m['rerank_score']:.3f}

[문서 내용]
{merged_text}
        """.strip())
        
        sources.append({
            'announcement_id': m['announcement_id'],
            'title': m['title'],
            'region': m['region'],
            'score': m['rerank_score'],
            'chunk_count': m['chunk_count']
        })
    
    return "\n\n".join(context_parts), sources

## 5단계: 답변 생성

In [None]:
def generate_answer(query: str, context: str, sources: List[Dict]) -> Dict:
    """LLM으로 답변 생성"""
    
    system_prompt = """당신은 LH 공사의 임대/분양 공고 전문 상담사입니다.

# 답변 원칙
1. 제공된 문서만을 근거로 답변
2. 문서에 없는 내용은 "제공된 공고에서 확인할 수 없습니다" 명시
3. 표가 있으면 마크다운 표로 정리
4. 숫자, 날짜, 조건은 정확히 인용
5. 답변 끝에 [문서 1, 2 참조] 형태로 출처 표시"""

    user_prompt = f"""# 제공된 문서\n\n{context}\n\n# 사용자 질문\n{query}\n\n위 문서를 바탕으로 정확하게 답변해주세요."""

    try:
        response = openai_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            temperature=0.1,
            max_tokens=2000
        )
        return {
            'answer': response.choices[0].message.content,
            'sources': sources,
            'metadata': {'model': 'gpt-4o-mini', 'tokens': response.usage.total_tokens}
        }
    except Exception as e:
        return {'answer': f"답변 생성 오류: {str(e)}", 'sources': sources, 'metadata': {'error': str(e)}}

## 통합 파이프라인

In [None]:
async def rag_chatbot(query: str, conversation_history: List[Dict] = None, verbose: bool = True) -> Dict:
    """5단계 RAG 파이프라인 통합 함수"""
    
    if verbose:
        print(f"\n{'='*80}\n질문: {query}\n{'='*80}")
    
    # 1. 질문 재구성
    query_analysis = await rewrite_query(query, conversation_history)
    if verbose:
        print(f"\n[1/5] 질문 재구성: {query_analysis.get('rewritten', 'N/A')}")
    
    # 2. 하이브리드 검색
    search_results = await hybrid_search(query_analysis, vector_top_k=15, keyword_top_k=10)
    if verbose:
        print(f"[2/5] 하이브리드 검색: {len(search_results)}개 결과")
    
    if not search_results:
        return {'query': query, 'answer': "관련 정보를 찾을 수 없습니다.", 'sources': []}
    
    # 3. 재순위화
    reranked = rerank_results(query_analysis.get('rewritten', query), search_results, top_k=8)
    if verbose:
        print(f"[3/5] 재순위화: 상위 {len(reranked)}개 선정 (최고 점수: {reranked[0]['rerank_score']:.4f})")
    
    # 4. 컨텍스트 구성
    context, sources = build_context(reranked)
    if verbose:
        print(f"[4/5] 컨텍스트 구성: {len(context)} 문자")
    
    # 5. 답변 생성
    result = generate_answer(query_analysis.get('rewritten', query), context, sources)
    if verbose:
        print(f"[5/5] 답변 생성 완료\n\n{'='*80}\n{result['answer']}\n{'='*80}")
    
    return {'query': query, 'query_analysis': query_analysis, **result}

## 대화형 챗봇 (맥락 유지)

In [None]:
conversation_history = []

async def analyze_context(query: str, history: List[Dict]) -> Dict:
    """LLM으로 맥락 참조 분석"""
    if not history:
        return {'is_context_question': False}
    
    history_str = "\n".join([f"Q: {h['query']}\nA: {h['answer'][:200]}..." for h in history[-2:]])
    
    system_prompt = """대화 맥락 분석 전문가입니다. 현재 질문이 이전 대화를 참조하는지 판단하세요.
JSON 응답: {"is_context_question": true/false, "reason": "판단 근거", "referenced_announcement_indices": [0, 1]}"""
    
    try:
        response = openai_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": f"이전 대화:\n{history_str}\n\n현재 질문: {query}"}
            ],
            temperature=0.1,
            response_format={"type": "json_object"}
        )
        return json.loads(response.choices[0].message.content)
    except:
        return {'is_context_question': False}


async def chat(query: str, verbose: bool = True):
    """대화 맥락을 유지하는 챗봇"""
    
    # 맥락 분석
    context_analysis = await analyze_context(query, conversation_history)
    is_context = context_analysis.get('is_context_question', False)
    
    if is_context and conversation_history:
        if verbose:
            print(f"[맥락 인식] {context_analysis.get('reason', '')}")
        
        # 이전 공고 ID 추출
        prev_ids = []
        for idx in context_analysis.get('referenced_announcement_indices', [0]):
            if idx < len(conversation_history):
                for src in conversation_history[-(idx+1)].get('sources', [])[:3]:
                    if src.get('announcement_id') and src['announcement_id'] not in prev_ids:
                        prev_ids.append(src['announcement_id'])
        
        if prev_ids:
            # 질문 재구성
            query_analysis = await rewrite_query(query, conversation_history)
            
            # 이전 공고에서 우선 검색
            context_results = await vector_search(query_analysis.get('rewritten', query), top_k=5, filter_ids=prev_ids)
            general_results = await hybrid_search(query_analysis, vector_top_k=15, keyword_top_k=10)
            
            # 결과 병합
            seen = {r['chunk_id'] for r in context_results}
            combined = context_results + [r for r in general_results if r['chunk_id'] not in seen]
            
            # 재순위화 및 답변 생성
            reranked = rerank_results(query_analysis.get('rewritten', query), combined, top_k=8)
            context, sources = build_context(reranked)
            result = generate_answer(query_analysis.get('rewritten', query), context, sources)
            result = {'query': query, 'query_analysis': query_analysis, **result}
            
            if verbose:
                print(f"\n{'='*80}\n{result['answer']}\n{'='*80}")
        else:
            result = await rag_chatbot(query, conversation_history, verbose)
    else:
        result = await rag_chatbot(query, conversation_history, verbose)
    
    # 대화 이력 추가
    conversation_history.append({
        'query': query,
        'answer': result['answer'],
        'sources': result.get('sources', [])
    })
    
    if len(conversation_history) > 10:
        conversation_history.pop(0)
    
    return result

## 테스트

In [None]:
# 단일 질문 테스트
result = await rag_chatbot("남양주시 국민임대주택 청약저축 납입 횟수 배점은?")

In [None]:
# 대화형 테스트
await chat("수원시 행복주택 알려줘")
await chat("거기 청년 계층 출생자녀에 따른 소득 기준은?")