In [1]:
# Model
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, BitsAndBytesConfig
from langchain_huggingface import HuggingFacePipeline
import torch
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain.prompts import PromptTemplate
# Vector stores
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.retrievers import BM25Retriever, KNNRetriever
from langchain.retrievers import EnsembleRetriever
# etc
import os
import pandas as pd
from tqdm import tqdm
import unicodedata

In [9]:
train_df = pd.read_csv('train.csv')
test_df = pd.read_csv('test.csv')

def get_embedding():
    embeddings = HuggingFaceEmbeddings(
        model_name='intfloat/multilingual-e5-small',
        model_kwargs={'device': 'mps'},
        encode_kwargs={'normalize_embeddings': True})
    return embeddings

def normalize_string(s):
    return unicodedata.normalize('NFC', s)

def format_docs(docs):
    """검색된 문서들을 하나의 문자열로 포맷팅"""
    context = ""
    for i, doc in enumerate(docs):
        #context += f"Document {i+1}\n"
        context += doc.page_content
        context += '\n\n'
    return context

In [5]:

def fewshot_db(df):
    df = df.drop('SAMPLE_ID', axis=1)
    df = df.drop('Source_path', axis=1)
    df = df.to_dict(orient='records')
    print("Loaded Fewshot Set:", df[:1])
    to_vectorize = ["\n\n".join(normalize_string(value) for value in example.values()) for example in df]
    
    faiss = FAISS.from_texts(to_vectorize, embedding=get_embedding())
    bm = BM25Retriever.from_texts(to_vectorize)
    knn = KNNRetriever.from_texts(to_vectorize, embeddings=get_embedding())
    return faiss, bm, knn
    
def make_db(df):
    # Create a new FAISS database
    # pdf reader
    documents = []
    pdf_files = df['Source_path'].unique()
    for pdf_file in pdf_files:
        pdf_loader = PyPDFLoader(pdf_file)
        pdf_documents = pdf_loader.load()
        for pdf_document in pdf_documents:
            pdf_document.page_content = pdf_document.page_content.replace("\x07","")
        documents.extend(pdf_documents)
    # 정규화
    # for doc in documents:
    #     doc.page_content = normalize_string(doc.page_content)
    chunk_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64)
    chunks = chunk_splitter.split_documents(documents)
    print(f"Total number of chunks: {len(chunks)}")
    # FAISS DB 만들기
    faiss = FAISS.from_documents(chunks, embedding=get_embedding())
    bm =  BM25Retriever.from_documents(chunks)
    knn = KNNRetriever.from_documents(chunks, embeddings=get_embedding())
    return faiss, bm, knn


In [None]:

train_faiss_db, train_bm_retrievier, knn_retriever = make_db(train_df) 
test_faiss_db, test_bm_retrievier, test_knn_retriever = make_db(test_df)


In [6]:

fewshot_faiss_db, fewshot_bm_retrievier, fewshot_knn_retriever = fewshot_db(test_df)


Loaded Fewshot Set: [{'Source': '중소벤처기업부_혁신창업사업화자금(융자)', 'Question': '2022년 혁신창업사업화자금(융자)의 예산은 얼마인가요?'}]


In [7]:
train_k = 1
train_bm_retrievier.k = train_k
knn_retriever.k = train_k
faiss_retriever = train_faiss_db.as_retriever(search_kwargs={'k':train_k} )
train_ensemble_retriever = EnsembleRetriever(
    retrievers=[train_bm_retrievier,knn_retriever, faiss_retriever], weights=[0.25,0.25, 0.5]
)

test_k = 3
test_bm_retrievier.k = test_k
test_knn_retriever.k = test_k
test_faiss_retriever = test_faiss_db.as_retriever(search_kwargs={'k':test_k} )
test_ensemble_retriever = EnsembleRetriever(
    retrievers=[test_bm_retrievier,test_knn_retriever, test_faiss_retriever], weights=[0.25,0.25, 0.5]
)

fewshot_k = 3
fewshot_bm_retrievier.k = fewshot_k
fewshot_knn_retriever.k = fewshot_k
fewshot_faiss_retriever = fewshot_faiss_db.as_retriever(search_kwargs={'k':fewshot_k} )
fewshot_ensemble_retriever = EnsembleRetriever(
    retrievers=[fewshot_bm_retrievier,fewshot_knn_retriever, fewshot_faiss_retriever], weights=[0.25,0.25, 0.5]
)


In [None]:
def setup_llm_pipeline(model_id = "meta-llama/Meta-Llama-3.1-8B-Instruct"):
    # 토크나이저 로드 및 설정
        # 양자화 설정 적용
    bnb_config = BitsAndBytesConfig(
    load_in_4bit=True, 
    bnb_4bit_use_double_quant=True, 
    bnb_4bit_quant_type="nf4", 
    bnb_4bit_compute_dtype=torch.bfloat16
    )
    model = AutoModelForCausalLM.from_pretrained(model_id, quantization_config=bnb_config,low_cpu_mem_usage=True)
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    terminators = [
    tokenizer.eos_token_id,
    tokenizer.convert_tokens_to_ids("<|eot_id|>")
    ]

    text_generation_pipeline = pipeline(
        model=model,
        tokenizer=tokenizer,
        task="text-generation",
        temperature=0.4,
        do_sample=True,
        top_p = 0.6,
        repetition_penalty=1.1,
        return_full_text=False,
        max_new_tokens=512,
        eos_token_id = terminators,
        pad_token_id = tokenizer.eos_token_id
    )

    llm = HuggingFacePipeline(pipeline=text_generation_pipeline)

    return llm
llm = setup_llm_pipeline()

In [8]:
fewshot_results = fewshot_ensemble_retriever.invoke(test_df.iloc[1]['Question'])

In [None]:
def extract_answer(response):
    # AI: 로 시작하는 줄을 찾아 그 이후의 텍스트만 추출
    lines = response.split('\n')
    for line in lines:
        line = line.replace('**', '')
        if line.startswith('Answer:'):
            return line.replace('Answer:', '').strip()
        if line.startswith('assistant:'):
            return line.replace('assistant:', '').strip()
    return response.strip()  # AI: 를 찾지 못한 경우 전체 응답을 정리해서 반환

def fewshot_ex(fewshot_retriever, train_retriever, query):
    fewshot_results = fewshot_retriever.invoke(query)
    fewshot_str = ""
    for result in fewshot_results:
        buff_str = "<|start_header_id|>user<|end_header_id|>\n"
        question = result['Question']
        buff_str += f"Question\n{question}\n\n"
        if train_retriever is not None:
            buff_str += f"Context\n"
            docs = train_retriever.invoke(question)
            buff_str += format_docs(docs)
            buff_str += "<eot_id>"
        buff_str += f"<|start_header_id|>assistant<|end_header_id>\n{result['Answer']}<|eot_id|>"
        fewshot_str += buff_str
    return fewshot_str

def run (train,test,fewshot,dataset,llm,verbose=False):
    results = []
    for i, row in tqdm(dataset.iterrows()):
        full_template = "<|begin_of_text|>"
        full_template += """<|start_header_id|>system<|end_header_id|>
You are the financial expert who helps me with my financial information Q&As.
You earn 10 points when you answer me and follow the rules and lose 7 points when you don't.
Here are some rules you should follow.
- Please use contexts to answer the question.
- Please your answers should be concise.
- Please answers must be written in Korean.
- Please answer the question in 1-3 sentences.

- Use the three examples below to learn how to follow the rules and reference information in context.<|eot_id|>
"""
        question = row['Question']
        if verbose:
            print(f"Question\n{question}")
        fewshot_str = fewshot_ex(fewshot, train, question)
        full_template += fewshot_str
        full_template += "\n\n"
        full_template += "<"
        contexts = test.invoke(question)
        contexts = format_docs(contexts)
        full_template += """<|start_header_id|>user<|end_header_id|>\nQuestion\n{input}\n\n"""
        full_template += f"""Context\n{contexts}<|eot_id|>"""
        full_template += """<|start_header_id|>assistant<|end_header_id>\n"""
        
        prompt = PromptTemplate.from_template(full_template)
        qa_chain = (
        {
            "input": RunnablePassthrough(),
        }
        | prompt
        | llm
        | StrOutputParser()
        )
        if verbose:
            print("\nQuestion: ", question)
        answer = qa_chain.invoke(input=question)
        answer = extract_answer(answer)
        results.append({
            "Question": question,
            "Answer": answer,
            "Source": row['Source']
        })
        if verbose:
            print("Answer: ", results[-1]['Answer'])
    return results