### Index 생성

In [10]:
# import library
from pinecone import Pinecone, ServerlessSpec
from langchain_pinecone import PineconeVectorStore
from langchain_upstage import ChatUpstage, UpstageEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_core.documents import Document
from dotenv import load_dotenv
import glob
import os

In [11]:
load_dotenv()

index_name = "dev-02"
pc = Pinecone()
llm_upstage = ChatUpstage(api_key=os.environ.get("UPSTAGE_API_KEY"), temperature=0, model="solar-pro")
embeddings_query = UpstageEmbeddings(model="embedding-query") #4096

In [12]:
if index_name not in [index_info["name"] for index_info in pc.list_indexes()]:
    pc.create_index(
        name=index_name,
        dimension=4096, 
        metric="dotproduct",
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        ) 
    )
    print(f"{index_name} has been successfully created")
else:
    print(f"{index_name} is already exists.")

dev-02 is already exists.


In [13]:
index = pc.Index(index_name)

In [14]:
llm = ChatUpstage(api_key=os.environ.get("UPSTAGE_API_KEY"), temperature=0, model="solar-pro")
embeddings_passage = UpstageEmbeddings(model="embedding-passage") #4096
embeddings_query = UpstageEmbeddings(model="embedding-query") #4096
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=150)

split_docs = []
files = sorted(glob.glob("data/*.pdf"))

for file in files:
    loader = PyMuPDFLoader(file)
    split_docs.extend(loader.load_and_split(text_splitter))

# 문서 개수 확인
len(split_docs)



1579

In [15]:
import re

def remove_question_and_specific_sentences(paragraph):
    # 정규표현식으로 "?" 또는 "보자"로 끝나는 문장 제거
    result = re.sub(r'[^.?!]*[\?]|[^.?!]*보자\.', '', paragraph)
    # 여러 개의 공백을 하나로 정리
    result = re.sub(r'\s+', ' ', result).strip()
    return result

for doc in split_docs:
    doc.page_content = remove_question_and_specific_sentences(doc.page_content)

In [None]:
docsearch = PineconeVectorStore.from_existing_index(index_name=index_name, embedding=embeddings_passage)
# docsearch.add_documents(split_docs)

### 테스트 데이터 만들기

In [21]:
split_docs = []
files = sorted(glob.glob("test_data/*.pdf"))

for file in files:
    loader = PyMuPDFLoader(file)
    split_docs.extend(loader.load_and_split(text_splitter))

# 문서 개수 확인
len(split_docs)

832

In [72]:
### 지구과학 2
from langchain_community.document_loaders import PyMuPDFLoader
import re

def extract_explanations(pdf_path, output_path):
    """
    PDF에서 "해설 | "로 시작하고 마침표로 끝나는 문장을 추출하여 파일로 저장하는 함수
    
    Args:
        pdf_path (str): 입력 PDF 파일 경로
        output_path (str): 출력 텍스트 파일 경로
    """
    # PDF 로드
    loader = PyMuPDFLoader(pdf_path)
    pages = loader.load()
    start_word = "해설"
    end_word = "ㄱ"
    pattern = rf"{start_word}.*?{end_word}\."
    matches = []
    for page in pages:
        matches.extend(re.findall(pattern, page.page_content, re.DOTALL))
        
    matches = [re.sub(rf"\s*{end_word}\.$", "", match) for match in matches]
    matches = [s[s.index(start_word):] for s in matches]
    
    hint_patterns = [r"\d", r"㉠", r"[a-zA-Z]", r"\(가\)", r"\(나\)"]
    explanations = []
    for match in matches:
        if not any(re.search(pattern, match) for pattern in hint_patterns):
            explanations.append(match)

    # 결과 저장
    with open(output_path, 'w', encoding='utf-8') as f:
        for explanation in explanations:
            f.write(explanation + '\n\n')
    
    return len(explanations)

pdf_path = glob.glob("test_data/*.pdf")
output_path = "지구과학.txt"  # 저장할 파일 경로

count = extract_explanations(pdf_path[0], output_path)
print(f"총 {count}개의 해설을 추출하여 {output_path}에 저장했습니다.")

총 171개의 해설을 추출하여 output.txt에 저장했습니다.


In [112]:
### 정치와 법
from langchain_community.document_loaders import PyMuPDFLoader
import re

def extract_explanations(pdf_path, output_path):
    """
    PDF에서 "해설 | "로 시작하고 마침표로 끝나는 문장을 추출하여 파일로 저장하는 함수
    
    Args:
        pdf_path (str): 입력 PDF 파일 경로
        output_path (str): 출력 텍스트 파일 경로
    """
    # PDF 로드
    loader = PyMuPDFLoader(pdf_path)
    pages = loader.load()
    start_word = "정답 찾기"
    end_word = "오답 피하기"
    pattern = rf"{start_word}.*?{end_word}"
    matches = []
    for page in pages:
        matches.extend(re.findall(pattern, page.page_content, re.DOTALL))

    matches = [s[len(start_word):-len(end_word)] for s in matches]
    
    hint_patterns = [r"\d", r"㉠",  r"[a-zA-Z]", r"\(가\)", r"\(나\)",r"ㄱ.",r"ㄴ.",r"ㄷ.",r"ㄹ.",]
    explanations = []
    for match in matches:
        # hint_patterns에 해당하는 패턴이 없는 경우에만 추가
        if not any(re.search(pattern, match) for pattern in hint_patterns):
            explanations.append(match)
    
    # r"①", r"②", r"③", r"④", r"⑤"는 제거
    explanations = [re.sub(r"①|②|③|④|⑤", "", explanation) for explanation in explanations]
    
    # 결과 저장
    with open(output_path, 'w', encoding='utf-8') as f:
        for explanation in explanations:
            f.write(explanation)
    
    return len(explanations)

pdf_path = glob.glob("test_data/*.pdf")
output_path = "정치와법.txt"  # 저장할 파일 경로

count = extract_explanations(pdf_path[0], output_path)
print(f"총 {count}개의 해설을 추출하여 {output_path}에 저장했습니다.")

총 114개의 해설을 추출하여 정치와법.txt에 저장했습니다.


In [13]:
import os
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_experimental.text_splitter import SemanticChunker
from langchain_upstage import UpstageEmbeddings
def split_pdf_to_sentences(pdf_path, output_file):
    # PDF 파일 로드
    loader = PyMuPDFLoader(pdf_path)
    documents = loader.load()
    
    # OpenAI 임베딩 모델 초기화 (OpenAI API 키 필요)
    embeddings = UpstageEmbeddings(model="embedding-passage")
    
    # Semantic Chunker 초기화 (문장 단위 분할)
    text_splitter = SemanticChunker(
        embeddings, 
        breakpoint_threshold_type="percentile",  # 다양한 옵션 선택 가능
        breakpoint_threshold_amount=95  # 분할 임계값 조정 가능
    )
    
    # 문서 분할
    splits = text_splitter.split_documents(documents)
    
    
    # 분할된 문장을 텍스트 파일로 저장
    with open(output_file, 'w', encoding='utf-8') as f:
        for i, split in enumerate(splits, 1):
            f.write(split.page_content + "\n\n")
# 사용 예시
if __name__ == "__main__":
    pdf_path = "./test_data/2025_기출문제집_정보처리기사_필기_핵심요약.pdf"  # PDF 파일 경로
    output_directory = "output_정처기.txt"  # 출력 디렉토리
    
    split_pdf_to_sentences(pdf_path, output_directory)

### Ground Truth 생성

In [18]:
data = []
with open(output_directory, 'r', encoding='utf-8') as f:
    data = f.read().split("\n\n")
data = [d for d in data if len(d) > 300]
len(data)

196

In [None]:
from typing import List
from pydantic import BaseModel, Field
import pandas as pd
from tqdm import tqdm
from langchain_core.documents import Document
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate

class RAGEvalOutput(BaseModel):
    """틀린 문장과 피드백을 포함하는 출력 형식"""
    modified_statement: str = Field(description="원본 텍스트의 핵심 개념을 포함한 문장 또는 틀린 문장")
    feedback: str = Field(description="문장의 오류가 있는지 여부 (O, X)")

def create_rag_evaluation_dataset(docsearch, num_samples=None):
    """
    Pinecone 인덱스에서 문서를 가져와 RAG 평가용 데이터셋을 생성합니다.
    LangChain을 사용하여 틀린 문장과 피드백을 한 번의 API 호출로 생성합니다.
    
    Args:
        docsearch: Pinecone 벡터 스토어 객체
        num_samples: 생성할 샘플 수 (None일 경우 전체 문서 사용)
    
    Returns:
        생성된 데이터프레임
    """
    # LangChain 컴포넌트 설정
    parser = JsonOutputParser(pydantic_object=RAGEvalOutput)
    
    prompt = ChatPromptTemplate.from_template("""다음 텍스트의 핵심 개념을 파악하고, 
    1) 개념을 활용한 문장을 2개 적어주세요. 이 때, 하나는 맞는 문장, 하나는 틀린 문장으로 작성해주세요.
    2) 개념을 활용한 문장 2개에 대한 정답을 O, X로 표시해주세요
    
    모든 문장은 다음 텍스트의 핵심 개념을 포함해야 합니다.
    모든 문장은 원본 텍스트와 다른 형태여야 합니다.
    
    원본 텍스트:
    {text}
    
    아래 형식으로 응답해주세요:
    {format_instructions}
    """)
    
    # ChatOpenAI 모델 초기화
    model = ChatOpenAI(model="gpt-4o-mini", temperature=0.2)
    
    # 체인 구성
    chain = prompt | model | parser
    
    evaluation_data = []
    docsearch = docsearch[:num_samples] if num_samples else docsearch
    for doc in tqdm(docsearch, desc="데이터셋 생성 중"):
        chunk_text = doc
            
        # LangChain을 사용하여 틀린 문장과 피드백 한 번에 생성
        result = chain.invoke({
            "text": chunk_text,
            "format_instructions": parser.get_format_instructions()
        })
        try:
            for res in result:
                evaluation_data.append({
                    'context': chunk_text,
                    'question': res["modified_statement"],
                    'answer': res["feedback"],
                })
        except:
            pass
    # DataFrame 생성
    df = pd.DataFrame(evaluation_data)
    
    # CSV 파일로 저장
    df.to_csv('정처기OX.csv', index=False)
    
    return df


evaluation_df = create_rag_evaluation_dataset(data)
print(f"생성된 데이터셋 크기: {len(evaluation_df)}")
print("\n데이터셋 샘플:")
print(evaluation_df.head())

### Solar LLM 테스트 (without RAG)

In [3]:
from langchain_upstage import ChatUpstage
import pandas as pd
import os
from typing import List
import time
import re

def create_batch_prompt(questions: List[str], batch_size: int = 5) -> str:
    """여러 질문을 하나의 프롬프트로 만듭니다."""
    numbered_questions = [f"Q{i+1}. {q}" for i, q in enumerate(questions)]
    questions_text = "\n".join(numbered_questions)
    
    return f"""
    다음 문장들의 사실 여부를 판단해주세요.
    응답 형식: Q1: O 또는 X, Q2: O 또는 X, ..., Qn: O 또는 X
    
    {questions_text}
    """

def parse_batch_response(response: str, batch_size: int) -> List[str]:
    """LLM의 응답을 개별 답변으로 파싱합니다."""
    try:
        pattern = r"Q(\d+):\s*(O|X)"
        matches = re.findall(pattern, response)
        answer = [ans for _, ans in matches if ans in ['O', 'X']]
        if len(answer) < batch_size:
            print(f"응답 개수({len(answer)})가 배치 크기({batch_size})보다 작습니다.")
            raise ValueError("응답 개수 부족")
        return answer[:batch_size]
    except Exception as e:
        print(f"응답 파싱 중 오류 발생: {e}")
        return ['-'] * batch_size  # 오류 시 기본값 반환

def process_questions_in_batches(df: pd.DataFrame, batch_size: int = 5, retry_delay: int = 1) -> pd.DataFrame:
    """데이터프레임의 질문들을 배치로 처리합니다."""
    llm_upstage = ChatUpstage(
        api_key=os.environ.get("UPSTAGE_API_KEY"),
        temperature=0,
        model="solar-pro"
    )
    
    all_answers = []
    questions = df['question'].tolist()
    
    for i in range(0, len(questions), batch_size):
        batch_questions = questions[i:i + batch_size]
        prompt = create_batch_prompt(batch_questions, batch_size)
        
        max_retries = 3
        for attempt in range(max_retries):
            try:
                response = llm_upstage(prompt).content
                batch_answers = parse_batch_response(response, len(batch_questions))
                all_answers.extend(batch_answers)
                print(f"배치 {i//batch_size + 1} 처리 완료: {len(batch_questions)}개 질문")
                break
            except Exception as e:
                print(f"배치 처리 중 오류 발생 (시도 {attempt + 1}/{max_retries}): {e}")
                if attempt < max_retries - 1:
                    time.sleep(retry_delay)
                else:
                    all_answers.extend(['-'] * len(batch_questions))  # 오류 시 기본값
        
        time.sleep(retry_delay)  # API 요청 간 딜레이
    
    df_result = df.copy()
    df_result['generated_answer'] = all_answers
    return df_result

In [None]:
# CSV 파일 읽기
science = pd.read_csv("지구과학OX.csv")
# 순서 섞기
science = science.sample(frac=1, random_state=42)
science = science.head(200)
# 배치 처리 실행
batch_size = 10  # 한 번에 처리할 질문 수
processed_df = process_questions_in_batches(science, batch_size)

# 결과 저장
processed_df.to_csv("정치와법OX_upstage_batch.csv", index=False)

# Error 비율
error_rate = (processed_df['generated_answer'] == '-').mean() * 100
print(f"Error 비율: {error_rate:.5f}%")
# Error row 제거
processed_df = processed_df[processed_df['generated_answer'] != '-']

# 정확도 계산
accuracy = (processed_df['answer'] == processed_df['generated_answer']).mean() * 100
print(f"정답 비율: {accuracy:.5f}%")

### Solar LLM with RAG

In [8]:
from langchain_upstage import ChatUpstage, UpstageEmbeddings
from langchain_pinecone import PineconeVectorStore
import pandas as pd
import os
from typing import List
import time
import re

class RAGScienceQA:
    def __init__(
        self,
        index_name: str,
        embedding_model,
        batch_size: int = 5,
        k_similar: int = 3
    ):
        self.batch_size = batch_size
        self.k_similar = k_similar
        
        # Initialize LLM
        self.llm = ChatUpstage(
            api_key=os.environ.get("UPSTAGE_API_KEY"),
            temperature=0,
            model="solar-pro"
        )
        
        # Initialize vector store
        self.vector_store = PineconeVectorStore.from_existing_index(
            index_name=index_name,
            embedding=embedding_model
        )

    def get_relevant_context(self, question: str) -> str:
        """주어진 질문과 관련된 컨텍스트를 검색합니다."""
        similar_docs = self.vector_store.similarity_search(
            question,
            k=self.k_similar
        )
        
        # 검색된 문서들을 하나의 컨텍스트로 결합
        context = "\n".join([doc.page_content for doc in similar_docs])
        return context

    def create_rag_prompt(self, questions: List[str]) -> str:
        """RAG 기반 프롬프트를 생성합니다."""
        # 각 질문에 대한 컨텍스트 검색
        question_contexts = []
        for i, q in enumerate(questions, start=1):
            context = self.get_relevant_context(q)
            question_contexts.append(f"Q{i}: {q}\nC{i}: {context}")
            
        # 번호가 매겨진 질문과 컨텍스트 생성
        qa_text = "\n\n".join(question_contexts)
        
        return f"""
        주어진 각 질문(Q)에 대해 관련 정보(C)를 참고하여 사실 여부를 판단해주세요.
        응답 형식: Q1: O 또는 X, Q2: O 또는 X, ..., Qn: O 또는 X
        
        {qa_text}
        """

    def parse_batch_response(self, response: str, batch_size: int) -> List[str]:
        """LLM의 응답을 개별 답변으로 파싱합니다."""
        try:
            pattern = r"Q(\d+):\s*(O|X)"
            matches = re.findall(pattern, response)
            answer = [ans for _, ans in matches if ans in ['O', 'X']]
            if len(answer) < batch_size:
                print(f"응답 개수({len(answer)})가 배치 크기({batch_size})보다 작습니다.")
                raise ValueError("응답 개수 부족")
            return answer[:batch_size]
        except Exception as e:
            print(f"응답 파싱 중 오류 발생: {e}")
            return ['-'] * batch_size

    def process_questions(self, df: pd.DataFrame) -> pd.DataFrame:
        """데이터프레임의 질문들을 RAG를 사용하여 처리합니다."""
        all_answers = []
        questions = df['question'].tolist()
        
        for i in range(0, len(questions), self.batch_size):
            batch_questions = questions[i:i + self.batch_size]
            prompt = self.create_rag_prompt(batch_questions)
            
            max_retries = 3
            for attempt in range(max_retries):
                try:
                    response = self.llm(prompt).content
                    batch_answers = self.parse_batch_response(
                        response,
                        len(batch_questions)
                    )
                    all_answers.extend(batch_answers)
                    print(f"배치 {i//self.batch_size + 1} 처리 완료: {len(batch_questions)}개 질문")
                    break
                except Exception as e:
                    print(f"배치 처리 중 오류 발생 (시도 {attempt + 1}/{max_retries}): {e}")
                    if attempt < max_retries - 1:
                        time.sleep(1)
                    else:
                        all_answers.extend(['-'] * len(batch_questions)) # Error 시 기본값
            
            time.sleep(1)  # API 요청 간 딜레이
        
        df_result = df.copy()
        df_result['generated_answer'] = all_answers
        return df_result

In [None]:
 # 환경 설정
index_name = "dev-01"

# OpenAI 임베딩 모델 초기화 (또는 다른 임베딩 모델 사용)
embeddings_query = UpstageEmbeddings(model="embedding-query") #4096


# RAG 시스템 초기화
rag_qa = RAGScienceQA(
    index_name=index_name,
    embedding_model=embeddings_query,
    batch_size=10,
    k_similar=2
)

# 데이터 로드
science = pd.read_csv("지구과학OX.csv")
# 순서 섞기
science = science.sample(frac=1, random_state=42)
science = science.head(200)

# RAG 기반 처리 실행
processed_df = rag_qa.process_questions(science)

# 결과 저장
processed_df.to_csv("정치와법OX_rag_upstage_dev1.csv", index=False)

# Error 비율
error_rate = (processed_df['generated_answer'] == '-').mean() * 100
print(f"Error 비율: {error_rate:.5f}%")
# Error row 제거
processed_df = processed_df[processed_df['generated_answer'] != '-']

# 정확도 계산
accuracy = (processed_df['answer'] == processed_df['generated_answer']).mean() * 100
print(f"정답 비율: {accuracy:.5f}%")