# 6. Advanced RAG


In [4]:
!pip install numpy==1.26.4



In [5]:
# 【注意】
# 上記の `!pip install numpy==1.26.4` を実行したあと、
# Google Colab 上部のメニューから「ランタイム」の「セッションを再起動する」を実行してください。
# その後このセルを実行して `1.26.4` と表示されることを確認してください。

import numpy as np

print(np.__version__)
assert np.__version__ == "1.26.4"

1.26.4


In [4]:
import os
from google.colab import userdata

os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = userdata.get("LANGCHAIN_API_KEY")
os.environ["LANGCHAIN_PROJECT"] = "agent-book"
os.environ["TAVILY_API_KEY"] = userdata.get("TAVILY_API_KEY")

## 6.2. ハンズオンの準備


In [1]:
!pip install langchain-core==0.3.0 langchain-openai==0.2.0 \
    langchain-community==0.3.0 GitPython==3.1.43 \
    langchain-chroma==0.1.4 tavily-python==0.5.0 pydantic==2.10.6



In [2]:
from langchain_community.document_loaders import GitLoader


def file_filter(file_path: str) -> bool:
    return file_path.endswith(".mdx")


loader = GitLoader(
    clone_url="https://github.com/langchain-ai/langchain",
    repo_path="./langchain",
    branch="master",
    file_filter=file_filter,
)

documents = loader.load()
print(len(documents))

442


In [12]:
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
import os

embeddings = OpenAIEmbeddings(model="text-embedding-3-small", openai_api_key=os.environ["OPENAI_API_KEY"])
BATCH_SIZE = 100
db = None # データベースオブジェクトを初期化

print(f"Total documents: {len(documents)}")

# 動作しなかったので修正
# ドキュメントをバッチに分割して処理
for i in range(0, len(documents), BATCH_SIZE):
    batch = documents[i:i + BATCH_SIZE]
    print(f"Processing batch {i//BATCH_SIZE + 1}/{len(documents)//BATCH_SIZE + 1} (Docs: {i} to {i + len(batch) - 1})")

    if db is None:
        # 最初のバッチでChroma DBを初期化
        # persist_directory を指定すると永続化されます。
        db = Chroma.from_documents(batch, embeddings)
    else:
        # 2番目以降のバッチは既存のDBに追加
        db.add_documents(batch)

ERROR:chromadb.telemetry.product.posthog:Failed to send telemetry event ClientStartEvent: capture() takes 1 positional argument but 3 were given
ERROR:chromadb.telemetry.product.posthog:Failed to send telemetry event ClientCreateCollectionEvent: capture() takes 1 positional argument but 3 were given


Total documents: 442
Processing batch 1/5 (Docs: 0 to 99)
Processing batch 2/5 (Docs: 100 to 199)
Processing batch 3/5 (Docs: 200 to 299)
Processing batch 4/5 (Docs: 300 to 399)
Processing batch 5/5 (Docs: 400 to 441)


In [14]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(documents)
print(len(splits))

1914


In [15]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_template('''\
以下の文脈だけを踏まえて質問に回答してください。

文脈: """
{context}
"""

質問: {question}
''')

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

retriever = db.as_retriever()

chain = {
    "question": RunnablePassthrough(),
    "context": retriever,
} | prompt | model | StrOutputParser()

chain.invoke("LangChainの概要を教えて")

'LangChainは、統合パッケージの実装を支援するためのフレームワークであり、特にストリーミング機能に優れています。LCELチェーンを使用することで、実行中にインクリメンタルな出力を得ることができ、最初のトークンが出力されるまでの時間を最小限に抑える最適化が行われます。これにより、チャットモデルやテキストLLMからの出力が迅速に得られるようになります。'

## 6.3. 検索クエリの工夫


### HyDE（Hypothetical Document Embeddings）


In [16]:
hypothetical_prompt = ChatPromptTemplate.from_template("""\
次の質問に回答する一文を書いてください。

質問: {question}
""")

hypothetical_chain = hypothetical_prompt | model | StrOutputParser()

In [17]:
hyde_rag_chain = {
    "question": RunnablePassthrough(),
    "context": hypothetical_chain | retriever,
} | prompt | model | StrOutputParser()

hyde_rag_chain.invoke("LangChainの概要を教えて")

'LangChainは、LLM（大規模言語モデル）を使用して、堅牢で状態を持つマルチアクターアプリケーションを構築するためのフレームワークです。主な機能には以下が含まれます：\n\n- **langgraph**: ステップをグラフのエッジとノードとしてモデル化することで、マルチアクターアプリケーションを構築します。\n- **langserve**: LangChainのチェーンをREST APIとしてデプロイします。\n\nまた、LangChainは異なるプロバイダーのチャットモデルと一貫したインターフェースを提供し、アプリケーションの監視、デバッグ、パフォーマンス最適化のための追加機能も提供します。最新のチャットモデルは、ツール呼び出し、構造化出力、マルチモーダリティといった追加機能を備えています。これにより、LLMが外部サービスやAPI、データベースとインタラクションするリッチなアプリケーションを構築することが可能です。'

### 複数の検索クエリの生成


In [18]:
from pydantic import BaseModel, Field


class QueryGenerationOutput(BaseModel):
    queries: list[str] = Field(..., description="検索クエリのリスト")


query_generation_prompt = ChatPromptTemplate.from_template("""\
質問に対してベクターデータベースから関連文書を検索するために、
3つの異なる検索クエリを生成してください。
距離ベースの類似性検索の限界を克服するために、
ユーザーの質問に対して複数の視点を提供することが目標です。

質問: {question}
""")

query_generation_chain = (
    query_generation_prompt
    | model.with_structured_output(QueryGenerationOutput)
    | (lambda x: x.queries)
)

In [19]:
multi_query_rag_chain = {
    "question": RunnablePassthrough(),
    "context": query_generation_chain | retriever.map(),
} | prompt | model | StrOutputParser()

multi_query_rag_chain.invoke("LangChainの概要を教えて")

'LangChainは、開発者が推論を行うアプリケーションを簡単に構築できるようにすることを目的としたPythonパッケージおよび企業です。元々は単一のオープンソースパッケージとして始まりましたが、現在は企業とエコシステム全体に進化しています。LangChainのエコシステム内の多くのコンポーネントは独立して使用できるため、特定のコンポーネントに特に魅力を感じる場合は、それを選んで使用することができます。\n\nLangChainの主な特徴には、以下のようなものがあります：\n\n- **`langchain-core`**: チャットモデルやその他のコンポーネントのための基本的な抽象化。\n- **統合パッケージ**: 重要な統合が軽量パッケージに分割され、LangChainチームと統合開発者によって共同管理されています。\n- **`langchain`**: アプリケーションの認知アーキテクチャを構成するチェーン、エージェント、検索戦略。\n- **`langchain-community`**: コミュニティによって維持されるサードパーティの統合。\n- **`langgraph`**: LangChainコンポーネントを組み合わせて、持続性やストリーミングなどの重要な機能を持つ本番用アプリケーションにするためのオーケストレーションフレームワーク。\n\nLangChainは、開発者が自分のユースケースに最適なコンポーネントを選択して使用できる柔軟性を提供しています。'

## 6.4. 検索後の工夫


### RAG Fusion


In [20]:
from langchain_core.documents import Document


def reciprocal_rank_fusion(
    retriever_outputs: list[list[Document]],
    k: int = 60,
) -> list[str]:
    # 各ドキュメントのコンテンツ (文字列) とそのスコアの対応を保持する辞書を準備
    content_score_mapping = {}

    # 検索クエリごとにループ
    for docs in retriever_outputs:
        # 検索結果のドキュメントごとにループ
        for rank, doc in enumerate(docs):
            content = doc.page_content

            # 初めて登場したコンテンツの場合はスコアを0で初期化
            if content not in content_score_mapping:
                content_score_mapping[content] = 0

            # (1 / (順位 + k)) のスコアを加算
            content_score_mapping[content] += 1 / (rank + k)

    # スコアの大きい順にソート
    ranked = sorted(content_score_mapping.items(), key=lambda x: x[1], reverse=True)  # noqa
    return [content for content, _ in ranked]

In [21]:
rag_fusion_chain = {
    "question": RunnablePassthrough(),
    "context": query_generation_chain | retriever.map() | reciprocal_rank_fusion,
} | prompt | model | StrOutputParser()

rag_fusion_chain.invoke("LangChainの概要を教えて")

'LangChainは、開発者が推論を行うアプリケーションを簡単に構築できるようにすることを目的としたPythonパッケージおよび企業です。元々は単一のオープンソースパッケージとして始まりましたが、現在は企業とエコシステム全体に進化しています。LangChainのエコシステム内の多くのコンポーネントは独立して使用できるため、特定のコンポーネントに特に魅力を感じる場合は、それを選んで使用することができます。\n\n主な特徴としては、複雑なチェーンの各ステップを自動的にログに記録する「Seamless LangSmith tracing」、すべてのチェーンが同じ方法で使用できる「Standard API」、および生産環境でのデプロイが可能な「LangServe」があります。また、LangChainは複数のオープンソースライブラリで構成されており、基盤となる抽象化や重要な統合が軽量パッケージとして分割されています。さらに、LangGraphを使用してLangChainコンポーネントを組み合わせ、フル機能のアプリケーションを構築することができます。'

### Cohere のリランクモデルを使用する準備


In [22]:
os.environ["COHERE_API_KEY"] = userdata.get("COHERE_API_KEY")

SecretNotFoundError: Secret COHERE_API_KEY does not exist.

In [None]:
!pip install langchain-cohere==0.3.0

### Cohere のリランクモデルの導入


In [23]:
from typing import Any

from langchain_cohere import CohereRerank
from langchain_core.documents import Document


def rerank(inp: dict[str, Any], top_n: int = 3) -> list[Document]:
    question = inp["question"]
    documents = inp["documents"]

    cohere_reranker = CohereRerank(model="rerank-multilingual-v3.0", top_n=top_n)
    return cohere_reranker.compress_documents(documents=documents, query=question)


rerank_rag_chain = (
    {
        "question": RunnablePassthrough(),
        "documents": retriever,
    }
    | RunnablePassthrough.assign(context=rerank)
    | prompt | model | StrOutputParser()
)

rerank_rag_chain.invoke("LangChainの概要を教えて")

ModuleNotFoundError: No module named 'langchain_cohere'

## 6.5. 複数の Retriever を使う工夫


### LLM によるルーティング


In [None]:
from langchain_community.retrievers import TavilySearchAPIRetriever

langchain_document_retriever = retriever.with_config(
    {"run_name": "langchain_document_retriever"}
)

web_retriever = TavilySearchAPIRetriever(k=3).with_config(
    {"run_name": "web_retriever"}
)

In [None]:
from enum import Enum


class Route(str, Enum):
    langchain_document = "langchain_document"
    web = "web"


class RouteOutput(BaseModel):
    route: Route


route_prompt = ChatPromptTemplate.from_template("""\
質問に回答するために適切なRetrieverを選択してください。

質問: {question}
""")

route_chain = (
    route_prompt
    | model.with_structured_output(RouteOutput)
    | (lambda x: x.route)
)

In [None]:
def routed_retriever(inp: dict[str, Any]) -> list[Document]:
    question = inp["question"]
    route = inp["route"]

    if route == Route.langchain_document:
        return langchain_document_retriever.invoke(question)
    elif route == Route.web:
        return web_retriever.invoke(question)

    raise ValueError(f"Unknown route: {route}")


route_rag_chain = (
    {
        "question": RunnablePassthrough(),
        "route": route_chain,
    }
    | RunnablePassthrough.assign(context=routed_retriever)
    | prompt | model | StrOutputParser()
)

In [None]:
route_rag_chain.invoke("LangChainの概要を教えて")

In [None]:
route_rag_chain.invoke("東京の今日の天気は？")

### ハイブリッド検索の実装


In [None]:
!pip install rank-bm25==0.2.2

In [None]:
from langchain_community.retrievers import BM25Retriever

chroma_retriever = retriever.with_config(
    {"run_name": "chroma_retriever"}
)

bm25_retriever = BM25Retriever.from_documents(documents).with_config(
    {"run_name": "bm25_retriever"}
)

In [None]:
from langchain_core.runnables import RunnableParallel

hybrid_retriever = (
    RunnableParallel({
        "chroma_documents": chroma_retriever,
        "bm25_documents": bm25_retriever,
    })
    | (lambda x: [x["chroma_documents"], x["bm25_documents"]])
    | reciprocal_rank_fusion
)

In [None]:
hybrid_rag_chain = (
    {
        "question": RunnablePassthrough(),
        "context": hybrid_retriever,
    }
    | prompt | model | StrOutputParser()
)

hybrid_rag_chain.invoke("LangChainの概要を教えて")