# Multimodal RAG : Coupon Image Retrieval(Colab)
## 설치방법
- github에서 코드 다운 받기 : https://github.com/cserock/arpo/archive/refs/heads/main.zip
- 압축을 풀고 arpo-main 폴더를 구글 드라이브의 /content/gdrive/MyDrive 경로에 업로드 합니다.
- /content/gdrive/MyDrive/arpo-main/.env_sample 파일을 copy하여 .env 파일을 생성하고 LANGCHAIN_API_KEY, OPENAI_API_KEY를 입력합니다.
- 아래 명령을 실행합니다.

### Colab 환경 설정

In [None]:
# 구글드라이브 연동
from google.colab import drive
drive.mount('/content/gdrive/')

# 경로 설정
COLAB_PROJECT_NAME = "arpo-main"
COLAB_PROJECT_PATH = f'/content/gdrive/MyDrive/{COLAB_PROJECT_NAME}'

### 라이브러리 설치

In [None]:
!pip install -r '{COLAB_PROJECT_PATH}/requirements_colab.txt'

### 0. 환경 설정

In [None]:
import os
from dotenv import load_dotenv
from langchain_community.cache import SQLiteCache
from langchain_core.globals import set_llm_cache
from langchain_teddynote import logging

# project 명
PROJECT = COLAB_PROJECT_NAME

# project root 경로
PROJECT_ROOT_PATH = COLAB_PROJECT_PATH

# default LLM 설정
DEFAULT_LLM = 'OPENAI' # 'UPSTAGE', 'OLLAMA'

# default embedding model 설정
DEFAULT_EMBEDDING_MODEL = 'OPENAI' # 'UPSTAGE', 'OLLAMA'

# coupon image 파일 경로
IMAGE_PATH = f'{PROJECT_ROOT_PATH}/data/coupon_image_files'

# OCR 모델에서 추출한 coupon 정보가 저장된 coupon_infos.json 파일 경로
COUPON_INFO_JSON_PATH = f'{PROJECT_ROOT_PATH}/data/coupon_info.json'

# VectorDB 저장 경로
VECTOR_DB_ROOT_PATH = f'{PROJECT_ROOT_PATH}/.vector_db'

# FAISS 저장 경로
FAISS_DB_PATH = f'{VECTOR_DB_ROOT_PATH}/faiss'

# FAISS index 이름
FAISS_INDEX_NAME = 'coupon'

# Chroma 저장 경로
CHROMA_DB_PATH = f'{VECTOR_DB_ROOT_PATH}/chroma'

# Chroma collection 이름
CHROMA_COLLECTION_NAME = "coupon"

# PINECONE 저장 경로
PINECONE_DB_PATH = f'{VECTOR_DB_ROOT_PATH}/pinecone'

# PINECONE index 이름
PINECONE_INDEX_NAME = 'coupon'

# cache 경로 설정
CACHE_PATH = f'{PROJECT_ROOT_PATH}/.cache'

# LLM CACHE 경로 설정
LLM_CACHE_PATH = f'{CACHE_PATH}/multimodal-rag_cache.db'

# env 설정 로딩
load_dotenv(f'{COLAB_PROJECT_PATH}/.env')

# 캐시 디렉토리를 생성합니다.
if not os.path.exists(CACHE_PATH):
    os.makedirs(CACHE_PATH)

# SQLiteCache를 사용합니다.
set_llm_cache(SQLiteCache(database_path=LLM_CACHE_PATH))

# langsmith 추적 시작
logging.langsmith(PROJECT)

#### get_llm() : default LLM을 초기화하여 리턴합니다.

In [None]:
from langchain_openai import ChatOpenAI
from langchain_upstage import ChatUpstage
from langchain_community.chat_models import ChatOllama

def get_llm(model=DEFAULT_LLM):
    if model == 'OPENAI':
        # model : gpt-4o-mini
        llm = ChatOpenAI(
            model_name="gpt-4o-mini",
            temperature=0.0,
            max_tokens=2048
        )
    elif model == 'UPSTAGE':
        # model : Upstage solar-pro
        llm = ChatUpstage(
            model="solar-pro",
            temperature=0.0,
            max_tokens=2048
        )
    else:
        # model : EEVE-Korean-Instruct-10.8B:latest
        llm = ChatOllama(
            model="EEVE-Korean-Instruct-10.8B:latest",
            temperature=0.0,
            max_tokens=2048
        )
    return llm

In [None]:
llm = get_llm()
print(llm)

#### get_embedding() : 설정한 default Embedding model을 초기화하여 리턴합니다.

- OpenAI
  - text-embedding-3-small : $0.02 / 1M tokens
  - text-embedding-3-large : $0.13 / 1M tokens
  - text-embedding-ada-002 : $0.1 / 1M tokens
  - dimension : 1536
- Upstage
  - Embeddings : $0.1 / 1M tokens
  - dimension : 4096
- Ollama
  - nomic-embed-text : open source

In [None]:
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_upstage import UpstageEmbeddings
from langchain_community.embeddings import OllamaEmbeddings

def get_embedding(embedding_model=DEFAULT_EMBEDDING_MODEL):
    if embedding_model == 'OPENAI':
        # OpenAIEmbeddings
        embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
    elif embedding_model == 'UPSTAGE':
        # UpstageEmbeddings
        embeddings = UpstageEmbeddings(model="solar-embedding-1-large")
    else:
        # ollama pull nomic-embed-text 실행 필요
        embeddings = OllamaEmbeddings(model="nomic-embed-text")
    return embeddings

In [None]:
embedding = get_embedding()
print(embedding)

### 1. OCR 모델을 통한 이미지 정보 분석
- Pororo : https://kakaobrain.github.io/pororo/miscs/ocr.html
- Upstage OCR : https://developers.upstage.ai/docs/apis/document-ocr

### 2. Mulltimodal Model을 사용하여 coupon 이미지 분류
- 로컬에 설치된 llava:13b를 사용 : https://ollama.com/library/llava:13b
- prompt를 통해 해당 이미지가 coupon 이미지인지를 검사하고 결과를 TRUE, FALSE로 답변함
- OCR 모델을 통해 쿠폰이미지에서 추출한 text와 이미지 경로입니다. 이를 통해 LLM을 통해 쿠폰 정보를 생성합니다.


### [참고] colab에서는 로컬 모델의 설치가 어렵기 때문에 1, 2단계는 실행하지 않습니다.
- 1,2단계에서 만들어진 데이터는 /data/coupon_info.json 파일에 저장되어 있으며 이후 단계에서 사용합니다.

### 3. 데이터 전처리 & 임베딩

쿠폰 정보를 임베딩하기 위해 OCR을 통해 얻은 쿠폰 이미지 데이터를 LLM을 통해 전처리합니다.

In [None]:
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import Optional

#
preprocess_prompt = PromptTemplate.from_template(
    """
        당신은 쿠폰 이미지를 ocr 모델을 통해 추출한 키워드로 쿠폰 정보를 추론하는 agent입니다. 추론시 다음의 규칙을 지켜주세요.
        - 정보는 주어진 키워드에서만 사용하세요.
        - 쿠폰코드는 공백(space) 또는 대시(-)를 포함할 수 있으며 공백(space) 또는 대시(-)를 제외하고 16자의 숫자로 구성되어 있습니다.
        - 오타가 있을 수 있으니 맞춤법에 맞게 수정해 주세요.

        다음은 ocr 모델을 통해 추출한 쿠폰 키워드입니다. 이 정보를 바탕으로 쿠폰정보를 추론해 주세요.
        답변 출력시 추론에 대한 설명은 제외하고 아래 FORMAT을 참고해서 json형태로만 출력해 주세요.

        KEYWORD:
        {coupon_info}

        FORMAT:
        {format}
    """
)

class CouponSummary(BaseModel):
    title: str = Field(description="쿠폰이름")
    coupon_code: str = Field(description="쿠폰코드")
    publisher: str = Field(description="교환처")
    valid_date: str = Field(description="유효기간")
    order_number: str = Field(description="주문번호")
    summary: str = Field(description="쿠폰 정보 요약")
    image_path: Optional[str] = None

# PydanticOutputParser 생성
preprocess_parser = PydanticOutputParser(pydantic_object=CouponSummary)

# instruction 을 출력합니다.
print(preprocess_parser.get_format_instructions())
preprocess_prompt = preprocess_prompt.partial(format=preprocess_parser.get_format_instructions())

generate_coupon_summary()는 쿠폰 정보를 바탕으로 쿠폰 요약(summary)을 생성하기 위한 함수입니다.
쿠폰 요약(summary)은 embedding 대상이 되며 RAG에서 retrieval시에 추출되는 context 입니다.

In [None]:
from langchain_core.prompts import PromptTemplate

def generate_coupon_summary(coupon_summary: CouponSummary):
    prompt = PromptTemplate.from_template(
        """
            당신은 주어진 쿠폰 정보를 자연스러운 한국어 문장으로 설명하는 agent입니다.
            아래의 주어진 정보 이외에는 절대 사용하면 안됩니다. 쿠폰코드는 필수로 포함되어야 하며 쿠폰에 대해 자연스러운 한국어 문장으로 설명해 주세요.
            - 쿠폰이름 : {title}
            - 쿠폰코드 : {coupon_code}
            - 발행처 : {publisher}
            - 유효기간 : {valid_date}
            - 주문번호 : {order_number}
            - 쿠폰요약 : {summary}
        """
    )

    input = {
        "title": coupon_summary.title,
        "coupon_code": coupon_summary.coupon_code,
        "publisher": coupon_summary.publisher,
        "valid_date": coupon_summary.valid_date,
        "order_number": coupon_summary.order_number,
        "summary": coupon_summary.summary
    }

    # llm 로딩
    llm = get_llm()

    # 체인 생성
    chain = prompt | llm

    output = chain.invoke(input)
    # print(output.content)
    return output.content

coupon_info.json 파일로부터 sources 변수를 초기화 합니다.


In [None]:
import json

# coupon_info.json 로딩
with open(COUPON_INFO_JSON_PATH, "r") as f:
	sources = json.load(f)
print(sources)

LLM을 사용하여 OCR에서 추출한 coupon_info로부터 CouponSummary 객체 생성

In [None]:
# llm 로딩
preprocess_llm = get_llm()

# 체인 생성
preprocess_chain = preprocess_prompt | preprocess_llm

result_summary_text = []
result_metadata = []
result_ids = []

# sources에는 coupon_info.json 로부터 로딩한 coupon 정보가 들어있음
for source in sources:
    input = {
        "coupon_info": source['coupon_info']
    }

    output = preprocess_chain.invoke(input)
    structured_output = preprocess_parser.parse(output.content)

    # 생성된 쿠폰 요약 추가
    structured_output.summary = generate_coupon_summary(structured_output)

    # json 파일의 image_path를 colab 환경 IMAGE_PATH 로 변경
    image_path = source['image_path'].replace("./data/coupon_image_files", IMAGE_PATH)

    # 이미지 path 추가
    structured_output.image_path = image_path

    result_ids.append(image_path)
    result_summary_text.append(structured_output.summary)
    result_metadata.append(structured_output.__dict__)

# print(result_ids)
print(result_summary_text)
# print(result_metadata)

#### VectorDB 설정 : Chroma
Vector DB로 Chroma를 사용하는 경우 초기화합니다.

In [None]:
from langchain_chroma import Chroma

# Chroma 생성
coupon_db = Chroma.from_texts(
    ids=result_ids, texts=result_summary_text, metadatas=result_metadata, embedding=get_embedding(), persist_directory=CHROMA_DB_PATH, collection_name=CHROMA_COLLECTION_NAME
)

# Chroma 로딩
# coupon_db = Chroma(persist_directory=CHROMA_DB_PATH, embedding_function=get_embedding(), collection_name=CHROMA_COLLECTION_NAME)
coupon_db.get()

# Chroma 삭제
# coupon_db.reset_collection()

similarity_search_with_relevance_scores()를 사용하여 쿼리를 실행합니다.

In [None]:
coupon_db.similarity_search_with_relevance_scores("GS에서 사용할 수 있는 쿠폰을 알려주세요.", k=4)

retriever를 설정하여 쿼리를 실행합니다.

In [None]:
retriever = coupon_db.as_retriever(search_type="mmr", search_kwargs={"k": 4, "fetch_k": 10})
# retriever = coupon_db.as_retriever(search_type="similarity", search_kwargs={"k": 3})
# retriever = coupon_db.as_retriever(search_kwargs={"k": 5})
retriever.invoke("베이글을 주문할 수 있는 쿠폰")

#### VectorDB 설정 : FAISS
Vector DB로 FAISS를 사용하는 경우 초기화 합니다.

In [None]:
from langchain_community.vectorstores import FAISS

coupon_db = FAISS.from_texts(
    result_summary_text,
    embedding=get_embedding(),
    metadatas=result_metadata,
    ids=result_ids,
)

# 로컬 Disk 에 저장
coupon_db.save_local(folder_path=FAISS_DB_PATH, index_name=FAISS_INDEX_NAME)

# 저장된 내용 확인
coupon_db.docstore._dict

In [None]:
from langchain_community.vectorstores import FAISS

# 저장된 데이터를 로드
coupon_db = FAISS.load_local(
    folder_path=FAISS_DB_PATH,
    index_name=FAISS_INDEX_NAME,
    embeddings=get_embedding(),
    allow_dangerous_deserialization=True,
)

# 로드된 데이터를 확인
coupon_db.index_to_docstore_id

FAISS로 similarity_search_with_relevance_scores()를 사용하여 쿼리를 실행합니다.

In [None]:
coupon_db.similarity_search_with_relevance_scores("GS에서 사용할 수 있는 쿠폰을 알려주세요.", k=4)

FAISS에 retriever를 설정하여 쿼리를 실행합니다.

In [None]:
retriever = coupon_db.as_retriever(search_type="mmr", search_kwargs={"k": 4, "fetch_k": 10})
# retriever = coupon_db.as_retriever(search_type="similarity", search_kwargs={"k": 3})
# retriever = coupon_db.as_retriever(search_kwargs={"k": 5})
retriever.invoke("베이글을 주문할 수 있는 쿠폰")

한글 단어 retrieval 성능 개선을 위해 한글 형태소 분석기 라이브러리인 kiwipiepy를 설치하고 kiwi_tokenize로 BM25Retriever를 설정하여 FAISS와 앙상블로 사용

In [None]:
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain_core.documents import Document
from kiwipiepy import Kiwi

kiwi = Kiwi()

def kiwi_tokenize(text):
    return [token.form for token in kiwi.tokenize(text)]

# FAISS로부터 documents를 추출합니다.
docs_dict = coupon_db.docstore.__dict__['_dict']

# list로 저장
docs = list(docs_dict.values())

result_summary_text = []
result_metadata = []

for doc in docs:
    result_summary_text.append(doc.page_content)
    result_metadata.append(doc.metadata)

kiwi_bm25 = BM25Retriever.from_texts(result_summary_text, metadatas=result_metadata, preprocess_func=kiwi_tokenize)

retriever = EnsembleRetriever(
    retrievers=[kiwi_bm25, retriever],  # 사용할 검색 모델의 리스트
    weights=[0.5, 0.5],  # 각 검색 모델의 결과에 적용할 가중치
    search_type="mmr",  # 검색 결과의 다양성을 증진시키는 MMR 방식을 사용
)

In [None]:
retriever.invoke("베이글을 주문할 수 있는 쿠폰")

#### VectorDB 설정 : Pinecone
Vector DB로 Pinecone을 사용하는 경우 초기화 합니다.

In [None]:
from pinecone import Pinecone
from dotenv import load_dotenv

# env 설정 로딩
load_dotenv()

import time
import os
from pinecone import Pinecone, ServerlessSpec
from langchain_pinecone import PineconeVectorStore

pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY"))

existing_indexes = [index_info["name"] for index_info in pc.list_indexes()]

if PINECONE_INDEX_NAME not in existing_indexes:
    pc.create_index(
        name=PINECONE_INDEX_NAME,
        dimension=1536,
        metric="cosine",
        spec=ServerlessSpec(cloud="aws", region="us-east-1"),
    )
    while not pc.describe_index(PINECONE_INDEX_NAME).status["ready"]:
        time.sleep(1)

index = pc.Index(PINECONE_INDEX_NAME)
coupon_db = PineconeVectorStore(index=index, embedding=get_embedding())

Add documents

In [None]:
coupon_db.add_texts(texts=result_summary_text, metadatas=result_metadata, ids=result_ids)

PINECONE으로 similarity_search_with_relevance_scores()를 사용하여 쿼리를 실행합니다.

In [None]:
coupon_db.similarity_search_with_relevance_scores("GS에서 사용할 수 있는 쿠폰을 알려주세요.", k=4)

PINECONE에 retriever를 설정하여 쿼리를 실행합니다.

In [None]:
retriever = coupon_db.as_retriever(search_type="mmr", search_kwargs={"k": 4, "fetch_k": 10})
# retriever = coupon_db.as_retriever(search_type="similarity", search_kwargs={"k": 3})
# retriever = coupon_db.as_retriever(search_kwargs={"k": 5})
retriever.invoke("베이글을 주문할 수 있는 쿠폰")

#### Reranker

1. Cross Encoder Reranker

In [None]:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder

def pretty_print_docs(docs):
    print(
        f"\n{'-' * 100}\n".join(
            [f"Document {i+1}:\n\n" + d.page_content for i, d in enumerate(docs)]
        )
    )

# 모델 초기화
model = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-v2-m3")

# 상위 3개의 문서 선택
compressor = CrossEncoderReranker(model=model, top_n=3)

# 문서 압축 검색기 초기화
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor, base_retriever=retriever
)

# 압축된 문서 검색
compressed_docs = compression_retriever.invoke("베이글을 주문할 수 있는 쿠폰")

# 문서 출력
pretty_print_docs(compressed_docs)

# retriever 설정
retriever = compression_retriever

2. Cohere reranker

In [None]:
from langchain.retrievers.contextual_compression import ContextualCompressionRetriever
from langchain_cohere import CohereRerank

def pretty_print_docs(docs):
    print(
        f"\n{'-' * 100}\n".join(
            [f"Document {i+1}:\n\n" + d.page_content for i, d in enumerate(docs)]
        )
    )

# 문서 재정렬 모델 설정
compressor = CohereRerank(model="rerank-multilingual-v3.0")

# 문맥 압축 검색기 설정
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor, base_retriever=retriever
)

# 압축된 문서 검색
compressed_docs = compression_retriever.invoke("베이글을 주문할 수 있는 쿠폰")

# 압축된 문서 출력
pretty_print_docs(compressed_docs)

# retriever 설정
retriever = compression_retriever

3. FlashRank reranker

In [None]:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import FlashrankRerank

def pretty_print_docs(docs):
    print(
        f"\n{'-' * 100}\n".join(
            [
                f"Document {i+1}:\n\n{d.page_content}\nMetadata: {d.metadata}"
                for i, d in enumerate(docs)
            ]
        )
    )

# LLM 초기화
llm = get_llm()

# 문서 압축기 초기화
compressor = FlashrankRerank(model="ms-marco-MultiBERT-L-12")

# 문맥 압축 검색기 초기화
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor, base_retriever=retriever
)

# 압축된 문서 검색
compressed_docs = compression_retriever.invoke(
    "치즈 베이글을 주문할 수 있는 쿠폰"
)

pretty_print_docs(compressed_docs)

# 문서 ID 출력
# print([doc.metadata["id"] for doc in compressed_docs])

# retriever 설정
retriever = compression_retriever

### 4. 검색 및 생성

RagResult 객체는 RAG의 답변을 저장하고 결과를 출력함

In [None]:
import base64
import io
from io import BytesIO
from PIL import Image
from IPython.display import HTML, display

class RagResult:
    def __init__(self, output):
        """
        객체를 초기화합니다.

        인자:
        output: LangChain의 output 객체
        """
        self.output = output

    def display_description(self, description):
        """
        쿠폰 설명을 표시합니다.

        인자:
        img_path (str): 이미지 경로
        """
        print(f'{description}')

    def display_image(self, img_path):
        """
        쿠폰 이미지를 표시합니다.

        인자:
        img_path (str): 쿠폰 이미지 경로
        """
        img_str = self.convert_to_base64(Image.open(img_path))
        resize_img_str = self.resize_base64_image(img_str)
        self.plt_img_base64(resize_img_str)

    def display(self):
        result = self.output
        self.display_description(result["answer"])

        if isinstance(result["image_path"], str) and os.path.isfile(result["image_path"]):
            self.display_image(result["image_path"])

        return self.output

    def answer(self):
        return self.output["answer"]

    def convert_to_base64(self, pil_image):
        """
        PIL 이미지를 Base64로 인코딩된 문자열로 변환합니다.

        :param pil_image: PIL 이미지
        :return: 크기 조정된 Base64 문자열
        """

        buffered = BytesIO()
        pil_image.save(buffered, format="JPEG")  # 필요한 경우 형식을 변경할 수 있습니다.
        img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
        return img_str

    def plt_img_base64(self, img_base64):
        """
        Base64로 인코딩된 문자열을 이미지로 표시합니다.

        :param img_base64:  Base64 문자열
        """
        # Base64 문자열을 소스로 사용하여 HTML img 태그 생성
        image_html = f'<img src="data:image/jpeg;base64,{img_base64}" />'
        # HTML을 렌더링하여 이미지 표시
        display(HTML(image_html))

    def encode_image(img_path):
        with open(img_path, "rb") as image_file:
            image_content = image_file.read()
            file_ext = os.path.splitext(img_path)[1].lower()
            if file_ext in [".jpg", ".jpeg"]:
                mime_type = "image/jpeg"
            elif file_ext == ".png":
                mime_type = "image/png"
            else:
                mime_type = "image/unknown"
            return f"data:{mime_type};base64,{base64.b64encode(image_content).decode('utf-8')}"

    def resize_base64_image(self, base64_string, ratio=(0.5)):
        """
        Base64 문자열로 인코딩된 이미지의 크기를 조정합니다.

        인자:
        base64_string (str): 원본 이미지의 Base64 문자열.
        ratio (float): 이미지 비율

        반환:
        str: 크기가 조정된 이미지의 Base64 문자열.
        """
        img_data = base64.b64decode(base64_string)
        img = Image.open(io.BytesIO(img_data))
        width, height = img.size
        newsize = (int(width * ratio), int(height * ratio))
        resized_img = img.resize(newsize, Image.LANCZOS)
        buffered = io.BytesIO()
        resized_img.save(buffered, format=img.format)
        return base64.b64encode(buffered.getvalue()).decode("utf-8")

    def plt_img_base64(self, img_base64):
        """
        Base64로 인코딩된 이미지를 표시합니다.

        인자:
        img_base64 (str): Base64로 인코딩된 이미지 문자열
        """
        image_html = f'<img src="data:image/jpeg;base64,{img_base64}" />'
        display(HTML(image_html))


LLM을 사용하여 Retrieval된 Context를 기반으로 질문에 대한 답변 생성

In [None]:
from langchain import hub
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.prompts import PromptTemplate
import json

class GenerationResult(BaseModel):
    answer: str = Field(description="Answer")
    image_path: str = Field(description="Only image path assigned to [image_path] within the selected paragraph")

generation_result_parser = JsonOutputParser(pydantic_object=GenerationResult)

# 프롬프트 생성
generation_prompt = PromptTemplate.from_template(
    """
        You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that 'NO'. Use three sentences maximum and keep the answer concise.
        답변은 한국어로 아래 FORMAT을 참고해서 json형태로만 출력해 주세요. 답변 json의 image_path 값은 선택된 문단에서 [image_path]에 할당된 값입니다. 정답을 모르면 'NO'라고 답변해 주세요.
        Format: {format_instructions}
        Question: {question}
        Context: {context}
        Answer:
    """
)

generation_prompt = generation_prompt.partial(format_instructions=generation_result_parser.get_format_instructions())

# llm 로딩
generation_llm = get_llm()

def format_docs(docs):
    # 검색한 문서 결과를 하나의 문단으로 합쳐줍니다.
    result = "\n\n"
    for doc in docs:
        result += f'[summary]:{doc.page_content}\n[image_path]:{doc.metadata["image_path"]}\n\n'
    # print('======== format_docs ========')
    # print(result)
    return result

def display_result(output):
    # print('======== display_result ========')
    # print(output)
    coupon_result = RagResult(output)
    result = coupon_result.display()
    return output

def answer(output):
    # print('======== answer ========')
    # print(output)
    coupon_result = RagResult(output)
    return coupon_result.answer()

# 체인 생성(Create Chain)
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | generation_prompt
    | generation_llm
    | JsonOutputParser()
    | display_result
)

rag_chain_with_llama = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | generation_prompt
    | get_llm('OLLAMA')
    | JsonOutputParser()
    | display_result
)

# 평가용 체인 생성 - llm : OpenAI
rag_evaluation_chain_with_openai = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | generation_prompt
    | generation_llm
    | JsonOutputParser()
    | answer
)

# 평가용 체인 생성 - llm : OLLAMA
rag_evaluation_chain_with_llama = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | generation_prompt
    | get_llm('OLLAMA')
    | JsonOutputParser()
    | answer
)

### 5. 쿼리로 이미지 검색하기

In [None]:
# query = "치킨 쿠폰을 알려줘"
query = "푸라닭 치킨 쿠폰을 알려줘"
# query = "GS에서 사용 가능 한 쿠폰을 알려줘"
# query = "아메리카노를 주문할 수 있는 쿠폰을 알려줘"
# query = "베이글 관련 쿠폰을 알려줘"
# query = "치킨을 주문할 수 있는 쿠폰 정보를 알려줘!"
# query = "아이폰 주문 관련 쿠폰을 알려줘"
# query = "유효기간이 2025년 03월 22일인 쿠폰을 찾아줘"
# query = "유효기간이 2025년 08월 06일인 쿠폰을 찾아줘"
# query = "2025년 08월 06일"

# 다음 단계 GroundChecker로 relevance 체크를 위해 output 설정
output = rag_chain.invoke(query)
# output = rag_chain_with_llama.invoke(query)

print(output)

### 6. GroundChecker로 답변-문서 간 관련성 여부 확인
- Upstage 의 UpstageGroundednessCheck()를 사용하여 검증
- .env 파일에 UPSTAGE_API_KEY가 등록되어 있어야함
- 비용 : $1 / 1M tokens
  - https://www.upstage.ai/pricing
- context, answer를 전달하면 결과로 grounded 또는 notGrounded 를 리턴함

In [None]:
from langchain_upstage import UpstageGroundednessCheck

# Upstage 문서 관련성 체크 기능을 설정합니다. https://upstage.ai
# context, answer를 전달하면 결과로 grounded 또는 notGrounded 를 리턴함
upstage_ground_checker = UpstageGroundednessCheck()

def groundness_format_docs(docs):
    return "\n".join(
        [
            f"<document><content>{doc.page_content}</content><image_path>{doc.metadata['image_path']}</image_path></document>"
            for doc in docs
        ]
    )

# notGrounded 예시 : 앞서 실행과 관련 없는 context를 추출함
# query = "치킨 쿠폰을 알려줘"
# query = "스타벅스 쿠폰을 알려줘"

# RAG 결과에 사용한 context
context = groundness_format_docs(retriever.invoke(query))
# print(context)

# RAG 결과인 answer
answer = output['answer']

# 업스테이지 문서 관련성 체크를 실행합니다.
ground_check_result = upstage_ground_checker.run(
    {
        "context": context,
        "answer": answer,
    }
)
print(ground_check_result)

### 7. 데이터셋 생성
#### AutoRAG

In [None]:
%%shell
apt-get remove python3-blinker
pip install blinker==1.8.2

In [None]:
%pip install -Uq ipykernel==5.5.6 ipywidgets-bokeh==1.0.2

In [None]:
%pip install AutoRAG==0.3.7

In [None]:
import nest_asyncio
nest_asyncio.apply()

document를 parquet형태로 변환

In [None]:
from autorag.data.legacy.corpus import langchain_documents_to_parquet

corpus_path = f'{COLAB_PROJECT_PATH}/corpus_data/corpus.parquet'

# FAISS로부터 documents를 추출합니다.
docs_dict = coupon_db.docstore.__dict__['_dict']

# list로 저장
docs = list(docs_dict.values())

# document를 parquet형태로 변환
corpus_df = langchain_documents_to_parquet(docs, corpus_path, upsert=True)
corpus_df.head(10)

contents로부터 QA를 생성하여 qa.parquet 형태로 저장합니다.

In [None]:
import pandas as pd
from autorag.data.legacy.qacreation import generate_qa_llama_index, make_single_content_qa
from llama_index.llms.openai import OpenAI

prompt = """다음은 쿠폰 정보입니다.
쿠폰 정보를 보고 할 만한 질문을 만드세요.
반드시 쿠폰정보와 관련한 질문이어야 합니다.
쿠폰을 사용할 수 있는 상품명과 사용처가 질문에 포함되어야 합니다. 쿠폰 정보, 쿠폰 코드, 유효기간을 물어보는 질문이어야 합니다.
만약 주어진 쿠폰정보 내용이 쿠폰과 관련되지 않았다면,
'쿠폰과 관련 없습니다.'라고 질문을 만드세요.
쿠폰정보:
{{text}}
생성할 질문 개수: {{num_questions}}
예시:
[Q]: 파리바게뜨에서 사용가능한 "파리바게뜨 B 초코반 딸기반 케이크" 쿠폰정보는 무엇인가요?
[A]: 파리바게뜨에서 사용할 수 있는 쿠폰은 "파리바게뜨 B 초코반 딸기반 케이크" 쿠폰입니다. 쿠폰코드는 92001857690229이며, 유효기간은 2025년 10월 08일까지입니다.
[Q]: 'BBQ 황을반~BBC양념반+콜라1.25 L' 쿠폰 번호는 무엇인가요?
[A]: BBQ에서 사용가능한 쿠폰은 'BBQ 황을반~BBC양념반+콜라1.25 L'이며 쿠폰의 코드 번호는 725044727165 입니다.
[Q]: '시그니처 생딸기 우유생크림케이크' 쿠폰의 유효기간 언제인가요?
[A]: 유효기간은 2025년 10월 08일입니다.
[Q]: 굽네치킨을 주문할 수 있는 쿠폰번호는 무엇인가요?
[A]: 쿠폰의 코드 번호는 725044727165 입니다.
[Q]: '스타벅스 카페 아메리카노 T 2잔 + 클라우드 치즈 케이크'를 주문할 수 있는 쿠폰번호는 무엇인가요?
[A]: 쿠폰의 코드 번호는 761603284683 입니다.
[Q]: GS 편의점에서 사용 가능한 쿠폰의 유효기간은 언제인가요?
[A]: 유효기간은 2025년 3월 25일입니다.
쿠폰 관련이 없는 내용일 경우 예시:
[Q]: 쿠폰과 관련 없습니다.
[A]: 쿠폰과 관련 없습니다.
결과:
"""
save_path=f'{PROJECT_ROOT_PATH}/corpus_data/qa.parquet'
qa_size = 20
corpus_df = pd.read_parquet(f'{PROJECT_ROOT_PATH}/corpus_data/corpus.parquet', engine='pyarrow')

llm = OpenAI(model='gpt-4o-mini', temperature=0.0)

qa_df = make_single_content_qa(
    corpus_df,
    content_size=qa_size,
    qa_creation_func=generate_qa_llama_index,
    llm=llm,
    prompt=prompt,
    question_num_per_content=1
    )
# delete if the output question is '쿠폰과 관련 없습니다'
qa_df = qa_df.loc[~qa_df['query'].str.contains('쿠폰과 관련 없습니다')]
qa_df.reset_index(drop=True, inplace=True)
qa_df.to_parquet(save_path)

저장된 qa.parquet를 qa_data로 읽어들입니다.

In [None]:
import pandas as pd

qa_parquet_path=f'{PROJECT_ROOT_PATH}/corpus_data/qa.parquet'
qa_data = pd.read_parquet(qa_parquet_path, engine='pyarrow')

Human-in-the-loop시 편의성을 위해 parquet형태를 excel 형태로 저장합니다.

In [None]:
save_excel_path=f'{PROJECT_ROOT_PATH}/corpus_data/qa.xlsx'
qa_data.to_excel(save_excel_path, index=False)

평가를 위해 qa데이터를 변환합니다.
- query => question, generation_gt => ground_truth
- ground_truth : ndarray를 str으로 변환

In [None]:
qa_data = qa_data[['query', 'generation_gt']]
qa_data.columns = ['question', 'ground_truth']
qa_data['ground_truth'] = qa_data['ground_truth'].apply(lambda x: x[0] )
qa_data.head()

RAG 실행 결과로 answer, contexts를 설정합니다.

In [None]:
qa_data['answer'] = qa_data['question'].apply(lambda x: rag_evaluation_chain_with_openai.invoke(x) )
qa_data['contexts'] = qa_data['question'].apply(lambda x: [d.page_content for d in retriever.get_relevant_documents(x)] )
qa_data.head()

qa_result를 parquet형태로 저장합니다.

In [None]:
save_qa_result_path=f'{PROJECT_ROOT_PATH}/corpus_data/qa_result.parquet'
qa_data.to_parquet(save_qa_result_path)

Human-in-the-loop시 편의성을 위해 parquet형태를 excel 형태로 저장합니다.

In [None]:
save_qa_result_excel_path=f'{PROJECT_ROOT_PATH}/corpus_data/qa_result.xlsx'
qa_data.to_excel(save_qa_result_excel_path, index=False)

저장된 qa_result.parquet를 qa_data로 읽어들입니다.

In [None]:
import pandas as pd

qa_result_parquet_path=f'{PROJECT_ROOT_PATH}/corpus_data/qa_result.parquet'
qa_data = pd.read_parquet(qa_result_parquet_path, engine='pyarrow')

question, ground_truth, answer, contexts를 확인합니다.

In [None]:
from datasets import Dataset
dataset = Dataset.from_pandas(qa_data)
dataset[0]

### 8. 평가
#### 1) RAGAS 활용

In [None]:
from ragas import evaluate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_recall,
    context_precision,
)

# 데이터셋 생성기
langchain_llm = get_llm()

# 문서 임베딩
langchain_embeddings = get_embedding()

result = evaluate(
    dataset,
    metrics = [
        faithfulness,
        answer_relevancy,
        context_recall,
        context_precision,
    ],
    llm=langchain_llm,
    embeddings=langchain_embeddings,
    raise_exceptions=False,
)

result

#### 2) LangSmith 활용

LangSmith에서 사용하는 DATASET_NAME 설정

In [None]:
DATASET_NAME = "RAG_EVAL_DATASET_1118_OPENAI_PINECONE"

QA 결과 로딩

In [None]:
import pandas as pd

qa_parquet_path=f'{PROJECT_ROOT_PATH}/corpus_data/qa.parquet'
qa_data = pd.read_parquet(qa_parquet_path, engine='pyarrow')
qa_data = qa_data[['query', 'generation_gt']]
qa_data.columns = ['question', 'answer']
qa_data['answer'] = qa_data['answer'].apply(lambda x: x[0] )

평가를 위한 함수 생성

In [None]:
# 질문에 대한 답변하는 함수를 생성
def ask_question(inputs: dict):
    return {"answer": rag_evaluation_chain_with_openai.invoke(inputs["question"])}

# # 사용자 질문 예시
# llm_answer = ask_question(
#     {"question": "푸라닭 치킨 쿠폰을 알려줘"}
# )
# llm_answer

# evaluator prompt 출력을 위한 함수
def print_evaluator_prompt(evaluator):
    return evaluator.evaluator.prompt.pretty_print()

# Context 를 반환하는 RAG 결과 반환 함수
def context_answer_rag_answer(inputs: dict):
    context = retriever.invoke(inputs["question"])
    return {
        "contexts": "\n".join([doc.page_content for doc in context]),
        "answer": rag_evaluation_chain_with_openai.invoke(inputs["question"]),
        "query": inputs["question"],
    }

# 함수 실행
context_answer_rag_answer(
    {"question": "푸라닭 치킨 쿠폰을 알려줘"}
)

LangSmith dataset에 qa data 추가(20개).  
실험 초기에 1회만 실행합니다.(반복 실행하면 데이터가 추가되어서 실험 데이터가 늘어납니다.)

In [None]:
from langsmith import Client

client = Client()

# 데이터셋 생성 함수
def create_dataset(client, dataset_name, description=None):
    for dataset in client.list_datasets():
        if dataset.name == dataset_name:
            return dataset

    dataset = client.create_dataset(
        dataset_name=dataset_name,
        description=description,
    )
    return dataset


# 데이터셋 생성
dataset = create_dataset(client, DATASET_NAME)

# 생성된 데이터셋에 예제 추가
client.create_examples(
    inputs=[{"question": q} for q in qa_data["question"].tolist()],
    outputs=[{"ground_truth": a} for a in qa_data["answer"].tolist()],
    dataset_id=dataset.id,
)

##### 1) QA 평가

In [None]:
from langsmith.evaluation import evaluate, LangChainStringEvaluator

# qa 평가자 생성
qa_evalulator = LangChainStringEvaluator("qa")

# 프롬프트 출력
# print_evaluator_prompt(qa_evalulator)

# 평가 실행
experiment_results = evaluate(
    ask_question,
    data=DATASET_NAME,
    evaluators=[qa_evalulator],
    experiment_prefix="RAG_EVAL",
    # 실험 메타데이터 지정
    metadata={
        "variant": "QA Evaluator 를 활용한 평가",
    },
)

##### 2) COT_QA / Context_QA 평가

In [None]:
from langsmith.evaluation import evaluate, LangChainStringEvaluator

# cot_qa 평가자 생성
cot_qa_evaluator = LangChainStringEvaluator(
    "cot_qa",
    prepare_data=lambda run, example: {
        "prediction": run.outputs["answer"],  # LLM 이 생성한 답변
        "reference": run.outputs["contexts"],  # Context
        "input": example.inputs["question"],  # 데이터셋의 질문
    },
)

# context_qa 평가자 생성
context_qa_evaluator = LangChainStringEvaluator(
    "context_qa",
    prepare_data=lambda run, example: {
        "prediction": run.outputs["answer"],  # LLM 이 생성한 답변
        "reference": run.outputs["contexts"],  # Context
        "input": example.inputs["question"],  # 데이터셋의 질문
    },
)

# 프롬프트 출력
# print_evaluator_prompt(cot_qa_evaluator)

# 평가 실행
evaluate(
    context_answer_rag_answer,
    data=DATASET_NAME,
    evaluators=[cot_qa_evaluator, context_qa_evaluator],
    experiment_prefix="RAG_EVAL",
    metadata={
        "variant": "QA & COT_QA & Context_QA Evaluator 를 활용한 평가",
    },
)

##### 3) labeled_criteria / relevance 평가


In [None]:
from langsmith.evaluation import LangChainStringEvaluator
from langsmith.evaluation import evaluate

# labeled_criteria 평가자 생성
labeled_criteria_evaluator = LangChainStringEvaluator(
    "labeled_criteria",
    config={
        "criteria": {
            "helpfulness": (
                "Is this submission helpful to the user,"
                " taking into account the correct reference answer?"
            )
        },
        "llm": get_llm(),
    },
    prepare_data=lambda run, example: {
        "prediction": run.outputs["answer"],
        "reference": example.outputs["ground_truth"],  # 정답 답변
        "input": example.inputs["question"],
    },
)

# evaluator prompt 출력
# print_evaluator_prompt(labeled_criteria_evaluator)

# relevance 평가자 생성
relevance_evaluator = LangChainStringEvaluator(
    "labeled_criteria",
    config={
        "criteria": "relevance",
        "llm": get_llm(),
    },
    prepare_data=lambda run, example: {
        "prediction": run.outputs["answer"],
        "reference": run.outputs["contexts"],  # Context 를 전달
        "input": example.inputs["question"],
    },
)

# evaluator prompt 출력
# print_evaluator_prompt(relevance_evaluator)

# 평가 실행
experiment_results = evaluate(
    context_answer_rag_answer,
    data=DATASET_NAME,
    evaluators=[labeled_criteria_evaluator, relevance_evaluator],
    experiment_prefix="LABELED-EVAL",
    # 실험 메타데이터 지정
    metadata={
        "variant": "labeled_criteria evaluator 활용한 평가",
    },
)

 ##### 4) Embedding 거리 기반 평가


In [None]:
from langsmith.evaluation import LangChainStringEvaluator
from langsmith.evaluation import evaluate
# from langchain_huggingface.embeddings import HuggingFaceEmbeddings
from langchain_upstage import UpstageEmbeddings
from langchain_openai import OpenAIEmbeddings
import os

# 토크나이저 병렬화 설정(HuggingFace 모델 사용)
os.environ["TOKENIZERS_PARALLELISM"] = "true"

# upstage_embedding_evaluator = LangChainStringEvaluator(
#     "embedding_distance",
#     config={
#         # OpenAIEmbeddings 가 기본 값으로 설정되어 있으나, 변경이 가능
#         "embeddings": UpstageEmbeddings(model="solar-embedding-1-large-query"),
#         "distance_metric": "euclidean",  # "cosine", "euclidean", "chebyshev", "hamming", and "manhattan"
#     },
# )

openai_embedding_evaluator = LangChainStringEvaluator(
    "embedding_distance",
    config={
        # OpenAIEmbeddings 가 기본 값으로 설정되어 있으나, 변경이 가능
        "embeddings": OpenAIEmbeddings(model="text-embedding-3-small"),
        "distance_metric": "euclidean",  # "cosine", "euclidean", "chebyshev", "hamming", and "manhattan"
    },
)

# 평가 실행
experiment_results = evaluate(
    ask_question,
    data=DATASET_NAME,
    evaluators=[openai_embedding_evaluator],
    experiment_prefix="EMBEDDING-EVAL",
    # 실험 메타데이터 지정
    metadata={
        "variant": "embedding_distance 활용한 평가",
    },
)

##### 5) Heuristic 평가
평가자 함수 생성 / 실행


In [None]:
from langsmith.schemas import Run, Example
from langsmith.evaluation import evaluate
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu
from nltk.translate import meteor_score
from sentence_transformers import SentenceTransformer, util
from langchain_teddynote.community.kiwi_tokenizer import KiwiTokenizer
import os
import warnings
from nltk.corpus import wordnet as wn
import nltk

nltk.download('wordnet')
wn.ensure_loaded()

wn.ensure_loaded()

# 토크나이저 병렬화 설정(HuggingFace 모델 사용)
os.environ["TOKENIZERS_PARALLELISM"] = "true"
# 토크나이저 선언
kiwi_tokenizer = KiwiTokenizer()

def rouge_evaluator(metric: str = "rouge1") -> dict:
    # wrapper function 정의
    def _rouge_evaluator(run: Run, example: Example) -> dict:
        # 출력값과 정답 가져오기
        student_answer = run.outputs.get("answer", "")
        reference_answer = example.outputs.get("ground_truth", "")

        # ROUGE 점수 계산
        scorer = rouge_scorer.RougeScorer(
            ["rouge1", "rouge2", "rougeL"], use_stemmer=True, tokenizer=KiwiTokenizer()
        )
        scores = scorer.score(reference_answer, student_answer)

        # ROUGE 점수 반환
        rouge = scores[metric].fmeasure

        return {"key": "ROUGE", "score": rouge}

    return _rouge_evaluator


def bleu_evaluator(run: Run, example: Example) -> dict:
    # 출력값과 정답 가져오기
    student_answer = run.outputs.get("answer", "")
    reference_answer = example.outputs.get("ground_truth", "")

    # 토큰화
    reference_tokens = kiwi_tokenizer.tokenize(reference_answer, type="sentence")
    student_tokens = kiwi_tokenizer.tokenize(student_answer, type="sentence")

    # BLEU 점수 계산
    bleu_score = sentence_bleu([reference_tokens], student_tokens)

    return {"key": "BLEU", "score": bleu_score}


def meteor_evaluator(run: Run, example: Example) -> dict:
    # 출력값과 정답 가져오기
    student_answer = run.outputs.get("answer", "")
    reference_answer = example.outputs.get("ground_truth", "")

    # 토큰화
    reference_tokens = kiwi_tokenizer.tokenize(reference_answer, type="list")
    student_tokens = kiwi_tokenizer.tokenize(student_answer, type="list")

    # METEOR 점수 계산
    meteor = meteor_score.meteor_score([reference_tokens], student_tokens)

    return {"key": "METEOR", "score": meteor}


def semscore_evaluator(run: Run, example: Example) -> dict:
    # 출력값과 정답 가져오기
    student_answer = run.outputs.get("answer", "")
    reference_answer = example.outputs.get("ground_truth", "")

    # SentenceTransformer 모델 로드
    model = SentenceTransformer("all-mpnet-base-v2")

    # 문장 임베딩 생성
    student_embedding = model.encode(student_answer, convert_to_tensor=True)
    reference_embedding = model.encode(reference_answer, convert_to_tensor=True)

    # 코사인 유사도 계산
    cosine_similarity = util.pytorch_cos_sim(
        student_embedding, reference_embedding
    ).item()

    return {"key": "sem_score", "score": cosine_similarity}

# 평가자 정의
heuristic_evalulators = [
    rouge_evaluator(metric="rougeL"),
    bleu_evaluator,
    meteor_evaluator,
    semscore_evaluator,
]

# 실험 실행
experiment_results = evaluate(
    ask_question,
    data=DATASET_NAME,
    evaluators=heuristic_evalulators,
    experiment_prefix="Heuristic-EVAL",
    # 실험 메타데이터 지정
    metadata={
        "variant": "Heuristic-EVAL (Rouge, BLEU, METEOR, SemScore) 을 사용하여 평가",
    },
)


##### 6) Groundedness 평가(Upstage Upstage Groundness Checker API 사용)

In [None]:
from langsmith.evaluation import evaluate
from langsmith.schemas import Run, Example
from langsmith.evaluation import evaluate
from langchain_upstage import UpstageGroundednessCheck

# Upstage 문서 관련성 체크 기능을 설정합니다. https://upstage.ai
# context, answer를 전달하면 결과로 grounded 또는 notGrounded 를 리턴함
upstage_groundedness_check = UpstageGroundednessCheck()

def upstage_groundness_check_evaluator(run: Run, example: Example) -> dict:
    # LLM 생성 답변, 정답 답변 가져오기
    answer = run.outputs.get("answer", "")
    context = run.outputs.get("context", "")

    # Groundness 체크
    groundedness_score = upstage_groundedness_check.invoke(
        {"answer": answer, "context": context}
    )
    groundedness_score = groundedness_score == "grounded"

    return {"key": "groundness_score", "score": int(groundedness_score)}

# 실행
experiment_results = evaluate(
    context_answer_rag_answer,
    data=DATASET_NAME,
    evaluators=[upstage_groundness_check_evaluator],
    experiment_prefix="GROUNDEDNESS-EVAL",
    # 실험 메타데이터 지정
    metadata={
        "variant": "Upstage Groundness Checker 를 활용한 Hallucination 평가",
    },
)