# RAG Chatbot with Multi-Query

chatbot_pipeline_clean.ipynb + Multi-Query 기능 통합

In [19]:
import os
import asyncpg
import asyncio
import numpy as np
from typing import List, Dict, Any, Tuple, Optional
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer, CrossEncoder
from openai import AsyncOpenAI
import json
import re

load_dotenv('.env', override=True)

DB_CONFIG = {
    'host': os.getenv('HOST'),
    'port': int(os.getenv('PORT')),
    'database': os.getenv('DATABASE'),
    'user': os.getenv('DB_USER'),
    'password': os.getenv('PASSWORD')
}

embedding_model = SentenceTransformer('BAAI/bge-m3')
reranker = CrossEncoder('Dongjin-kr/ko-reranker', device='cpu')
client = AsyncOpenAI(api_key=os.getenv('OPENAI_API_KEY'))

print(f"API Key loaded: {os.getenv('OPENAI_API_KEY')[:10] if os.getenv('OPENAI_API_KEY') else 'None'}...")
print(f"DB User: {os.getenv('DB_USER')}")

API Key loaded: sk-proj-Lf...
DB User: rag_user


## 1단계: Query Rewriting + Multi-Query Generation

In [20]:
async def rewrite_query(query: str, conversation_history: List[Dict] = None) -> Dict:
    context_info = ""
    context_analysis = None
    
    if conversation_history:
        context_prompt = f"""대화 기록:
{conversation_history}

현재 질문: "{query}"

이 질문이 이전 대화를 참조하는지 판단하고, 참조한다면 어떤 공고를 언급하는지 분석해주세요.
JSON 형식으로만 답변:
{{
  "is_followup": true/false,
  "referenced_announcement_ids": ["공고ID1", "공고ID2"],
  "context_type": "specific_announcement/general_topic/new_question"
}}"""
        
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": context_prompt}],
            temperature=0
        )
        
        try:
            context_analysis = json.loads(response.choices[0].message.content)
            if context_analysis.get('is_followup') and context_analysis.get('referenced_announcement_ids'):
                context_info = f"이전 대화에서 언급된 공고: {', '.join(context_analysis['referenced_announcement_ids'])}"
        except json.JSONDecodeError:
            context_analysis = None
    
    rewrite_prompt = f"""질문을 분석하여 다음 정보를 추출:

질문: "{query}"
{context_info}

추출 규칙:
1. region: "경기도", "서울특별시", "서울특별시 외" 중 하나만 (수원시→검색키워드로)
2. notice_type: 국민임대/행복주택/영구임대 등 (명시되지 않으면 빈 문자열)
3. category: "lease"(임대) 또는 "sale"(분양) 또는 빈 문자열 (명시되지 않으면 빈 문자열)
4. rewritten_question: 검색용 자연어 질문
5. search_keywords: 핵심 검색어 (도시명, 지역명 포함)

중요: category는 반드시 "lease", "sale", "" 중 하나만 사용. "주택유형" 같은 값 금지

JSON 형식으로만 답변:
{{
  "region": "",
  "notice_type": "",
  "category": "",
  "rewritten_question": "",
  "search_keywords": []
}}"""
    
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": rewrite_prompt}],
        temperature=0
    )
    
    try:
        result = json.loads(response.choices[0].message.content)
        
        # category 값 검증 및 정규화
        # category에 lease, sale, 빈문자열 아닌 경우 where절 결과로 아무것도 안나옴
        if result.get('category') and result['category'] not in ['lease', 'sale']:
            result['category'] = ''
            
    except json.JSONDecodeError:
        result = {
            "region": "",
            "notice_type": "",
            "category": "",
            "rewritten_question": query,
            "search_keywords": query.split()
        }
    
    if context_analysis:
        result['context_analysis'] = context_analysis
    return result

In [21]:
async def generate_multi_queries(query: str, base_query_analysis: Dict, num_queries: int = 2) -> List[str]:
    base_question = base_query_analysis.get('rewritten_question', query)

    multi_query_prompt = f"""LH 주택 공고 검색을 위해 질문을 다양한 표현으로 변환하세요.

원본 질문: {base_question}

다음 {num_queries}개의 다른 버전을 생성하세요:
1. 동의어나 유사 표현을 사용한 버전
2. 더 구체적이거나 상세한 버전

규칙:
- 각 질문은 한 줄에 하나씩
- 번호나 기호 없이 질문만
- 원본 질문의 의도 유지
- LH, 임대주택, 분양주택 관련 용어 활용

변환된 질문들:"""

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": multi_query_prompt}],
        temperature=0.7
    )

    generated = [q.strip() for q in response.choices[0].message.content.strip().split('\n') if q.strip()]
    all_queries = [base_question] + generated[:num_queries]

    return all_queries

## 2단계: Multi-Query Hybrid Search

In [22]:
async def vector_search(query: str, top_k: int = 15, filters: dict = None, filter_ids: List[str] = None) -> List[Dict]:
    conn = await asyncpg.connect(**DB_CONFIG)
    try:
        query_embedding = embedding_model.encode(query, normalize_embeddings=True)
        
        where_clauses = []
        params = [str(query_embedding.tolist())]
        
        if filters:
            if filters.get('region') and filters['region'].strip():
                where_clauses.append(f"a.region LIKE ${len(params)+1}")
                params.append(f"%{filters['region']}%")
            if filters.get('category') and filters['category'].strip():
                where_clauses.append(f"a.category = ${len(params)+1}")
                params.append(filters['category'])
            if filters.get('notice_type') and filters['notice_type'].strip():
                where_clauses.append(f"a.notice_type LIKE ${len(params)+1}")
                params.append(f"%{filters['notice_type']}%")
        
        if filter_ids:
            where_clauses.append(f"a.id = ANY(${len(params)+1}::text[])")
            params.append(filter_ids)
        
        where_sql = " AND " + " AND ".join(where_clauses) if where_clauses else ""
        params.append(top_k)
        
        sql = f"""
            SELECT dc.id as chunk_id, dc.announcement_id, a.title, a.category, a.region, a.notice_type,
                   a.posted_date, dc.chunk_text, dc.chunk_index, dc.metadata, 
                   (1 - (dc.embedding <=> $1::vector)) as similarity
            FROM document_chunks dc
            JOIN announcements a ON dc.announcement_id = a.id
            WHERE 1=1 {where_sql}
            ORDER BY dc.embedding <=> $1::vector
            LIMIT ${len(params)}
        """
        
        rows = await conn.fetch(sql, *params)
        return [dict(row) for row in rows]
    finally:
        await conn.close()

async def keyword_search(keywords: List[str], top_k: int = 10, filters: dict = None, filter_ids: List[str] = None) -> List[Dict]:
    if not keywords:
        return []
    
    conn = await asyncpg.connect(**DB_CONFIG)
    try:
        params, keyword_conditions = [], []
        for kw in keywords:
            keyword_conditions.append(f"dc.chunk_text LIKE ${len(params)+1}")
            params.append(f"%{kw}%")
        
        keyword_sql = " OR ".join(keyword_conditions) if keyword_conditions else "1=1"
        
        where_clauses = []
        if filters:
            if filters.get('region') and filters['region'].strip():
                where_clauses.append(f"a.region LIKE ${len(params)+1}")
                params.append(f"%{filters['region']}%")
            if filters.get('category') and filters['category'].strip():
                where_clauses.append(f"a.category = ${len(params)+1}")
                params.append(filters['category'])
            if filters.get('notice_type') and filters['notice_type'].strip():
                where_clauses.append(f"a.notice_type LIKE ${len(params)+1}")
                params.append(f"%{filters['notice_type']}%")
        
        if filter_ids:
            where_clauses.append(f"a.id = ANY(${len(params)+1}::text[])")
            params.append(filter_ids)
        
        where_sql = " AND " + " AND ".join(where_clauses) if where_clauses else ""
        params.append(top_k)
        
        sql = f"""
            SELECT DISTINCT ON (dc.id) dc.id as chunk_id, dc.announcement_id, a.title, a.category, a.region,
                   a.notice_type, a.posted_date, dc.chunk_text, dc.chunk_index, dc.metadata
            FROM document_chunks dc
            JOIN announcements a ON dc.announcement_id = a.id
            WHERE ({keyword_sql}) {where_sql}
            LIMIT ${len(params)}
        """
        
        rows = await conn.fetch(sql, *params)
        return [dict(row) for row in rows]
    finally:
        await conn.close()



async def multi_query_hybrid_search(
    query_analysis: Dict,
    multi_queries: List[str],
    vector_top_k: int = 10,
    keyword_top_k: int = 5
) -> List[Dict]:
    filters = {
        'region': query_analysis.get('region', ''),
        'notice_type': query_analysis.get('notice_type', ''),
        'category': query_analysis.get('category', '')
    }
    
    filter_ids = None
    if 'context_analysis' in query_analysis:
        context = query_analysis['context_analysis']
        if context.get('is_followup') and context.get('referenced_announcement_ids'):
            filter_ids = context['referenced_announcement_ids']
    
    
    tasks = []
    for q in multi_queries:
        tasks.append(vector_search(q, vector_top_k, filters, filter_ids))
        tasks.append(keyword_search(query_analysis.get('search_keywords', []), keyword_top_k, filters, filter_ids))
        
    results_list = await asyncio.gather(*tasks)
    
    all_results = {}
    for results in results_list:
        for r in results:
            chunk_id = r['chunk_id']
            if chunk_id not in all_results:
                all_results[chunk_id] = r
                
    return list(all_results.values())

## 3단계: Reranking

In [23]:
def rerank_results(query: str, results: List[Dict], top_k: int = 10) -> List[Dict]:
    if not results:
        return []
    
    pairs = [[query, result['chunk_text']] for result in results]
    scores = reranker.predict(pairs)
    
    for result, score in zip(results, scores):
        result['rerank_score'] = float(score)
    
    reranked = sorted(results, key=lambda x: x['rerank_score'], reverse=True)
    return reranked[:top_k]

## 4단계: Context Building with Chunk Merging

In [24]:
async def merge_chunks(chunks: List[Dict]) -> List[Dict]:
    if not chunks:
        return []
    
    announcement_chunks = {}
    for chunk in chunks:
        ann_id = chunk['announcement_id']
        if ann_id not in announcement_chunks:
            announcement_chunks[ann_id] = []
        announcement_chunks[ann_id].append(chunk)
    
    merged_results = []
    for ann_id, ann_chunks in announcement_chunks.items():
        ann_chunks.sort(key=lambda x: x.get('chunk_index', 999))
        
        merged_text = '\n\n'.join([c['chunk_text'] for c in ann_chunks])
        max_score_chunk = max(ann_chunks, key=lambda x: x.get('rerank_score', 0))
        
        merged_results.append({
            'announcement_id': ann_id,
            'announcement_title': ann_chunks[0]['title'],
            'announcement_date': ann_chunks[0].get('posted_date'),
            'region': ann_chunks[0]['region'],
            'notice_type': ann_chunks[0]['notice_type'],
            'category': ann_chunks[0]['category'],
            'merged_content': merged_text,
            'rerank_score': max_score_chunk.get('rerank_score', 0),
            'num_chunks': len(ann_chunks)
        })
    
    merged_results.sort(key=lambda x: x['rerank_score'], reverse=True)
    return merged_results

def build_context(merged_results: List[Dict]) -> str:
    if not merged_results:
        return "검색된 관련 정보가 없습니다."
    
    context_parts = []
    for i, result in enumerate(merged_results, 1):
        category_name = "임대" if result['category'] == 'lease' else "분양"
        
        context_parts.append(f"""━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
문서 {i}: {result['announcement_title']}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

[기본 정보]
- 분류: {category_name}
- 지역: {result['region']}
- 유형: {result['notice_type'] or 'N/A'}
- 관련도: {result['rerank_score']:.3f}

[문서 내용]
{result['merged_content']}""")
    
    return '\n\n'.join(context_parts)

## 5단계: Answer Generation

In [25]:
async def generate_answer(query: str, context: str, conversation_history: List[Dict] = None) -> str:
    history_text = ""
    if conversation_history:
        history_items = [f"Q: {h['question']}\nA: {h['answer'][:150]}..." for h in conversation_history[-3:]]
        history_text = "\n\n".join(history_items)
    
    system_prompt = """LH 공사의 임대/분양 공고 전문 상담사입니다.

# 답변 원칙
1. **정확성**: 제공된 문서만을 근거로 답변. 문서에 없는 내용은 "제공된 공고에서 확인할 수 없습니다" 명시
2. **구체성**: 숫자, 날짜, 조건을 정확히 인용. 표가 있으면 마크다운 표로 정리
3. **완전성**: 질문과 관련된 모든 중요 정보(자격, 일정, 서류, 주의사항) 포함
4. **명확성**: 복잡한 조건은 단계별로 나누어 설명
5. **친절함**: 전문 용어는 쉽게 풀어 설명

# 답변 형식
- 공고 제목, 지역, 유형 명시
- 중요한 날짜/금액/조건은 **강조**
- 여러 항목은 번호 목록이나 표로 정리
- 답변 끝에 [공고 1, 2 참조] 형태로 출처 표시

# 질문 유형별 대응
- 자격/조건 질문: 구체적 기준(소득, 나이, 거주지 등)을 상세히 나열
- 일정 질문: 모든 날짜(공고일, 접수기간, 발표일, 계약일)를 시간순 정리
- 신청 방법: 단계별 절차와 필요 서류를 목록으로 정리
- 비용 질문: 정확한 금액을 표로 정리
- 비교 질문: 차이점을 표로 정리"""
    
    user_prompt = f"""# 제공된 문서

{context}

# 이전 대화
{history_text if history_text else '없음'}

# 사용자 질문
{query}

위 문서를 바탕으로 정확하게 답변해주세요. 문서에 없는 내용은 추측하지 마세요."""
    
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        temperature=0.1,
        max_tokens=2000
    )
    
    return response.choices[0].message.content

## RAG Pipeline Integration

In [26]:
async def rag_chatbot(
    query: str,
    conversation_history: List[Dict] = None,
    use_multi_query: bool = True,
    vector_top_k: int = 10,
    keyword_top_k: int = 5,
    rerank_top_k: int = 10,
    verbose: bool = False
) -> Dict[str, Any]:
    query_analysis = await rewrite_query(query, conversation_history)
    
    if use_multi_query:
        multi_queries = await generate_multi_queries(query, query_analysis, num_queries=2)
        if verbose:
            print(f"Generated queries: {multi_queries}")
    else:
        multi_queries = [query_analysis.get('rewritten_question', query)]
    
    search_results = await multi_query_hybrid_search(
        query_analysis,
        multi_queries,
        vector_top_k,
        keyword_top_k
    )
    
    if verbose:
        print(f"Total search results: {len(search_results)}")
    
    reranked_results = rerank_results(
        query_analysis.get('rewritten_question', query),
        search_results[:15],
        rerank_top_k
    )
    
    merged_results = await merge_chunks(reranked_results)
    context = build_context(merged_results)
    answer = await generate_answer(query, context, conversation_history)
    
    return {
        'query': query,
        'query_analysis': query_analysis,
        'multi_queries': multi_queries if use_multi_query else None,
        'search_results_count': len(search_results),
        'reranked_count': len(reranked_results),
        'merged_announcements': merged_results,
        'answer': answer
    }

## Test: Single Query

In [27]:
result = await rag_chatbot(
    "수원시 공고 알려줘",
    use_multi_query=True,
    verbose=True
)

print(f"\nQuery Analysis: {result['query_analysis']}")
print(f"\nMulti Queries: {result['multi_queries']}")
print(f"\nSearch Results: {result['search_results_count']}")
print(f"\nMerged Announcements: {len(result['merged_announcements'])}")
print(f"\nAnswer:\n{result['answer']}")

Generated queries: ['수원시에 대한 공고를 알려주세요.', '수원시에 위치한 LH 임대주택이나 분양주택에 대한 공고를 알려주세요.', '수원시에서 진행 중인 LH 주택 관련 공고를 확인할 수 있을까요?']
Total search results: 24

Query Analysis: {'region': '경기도', 'notice_type': '', 'category': '', 'rewritten_question': '수원시에 대한 공고를 알려주세요.', 'search_keywords': ['수원시']}

Multi Queries: ['수원시에 대한 공고를 알려주세요.', '수원시에 위치한 LH 임대주택이나 분양주택에 대한 공고를 알려주세요.', '수원시에서 진행 중인 LH 주택 관련 공고를 확인할 수 있을까요?']

Search Results: 24

Merged Announcements: 7

Answer:
수원시에서 진행 중인 공고는 다음과 같습니다.

### 1. 2025년 기존주택 등 매입임대주택 예비입주자 모집 공고
- **지역**: 수원시, 경기도
- **유형**: 매입임대
- **모집세대수**: **400세대**
- **모집공고일**: **2025.03.27(목)** (입주자격 판단 기준일)

#### 주요 조건
1. **중복신청불가**: 1세대(세대구성원 전원) 1주택 신청
2. **신청유의**: 신청 내용이 사실과 다를 경우 부적격 처리 및 당첨 취소 가능

---

### 2. 수원시 지역 행복주택 예비입주자 모집 공고
- **지역**: 수원시, 경기도
- **유형**: 행복주택
- **서류 제출 기한**: **2025.07.24.~2025.07.30.**

#### 순위별 자격
| 순위 | 자격 조건 |
|---|---|
| 1순위 | 해당 주택건설지역(수원시) 또는 연접지역(용인시·화성시·의왕시·안산시) 거주자 |
| 2순위 | 경기도 지역 또는 서울특별시·인천광역시 거주자 |
| 3순위 | 제1,2순위에 해당되지 않는 자 |

In [28]:
result = await rag_chatbot(
    "가장 최근에 올라온 수원 지역 공고의 신청 기간은 언제까지야?",
    use_multi_query=True,
    verbose=True
)

print(f"\nAnswer:\n{result['answer']}")

Generated queries: ['가장 최근에 올라온 수원 지역 공고의 신청 기간은 언제까지인가요?', '수원 지역에서 가장 최근에 게시된 LH 주택 공고의 신청 마감일은 언제인가요?', '수원에서 발표된 최신 LH 임대주택 공고의 신청 기간이 종료되는 날짜는 어떻게 되나요?']
Total search results: 25

Answer:
가장 최근에 올라온 수원 지역 공고는 **수원역푸르지오자이 5년 공공임대주택 예비입주자 모집공고**입니다. 이 공고의 신청 기간은 다음과 같습니다:

| 신청접수(현장접수 불가) | 당첨자 발표 | 서류제출 | 계약체결 |
|---|---|---|---|
| **2024.11.18(월)**<br>(09:00~16:00) | **2024.11.21(목)**<br>**17:00 이후** | **2024.11.22(금) ~ 2024.11.26(화)**<br>**방문 제출** | **자격검증 및 소명완료 후 공가 발생 시 순차적으로 개별통보** |

따라서 신청 기간은 **2024년 11월 18일**입니다. [문서 3 참조]


## Test: Conversational Chatbot

In [29]:
async def chat(questions: List[str], use_multi_query: bool = True):
    history = []
    
    for q in questions:
        print(f"\n{'='*60}")
        print(f"Q: {q}")
        print(f"{'='*60}")
        
        result = await rag_chatbot(q, history, use_multi_query=use_multi_query, verbose=True)
        
        print(f"\nA: {result['answer']}")
        
        announcement_ids = [ann['announcement_id'] for ann in result['merged_announcements']]
        history.append({
            'question': q,
            'answer': result['answer'],
            'announcement_ids': announcement_ids
        })

test_questions = [
    "수원시의 공고 알려줘",
    "나는 결혼안한 28세 청년이야. 그 공고에 적절해?",
    "내년에 결혼할 예정인데, 신혼부부가 더 유리한가?"
]

await chat(test_questions, use_multi_query=True)


Q: 수원시의 공고 알려줘
Generated queries: ['수원시의 공고를 알려주세요.', '수원시에서 발표된 LH 임대주택 공고를 확인하고 싶습니다.', '수원시에 있는 분양주택 관련 공고를 자세히 알려주세요.']
Total search results: 24

A: 수원시의 임대 공고는 다음과 같습니다.

### 1. 2025년 기존주택 등 매입임대주택 예비입주자 모집 공고
- **지역**: 수원시, 경기도
- **유형**: 매입임대
- **모집세대수**: **400세대**
- **모집공고일**: **2025.03.27(목)** (입주자격 판단 기준일)

#### 주요 조건
1. **중복신청불가**: 1세대(세대구성원 전원) 1주택 신청
2. **신청유의**: 신청 내용이 사실과 다를 경우 부적격 처리 및 당첨 취소 가능
3. **계약안내**: 예비입주자로 선정되더라도 입주까지 상당 기간 소요될 수 있음

---

### 2. 수원역푸르지오자이 5년 공공임대주택 예비입주자 모집공고
- **지역**: 수원시, 경기도
- **유형**: 공공임대

#### 일정
| 항목 | 날짜 |
|---|---|
| 신청접수 | **2024.11.18(월)** (09:00~16:00) |
| 당첨자 발표 | **2024.11.21(목)** (17:00 이후) |
| 서류제출 | **2024.11.22(금) ~ 2024.11.26(화)** |
| 계약체결 | 자격검증 및 소명 완료 후 공가 발생 시 순차적으로 개별통보 |

#### 신청 방법
- **신청**: LH 청약플러스 또는 모바일 앱을 통해 신청
- **서류 제출**: 등기우편으로만 가능, 제출 주소는 경기도 수원시 영통구 광교로 146 안효빌딩 11층 LH수원권주거복지지사

---

위의 두 공고는 수원시에서 진행되는 임대주택 모집에 대한 정보입니다. 추가적인 세부사항이나 조건은 각 공고문을 참고하시기 바랍니다. [공고 1, 6 참조]

Q: 나는 결혼안한 28세 청년이야. 그 공고에 적절해?
Generated