In [1]:
from IPython.display import display, HTML
display(HTML("""
<style>
div.container{width:90% !important;}
div.cell.code_cell.rendered{width:100%;}
div.input_prompt{padding:0px;}
div.CodeMirror {font-family:Consolas; font-size:12pt;}
div.text_cell_render.rendered_html{font-size:12pt;}
div.output {font-size:12pt; font-weight:bold;}
div.input {font-family:Consolas; font-size:12pt;}
div.prompt {min-width:70px;}}
div#toc-wrapper{padding-top:120px;}
div.text_cell_render ul li{font-size:12pt;padding:5px;}
table.dataframe{font-size:12px;}
</style>
"""))

<b><font size="6" color="#009e84"> ch9. 12장 Retrieval의 성능개선을 위한 metadata 활용 </font></b>

# 1. 패키지

In [None]:
# -python-dotenv, langchain(1.2.0), langchain-openai, langchain-pinecone
# -pandas, langchain-community, docxtxt, langchain_text_splitters, langchain_ollama

# 2. 환경설정(환경변수, 시스템 파라미터 변수)

In [4]:
import os
from dotenv import load_dotenv
load_dotenv()

# 환경변수는 주로 대문자로씀
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
PINECONE_API_KEY = os.getenv('PINECONE_API_KEY')
OPENAI_LLM_MODEL = 'gpt-4o-mini'
OPENAI_EMBEDDIGN_MODEL = 'text-embedding-3-large' # 차원수 3072

PINECONE_INDEX_NAME = 'better-rag-index'
PINECONE_INDEX_DEMENSION = 3072
PINECONE_INDEX_METRIC = 'cosine'
PINECONE_INDEX_REGION = 'us-east-1'
PINECONE_INDEX_CLOUD = 'aws'

# 3. 문서를 chunk로 분할하기

In [5]:
from langchain_community.document_loaders import Docx2txtLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

loader = Docx2txtLoader('data/소득세법_with markdown.docx')
document = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500,
                                               chunk_overlap=200,
                                               # separators=['\n\n', '제', '\n'], 제 1조 이렇게 잘리면 좋겠지만 통제같은 경우 통에서 잘릴까봐 뺌
                                               )

# documents = loader.load_and_split(text_splitter) 지금까지는 이렇게했고 다른 방법은 아래. 아래 방법처럼 할때는 위에 document 생성
documents = text_splitter.split_documents(document)
print(f'총 {len(documents)}개 청크 생성')

총 194개 청크 생성


# 4. metadata 추가하기

- 청크 내용의 카테고리, 청크 내용의 title, 조항

In [11]:
import re

def remove_special_chars(text:str) -> str:
    '특수문자 및 \n제거(:는 그대로)'
    # \n 제거
    text = text.replace('\n',' ')
    
    # 한글, 영문, 숫자, 공백, 마침표, 콤마, :만 남기고 전부 제거
    cleaned = re.sub(r'[^가-힣a-zA-Z0-9\s,.:]', '', text) # []안에 쓴것이 아니면 없애라
    
    # 불필요한 중복 공백을 제거
    cleaned = re.sub(r'\s+', ' ', cleaned).strip()
    return cleaned

# 사용예시
content = '**한다.<개정 2025. 10. 1.>\n[본조신설 2014. 12. 23.]\n제57조(외국납부세액공제)     ① 거주자의 종합소득금액 '
print(remove_special_chars(content))

한다.개정 2025. 10. 1. 본조신설 2014. 12. 23. 제57조외국납부세액공제 거주자의 종합소득금액


In [13]:
# 제목을 추출하는 함수(exaone3.5) : cmd창에서 ollama pull exaone3.5:2.4b

from langchain_ollama import ChatOllama

def extract_title_with_llm(content):
    'exaone3.5를 사용하여 content의 제목을 추출하는 함수'
    title_extractor_llm = ChatOllama(model='exaone3.5:2.4b',
                                     temperature=0.1,
                                     num_predict=30, # 최대 30토큰까지만 출력
                                     )
    # content = remove_special_chars(content)
    prompt = f'''다음 소득세법 조문의 핵심 제목을 30토큰 이내로 간단히 완벽하게 말이 되도록 추출하세요. 중간에 말이 끊기면 안됩니다
    예 : "소득세 납세의미 범위 : 공동사업자별 소득과세, 상속과세, 증여자"
    조문 : {content}'''
    ai_message = title_extractor_llm.invoke(prompt)
    title = ai_message.content.strip()
    return remove_special_chars(title)

content = '제57조(외국납부세액공제) ① 거주자의 종합소득금액 또는 퇴직소득금액에 국외원천소득이 합산되어 있는 경우로서 그 국외원천소득에 대하여 외국에서 대통령령으로 정하는 외국소득세액(이하 이 조에서 “외국소득세액”이라 한다)을 납부하였거나 납부할 것이 있을 때에는 다음 계산식에 따라 계산한 금액(이하 이 조에서 “공제한도금액”이라 한다) 내에서 외국소득세액을 해당 과세기간의 종합소득산출세액 또는 퇴직소득 산출세액에서 공제할 수 있다. <개정 2020. 12. 29.>'
extract_title_with_llm(content)

'외국납부세액공제 : 국외세액 공제 한도 내에서 종합소득세 또는 퇴직소득세에서 차감 가능'

In [19]:
def categorize_content(content):
    '내용 카테고리 분류하기'
    categories = {
    '비과세': ['비과세', '면제', '복무급여', '실업급여', '출산휴가급여', '장학금'],
    '납세의무': ['납세의무', '거주자', '비거주자', '원천징수', '공동사업자', '상속인', '증여자', '신탁재산'],
    '세율계산': ['세율', '과세표준', '산출세액', '결정세액', '종합소득', '퇴직소득'],
    '근로소득': ['근로소득', '총급여', '급여', '연봉', '임금', '퇴직소득'],
    '사업소득': ['사업소득', '공동사업', '주택임대소득', '부업소득'],
    '이자배당': ['이자소득', '배당소득', '예금이자', '채권', '의제배당'],
    '양도소득': ['양도소득', '자산양도', '부동산양도', '주식양도'],
    '연금소득': ['연금소득', '공적연금', '사적연금', '연금보험'],
    '기타소득': ['기타소득', '상금', '보상금', '발명보상금', '종교인소득'],
    '공제감면': ['공제', '소득공제', '세액공제', '기본공제', '감면'],
    '신고납부': ['신고', '납부', '납세지', '신고기한', '원천징수'],
    '과세기간': ['과세기간', '과세연도', '사업연도'],
    '과세소득구분': ['종합소득', '퇴직소득', '양도소득', '금융투자소득'],
    }
    result = [] # 각 카테고리 추가
    for category, keywords in categories.items():
        # print(category, keywords)
        # print(any([keyword in content for keyword in keywords]))
        if any([keyword in content for keyword in keywords]):
            result.append(category)
    if not result: # 빈 리스트일때
        result.append('기타')
    return result
        
categorize_content(content)

['납세의무', '세율계산', '근로소득', '공제감면', '신고납부', '과세기간', '과세소득구분']

- 개선된 카테고리 분류방법 : 키워드의 중요도에 따라 가중치를 부여하고, 가중치가 높은 순으로 카테고리를 반환

In [20]:
def get_category():
    return {
        '납세의무': {
            '납세의무': 3, '거주자': 3, '비거주자': 3, '납세의무자': 3, '원천징수': 2,
            '원천징수의무자': 2, '공동사업자': 2, '상속인': 2, '증여자': 2, '신탁재산': 2
        },

        '세율계산': {
            '세율': 3, '소득세': 3, '과세표준': 3, '산출세액': 3, '세액': 3,'결정세액': 2, 
            '세액계산': 2, '기본세율': 2, '세율적용': 2, '누진세율': 2, '종합소득세': 2
        },

        '근로소득': {
            '근로소득': 3, '총급여': 3, '급여': 3, '연봉': 3, '임금': 3, '근로소득금액': 2, 
            '총급여액': 2, '상여': 2, '수당': 2, '봉급': 2, '직장인': 2
        },

        '사업소득': {
            '사업소득': 3, '총수입금액': 3, '필요경비': 3, '사업자': 2, 
            '사업소득금액': 2, '결손금': 2, '이월결손금': 2, '주택임대소득': 2, '공동사업': 2
        },

        '이자배당': {
            '이자소득': 3, '배당소득': 3, '예금이자': 2, '채권': 2,
            '의제배당': 2, '배당세액공제': 2, '분리과세이자소득': 2, '분리과세배당소득': 2
        },

        '양도소득': {
            '양도소득': 3, '자산양도': 2, '부동산양도': 2, '주식양도': 2,
            '양도차익': 2, '취득가액': 2, '양도가액': 2, '양도소득금액': 2
        },

        '연금소득': {
            '연금소득': 3, '연금계좌': 3, '연금저축': 3, '연금수령': 2,
            '퇴직연금': 2, '공적연금': 2, '사적연금': 2, '연금보험': 2
        },

        '기타소득': {
            '기타소득': 3, '가상자산': 3, '가상자산소득': 2, '상금': 2, '보상금': 2, 
            '종교인소득': 2, '원고료': 2, '복권': 1, '당첨금': 1, '발명보상금': 1
        },

        '공제감면': {
            '공제': 3, '소득공제': 3, '세액공제': 3, '기본공제': 2, '인적공제': 2, '특별공제': 2, '추가공제': 2,
            '근로소득공제': 2, '연금소득공제': 2, '퇴직소득공제': 2, '연금계좌세액공제': 2, '감면': 2
        },

        '비과세': {
            '비과세': 3, '비과세소득': 3, '면제': 2, '세액감면': 2,
            '소득세면제': 2, '복무급여': 1, '실업급여': 1, '출산휴가급여': 1, '장학금': 1
        },

        '신고납부': {
            '신고': 3, '확정신고': 3, '과세표준확정신고': 3, 
            '납부': 3, '중간예납': 2, '가산세': 2,
            '신고기한': 2, '납부기한': 2, '납세지': 2
        },

        '과세기간': {
            '과세기간': 3, '과세연도': 3, '사업연도': 2, '과세기간종료일': 2
        },

        '과세방식': {
            '종합과세': 3, '분리과세': 3, '합산과세': 2, 
            '종합소득과세표준': 2, '분리과세소득': 2, '금융투자소득': 2
        },

        '장부기장': {
            '장부': 3, '복식부기': 3, '간편장부': 2, '기장': 2,
            '장부기록': 2, '증명서류': 2, '기장세액공제': 2
        }
    }


In [26]:
def categorize_content(content, top_k=None):
    '''
    내용 카테고리 분류 - 점수 기반으로 모든 카테고리를 점수 순으로 반환
    Parameters : 
        - content : 분류할 텍스트 내용
        - top_k : 상위 몇개까지 카테고리를 반환할지(None이면 모든 카테고리 반환)
        
    Returns :
        - 카테고리 리스트(점수 높은 순)
    '''
    category_keywords = get_category()
    category_scores = {}
    
    # 각 카테고리별 점수 계산
    for category, weighted_keywords in category_keywords.items():
        # print(category, weighted_keywords)
        score = 0
        for keyword, weight in weighted_keywords.items():
            if keyword in content:
                score += weight
        if score > 0 :
            category_scores[category] = score
    # print(category_scores)
    
    # 내림차순 정렬한 카테고리 이름만 추출
    sorted_categories = sorted(category_scores.items(), key=lambda x : x[1], reverse=True)
    all_categories = [category[0] for category in sorted_categories]
    # print(all_categories)
    
    # 매칭되는 카테고리가 없으면 '기타' 반환
    if not all_categories:
        all_categories = ['기타']
        
    # top_k가 지정되면 상위 top_k개만, 아니면 전체 반환
    if top_k is not None:
        return all_categories[:top_k]
    return all_categories
        
categorize_content(content, 3)

['세율계산', '공제감면', '납세의무']

In [27]:
categorize_content('연봉 5천만원인 회사원의 소득세는 얼마에요?')

['세율계산', '근로소득']

In [30]:
categorize_content(documents[46].page_content)

['세율계산', '이자배당', '공제감면', '과세방식', '납세의무', '사업소득', '과세기간']

In [38]:
# 해당 조항 추출하기

def get_article(content):
    '조항들 추출'
    article_match = re.findall(r'제(\d+)조', content)
    # article_match.sort() 그냥 이렇게만 하면 '156', '60', '7' 로 오는데 이게 문자라서 1이 가장 작다보니 이렇게 소트하면 안됨
    article_match =list(set(article_match)) # 중복 제거
    article_match.sort(key=lambda x : int(x))
    if article_match:
        return '「'+ ', '.join([f'{a}조' for a in article_match]) +'」'

get_article(' 어쩌고 저쩌고 ') # 조항이 없으면 None

In [47]:
%%time

# 메타데이터(source, title, category, article(조항))를 포함한 새로운 chunk
enhanced_chunks = []
for i, chunk in enumerate(documents):
    if i%20==0:
        print(f'진행중 : {i/len(documents)*100:.1f}%')
    content = chunk.page_content
    metadata = chunk.metadata.copy()
    
    # metadata['title'] = extract_title_with_llm(content) # exaone으로 제목 추출
    metadata['category'] = categorize_content(content) # categroy 추출
    metadata['chunk_id'] = f'chunk_{i:03d}'
    article_match = get_article(content)
    if article_match:
        metadata['article'] = article_match
    enhanced_chunks.append(type(chunk)(page_content=content, metadata=metadata)) # type(chunk)는 Document타입
print('확장된 chunk 처리 완료')

진행중 : 0.0%
진행중 : 10.3%
진행중 : 20.6%
진행중 : 30.9%
진행중 : 41.2%
진행중 : 51.5%
진행중 : 61.9%
진행중 : 72.2%
진행중 : 82.5%
진행중 : 92.8%
확장된 chunk 처리 완료
CPU times: total: 31.2 ms
Wall time: 30.4 ms


# 5. 임베딩 모델 설정

In [48]:
from langchain_openai import OpenAIEmbeddings

embedding = OpenAIEmbeddings(model=OPENAI_EMBEDDIGN_MODEL,
                             openai_api_key = OPENAI_API_KEY)

# 6. Pinecone 인덱스 생성 및 vector store(DB) 저장

In [54]:
from pinecone import Pinecone, ServerlessSpec # 사이트 가서 생성하는 것을 파이썬에서 설정하는 방법
from langchain_pinecone import PineconeVectorStore

# pinecone 클라이언트 객체
pc = Pinecone(api_key=PINECONE_API_KEY)
# print('pinecone index들 :', pc.list_indexes().names())

# 인덱스 생성 여부 확인 및 생성
# if PINECONE_INDEX_NAME not in pc.list_indexes().names():
if not pc.has_index(PINECONE_INDEX_NAME):
    pc.create_index(name=PINECONE_INDEX_NAME,
                    dimension=PINECONE_INDEX_DEMENSION,
                    metric=PINECONE_INDEX_METRIC,
                    spec=ServerlessSpec(region=PINECONE_INDEX_REGION,
                                        cloud=PINECONE_INDEX_CLOUD))
    print('인덱스 생성 완료')
else:
    print(f'인덱스 {PINECONE_INDEX_NAME}이 이미 존재합니다')

인덱스 생성 완료


In [56]:
%%time
# Pinecone 벡터 스토어 업로드

vector_database = PineconeVectorStore.from_documents(documents=enhanced_chunks,
                                                     embedding=embedding,
                                                     index_name=PINECONE_INDEX_NAME)
print('벡터 DB 저장 완료')

벡터 DB 저장 완료
CPU times: total: 2.83 s
Wall time: 15.1 s


# 7. 유사도 검색(meta 데이터 활용)

In [58]:
query = '연봉 5천만원인 회사원의 소득세는 얼마에요?'
categorize_content(query)

['세율계산', '근로소득']

In [62]:
# Retriever 생성

retriever = vector_database.as_retriever(search_kwargs = {'k':3, 'filter':{'category':{'$in':categorize_content(query)}}})
docs = retriever.invoke(query)
print('관련 문서')
for i, doc in enumerate(docs):
    category = doc.metadata['category']
    article = doc.metadata.get('article', '조항없음')
    content = doc.page_content[:20]
    print(f'{i+1}.{article} {category} - {content}')

관련 문서
1.「14조, 17조, 55조, 56조」 ['세율계산', '이자배당', '공제감면', '과세방식', '납세의무', '사업소득', '과세기간'] - 제55조(세율) ①거주자의 종합소득에
2.「6조, 9조, 10조, 11조, 12조, 99조」 ['납세의무', '사업소득', '세율계산', '근로소득', '비과세', '신고납부', '과세기간'] - [전문개정 2009. 12. 31.]
3.「5조, 57조, 113조, 127조, 142조」 ['세율계산', '이자배당', '공제감면', '연금소득', '근로소득', '신고납부', '납세의무'] - 1. 「민사집행법」 제113조 및 같


In [None]:
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_upstage import ChatUpstage
from dotenv import load_dotenv
load_dotenv()

template = f'당신은 최고의 한국 소득세 전문가입니다. 다음 문맥을 참고하여 질문에 답하세요. 답을 모르면 모른다고 답하세요. 최대 3문장으로 간결하게 답변하세요. 질문 : {{query}} / 문맥 : {{context}} / 답변 :'
prompt = ChatPromptTemplate.from_template(template)

# 5. 검색된 document를 텍스트로 변환하는 함수
def format_documents(documents):
    return '\n\n---\n\n'.join([doc.page_content for doc in documents])