In [1]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_huggingface.embeddings import HuggingFaceEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chat_models import ChatOpenAI
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.prompts import ChatPromptTemplate
from langchain_community.document_loaders import TextLoader
from langchain.schema.runnable import RunnableLambda, RunnablePassthrough
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain_community.document_transformers import LongContextReorder
from langchain_core.output_parsers import StrOutputParser

from dotenv import load_dotenv
import os
import shutil

from langchain_qdrant import QdrantVectorStore
from langchain_qdrant import FastEmbedSparse, RetrievalMode

In [2]:
def load_env():
    load_dotenv('.env')

    os.getenv("LANGCHAIN_TRACING_V2")
    os.getenv("LANGCHAIN_ENDPOINT")
    os.getenv("LANGCHAIN_API_KEY")

    os.getenv("OPENAI_API_KEY")
    os.getenv("ANTHROPIC_API_KEY")
    os.getenv("GOOGLE_API_KEY")
    os.getenv("UPSTAGE_API_KEY")

    os.getenv("LM_URL")
    os.getenv("LM_LOCAL_URL")

In [3]:
def docs_load():
    try:
        loader = TextLoader("corpus/정시 모집요강(동의대) 전처리 결과.txt", encoding="utf-8").load()
        return loader
    except FileNotFoundError:
        print("파일을 찾을 수 없습니다. 경로를 확인하세요.")
        return []
    except Exception as e:
        print(f"오류가 발생했습니다: {e}")
        return []

In [4]:
def rc_text_split(corpus):
    """
    RecursiveCharacterTextSplitter를 사용하여 문서를 분할하도록 하는 함수
    :param corpus: 전처리 완료된 말뭉치
    :return: 분리된 청크
    """

    # 청크 사이즈 선택
    chunk_size_number = input("chunk_size를 선택해주세요. 기본값은 1500입니다.\n"
                              "1: 1500\n"
                              "2: 2000\n"
                              "3: 2500\n"
                              "4: 3000\n"
                              "5: 3500\n"
                              "6: 4000\n\n"
                              "선택 번호: ")

    chunk_size_checker = {
        '1': 1500,
        '2': 2000,
        '3': 2500,
        '4': 3000,
        '5': 3500,
        '6': 4000
    }

    chunk_size = chunk_size_checker.get(chunk_size_number, 1500)

    # 오버랩 사이즈 선택
    overlap_size_number = input("chunk_overlap를 선택해주세요. 기본값은 0입니다.\n"
                                "1: 0\n"
                                "2: 100\n"
                                "3: 200\n"
                                "4: 300\n"
                                "5: 400\n"
                                "6: 500\n\n"
                                "선택 번호: ")

    overlap_size_checker = {
        '1': 0,
        '2': 100,
        '3': 200,
        '4': 300,
        '5': 400,
        '6': 500
    }

    overlap_size = overlap_size_checker.get(overlap_size_number, 0)

    rc_text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        separators=["---", "\n\n", "\n"],
        chunk_size=chunk_size,
        chunk_overlap=overlap_size,
        model_name="gpt-4o"  # o200k_base
        # model_name="gpt-4"  # cl100k_base
    )

    text_documents = rc_text_splitter.split_documents(corpus)

    return text_documents

In [5]:
def embed_text():
    model_name = "BAAI/bge-m3"
    model_kwargs = {'device': 'cuda'}
    encode_kwargs = {'normalize_embeddings': True}
    model = HuggingFaceEmbeddings(
        model_name=model_name,
        model_kwargs=model_kwargs,
        encode_kwargs=encode_kwargs
    )

    return model

In [18]:
def document_embedding_basic(docs, model, save_directory: str):
    """
    Chroma 벡터저장소를 사용하여 문서를 임베딩하고, BM25Retriever의 기본적인 구조를 통해 문서를 키워드 위주의 임베딩을 진행하여 저장하는 함수
    :param model: 임베딩 모델 종류
    :param save_directory: 벡터저장소 저장 경로
    :param docs: 분할된 문서
    :return: 벡터저장소, BM25(기본)저장소
    """

    print("\n잠시만 기다려주세요.\n\n")

    # 벡터저장소가 이미 존재하는지 확인
    if os.path.exists(save_directory):
        shutil.rmtree(save_directory)
        print(f"디렉토리 {save_directory}가 삭제되었습니다.\n")

    print("문서 벡터화를 시작합니다. ")
    db = Chroma.from_documents(docs, model, persist_directory=save_directory)

    bm25_sparse_embeddings = FastEmbedSparse(model_name="Qdrant/retrievers_bm25_faiss")
    bm42_sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm42-all-minilm-l6-v2-attentions")

    # 여기에 BM25, BM42 추가하기
    bm25_db = QdrantVectorStore.from_documents(
        docs,
        embedding=db,
        sparse_embedding=bm25_sparse_embeddings,
        location=":memory:",
        collection_name="db_bm25",
        retrieval_mode=RetrievalMode.SPARSE,
        # retrieval_mode=RetrievalMode.DENSE,
    )

    bm42_db = QdrantVectorStore.from_documents(
        docs,
        embedding=db,
        sparse_embedding=bm42_sparse_embeddings,
        location=":memory:",
        collection_name="db_bm42",
        retrieval_mode=RetrievalMode.SPARSE,
    )

    print("새로운 Chroma 데이터베이스가 생성되었습니다.\n")

    return db, bm25_db, bm42_db

In [19]:
def chat_llm():
    """
    채팅에 사용되는 거대언어모델 생성 함수
    :return: 답변해주는 거대언어모델
    """
    llm = ChatOpenAI(
        model="gpt-4o-mini",
        temperature=0,
        streaming=True,
        callbacks=[StreamingStdOutCallbackHandler()],
    )

    return llm

In [20]:
def format_docs(docs):
    return "\n\n".join(document.page_content for document in docs)

In [21]:
def reorder_documents(docs):
    # 재정렬
    reordering = LongContextReorder()
    reordered_docs = reordering.transform_documents(docs)
    combined = format_docs(reordered_docs)

    return combined

In [22]:
def db_qna_ensemble_2(llm, db, bm25_db, bm42_db, query):
    """
    llm, bm_db, db, query
    BM25Retriever와 Chroma 벡터스토어를 앙상블하여 문서 검색 후 적절한 답변을 찾아서 답하도록 하는 함수
    :param llm: 거대 언어 모델
    :param bm_db: BM25Retriever
    :param db: 벡터스토어
    :param query: 사용자 질문
    """
    db = db.as_retriever(
        search_kwargs={'k': 2},
    )

    bm25 = bm25_db.as_retriever(
        search_kwargs={'k': 2},
    )

    bm42 = bm42_db.as_retriever(
        search_kwargs={'k': 2},
    )

    # bm_db.k = 1  # BM25Retriever의 검색 결과 개수를 3로 설정
    #
    # # 앙상블 retriever를 초기화합니다.
    # ensemble_retriever = EnsembleRetriever(
    #     retrievers=[retrievers_bm25_faiss, db],
    #     weights=[0.3, 0.7],
    #     search_type="mmr",
    # )

    ensemble_retriever = EnsembleRetriever(
        retrievers=[bm42, db],
        weights=[0.5, 0.5],
        search_type="mmr",
    )

    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                """
                You are a specialized AI for question-and-answer tasks.
                You must answer questions based solely on the Context provided.
                For questions about predicting successful applicants, base your answers on data from either the initial successful applicants or the final enrolled students.
                If no Context is provided, you must instruct to inquire at "https://ipsi.deu.ac.kr/main.do".

                Context: {context}
                """,
            ),
            ("human", "Question: {question}"),
        ]
    )

    chain = {
                "context": ensemble_retriever | RunnableLambda(reorder_documents),
                # "context": bm25 | RunnableLambda(reorder_documents),
                # "context": bm42 | RunnableLambda(reorder_documents),
                "question": RunnablePassthrough()
            } | prompt | llm | StrOutputParser()

    response = chain.invoke(query)

    if not isinstance(llm, ChatOpenAI):
        print("\n\n{}".format(response))

    return response

In [23]:
load_env()
chunk = docs_load()
text_documents = rc_text_split(chunk)
print(text_documents)

[Document(metadata={'source': 'corpus/정시 모집요강(동의대) 전처리 결과.txt'}, page_content='표의 시작은 <data> 태그를 통해 표현하고 표의 끝은 </data> 태그를 통해 끝을 표현한다.\n[표 속성]과 [표 내용]의 시작과 끝은 ```으로 시작해서 ```으로 끝난다.\n\n<data>\n표 제목 : 전형요소 반영비율\n아래부터는 전형요소 반영비율 표 내용이다.\n\n[표 속성]\n``` 전형 유형 / 전형명 / 모집 인원 / 사정 단계 / 전형요소 반영비율(%){학생부교과 / 출결 / 서류 / 면접 / 실기 / 수능} / 수능최저기준 ```\n\n[표 내용]\n```\n수능 위주 / 가군 수능(일반학생전형) 전 모집단위(레저스포츠학과, 태권도학과 제외) / 273 / 일괄 합산 / { 0 /  0 /  0 /  0 /  0 /  100 / } / 수능최저기준 없음 /\n수능 위주 / 나군 수능(일반학생전형) 전 모집단위(한의예과, 디자인조형학과, 체육학과, 경기지도학과 제외) 한의예과(수학(미적분/기하),수학(확률과 통계)) / 401 / 일괄 합산 / { 0 /  0 /  0 /  0 /  0 /  100 / } / 수능최저기준 없음 /\n수능 위주 / 다군 수능(일반학생전형) 전 모집단위 / 180 / 일괄 합산 / { 0 /  0 /  0 /  0 /  0 /  100 / } / 수능최저기준 없음 /\n실기/실적 위주 / 가군 실기/실적(일반학생전형) 레저스포츠학과, 태권도학과 / 11 / 일괄 합산 / { 0 /  0 /  0 /  0 /  70 /  30 / } / 수능최저기준 없음 /\n실기/실적 위주 / 나군 실기/실적(일반학생전형) 디자인조형학과, 체육학과 / 24 / 일괄 합산 / { 0 /  0 /  0 /  0 /  70 /  30 / } / 수능최저기준 없음 /\n실기/실적 위주 / 다군 실기/실적(일반학생전형) 경기지도학과 / 7 / 일괄 합산 / { 18 /  12 /  70 / 

In [25]:
embedding_model = embed_text()
db, bm25_db, bm42_db = document_embedding_basic(text_documents, embedding_model, "bm_test_1")
llm = chat_llm()


잠시만 기다려주세요.


문서 벡터화를 시작합니다. 


ValueError: Model Qdrant/retrievers_bm25_faiss is not supported in SparseTextEmbedding.Please check the supported models using `SparseTextEmbedding.list_supported_models()`

In [None]:
import openpyxl
from openpyxl import load_workbook
import pandas as pd
import numpy as np

In [None]:
def cosine_similarity(a, b):
    """
    코사인 유사도를 확인하기 위한 함수
    :param a: 벡터 a
    :param b: 벡터 b
    """
    dot_product = np.dot(a, b)
    norm_a = np.linalg.norm(a)
    norm_b = np.linalg.norm(b)
    return dot_product / (norm_a * norm_b)

In [14]:
def save_qna_list_v2(q, a, model_answer, similarity):
        """
        질의 응답을 엑셀 파일에 추가하는 함수 (중복 질문 제거)
        @param q: 질의
        @param a: 모범 응답
        @param model_answer: 질의에 대한 거대언어모델 응답
        @param model_checker: 선택한 거대언어모델 이름을 알기 위한 번호
        @param similarity: 질의에 대한 모범 응답과 거대언어모델 응답의 유사도
        @param embedding_model_name: 사용한 임베딩 모델 이름
        @param chunk_size: 문서 분할기의 청크 사이즈
        @param overlap_size: 문서 분할기의 오버랩 사이즈
        """

        filename = f'research_result/ensemble_bm25.xlsx'

        # model_checker 값을 모델 이름으로 변환
        model_name = 'GPT-4o-mini'

        try:
            # 기존 엑셀 파일 열기
            workbook = load_workbook(filename)
            sheet = workbook.active
        except FileNotFoundError:
            # 파일이 없는 경우 새로운 엑셀 파일 생성
            workbook = openpyxl.Workbook()
            sheet = workbook.active
            sheet['A1'] = '거대언어모델'
            sheet['B1'] = '유사도'
            sheet['C1'] = '질의'
            sheet['D1'] = '응답'
            sheet['E1'] = '모범 응답'

        # '모범 응답' 헤더 추가 (파일이 이미 존재하는 경우에도)
        if not isinstance(sheet['E1'].value, str) or sheet['E1'].value != '모범 응답':
            sheet['E1'] = '모범 응답'

        # 기존 질문 목록 가져오기
        existing_questions = set((cell.value, sheet.cell(row=cell.row, column=1).value) for cell in sheet['C'][1:])

        # 중복 질문 확인
        if (q, model_name) not in existing_questions:
            # 새로운 행에 데이터 추가
            row = sheet.max_row + 1
            sheet.cell(row=row, column=1, value=model_name)
            sheet.cell(row=row, column=2, value=similarity)
            sheet.cell(row=row, column=3, value=q)
            sheet.cell(row=row, column=4, value=a)
            sheet.cell(row=row, column=5, value=model_answer)

        # 거대언어모델별로 정렬
        data = list(sheet.values)[1:]
        data.sort(key=lambda x: (x[0], x[1]))

        # 정렬된 데이터로 시트 업데이트
        sheet.delete_rows(2, sheet.max_row)
        for row, row_data in enumerate(data, start=2):
            for col, value in enumerate(row_data, start=1):
                sheet.cell(row=row, column=col, value=value)

        # 엑셀 파일 저장
        workbook.save(filename)

In [15]:
def auto_question(llm, db, bm25_db, bm42_db, embedding_model):  #  db_qna_ensemble_2(llm, db, bm25_db, bm42_db, query)
    """
    질문 리스트를 기반으로 자동으로 질문하고 답변을 받아 엑셀 파일에 저장하는 함수
    :param llm: 거대언어모델 종류
    :param db: 기본 벡터저장소
    :param bm_db: bm 벡터저장소
    :param embedding_model: 임베딩 모델
    """

    df = pd.read_excel("test_automation/qna.xlsx")

    questions_list = df['질의'].tolist()
    model_answers_list = df['모범 응답'].tolist()

    for question, model_answer in zip(questions_list, model_answers_list):
        response = db_qna_ensemble_2(llm, db, bm25_db, bm42_db, question)

        # 코사인 유사도 확인
        temp_model_answer = embedding_model.embed_query(model_answer)
        temp_response = embedding_model.embed_query(response)
        similarity = cosine_similarity(temp_model_answer, temp_response)
        print(f"similarity: {similarity}")

        # 파일 저장
        save_qna_list_v2(question, response, model_answer, similarity)

In [16]:
auto_question(llm, db, bm25_db, bm42_db, embedding_model)  

NameError: name 'llm' is not defined

In [None]:
def save_qna_list(name, model_answer, embedding_model):  # q, a, model_answer, similarity, embedding_model
        """
        질의 응답을 엑셀 파일에 추가하는 함수 (중복 질문 제거)
        @param name: 검색기 이름
        @param model_answer: 질의에 대한 거대언어모델 응답 리스트
        @param embedding_model_name: 사용한 임베딩 모델 이름
        """
        
        df = pd.read_excel("test_automation/qna.xlsx")

        questions_list = df['질의'].tolist()
        response_list = df['모범 응답'].tolist()
    
        for question, answer in zip(questions_list, response_list):
            
            response = print_ensemble_results(retrievers_bm25_faiss, question)
            # response = print_ensemble_results(retrievers_kiwi_bm25_faiss, question)
            # response = print_ensemble_results(retrievers_kkma_bm25_faiss, question)
            # response = print_ensemble_results(retrievers_okt_bm25_faiss, question)
            # response = print_ensemble_results(retrievers_komoran_bm25_faiss, question)
            # response = print_ensemble_results(retrievers_hannanum_bm25_faiss, question)
            # response = print_ensemble_results(retrievers_mecab_bm25_faiss, question)
    
            # 코사인 유사도 확인
            temp_answer = embedding_model.embed_query(answer)
            temp_response = embedding_model.embed_query(response)
            similarity = cosine_similarity(temp_answer, temp_response)
            print(f"similarity: {similarity}")
    
            # 파일 저장

            filename = f'research_result/{name}.xlsx'  # 파일명 지정
    
            # model_checker 값을 모델 이름으로 변환
            model_name = 'GPT-4o-mini'  # 거대언어모델 이름 지정
    
            try:
                # 기존 엑셀 파일 열기
                workbook = load_workbook(filename)
                sheet = workbook.active
            except FileNotFoundError:
                # 파일이 없는 경우 새로운 엑셀 파일 생성
                workbook = openpyxl.Workbook()
                sheet = workbook.active
                sheet['A1'] = '거대언어모델'
                sheet['B1'] = '유사도'
                sheet['C1'] = '질의'
                sheet['D1'] = '응답'
                sheet['E1'] = '모범 응답'
    
            # '모범 응답' 헤더 추가 (파일이 이미 존재하는 경우에도)
            if not isinstance(sheet['E1'].value, str) or sheet['E1'].value != '모범 응답':
                sheet['E1'] = '모범 응답'
    
            # 기존 질문 목록 가져오기
            existing_questions = set((cell.value, sheet.cell(row=cell.row, column=1).value) for cell in sheet['C'][1:])
    
            # 중복 질문 확인
            if (question, model_name) not in existing_questions:
                # 새로운 행에 데이터 추가
                row = sheet.max_row + 1
                sheet.cell(row=row, column=1, value=model_name)
                sheet.cell(row=row, column=2, value=similarity)
                sheet.cell(row=row, column=3, value=question)
                sheet.cell(row=row, column=4, value=answer)
                sheet.cell(row=row, column=5, value=model_answer)
    
            # 거대언어모델별로 정렬
            data = list(sheet.values)[1:]
            data.sort(key=lambda x: (x[0], x[1]))
    
            # 정렬된 데이터로 시트 업데이트
            sheet.delete_rows(2, sheet.max_row)
            for row, row_data in enumerate(data, start=2):
                for col, value in enumerate(row_data, start=1):
                    sheet.cell(row=row, column=col, value=value)
    
            # 엑셀 파일 저장
            workbook.save(filename)