In [103]:
import os
import re
import pickle
from typing import List, Dict, Any, Tuple, Optional
from pathlib import Path
from dataclasses import dataclass

from langchain.docstore.document import Document
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_text_splitters import MarkdownHeaderTextSplitter
from langchain_ollama import OllamaEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever

In [104]:
import re
from typing import List, Dict, Set
from dataclasses import dataclass
import json
from collections import Counter

@dataclass
class DocumentChunk:
    """문서 청크를 나타내는 클래스"""
    content: str
    level: int  # 헤더 레벨 (1=H1, 2=H2, 3=H3, 4=H4)
    title: str
    full_path: List[str]  # 계층적 경로 (예: ["07. PNS", "PNS 상세", "Payment Notification"])
    metadata: Dict
    start_line: int
    end_line: int
    tags: List[str]  # 문서에서 추출된 태그들 (특수용어, 코드값, API 파라미터 등)

class TechnicalTermExtractor:
    """기술문서에서 특수 용어, 코드값, API 파라미터 등을 추출하는 클래스"""
    
    def __init__(self):
        # 기술 용어 패턴들
        self.patterns = {
            # camelCase API 파라미터 (더 정확한 패턴)
            'camel_case': re.compile(r'\b[a-z]+[A-Z][a-zA-Z0-9]*\b'),
            
            # snake_case 파라미터  
            'snake_case': re.compile(r'\b[a-z]+_[a-z0-9_]+\b'),
            
            # 모든 대문자 코드/상수 (2글자 이상)
            'all_caps_codes': re.compile(r'\b[A-Z][A-Z0-9_]{1,}\b'),
            
            # 복합 코드값 (SINGLE_PAYMENT_TRANSACTION, ONESTORECASH 등)
            'compound_codes': re.compile(r'\b[A-Z]{3,}(?:[A-Z0-9_]*[A-Z0-9]+)+\b'),
            
            # 긴 대문자 단어 (CREDITCARD, TELCOMEMBERSHIP 등)
            'long_caps_words': re.compile(r'\b[A-Z]{6,}\b'),
            
            # 약어 패턴 (2-5글자 대문자)
            'acronyms': re.compile(r'\b[A-Z]{2,5}\b'),
            
            # 코드값 패턴 확장 (MKT_ONE, 3.1.0D, OS_CODE 등)
            'code_values': re.compile(r'\b[A-Z]+_[A-Z0-9]+\b|\b\d+\.\d+\.\d+[A-Z]?\b|\b[A-Z]{2,}_[0-9]{6,}\b'),
            
            # HTTP 메서드와 상태코드
            'http_terms': re.compile(r'\b(?:GET|POST|PUT|DELETE|PATCH|HEAD|OPTIONS)\b|\b\d{3}\s+[A-Z]+\b'),
            
            # JSON 필드명 (따옴표 안의 것들)
            'json_fields': re.compile(r'"([a-zA-Z][a-zA-Z0-9_]*)"'),
            
            # 테이블 헤더의 파라미터명 (| 로 구분된 테이블에서)
            'table_params': re.compile(r'\|\s*([a-zA-Z][a-zA-Z0-9_]*)\s*\|'),
            
            # 테이블 셀의 코드값 (| 로 구분된 테이블에서)
            'table_codes': re.compile(r'\|\s*([A-Z][A-Z0-9_]{2,})\s*\|'),
            
            # 숫자가 포함된 코드 (11PAY, V21 등)
            'numeric_codes': re.compile(r'\b[A-Z]*[0-9]+[A-Z]*\b|\b[0-9]+[A-Z]+\b'),
            
            # 기술 키워드들
            'tech_keywords': re.compile(r'\b(?:API|SDK|JSON|XML|HTTP|HTTPS|URL|URI|JWT|OAuth|REST|SOAP|SSL|TLS|UTF|ASCII|Base64)\b', re.IGNORECASE),
            
            # 단어 경계에서 camelCase/PascalCase 추가 패턴
            'mixed_case': re.compile(r'\b[a-zA-Z]*[a-z][A-Z][a-zA-Z0-9]*\b'),
        }
        
        # 제외할 일반적인 단어들
        self.exclude_words = {
            'THE', 'AND', 'OR', 'BUT', 'FOR', 'WITH', 'FROM', 'TO', 'IN', 'ON', 'AT', 'BY',
            'AS', 'IS', 'ARE', 'WAS', 'WERE', 'BE', 'BEEN', 'HAVE', 'HAS', 'HAD', 'DO', 'DOES', 'DID',
            'WILL', 'WOULD', 'COULD', 'SHOULD', 'MAY', 'MIGHT', 'CAN', 'MUST', 'SHALL',
            'THIS', 'THAT', 'THESE', 'THOSE', 'A', 'AN', 'IF', 'WHEN', 'WHERE', 'HOW', 'WHY', 'WHAT',
            'WHO', 'WHICH', 'WHOSE', 'WHOM', 'ALL', 'ANY', 'SOME', 'MANY', 'MUCH', 'MORE', 'MOST',
            'OTHER', 'ANOTHER', 'EACH', 'EVERY', 'BOTH', 'EITHER', 'NEITHER', 'NOT', 'NO', 'YES', 
            'String', 'ID', 'URL', 'URI', 'JSON', 'XML', 'HTTP', 'HTTPS', 'SSL', 'TLS', 'UTF', 'ASCII', 'Base64',
        }
        
        # 한국어 제외 패턴 (한글이 포함된 것들은 제외)
        self.korean_pattern = re.compile(r'[가-힣]')
        
    def extract_tags(self, text: str, title: str = "") -> List[str]:
        """
        텍스트에서 기술 태그들을 추출합니다.
        
        Args:
            text: 분석할 텍스트
            title: 섹션 제목 (추가 태그 추출용)
            
        Returns:
            List[str]: 추출된 태그들
        """
        all_tags = set()
        
        # 제목에서도 태그 추출
        if title:
            title_tags = self._extract_from_text(title)
            all_tags.update(title_tags)
        
        # 본문에서 태그 추출
        content_tags = self._extract_from_text(text)
        all_tags.update(content_tags)
        
        # 빈도수 기반 필터링 및 정렬
        return self._filter_and_rank_tags(list(all_tags), text)
    
    def _extract_from_text(self, text: str) -> Set[str]:
        """텍스트에서 모든 패턴을 이용해 태그를 추출합니다."""
        tags = set()
        
        for pattern_name, pattern in self.patterns.items():
            matches = pattern.findall(text)
            
            for match in matches:
                # 그룹 매치 결과를 사용하는 패턴들
                if pattern_name in ['json_fields', 'table_codes', 'table_params']:
                    tag = match if isinstance(match, str) else match[0]
                else:
                    tag = match
                
                # 필터링 조건들
                if self._is_valid_tag(tag):
                    tags.add(tag)
        
        return tags
    
    def _is_valid_tag(self, tag: str) -> bool:
        """태그가 유효한지 검증합니다."""
        # 너무 짧거나 긴 것 제외 (긴 코드명도 허용)
        if len(tag) < 2 or len(tag) > 30:
            return False
            
        # 한글 포함된 것 제외
        if self.korean_pattern.search(tag):
            return False
            
        # 제외 단어 목록에 있는 것 제외 (단, 긴 코드는 예외)
        if tag in self.exclude_words:
            return False
            
        # 숫자로만 이루어진 것 제외 (단, 의미있는 숫자 코드는 허용)
        if tag.isdigit() and len(tag) < 3:
            return False
            
        # 특수문자로만 이루어진 것 제외
        if re.match(r'^[^a-zA-Z0-9]+$', tag):
            return False
            
        # 점으로만 이루어진 것 제외
        if re.match(r'^\.+$', tag):
            return False
            
        # 대문자 코드는 더 관대하게 허용
        if re.match(r'^[A-Z][A-Z0-9_]*$', tag) and len(tag) >= 3:
            return True
            
        # camelCase 파라미터는 허용
        if re.match(r'^[a-z][a-zA-Z0-9]*[A-Z]', tag):
            return True
            
        return True
    
    def _filter_and_rank_tags(self, tags: List[str], text: str) -> List[str]:
        """태그들을 필터링하고 중요도에 따라 정렬합니다."""
        # 빈도수 계산
        tag_freq = Counter()
        for tag in tags:
            # 대소문자 구분 없이 빈도 계산
            occurrences = len(re.findall(re.escape(tag), text, re.IGNORECASE))
            tag_freq[tag] = occurrences
        
        # 중요도 계산 (길이, 빈도수, 패턴 유형 고려)
        scored_tags = []
        for tag, freq in tag_freq.items():
            score = self._calculate_tag_score(tag, freq, text)
            scored_tags.append((tag, score))
        
        # 점수 순으로 정렬하고 상위 태그들만 반환
        scored_tags.sort(key=lambda x: x[1], reverse=True)
        
        # 최대 30개까지만 반환, 최소 점수 0.5 이상인 것들만
        return [tag for tag, score in scored_tags[:50] if score >= 0.5]
    
    def _calculate_tag_score(self, tag: str, frequency: int, text: str) -> float:
        """태그의 중요도 점수를 계산합니다."""
        score = 0
        
        # 기본 빈도수 점수
        score += frequency * 0.5
        
        # 길이 점수 (적당한 길이가 좋음)
        if 3 <= len(tag) <= 20:  # 긴 코드명도 허용
            score += 2
        elif len(tag) >= 2:
            score += 1
            
        # 패턴별 가중치 (더 구체적으로)
        if re.match(r'^[A-Z]{6,}$', tag):  # 긴 대문자 코드 (CREDITCARD, TELCOMEMBERSHIP)
            score += 3.5
        elif re.match(r'^[A-Z]{3,}(?:[A-Z0-9_]*[A-Z0-9]+)+$', tag):  # 복합 코드 (SINGLE_PAYMENT_TRANSACTION)
            score += 4  # 가장 높은 점수
        elif re.match(r'^[a-z]+[A-Z][a-zA-Z0-9]*$', tag):  # camelCase (purchaseState, clientId)
            score += 3.5  # 높은 점수
        elif re.match(r'^[A-Z]{2,5}$', tag):  # 약어
            score += 3
        elif re.match(r'^[A-Z][A-Z0-9_]+$', tag):  # 일반 상수
            score += 2.5
        elif '_' in tag and tag.upper() == tag:  # CONSTANT_CASE
            score += 3
        elif re.match(r'^\d+\.\d+', tag):  # 버전 번호
            score += 2.5
        elif re.match(r'^[A-Z]*[0-9]+[A-Z]*$', tag):  # 숫자 포함 코드 (11PAY, V21)
            score += 2.5
        elif re.match(r'^[a-z]+_[a-z0-9_]+$', tag):  # snake_case
            score += 2
            
        # 기술 키워드 보너스
        tech_keywords = ['API', 'SDK', 'JSON', 'HTTP', 'URL', 'TOKEN', 'ID', 'KEY', 'PAY', 'CARD']
        if any(keyword in tag.upper() for keyword in tech_keywords):
            score += 1.5
            
        # 결제 관련 코드 보너스
        payment_keywords = ['PAY', 'CARD', 'CASH', 'POINT', 'COUPON', 'WALLET', 'BILL', 'PNS']
        if any(keyword in tag.upper() for keyword in payment_keywords):
            score += 1
            
        # 상태/환경 관련 코드 보너스
        status_keywords = ['COMPLETED', 'CANCELED', 'SANDBOX', 'COMMERCIAL', 'TEST']
        if any(keyword in tag.upper() for keyword in status_keywords):
            score += 1
            
        return score

class HierarchicalDocumentSplitter:
    """계층별 헤더 기반 문서 분할기"""
    
    def __init__(self, include_parent_context: bool = True, max_chunk_size: int = 2000, extract_tags: bool = True):
        """
        Args:
            include_parent_context: 상위 계층의 컨텍스트를 포함할지 여부
            max_chunk_size: 최대 청크 크기 (문자 수)
            extract_tags: 태그 추출 기능 사용 여부
        """
        self.include_parent_context = include_parent_context
        self.max_chunk_size = max_chunk_size
        self.extract_tags = extract_tags
        self.header_pattern = re.compile(r'^(#{1,4})\s+(.+?)(?:\s+<.*)?$', re.MULTILINE)
        
        # 태그 추출기 초기화
        if self.extract_tags:
            self.tag_extractor = TechnicalTermExtractor()
        
    def split_document(self, text: str) -> List[DocumentChunk]:
        """
        문서를 계층별로 분할합니다.
        
        Args:
            text: 분할할 마크다운 텍스트
            
        Returns:
            List[DocumentChunk]: 분할된 문서 청크들
        """
        lines = text.split('\n')
        chunks = []
        
        # 헤더 정보 추출
        headers = self._extract_headers(text)
        
        # 각 헤더별로 문서 청크 생성
        for i, header in enumerate(headers):
            start_line = header['line_num']
            end_line = headers[i + 1]['line_num'] - 1 if i + 1 < len(headers) else len(lines)
            
            # 해당 섹션의 내용 추출
            section_content = '\n'.join(lines[start_line:end_line])
            
            # 상위 컨텍스트 추가 (옵션)
            if self.include_parent_context:
                parent_context = self._get_parent_context(header, headers)
                if parent_context:
                    section_content = parent_context + '\n\n' + section_content
            
            # 태그 추출
            tags = []
            if self.extract_tags:
                tags = self.tag_extractor.extract_tags(section_content, header['title'])
            
            # 청크 생성
            chunk = DocumentChunk(
                content=section_content,
                level=header['level'],
                title=header['title'],
                full_path=header['path'],
                metadata={
                    'level': header['level'],
                    'section_id': f"section_{i}",
                    'parent_titles': header['path'][:-1],
                    'char_count': len(section_content),
                    'line_range': f"{start_line}-{end_line}",
                    'tags': tags
                },
                start_line=start_line,
                end_line=end_line,
                tags=tags
            )
            
            chunks.append(chunk)
            
            # 하위 레벨별로도 청크 생성 (계층적 접근)
            sub_chunks = self._create_hierarchical_chunks(header, section_content, start_line)
            chunks.extend(sub_chunks)
        
        return chunks
    
    def _extract_headers(self, text: str) -> List[Dict]:
        """텍스트에서 헤더 정보를 추출합니다."""
        lines = text.split('\n')
        headers = []
        path_stack: List[str] = []
        
        for line_num, line in enumerate(lines):
            match = self.header_pattern.match(line)
            if match:
                level = len(match.group(1))  # # 개수
                title = match.group(2).strip()
                
                # 경로 스택 관리
                while len(path_stack) >= level:
                    path_stack.pop()
                
                path_stack.append(title)
                
                headers.append({
                    'level': level,
                    'title': title,
                    'path': path_stack.copy(),
                    'line_num': line_num
                })
        
        return headers
    
    def _get_parent_context(self, current_header: Dict, all_headers: List[Dict]) -> str:
        """현재 헤더의 상위 컨텍스트를 가져옵니다."""
        parent_context = []
        
        # 상위 레벨 헤더들의 제목을 컨텍스트로 추가
        for parent_title in current_header['path'][:-1]:
            parent_context.append(f"상위 섹션: {parent_title}")
        
        return '\n'.join(parent_context) if parent_context else ""
    
    def _create_hierarchical_chunks(self, header: Dict, content: str, start_line: int) -> List[DocumentChunk]:
        """계층적으로 청크를 생성합니다."""
        chunks = []
        
        # 긴 내용을 더 작은 청크로 분할
        if len(content) > self.max_chunk_size:
            sub_chunks = self._split_long_content(content, header, start_line)
            chunks.extend(sub_chunks)
        
        return chunks
    
    def _split_long_content(self, content: str, header: Dict, start_line: int) -> List[DocumentChunk]:
        """긴 내용을 작은 청크로 분할합니다."""
        chunks = []
        paragraphs = content.split('\n\n')
        current_chunk = ""
        chunk_count = 0
        
        for para in paragraphs:
            if len(current_chunk + para) > self.max_chunk_size and current_chunk:
                # 태그 추출
                tags = []
                if hasattr(self, 'tag_extractor') and self.extract_tags:
                    tags = self.tag_extractor.extract_tags(current_chunk.strip(), f"{header['title']} (Part {chunk_count + 1})")
                
                # 현재 청크를 저장
                chunk = DocumentChunk(
                    content=current_chunk.strip(),
                    level=header['level'] + 1,  # 하위 레벨로 설정
                    title=f"{header['title']} (Part {chunk_count + 1})",
                    full_path=header['path'] + [f"Part {chunk_count + 1}"],
                    metadata={
                        'level': header['level'] + 1,
                        'section_id': f"section_{header['title']}_{chunk_count}",
                        'parent_titles': header['path'],
                        'char_count': len(current_chunk),
                        'is_sub_chunk': True,
                        'tags': tags
                    },
                    start_line=start_line,
                    end_line=start_line,
                    tags=tags
                )
                chunks.append(chunk)
                current_chunk = para
                chunk_count += 1
            else:
                current_chunk += "\n\n" + para if current_chunk else para
        
        # 마지막 청크 추가
        if current_chunk.strip():
            # 태그 추출
            tags = []
            if hasattr(self, 'tag_extractor') and self.extract_tags:
                tags = self.tag_extractor.extract_tags(current_chunk.strip(), f"{header['title']} (Part {chunk_count + 1})")
            
            chunk = DocumentChunk(
                content=current_chunk.strip(),
                level=header['level'] + 1,
                title=f"{header['title']} (Part {chunk_count + 1})",
                full_path=header['path'] + [f"Part {chunk_count + 1}"],
                metadata={
                    'level': header['level'] + 1,
                    'section_id': f"section_{header['title']}_{chunk_count}",
                    'parent_titles': header['path'],
                    'char_count': len(current_chunk),
                    'is_sub_chunk': True,
                    'tags': tags
                },
                start_line=start_line,
                end_line=start_line,
                tags=tags
            )
            chunks.append(chunk)
        
        return chunks
    
    def get_chunks_by_level(self, chunks: List[DocumentChunk], level: int) -> List[DocumentChunk]:
        """특정 레벨의 청크들만 반환합니다."""
        return [chunk for chunk in chunks if chunk.level == level]
    
    def find_relevant_chunks(self, chunks: List[DocumentChunk], query: str) -> List[DocumentChunk]:
        """쿼리와 관련된 청크들을 찾습니다."""
        relevant_chunks = []
        query_lower = query.lower()
        
        for chunk in chunks:
            # 제목이나 내용에 쿼리 키워드가 포함된 청크 찾기
            if (query_lower in chunk.title.lower() or 
                query_lower in chunk.content.lower()):
                relevant_chunks.append(chunk)
        
        return relevant_chunks
    
    def find_chunks_by_tags(self, chunks: List[DocumentChunk], target_tags: List[str]) -> List[DocumentChunk]:
        """특정 태그들을 포함하는 청크들을 찾습니다."""
        relevant_chunks = []
        target_tags_lower = [tag.lower() for tag in target_tags]
        
        for chunk in chunks:
            chunk_tags_lower = [tag.lower() for tag in chunk.tags]
            # 타겟 태그 중 하나라도 포함하면 관련 청크로 판정
            if any(target_tag in chunk_tags_lower for target_tag in target_tags_lower):
                relevant_chunks.append(chunk)
        
        return relevant_chunks
    
    def get_all_tags(self, chunks: List[DocumentChunk]) -> Dict[str, int]:
        """모든 청크에서 태그들과 빈도수를 수집합니다."""
        tag_counter = Counter()
        
        for chunk in chunks:
            for tag in chunk.tags:
                tag_counter[tag] += 1
        
        return dict(tag_counter)
    
def save_documents(docs: List[Document], output_path: str):
    """
    List[Document]를 pickle 파일로 저장합니다.
    
    Args:
        docs (List[Document]): 저장할 문서 리스트
        output_path (str): 저장할 디렉토리 경로
    """
    os.makedirs(output_path, exist_ok=True)
    docs_file_path = os.path.join(output_path, "documents.pkl")
    
    with open(docs_file_path, "wb") as f:
        pickle.dump(docs, f)
    
    print(f"✅ 문서 저장 완료: {docs_file_path}")
    print(f"📄 저장된 문서 수: {len(docs)}")
    
def embed_and_save_with_docs(docs: List[Document], output_path: str, model_name: str = "bge-m3:latest"):
    """
    문서를 임베딩하고 FAISS 데이터베이스로 저장하며, 동시에 원본 문서도 저장합니다.
    
    Args:
        docs (List[Document]): 처리할 문서 리스트
        output_path (str): 저장할 디렉토리 경로
        model_name (str): 임베딩 모델명
    """
    # 임베딩 모델 초기화
    embedding_model = OllamaEmbeddings(model=model_name)
    
    # FAISS 데이터베이스 생성 및 저장
    db = FAISS.from_documents(docs, embedding_model)
    db.save_local(output_path)
    print(f"✅ 임베딩 저장 완료: {output_path}")
    
    # 원본 문서도 함께 저장
    save_documents(docs, output_path)

def create_document(doc_file_path: str = "data/dev_center_guide_allmd_touched.md") -> List[Document]:
    with open(doc_file_path, 'r', encoding='utf-8') as f:
        document_text = f.read()
        
    splitter = HierarchicalDocumentSplitter(
      include_parent_context=True,  # 상위 컨텍스트 포함
      max_chunk_size=2000  # 최대 청크 크기
    )
    chunks = splitter.split_document(document_text)
    
    docs = list(map(lambda chunk: Document(
      page_content=f"[title]: {chunk.title}\n[section_path]: {chunk.full_path}\n[tags]: {chunk.tags}\n\n[contents]:{chunk.content}".strip(), 
      metadata=chunk.metadata
    ), chunks))
    return docs

In [105]:
docs = create_document()

cnt = 0  
for doc in docs:
  if 'PNS' in doc.page_content:
    print(doc.metadata)
    print("++++++")
    print(doc.page_content)
    # print(doc.metadata)
    print("-" * 100)
    cnt += 1

print(f"cnt: {cnt}")  

model_path = "models/faiss_rag_base_bge-m3"
# embed_and_save_with_docs(docs, model_path)

{'level': 1, 'section_id': 'section_0', 'parent_titles': [], 'char_count': 1136, 'line_range': '1-38', 'tags': ['V21', 'SDK', 'API', 'sdk', 'PNS', 'V16', 'IAP', 'V7', 'V4', 'info']}
++++++
[title]: 원스토어 인앱결제 API V7(SDK V21) 연동 안내 및 다운로드
[section_path]: ['원스토어 인앱결제 API V7(SDK V21) 연동 안내 및 다운로드']
[tags]: ['V21', 'SDK', 'API', 'sdk', 'PNS', 'V16', 'IAP', 'V7', 'V4', 'info']

[contents]:# 원스토어 인앱결제 API V7(SDK V21) 연동 안내 및 다운로드

원스토어의 최신 인앱결제 API V7(SDK V21)이 출시되었습니다.

보다 강력하고 다양한 기능을 지원하는 최신 버전을 적용해보세요.

{% hint style="info" %}
API V4(SDK V16) 이하 버전과는 호환되지 않습니다. 인앱결제 API V4(SDK V16)에 대한 안내 및 다운로드는 [여기](old-version/v16)를 클릭해주세요.
{% endhint %}

{% hint style="info" %}
현재 판매중인 앱을 대한민국 외 국가/지역으로 배포하기 위해서는 아래 가이드를 참고해주세요

* [대한민국 외 국가 및 지역 배포를 위한 가이드](../glb)
{% endhint %}

If you are comfortable with English, please change the language to English from the upper left side in this page.

* [01. 원스토어 인앱결제 개요](v21/ov)
* [02. 원스토어 인앱결제 적용을 위한 사전준비](v21/pre)
* [03. 결제 테스트 및 보안](v21/test)
* [04. 원스토어

In [106]:
qry = "PNS 의 purchaseState 값은 무엇인가요?"

tte = TechnicalTermExtractor()

tags = tte.extract_tags(qry)
for tag in tags:
    print(tag)

PNS
purchaseState


In [80]:
###### Retriever Check from FAISS Vector DB ######

# from langchain.embeddings import OllamaEmbeddings
# from langchain.vectorstores import FAISS
# from langchain_core.documents import Document
# from langchain.prompts import PromptTemplate
# from langchain_community.chat_models import ChatOllama
# from langchain_text_splitters import RecursiveCharacterTextSplitter
# from langchain_community.document_loaders import PyMuPDFLoader
# from langchain_community.vectorstores import FAISS
# from langchain_core.output_parsers import StrOutputParser
# from langchain_core.runnables import RunnablePassthrough
# from langchain_core.prompts import PromptTemplate
# from langchain.retrievers import EnsembleRetriever
# from langchain.retrievers import BM25Retriever

from langchain_ollama import OllamaEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever

fixed_model_name = "bge-m3:latest"
model_file_name = "models/faiss_rag_base_bge-m3_2"

# ✅ 3. 임베딩 모델 초기화 (Ollama)
embedding_model = OllamaEmbeddings(model=fixed_model_name)

# 저장된 데이터를 로드
loaded_db = FAISS.load_local(
    folder_path=model_file_name,
    # index_name="index",
    embeddings=embedding_model,
    allow_dangerous_deserialization=True,
)

retriever = loaded_db.as_retriever(
    search_type="mmr",
    search_kwargs={"k": 20, "fetch_k": 50, "lambda_mult": 0.7}
    # search_type="similarity",
    # search_kwargs={"k": 50}
)

bm25 = BM25Retriever.from_documents(
    docs,
    bm25_params={"k1": 1.5, "b": 0.75}
)

bm25.k = 20

ensembled_retriever = EnsembleRetriever(
    retrievers=[bm25, retriever],
    weights=[0.3, 0.7]
)


res = ensembled_retriever.invoke(
    "PN S의 purchaseState 값은 무엇인가요?",
    # "PNS의 개념을 설명해주세요"
)

# res = bm25.invoke(
#     "PNS의 개념을 설명해주세요"
# )

# res = retriever.invoke(
#     "PNS 의 purchaseState의 값은 무엇이 있나요?"
# )

print(f"검색된 문서 수: {len(res)}")

idx = 0
for doc in res: 
    print(f"--- doc_index: {idx} ---")
    print(doc.page_content)  # Print first 100 characters of each document
    # print(doc.metadata)
    # print('-' * 40)
    idx += 1

검색된 문서 수: 40
--- doc_index: 0 ---
[title]: getPurchaseState
[section_path]: ['PurchaseData', 'Public methods', 'getPurchaseState']
[tags]: ['getPurchaseState', 'PurchaseState', 'PurchaseData', 'int']

[contents]:상위 섹션: PurchaseData
상위 섹션: Public methods

### getPurchaseState <a href="#id-c-purchasedata-getpurchasestate" id="id-c-purchasedata-getpurchasestate"></a>

```
int getPurchaseState()
```

구매 상태를 나타내는 값으로 \[A]PurchaseData.PurchaseState 중 하나를 반환합니다.

| **Returns:** |             |
| ------------ | ----------- |
| int          | <p><br></p> |
--- doc_index: 1 ---
[title]: PNS 메시지 규격 변경
[section_path]: ['09. 원스토어 인앱결제 릴리즈 노트', '원스토어 인앱결제 라이브러리 21.02.00 업데이트', '**원스토어 인앱결제 라이브러리 API V6(SDK V19) 출시**', 'PNS 메시지 규격 변경']
[tags]: ['PNS', 'purchaseToken', 'SDK', 'API', 'marketCode', 'priceCurrencyCode', 'V19', '21.02.00', 'V6']

[contents]:상위 섹션: 09. 원스토어 인앱결제 릴리즈 노트
상위 섹션: 원스토어 인앱결제 라이브러리 21.02.00 업데이트
상위 섹션: **원스토어 인앱결제 라이브러리 API V6(SDK V19) 출시**

#### PNS 메시지 규격 변경  <a href="#id-09.-p