My First Own AI assistant using OLlama

1. 주요 라이브러리 및 모듈

    - langchain 관련 모듈을 다수 임포트하여 사용 (프롬프트 템플릿, 검색기, 임베딩, LLM 등)
    - sys 및 os, pathlib을 사용하여 파일 경로 및 환경 설정을 처리
    - Config 클래스를 통해 다양한 설정을 관리, 보다 쉬운 프로젝트 관리를 도움



In [1]:
from typing import List, Tuple
import sys
from langchain.prompts import ChatPromptTemplate
from langchain.retrievers import ContextualCompressionRetriever, EnsembleRetriever
from langchain_community.document_compressors.flashrank_rerank import FlashrankRerank
from langchain_community.embeddings.fastembed import FastEmbedEmbeddings
from langchain_community.retrievers import BM25Retriever
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_ollama import ChatOllama
from langchain_text_splitters import RecursiveCharacterTextSplitter

#import PATH
import os
from pathlib import Path

class Config:
    
    SEED = 42
    ALLOWED_FILE_EXTENSIONS = set([".pdf",".md",".txt",".docx"]) # 지원하는 파일 확장자
    
    class Model:
        NAME = "deepseek-r1:14b" # 기본 LLM으로 deepseek-r1:14b 사용
        TEMPERATURE = 0.6 # 응답 창의성 및 다양성 조절 파라미터
        
    class Preprocessing:
        CHUNK_SIZE = 2048 # 문서를 2048 길이로 청킹 (CHUNK_SIZE = 2048)
        CHUNK_OVERLAP = 128 # Chunk들이 겹치도록 --> Overlap
        EMBEDDING_MODEL = "BAAI/bge-small-en-v1.5" # Embedding 모델은 BAAI/bge-small-en-v1.5 사용
        RERANKER = "ms-marco-MiniLM-L-12-v2" # Reranker 모델은 ms-marco-MiniLM-L-12-v2 사용
        LLM = "llama3.2" # LLM 모델은 llama3.2 사용. 여기서의 LLM은 전처리 단계에서의 LLM이고 사용할 Chatbot에서의 LLM은 다른 걸 사용할 수 있다.
        CONTEXTUALIZE_CHUNKS = True # 
        N_SEMANTIC_CHUNKS = 5 #  의미적 검색 시 반환할 최대 청크 수
        N_BM25_RESULTS = 5 # BM25 기반 검색 시 반환할 최대 결과 개수
        
    class Chatbot:
        N_CONTEXT_RESULTS = 3 # 챗봇이 제공하는 문맥 수 (N_CONTEXT_RESULTS = 3)
        
    class Path:
        try:
            APP_HOME = Path(os.getenv("APP_HOME", Path(__file__).parent.parent))
        except NameError:
            APP_HOME = Path(os.getenv("APP_HOME", Path.cwd().parent))  # 현재 디렉토리 기준으로 설정
        DATA_DIR = APP_HOME / "data/pdf" # 데이터 디렉토리를 APP_HOME/data/pdf로 설정 (DATA_DIR)

        


def config_logging():
    config = {
        
        "handlers": [
            {
            "sink": sys.stdout,
            "colorize": True,
            "format": "<green{time:YYYY-MM-DD HH:mm:ss}> <level>{level: <8}</level> <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
            }
        ]
        
        
    }
        

2. Utils 정의.

    - TEXT_FILE_EXTENSION, MD_FILE_EXTENSION, PDF_FILE_EXTENSION 변수를 선언하여 지원하는 파일 유형을 정의.
    - File 데이터 클래스를 정의하여 name과 content 필드를 저장.
    - extract_pdf_content() 함수를 사용하여 PDF 문서에서 텍스트를 추출.
    - load_uploaded_file() 함수는 Streamlit의 UploadedFile을 받아 파일 확장자에 따라 적절히 처리.



In [2]:
from dataclasses import dataclass
from pathlib import Path
from pypdfium2 import PdfDocument
from streamlit.runtime.uploaded_file_manager import UploadedFile


TEXT_FILE_EXTENSION = ".txt"
MD_FILE_EXTENSION = ".md"
PDF_FILE_EXTENSION = ".pdf"


@dataclass
class File:
    name: str
    extension: str
    content: str
    

def extract_pdf_content(data: bytes) -> str:
    """PDF 문서에서 텍스트를 추출하는 함수"""
    pdf = PdfDocument(data)
    text = ""
    for page in pdf.pages:
        text_page = page.get_textpage()
        text += f"{text_page.get_text()}\n"  # 페이지별 전체 텍스트 추출
    return text


def load_uploaded_file(uploaded_file: UploadedFile) -> File:
    """업로드된 파일을 처리하여 File 객체로 변환하는 함수"""
    file_extension = Path(uploaded_file.name).suffix.lower()  # 확장자 소문자로 변환
    if file_extension not in Config.ALLOWED_FILE_EXTENSIONS:
        raise ValueError(f"Invalid file extension. Allowed extensions are {Config.ALLOWED_FILE_EXTENSIONS}")
    
    content = ""
    if file_extension == PDF_FILE_EXTENSION:
        content = extract_pdf_content(uploaded_file.getvalue())
    else:
        try:
            content = uploaded_file.getvalue().decode("utf-8")
        except UnicodeDecodeError:
            content = uploaded_file.getvalue().decode("latin1")  # 예외 처리 추가
    
    return File(name=uploaded_file.name, extension=file_extension, content=content)




3. RAG 기반 문서 검색 및 컨텍스트 생성 시스템 구현
    - 주요 기능 요약
        - ✅ 문서 청킹 및 컨텍스트 생성
        - ✅ LLM 기반 요약 및 컨텍스트 추가
        - ✅ Semantic + BM25 검색 결합 (Ensemble Retriever)
        - ✅ 리랭킹을 통한 최종 검색 결과 압축



In [3]:
from langchain_core.prompts import ChatPromptTemplate

chat_template = ChatPromptTemplate.from_messages(
    [
        # role, message
        ("system", "당신은 친절한 AI 어시스턴트입니다. 당신의 이름은 {name} 입니다."),
        ("human", "반가워요!"),
        ("ai", "안녕하세요! 무엇을 도와드릴까요?"),
        ("human", "{user_input}"),
    ]
)



In [5]:
# 챗 message 를 생성합니다.
messages = chat_template.format_messages(
    name="테디", user_input="당신의 이름은 무엇입니까?"
)
messages


[SystemMessage(content='당신은 친절한 AI 어시스턴트입니다. 당신의 이름은 테디 입니다.', additional_kwargs={}, response_metadata={}),
 HumanMessage(content='반가워요!', additional_kwargs={}, response_metadata={}),
 AIMessage(content='안녕하세요! 무엇을 도와드릴까요?', additional_kwargs={}, response_metadata={}),
 HumanMessage(content='당신의 이름은 무엇입니까?', additional_kwargs={}, response_metadata={})]

In [6]:
messages

[SystemMessage(content='당신은 친절한 AI 어시스턴트입니다. 당신의 이름은 테디 입니다.', additional_kwargs={}, response_metadata={}),
 HumanMessage(content='반가워요!', additional_kwargs={}, response_metadata={}),
 AIMessage(content='안녕하세요! 무엇을 도와드릴까요?', additional_kwargs={}, response_metadata={}),
 HumanMessage(content='당신의 이름은 무엇입니까?', additional_kwargs={}, response_metadata={})]

In [None]:
# CONTEXT_PROMPT: 청크별 컨텍스트 생성 프롬프트

CONTEXT_PROMPT = ChatPromptTemplate.from_template(
    
    """
    You're an expert in document analysis. Your task is to provide brief, relevant context for a chunk of text.
    
    Here is the document:
    
    <document>
    {document}
    </document>
    
    Here is the chunk we want to situate within the whole document:
    
    <chunk>
    {chunk}
    </chunk>
    
    Provide a concise context (2-3 sentences) for this chunk, considering the following guidelines:
    1. Identify the main topic or concept discussed in the chunk.
    2. Mention any relevant information or comparisons from the broader document context.
    3. If applicable, note how this information relates to the overall theme or purpose of the document.
    4. Include any key figures, dates, or percentages that provide important context.
    5. Do not use phrases like "This chunk discusses..." or "The chunk is about...". Instead, directly state that information.
    
    Please give a short succint context to situate this chunk within the overall document for the purpose of summarization.
    
    Context:
    """.strip()
    
)

# RecursiveCharacterTextSplitter: 재귀적 문자 기반 텍스트 분할기. 문서를 일정한 크기(chunk_size)로 나누되, 청크 간 chunk_overlap 만큼 중첩을 유지.
# RecursiveCharacterTextSplitter는 문장 구조를 유지하면서 효율적으로 청킹.

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=Config.Preprocessing.CHUNK_SIZE,
    chunk_overlap=Config.Preprocessing.CHUNK_OVERLAP
)

# ChatOllama 객체를 생성하여 Open-Source LLM(llama3.2)을 활용. ChatOllama는 Chatbot을 위한 Ollama 모델을 사용하는 객체이며 temperature=0.6이 적절한지 테스트 필요 (정확성과 창의성 균형 조정).
def create_llm() -> ChatOllama:
    return ChatOllama(model = Config.Preprocessing.LLM, temperature=Config.Model.TEMPERATURE, seed=Config.SEED, keep_alive = -1)


# FastEmbedEmbeddings를 사용하여 문서의 임베딩 벡터 생성. Embedding Mo   
def create_embeddings() -> FastEmbedEmbeddings:
    return FastEmbedEmbeddings(model=Config.Preprocessing.EMBEDDING_MODEL, seed=Config.SEED)

# FlashrankRerank를 사용하여 검색된 문서를 랭킹 정렬.
def create_reranker() -> FlashrankRerank:
    return FlashrankRerank(model=Config.Preprocessing.RERANKER, seed=Config.SEED,
                           top_k=Config.Chatbot.N_CONTEXT_RESULTS)
    
# (4) 문서 청크 생성 및 컨텍스트 추가 - CONTEXT_PROMPT를 이용해 LLM이 청크의 문맥을 파악하여 컨텍스트를 생성.
def _generate_context(llm:ChatOllama, document:str, chunk:str)->str:
    messages = CONTEXT_PROMPT.format_messages(document=document, chunk=chunk)
    response = llm.invoke(messages)
    return response.content

# _generate_context()를 활용해 각 청크에 문맥 정보를 추가.
# Config.Preprocessing.CONTEXTUALIZE_CHUNKS 설정에 따라 컨텍스트 생략 가능.
def _create_chunks(document:Document)->List[Document]: # 여기서의 Document는 langchain_core.documents.Document 객체임
    chunks = text_splitter.split_documents([document])
    if not Config.Preprocessing.CONTEXTUALIZE_CHUNKS:
        return chunks
    llm = create_llm()
    contextual_chunks = []
    
    for chunk in chunks:
        context = _generate_context(llm, document.page_content, chunk.page_content)
        chunk_with_context = f"{context}\n\n{chunk.page_content}"
        contextual_chunks.append(Document(page_content=chunk_with_context,
                                          metadata=chunk.metadata))
    return contextual_chunks

# (5) 파일 인제스트 및 검색기 생성

def ingest_files(files:List[File]) -> BaseRetriever:
    documents = [Document(file.content, metadata={'source':file.name}) for file in files] # 파일을 받아 문서(Document) 리스트로 변환.
    chunks = []
    for document in documents:
        chunks.extend(_create_chunks(document))
    
    # BM25Retriever (키워드 기반)와 InMemoryVectorStore (임베딩 기반)로 검색기 생성.
    semantic_retriever = InMemoryVectorStore.from_documents(
        chunks, create_embeddings().as_retriever(search_kwargs={"k": Config.Preprocessing.N_SEMANTIC_RESULTS}))
    
    bm25_retriever = BM25Retriever.from_documents(chunks)
    bm25_retriever.k = Config.Preprocessing.N_BM25_RESULTS
    # 두 개의 검색기를 가중치를 조정해 EnsembleRetriever로 결합.

    ensemble_retriever = EnsembleRetriever([semantic_retriever, bm25_retriever],
                                           weights = [0.6, 0.4])
    
    # ContextualCompressionRetriever를 활용해 최종 결과 정제.
    return ContextualCompressionRetriever(
        base_compressor = create_reranker(), base_retriever = ensemble_retriever
    )
    

4. LangChain 기반 RAG 챗봇의 구조
    - 주요 기능
        - ✅ 문서 검색 + LLM 답변 생성 (RAG 구조)
        - ✅ LangGraph 기반 대화 상태 관리
        - ✅ 답변을 스트리밍하여 순차적으로 반환
        - ✅ BM25 + Semantic 검색 결합




In [None]:
from langgraph.graph import START, StateGraph
from langgraph.graph.state import CompiledStateGraph
from langchain_core.messages import BaseMessage, AIMessage, HumanMessage
from langchain_core.prompts import MessagesPlaceholder
from typing import Iterable,TypedDict
from enum import Enum
from typing import Union


# 시스템 프롬프트 - 사용자의 문서에서 정보를 검색하여 대화할 수 있도록 유도.

SYSTEM_PROMPT="""
    You're having a conversation with an user about excerpts of their files.
    Try to be helpful and answer their questions.
    
    
    If you do not know the answer, say that you do not know and try to clarify the question.
    """.strip()

# 메시지 템플릿 - context에 검색된 문서를 포함하고, question을 입력받아 답변 생성.
PROMPT = """
Here's the infornation you have about the excerpts of the files:

<context>
{context}
</context>

One file can have multiple excerpts.

Please, respond to the query below

<question>
{question}
</question>

Answer:
"""

FILE_TEMPLATE = """
<file>
    <name>{name}</name>
    <content>{content}</content>
</file>
""".strip()

# 전체 대화 흐름 템플릿 - 기존 대화 기록 (chat_history) 포함하여 일관된 대화를 유지.
PROMPT_TEMPLATE = ChatPromptTemplate.from_messages(
    [
(
    "system",
    SYSTEM_PROMPT,
),

MessagesPlaceholder(variable_name = "chat_history"),
("human", PROMPT),
    ]
)

# 역할 및 데이터 클래스 정의 
class Role(Enum):
    USER = "user"
    ASSISTANT = "assistant"
    
@dataclass
class Message:
    # Message 객체는 '대화의 한 단위'를 저장.
    role:Role
    context:str
    
@dataclass
class ChunkEvent:
    # 이벤트 데이터 클래스 (스트리밍 응답) - 스트리밍된 답변의 일부 청크(부분)를 저장.
    content: str
    
@dataclass
class SourcesEvent:
    # 검색된 문서 목록을 저장.
    sources:List[Document]
    
@dataclass
class FinalAnswerEvent:
    #최종적으로 생성된 LLM 답변을 저장.
    content:str

class State(TypedDict):
    # 현재 챗봇의 상태를 저장하는 TypedDict.
    # 대화 이력(chat_history), 검색된 문서(context)를 포함.
    question:str
    chat_history:List[BaseMessage]
    context:List[Document]
    answer:str
    

def _remove_thinking_from_message(message:str)->str:
    close_tag = "</think>"
    tag_length = len(close_tag)
    
    
    return message[message.find(close_tag) + tag_length :].strip()

def create_history(welcome_message:Message)->List[Message]:
    return [welcome_message]


# **문서 검색기(retriever)를 생성하여 파일 검색 가능하도록 설정.
# LLM(ChatOllama)을 초기화하여 답변 생성.
# LangGraph 기반 workflow를 설정하여 대화 흐름을 관리.

class Chatbot:
    def __init__(self, files:List[File]):
        self.files = files
        self.retriever = ingest_files(files)
        self.llm = ChatOllama(
            model = Config.Model.Name,
            temperataure = Config.Model.Temperature,
            seed = Config.SEED,
            keep_alive = -1,
            verbose = False
        )
        self.workflow = self._create_workflow()
        
        
    def _format_docs(self, docs:List[Document])->str:
        #  문서 포맷팅
        return "\n\n".join(
            FILE_TEMPLATE.format(name=doc.metadata["source"], content=doc.page_content) # format 메소드를 활용해서 변수에 값을 넣어줌
            for doc in docs
        )


    def _retrieve(self, state:State): # 문서 검색
        context = self.retriever.invoke(state["question"])
        return {"context":context}
    
    def _generate(self, state:State): # LLM을 사용해 답변 생성
        messages = PROMPT_TEMPLATE.invoke(
            {
                "question": state["question"],
                "context": self._format_docs(state["context"]),
                "chat_history": state["chat_history"],
            }
        )

        answer = self.llm.invoke(messages)
        return {"answer":answer}
    
    
    def _create_workflow(self) -> CompiledStateGraph: # 
        graph_builder = StateGraph(State).add_sequence([self._retrieve, self._generate])
        graph_builder.add_edge(START, "_retrieve")
        return graph_builder.compile()

    

    def _ask_model(self, prompt: str, chat_history: List[Message]) -> Iterable[Union[SourcesEvent, ChunkEvent, FinalAnswerEvent]]: # 스트리밍 방식 답변 생성
        history = [
            AIMessage(m.content) if m.role == Role.ASSISTANT else HumanMessage(m.content)
            for m in chat_history
        ]
        payload = {"question":prompt, "chat_history":history}
        
        config = {
            "configurable": {"thread_id": 42},
        }
        for event_type, event_data in self.workflow.stream(
            payload,
            config = config,
            stream_mode=  ["updates","messages"],
        ):
            
            if event_type == "messages":
                chunk, _ = event_data
                yield ChunkEvent(chunk.content)
                
            if event_type == "updates":
                if "_retrieve" in event_data:
                    documents = event_data["_retrieve"]["context"]
                    yield SourcesEvent(sources=documents)
                if "_generate" in event_data:
                    answer = event_data["_generate"]["answer"]
                    yield FinalAnswerEvent(content=answer.content)
                
    
    
    def ask(self, prompt:str, chat_history:List[Message])->Iterable[Union[SourcesEvent, ChunkEvent, FinalAnswerEvent]]:  
        for event in self._ask_model(prompt, chat_history):
            yield event
            if isinstance(event, FinalAnswerEvent):
                response = _remove_thinking_from_message("".join(event.content))
                chat_history.append(Message(role=Role.USER,content = prompt))
                chat_history.append(Message(role=Role.ASSISTANT, context=response))
    

