In [1]:
# !pip install -qqq langchain langchain-community

# Data Ingestion (PDF)

[langchain_concept_document_loaders](https://python.langchain.com/docs/integrations/document_loaders/)

In [None]:
# !pip install -qqq pymupdf

In [None]:
# Data Loading & Extract
from langchain_community.document_loaders import PyMuPDFLoader

loader = PyMuPDFLoader('example_pdf')

docs = loader.load() # -> List[Document] : returns a list of Document objects

## without Langchain

In [None]:
import fitz

# pdf load
pdf = fitz.open('example.pdf')

# single page extract
def extract_single_page(pdf, pdf_num=0):
    page = pdf.load_page(pdf_num)
    text = page.get_text("text")  # Extract text from pdf
    return text

def extract_all_page(pdf):
    all_text = ""
    for page_num in range(pdf.page_count):
        all_text += extract_single_page(pdf, page_num)
    return all_text

pdf.close()

In [None]:
with fitz.open('example.pdf') as pdf:
    text = extract_all_page(pdf)

검색된 결과는 최종적으로 LLM의 프롬프트에 query와 함께 입력되기 때문에 **text**로 추출하여 저장하게 됩니다.

그러나 langchain에서는 검색된 문서의 정보(metadata)를 효과적으로 관리하고 활용하기 위해서 **Document**객체로 wrapping하여 사용하고있습니다.

# Data Indexing

## Splitting & Chinking

[langchain_concept_text_splitter](https://python.langchain.com/docs/concepts/text_splitters/)

In [10]:
#!pip install -qU langchain-text-splitters

In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=0)

texts = text_splitter.split_text(example_documents)

In [None]:
'''
langchain의 TextSplitter의 methods
1. split_text :
    split_text(text: str) -> List[str]

    텍스트를 input으로 받아, 조건에 맞춰 text로 split해주는 함수

2. create_documents :
    create_documents(texts: list[str], metadatas: Optional[list[dict[Any, Any]]]) -> List[Document]

    str List와 metadata로 Document객체를 만드는 함수

3. split_documents :
    split_documents(documents: Iterable[Document]) -> List[Document]

    Document객체를 input으로 받아, 조건에 맞춰 Document객체로 split해주는 함수
'''

## without Langchain

### plain text splitter

In [None]:
def plain_text_split(all_text, chunk_size, overlap):
    chunks = []

    for chunck_start in range(0, len(all_text), chunk_size - overlap):
        chunk = all_text[chunck_start : chunck_start + chunk_size]
        chunks.append(chunk)
    return chunks

### Simple Recursive Splitter

In [None]:
def recursive_split(text, chunk_size, separators=["\n\n", "\n", " ", ""]):
    sep = separators[0]
    parts = text.split(sep)
    chunks = []
    current_chunk = ""

    for part in parts:
        if len(current_chunk) + len(part) <= chunk_size:
            current_chunk += part
        else:
            if current_chunk:
                chunks.append(current_chunk.strip())
            if len(part) > chunk_size and len(separators) > 1:
                chunks.extend(recursive_split(part, separators[1:]))
            else:
                chunks.append(part.strip())
            current_chunk = ""

    if current_chunk:
        chunks.append(current_chunk.strip())
    return chunks

In [None]:
def merge_with_overlap(chunks, chunk_size, overlap):
    merged = []
    i = 0
    while i < len(chunks):
        chunk = chunks[i]
        current = chunk
        i += 1
        while i < len(chunks) and len(current) + len(chunks[i]) <= chunk_size:
            current += " " + chunks[i]
            i += 1
        merged.append(current.strip())

    # overlap 처리
    final_chunks = []
    for chunk in merged:
        if not final_chunks:
            final_chunks.append(chunk)
        else:
            prev = final_chunks[-1]
            overlap = prev[-overlap :] if overlap < len(prev) else prev
            combined = overlap + " " + chunk
            final_chunks.append(combined.strip())
    return final_chunks

In [None]:
def simple_recursive_splitter(text, chunk_size, overlap):
    chunks = recursive_split(text, chunk_size)  # List[chunk]
    return merge_with_overlap(chunks, chunk_size, overlap)

## Embedding Creation

In [None]:
# !pip install -qqq langchain-openai

In [None]:
from langchain_openai import OpenAIEmbeddings
import os
EMB_MODEL_NAME="text-embedding-3-small"
api_key=os.getenv("OPENAI_API_KEY")
embeddings = OpenAIEmbeddings(model=EMB_MODEL_NAME, api_key=api_key)

In [None]:
text = "TEST embedding text"
query_result = embeddings.embed_query(text)

# document embedding
doc_result = embeddings.embed_documents(docs)

In [None]:
# embedding dimension config
embeddings_1024 = OpenAIEmbeddings(model=EMB_MODEL_NAME, api_key=api_key, dimensions=1024)

# VectorStore

## FAISS

In [None]:
import faiss
from langchain_community.vectorstores import FAISS
from langchain_community.docstore.in_memory import InMemoryDocstore

dimension_size = len(embeddings.embed_query("hello world"))

# langchain vector db init 방식 (1)
faiss_vector_db = FAISS(
    embedding_function=OpenAIEmbeddings(),
    index=faiss.IndexFlatL2(dimension_size),
    docstore=InMemoryDocstore(),
)

In [None]:
# langchain vector db init 방식 (2)
faiss_vector_db = FAISS.from_documents(
    documents=example_docs, embedding=OpenAIEmbeddings()
    )

In [None]:
# langchain vector db init 방식 (3)
faiss_vector_db = FAISS.from_texts(
    example_list_text,
    embedding=OpenAIEmbeddings(),
    metadatas=[{"source": "page1"}, {"source": "page2"}],
    ids=["doc_id1", "doc_id2"],
)

## Retriever

langchain Vectorstore기반 retriever 생성

In [None]:
retriever = faiss_vector_db.as_retriever()

In [None]:
'''
as_retriever()

## parameters
- search_type : (similarity, mmr, similarity_score_threshold)
    * search_kwargs : (k, score_threshold, fetch_k, lambda_mult, filter...)


'''

In [None]:
config = {"configurable": {"search_kwargs": {"k": 3}}}
query = ''
docs = retriever.invoke(query, config=config)

# Response Generation

In [None]:
# Open AI LLM

api_key=os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
    temperature=0,
    model_name="gpt-4o",
    openai_api_key=api_key
)

In [None]:
template = """Answer the question based on the following context:
# Context
{context}

# Question
{question}
"""

In [None]:
from langchain_core.output_parsers import StrOutputParser

retrieval_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

In [None]:
query = ''
retrival_chain.invoke(query)