In [None]:
# Google Colab에서 실행하는 PDF 기반 RAG 질의응답 시스템

# ========================================
# 1. 필요한 라이브러리 설치
# ========================================
!pip install -q langchain langchain-community langchain-openai
!pip install -q chromadb sentence-transformers
!pip install -q PyPDF2 pymupdf
!pip install -q gradio
!pip install -q tiktoken
!pip install -q openai


In [None]:

import os
import re
from typing import List, Dict, Tuple
import json
import gradio as gr
import numpy as np
from datetime import datetime

# PDF 처리
import PyPDF2
import fitz  # pymupdf

# LangChain 관련
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
from langchain_openai import ChatOpenAI
from langchain.schema import Document
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.evaluation import load_evaluator

# ========================================
# 2. 환경 설정 및 API 키 설정
# ========================================

# OpenAI API 키 설정 (Colab 시크릿 또는 직접 입력)
# 옵션 1: Colab 시크릿 사용
# from google.colab import userdata
# os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')

# 옵션 2: 직접 입력 (보안 주의)
import getpass
# OpenAI API 키 설정 (사용자가 입력해야 함)
# from google.colab import userdata
# api_key=userdata.get('api_key')
# os.environ["OPENAI_API_KEY"] = api_key
# api_key2=userdata.get('api_key2')
# os.environ["LANGCHAIN_API_KEY"] = api_key2

from dotenv import load_dotenv

load_dotenv()
# OpenAI API 클라이언트 생성
OPENAPI_KEY = os.getenv("OPENAI_API_KEY")
LangSmith_KEY = os.getenv("LANGCHAIN_API_KEY")

# 2) LangSmith 연동 필수 환경변수
os.environ["LANGCHAIN_TRACING_V2"] = "true"      # 트레이싱 활성화
os.environ["LANGSMITH_ENDPOINT"]   = "https://api.smith.langchain.com"  # 기본값
os.environ["LANGSMITH_PROJECT"]    = "RAG_EV_ex3"                 # 수업용 프로젝트명

# ========================================
# 3. PDF 처리 클래스
# ========================================

class PDFProcessor:
    """PDF 문서를 읽고 텍스트를 추출하는 클래스"""

    def __init__(self):
        self.documents = []

    def extract_text_pypdf2(self, pdf_path: str) -> str:
        """PyPDF2를 사용한 텍스트 추출"""
        text = ""
        try:
            with open(pdf_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                for page_num in range(len(pdf_reader.pages)):
                    page = pdf_reader.pages[page_num]
                    text += page.extract_text()
        except Exception as e:
            print(f"PyPDF2 오류: {e}")
        return text

    def extract_text_pymupdf(self, pdf_path: str) -> str:
        """PyMuPDF를 사용한 텍스트 추출 (더 정확함)"""
        text = ""
        try:
            pdf_document = fitz.open(pdf_path)
            for page_num in range(pdf_document.page_count):
                page = pdf_document[page_num]
                text += page.get_text()
            pdf_document.close()
        except Exception as e:
            print(f"PyMuPDF 오류: {e}")
        return text

    def process_pdf(self, pdf_path: str, method='pymupdf') -> List[Document]:
        """PDF를 처리하여 Document 객체 리스트로 변환"""
        if method == 'pymupdf':
            text = self.extract_text_pymupdf(pdf_path)
        else:
            text = self.extract_text_pypdf2(pdf_path)

        # 텍스트를 청크로 분할
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            separators=["\n\n", "\n", ".", " ", ""],
            length_function=len
        )

        chunks = text_splitter.split_text(text)

        # Document 객체로 변환
        documents = []
        for i, chunk in enumerate(chunks):
            doc = Document(
                page_content=chunk,
                metadata={
                    "source": pdf_path,
                    "chunk_id": i,
                    "total_chunks": len(chunks)
                }
            )
            documents.append(doc)

        self.documents = documents
        return documents

# ========================================
# 4. RAG 시스템 클래스
# ========================================

class RAGSystem:
    """RAG 기반 질의응답 시스템"""

    def __init__(self, model_name="gpt-3.5-turbo"):
        self.embeddings = HuggingFaceEmbeddings(
            model_name="jhgan/ko-sroberta-multitask",
            model_kwargs={'device': 'cpu'},
            encode_kwargs={'normalize_embeddings': True}
        )

        self.llm = ChatOpenAI(
            model_name=model_name,
            temperature=0.3,
            max_tokens=1000
        )

        self.vectorstore = None
        self.retriever = None
        self.qa_chain = None

    def create_vectorstore(self, documents: List[Document]):
        """벡터 스토어 생성"""
        self.vectorstore = Chroma.from_documents(
            documents=documents,
            embedding=self.embeddings,
            persist_directory="./chroma_db"
        )
        self.retriever = self.vectorstore.as_retriever(
            search_type="similarity",
            search_kwargs={"k": 5}
        )

    def setup_qa_chain(self):
        """QA 체인 설정"""
        prompt_template = """당신은 AI 산업 동향 전문가입니다.
        주어진 컨텍스트를 바탕으로 질문에 정확하고 상세하게 답변해주세요.

        컨텍스트: {context}

        질문: {question}

        답변:"""

        PROMPT = PromptTemplate(
            template=prompt_template,
            input_variables=["context", "question"]
        )

        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self.retriever,
            return_source_documents=True,
            chain_type_kwargs={"prompt": PROMPT}
        )

    def query(self, question: str) -> Dict:
        """질문에 대한 답변 생성"""
        if not self.qa_chain:
            return {"error": "QA 체인이 초기화되지 않았습니다."}

        result = self.qa_chain({"query": question})
        return {
            "question": question,
            "answer": result['result'],
            "source_documents": result.get('source_documents', [])
        }

    def keyword_search(self, keyword: str, top_k: int = 5) -> List[Document]:
        """키워드 검색"""
        if not self.vectorstore:
            return []

        results = self.vectorstore.similarity_search(keyword, k=top_k)
        return results

# ========================================
# 5. 평가 시스템 클래스
# ========================================

class EvaluationSystem:
    """질의응답 평가 시스템"""

    def __init__(self, llm):
        self.llm = llm
        self.test_cases = []
        self.evaluation_results = []

    def add_test_case(self, question: str, expected_answer: str):
        """테스트 케이스 추가"""
        self.test_cases.append({
            "question": question,
            "expected_answer": expected_answer
        })

    def evaluate_answer(self, question: str, generated_answer: str, expected_answer: str) -> Dict:
        """답변 평가"""
        # 간단한 평가 프롬프트
        eval_prompt = f"""
        질문: {question}

        생성된 답변: {generated_answer}

        예상 답변: {expected_answer}

        위의 생성된 답변을 다음 기준으로 평가해주세요:
        1. 정확성 (0-10점): 정보의 정확도
        2. 완전성 (0-10점): 답변의 완전성
        3. 관련성 (0-10점): 질문과의 관련성

        JSON 형식으로 점수와 간단한 평가를 제공해주세요:
        {{"accuracy": 점수, "completeness": 점수, "relevance": 점수, "comment": "평가 내용"}}
        """

        response = self.llm.invoke(eval_prompt)

        try:
            # JSON 파싱 시도
            eval_result = json.loads(response)
        except:
            # 파싱 실패 시 기본값
            eval_result = {
                "accuracy": 5,
                "completeness": 5,
                "relevance": 5,
                "comment": "평가 파싱 실패"
            }

        return eval_result

    def run_evaluation(self, rag_system: RAGSystem) -> List[Dict]:
        """전체 테스트 케이스 평가 실행"""
        results = []

        for test_case in self.test_cases:
            # RAG 시스템으로 답변 생성
            response = rag_system.query(test_case["question"])
            generated_answer = response["answer"]

            # 답변 평가
            eval_result = self.evaluate_answer(
                test_case["question"],
                generated_answer,
                test_case["expected_answer"]
            )

            # 결과 저장
            result = {
                "question": test_case["question"],
                "expected": test_case["expected_answer"],
                "generated": generated_answer,
                "evaluation": eval_result
            }
            results.append(result)

        self.evaluation_results = results
        return results

    def get_summary_statistics(self) -> Dict:
        """평가 통계 요약"""
        if not self.evaluation_results:
            return {}

        accuracies = [r["evaluation"]["accuracy"] for r in self.evaluation_results]
        completeness = [r["evaluation"]["completeness"] for r in self.evaluation_results]
        relevance = [r["evaluation"]["relevance"] for r in self.evaluation_results]

        return {
            "avg_accuracy": np.mean(accuracies),
            "avg_completeness": np.mean(completeness),
            "avg_relevance": np.mean(relevance),
            "total_tests": len(self.evaluation_results)
        }

# ========================================
# 6. Gradio 인터페이스 생성
# ========================================

def create_gradio_interface(rag_system: RAGSystem, eval_system: EvaluationSystem):
    """Gradio 웹 인터페이스 생성"""

    def answer_question(question):
        """질문 답변 함수"""
        if not question:
            return "질문을 입력해주세요."

        result = rag_system.query(question)

        # 소스 문서 정보 추가
        sources = "\n\n**참고 문서:**\n"
        for i, doc in enumerate(result.get("source_documents", [])[:3]):
            sources += f"\n{i+1}. {doc.page_content[:200]}...\n"

        return result["answer"] + sources

    def search_keyword(keyword, top_k):
        """키워드 검색 함수"""
        if not keyword:
            return "검색어를 입력해주세요."

        results = rag_system.keyword_search(keyword, int(top_k))

        output = f"**'{keyword}' 검색 결과 (상위 {top_k}개):**\n\n"
        for i, doc in enumerate(results):
            output += f"{i+1}. {doc.page_content[:300]}...\n\n"

        return output

    def run_evaluation():
        """평가 실행 함수"""
        results = eval_system.run_evaluation(rag_system)
        stats = eval_system.get_summary_statistics()

        output = "**평가 결과:**\n\n"
        output += f"평균 정확성: {stats['avg_accuracy']:.2f}/10\n"
        output += f"평균 완전성: {stats['avg_completeness']:.2f}/10\n"
        output += f"평균 관련성: {stats['avg_relevance']:.2f}/10\n"
        output += f"총 테스트 수: {stats['total_tests']}\n\n"

        output += "**상세 결과:**\n"
        for i, result in enumerate(results[:5]):  # 처음 5개만 표시
            output += f"\n{i+1}. 질문: {result['question']}\n"
            output += f"   평가: 정확성 {result['evaluation']['accuracy']}, "
            output += f"완전성 {result['evaluation']['completeness']}, "
            output += f"관련성 {result['evaluation']['relevance']}\n"

        return output

    # Gradio 인터페이스 구성
    with gr.Blocks(title="AI Brief RAG 시스템") as interface:
        gr.Markdown("# 📚 SPRi AI Brief RAG 질의응답 시스템")
        gr.Markdown("PDF 문서 기반 질의응답 및 키워드 검색 시스템입니다.")

        with gr.Tab("질의응답"):
            question_input = gr.Textbox(
                label="질문을 입력하세요",
                placeholder="예: 미드저니의 최신 비디오 생성 모델은 무엇인가요?",
                lines=2
            )
            answer_output = gr.Textbox(
                label="답변",
                lines=10
            )
            answer_btn = gr.Button("답변 생성", variant="primary")
            answer_btn.click(
                fn=answer_question,
                inputs=question_input,
                outputs=answer_output
            )

        with gr.Tab("키워드 검색"):
            keyword_input = gr.Textbox(
                label="검색 키워드",
                placeholder="예: 오픈AI, 구글, AI 정책",
                lines=1
            )
            top_k_slider = gr.Slider(
                minimum=1,
                maximum=10,
                value=5,
                step=1,
                label="검색 결과 수"
            )
            search_output = gr.Textbox(
                label="검색 결과",
                lines=15
            )
            search_btn = gr.Button("검색", variant="primary")
            search_btn.click(
                fn=search_keyword,
                inputs=[keyword_input, top_k_slider],
                outputs=search_output
            )

        with gr.Tab("평가"):
            gr.Markdown("### 시스템 평가")
            gr.Markdown("미리 정의된 테스트 케이스로 시스템을 평가합니다.")
            eval_output = gr.Textbox(
                label="평가 결과",
                lines=15
            )
            eval_btn = gr.Button("평가 실행", variant="primary")
            eval_btn.click(
                fn=run_evaluation,
                inputs=[],
                outputs=eval_output
            )

        with gr.Tab("사용 가이드"):
            gr.Markdown("""
            ### 사용 방법

            1. **질의응답**: AI Brief 문서 내용에 대해 질문하면 관련 정보를 찾아 답변합니다.
            2. **키워드 검색**: 특정 키워드로 문서 내용을 검색할 수 있습니다.
            3. **평가**: 시스템의 성능을 평가하는 테스트를 실행합니다.

            ### 예시 질문
            - 미드저니의 최신 동향은?
            - 오픈AI의 챗GPT 에이전트에 대해 설명해주세요
            - EU의 AI 법 관련 내용은?
            - 구글 딥마인드의 최신 연구는?
            """)

    return interface

# ========================================
# 7. 메인 실행 코드
# ========================================

def main():
    """메인 실행 함수"""

    print("1. PDF 파일 업로드...")
    # Colab에서 파일 업로드
    from google.colab import files
    uploaded = files.upload()

    # 업로드된 파일 중 첫 번째 PDF 선택
    pdf_file = list(uploaded.keys())[0]
    print(f"업로드된 파일: {pdf_file}")

    print("\n2. PDF 처리 중...")
    pdf_processor = PDFProcessor()
    documents = pdf_processor.process_pdf(pdf_file)
    print(f"생성된 문서 청크 수: {len(documents)}")

    print("\n3. RAG 시스템 초기화...")
    rag_system = RAGSystem()
    rag_system.create_vectorstore(documents)
    rag_system.setup_qa_chain()
    print("RAG 시스템 준비 완료!")

    print("\n4. 평가 시스템 설정...")
    eval_system = EvaluationSystem(rag_system.llm)

    # 테스트 케이스 추가 (SPRi AI Brief 기반)
    eval_system.add_test_case(
        "미드저니의 첫 번째 비디오 생성 AI 모델 이름은?",
        "V1"
    )
    eval_system.add_test_case(
        "오픈AI의 챗GPT 에이전트는 어떤 기능을 수행하나요?",
        "웹 브라우징과 심층 추론 기능을 결합해 사용자 대신 복잡한 작업을 처리"
    )
    eval_system.add_test_case(
        "EU 집행위원회가 발표한 AI 법 관련 내용은?",
        "범용 AI 관련 실천 강령과 지침 발표"
    )
    eval_system.add_test_case(
        "구글 딥마인드가 공개한 DNA 분석 AI 모델은?",
        "알파게놈"
    )
    eval_system.add_test_case(
        "xAI의 차세대 AI 모델 이름은?",
        "그록 4"
    )

    print("평가 시스템 준비 완료!")

    print("\n5. Gradio 인터페이스 시작...")
    interface = create_gradio_interface(rag_system, eval_system)

    # Gradio 실행
    interface.launch(share=True)

# ========================================
# 8. 빠른 테스트용 함수
# ========================================

def quick_test():
    """빠른 테스트를 위한 함수 (PDF 업로드 없이)"""

    # 샘플 문서 생성
    sample_docs = [
        Document(
            page_content="미드저니가 첫 번째 비디오 생성 AI 모델 'V1'을 출시했습니다. V1은 이미지를 동영상으로 변환하는 모델로, 5초 길이의 동영상 4개를 제작할 수 있습니다.",
            metadata={"source": "sample", "chunk_id": 0}
        ),
        Document(
            page_content="오픈AI가 사용자 대신 복잡한 작업을 처리하는 '챗GPT 에이전트'를 공개했습니다. 웹 브라우징과 심층 추론 기능을 결합한 시스템입니다.",
            metadata={"source": "sample", "chunk_id": 1}
        ),
        Document(
            page_content="구글 딥마인드가 인간 DNA 염기서열 분석 AI 모델 '알파게놈'을 공개했습니다. 최대 100만 개의 DNA 염기서열을 바탕으로 분자적 특성을 예측합니다.",
            metadata={"source": "sample", "chunk_id": 2}
        ),
        Document(
            page_content="EU 집행위원회가 AI 법의 범용 AI 관련 실천 강령과 지침을 발표했습니다. 투명성, 저작권, 안전과 보안의 3개 측면에서 준수 사항을 제시했습니다.",
            metadata={"source": "sample", "chunk_id": 3}
        ),
        Document(
            page_content="xAI가 차세대 AI 모델 '그록 4'를 공개하고 정부 AI 시장에 진출했습니다. 도구 사용과 실시간 검색 통합 기능을 갖추었습니다.",
            metadata={"source": "sample", "chunk_id": 4}
        )
    ]

    print("RAG 시스템 초기화...")
    rag_system = RAGSystem()
    rag_system.create_vectorstore(sample_docs)
    rag_system.setup_qa_chain()

    print("평가 시스템 설정...")
    eval_system = EvaluationSystem(rag_system.llm)

    # 간단한 테스트 케이스
    eval_system.add_test_case(
        "미드저니의 비디오 생성 모델은?",
        "V1"
    )

    print("Gradio 인터페이스 시작...")
    interface = create_gradio_interface(rag_system, eval_system)
    interface.launch(share=True)

# ========================================
# 실행
# ========================================

# 전체 시스템 실행 (PDF 업로드 포함)
main()

# 빠른 테스트 (샘플 데이터 사용)
#quick_test()

RAG 시스템 초기화...


  self.embeddings = HuggingFaceEmbeddings(
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/229 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/123 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/744 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/443M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/442M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/585 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/156 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

평가 시스템 설정...
Gradio 인터페이스 시작...
Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://7319f895565b9f3e89.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
