# Ragas と Langfuse を使用した Retrieval-Augmented Generation (RAG) パイプラインの評価

このノートブックでは、[RAGAS](https://docs.ragas.io/en/v0.1.21/index.html) などのオープンソースツールを使用して Retrieval-Augmented Generation (RAG) パイプラインの品質を評価する方法を探求し、[Langfuse](https://langfuse.com/) の機能を活用して、トレースとスパンで RAG パイプラインを管理およびトレースします。Amazon Bedrock ナレッジベースと RAG バッチ生成結果を作成して、オフライン評価とスコアリングを示します。

> ℹ️ 注意：このノートブックでは、一部のステップでユーザー設定が必要です。
>
> セルでユーザー設定が必要な場合、👉 絵文字付きのこのコールアウトのようなメッセージが表示されます。
>
> 👉 絵文字付きの指示に注意を払い、コードセルを実行する前に AWS コンソールまたは対応するセルで設定を実行してください。

## 前提条件

> カーネルを選択していない場合は、右上隅にある「Select Kernel」ボタンをクリックし、Python Environmentsを選択して「.venv (Python 3.9.20) .venv/bin/python Recommended」を選択してください。

> 各ノートブックセルを実行するには、Shift + Enterを押してください。

> ℹ️ AWS が提供する一時アカウントを使用してインストラクター主導のワークショップに参加している場合は、これらの前提条件ステップを**スキップ**できます

### Amazon OpenSearch の追加権限

このノートブックで手動の Amazon Bedrock Knowledge Bases セットアップ手順を完了するには、**AWS Console ユーザー/ロール**に以下が必要です：

- [Amazon OpenSearch ベクターコレクションを操作する権限](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-vector-search.html)
- **IAM ロールの作成**とポリシーの添付を含む権限：`iam:AttachRolePolicy`、`iam:CreateRole`、`iam:DetachRolePolicy`、`iam:GetRole`、`iam:PassRole`、`iam:CreatePolicy`、`iam:CreatePolicyVersion`、および `iam:DeletePolicyVersion`。

> ℹ️ **注意：** テストでは、上記のリンクされた `aoss` ポリシーステートメントのみを使用して Amazon Bedrock KB を作成しようとしたときに `NetworkError` の問題が発生しました。これは代わりに `*` に対して `aoss:*` を付与することで解決されましたが、本番環境で使用する前にこれらの権限を減らすことを検討する必要があります。

[AWS Console for Identity and Access Management (IAM)](https://console.aws.amazon.com/iam/home?#/home) を参照して、ユーザーまたはロールに権限を付与してください。

### 依存関係と環境変数

In [None]:
# AWS ワークショップ環境を使用していない場合は、以下の行のコメントを外して依存関係をインストールしてください
# %pip install langfuse datasets ragas python-dotenv langchain-aws boto3 --upgrade

.env ファイルで Langfuse プロジェクトと API キーをセットアップし、セルフホストまたはクラウドの Langfuse 環境に接続するための前提条件が完了していることを確認してください。

In [None]:
# すでに VS Code サーバーの .env で環境変数を定義している場合は、以下のセルはスキップしてください。
# langfuse 用の環境変数を定義してください。
# これらの値は Langfuse で API キーを作成する際に確認することができます。
# import os
# os.environ["LANGFUSE_SECRET_KEY"] = "xxxx" # Langfuse プロジェクトのシークレットキー
# os.environ["LANGFUSE_PUBLIC_KEY"] = "xxxx" # Langfuse プロジェクトのパブリックキー
# os.environ["LANGFUSE_HOST"] = "xxx" # Langfuse ドメイン

詳細については [Langfuse ドキュメント](https://langfuse.com/docs/get-started) を確認してください。

## 初期化と認証チェック
以下のセルを実行して、共通ライブラリとクライアントを初期化してください。

In [None]:
import json
import os
from typing import Any

# 外部の依存関係:
import pandas as pd  # テーブルデータの操作用
from dotenv import load_dotenv


load_dotenv("../.env")

Amazon Bedrock クライアントを初期化し、アカウントで利用可能なモデルを確認します。

In [None]:
import boto3  # AWS Python SDK 全般 (Amazon Bedrock を含む)


# Amazon Bedrock の設定へのアクセスに使用
bedrock = boto3.client(service_name="bedrock", region_name="us-west-2")

bedrock_agent_runtime = boto3.client(service_name="bedrock-agent-runtime", region_name="us-west-2")

# アカウントで利用可能なモデルを確認
models = bedrock.list_inference_profiles()
for model in models["inferenceProfileSummaries"]:
    print(model["inferenceProfileName"] + " - " + model["inferenceProfileId"])

Langfuse クライアントを初期化し、認証情報が有効であることを確認します。

In [None]:
from langfuse import Langfuse


# langfuse クライアント
langfuse = Langfuse()
if langfuse.auth_check():
    print("Langfuse は正しく設定されています")
    print(f"Langfuse インスタンスへはこちらからアクセスできます: {os.environ['LANGFUSE_HOST']}")
else:
    print("認証情報が見つからないか問題があります。.env ファイル内の Langfuse API キーとホストを確認してください。")

## ナレッジベースのセットアップ
次に、ユーザークエリに対して retrieval-augmented generation (RAG) を実行できるように、ドキュメントを Amazon S3 にアップロードし、ベクターストア（ナレッジベース）を作成します。以下のステップでは、以下を設定します：

- ドキュメントコーパスを保存するための Amazon S3 `bucket_name`
- アーティファクトが保存されるバケット内のフォルダプレフィッス。

In [None]:
from botocore.exceptions import ClientError


botosess = boto3.Session(region_name="us-west-2")
region = botosess.region_name
account_id = boto3.client("sts").get_caller_identity()["Account"]
bucket_name = f"eval-{account_id}-{region}"
s3_prefix = "bedrock-rag-eval"

# S3 バケットが存在するかどうかを確認し、存在しない場合はバケットを作成
s3 = boto3.client("s3")
try:
    s3.head_bucket(Bucket=bucket_name)
    print(f"Bucket {bucket_name} exists")
except ClientError:
    print(f"Creating bucket {bucket_name}")
    s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={"LocationConstraint": region})

### Amazon S3 にドキュメントをアップロードする

まず、サンプルドキュメントを Amazon S3 にアップロードする必要があります。以下のコードセルを実行するだけで完了します。

In [None]:
corpus_s3uri = f"s3://{bucket_name}/{s3_prefix}/corpus"
print(f"Syncing corpus to:\n{corpus_s3uri}/")

# フォルダを S3 バケットに再帰的に同期するために AWS CLI を使用
!aws s3 sync --quiet ./datasets/corpus {corpus_s3uri}/

### AWS コンソールでナレッジベースを作成する
> 👉 このセクションには、コードセルを実行するだけでなく、手動で実行する必要があるステップが含まれています！

テスト用の実際の Bedrock ナレッジベースを設定する最も簡単な方法は、**AWS コンソールを通じて手動で行う**ことです：

1. まず、[Amazon Bedrock の AWS コンソール](https://console.aws.amazon.com/bedrock/home?#/knowledge-bases)を**開き**、左側のサイドバーメニューから *Orchestration > Knowledge bases* を選択します。以下に示すスクリーンショットを参照してください：

    > ℹ️ UI の右上隅にある *AWS Region* が正しい (us-west-2 である) ことを**確認**してください

![KB Console](images/bedrock-kbs/01-bedrock-kb-console.png "Amazon Bedrock ナレッジベースの AWS コンソールのスクリーンショット、「Create knowledge base」アクションボタンを表示")

2. **Create knowledge base** ボタンをクリックし、**Knowledge Base with vector store** を選択します。開く画面で：

- **knowledge base name** には `example-squad-kb` と入力します
- **knowledge base description** には、`Demo knowledge base for question answering evaluation` のようなものを入力できます
- その他の設定はデフォルトのままにします（新しい実行ロールの作成を許可し、タグなし）
- データソースとして Amazon S3 を選択してください（デフォルト）

設定は以下のスクリーンショットのようになるはずです：

![KB Basics](images/bedrock-kbs/02a-create-kb-basics.png "Bedrock ナレッジベース作成ワークフローのステップ 1 のスクリーンショット：KB 名、説明、（新規作成）実行ロール、（空の）タグが設定されています。フォームの最後に「Next」ボタンが表示されています。")

3. **Next** 画面で、S3 データソースを設定します。

    データソースを S3 のままにして、前のステップで作成したバケットとプレフィックスを選択し、Amazon Bedrock のデフォルトのパーサーを使用します。

![](images/bedrock-kbs/02b-create-kb-data-source.png "Cohere Embed Multilingual 埋め込みモデルと quick-create vector store を含むナレッジベースのベクターインデックス設定のスクリーンショット。「Next」ボタンが表示されています。")

4. **Next** 画面で、ベクターインデックスを設定します：

    *embeddings model* には `Cohere Embed Multilingual` を選択します

    > ℹ️ [Amazon Bedrock モデルアクセスコンソール](https://console.aws.amazon.com/bedrock/home?#/modelaccess) で、現在のリージョンでこのモデルへのアクセスを有効にしていることを**確認**してください。
    >
    > 必要に応じて、別の埋め込みモデルを選択することができます。

    *Vector database* には `Quick create a new vector store` を選択します

    この画面または [Amazon Bedrock 開発者ガイド](https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-setup.html) で、Amazon Bedrock ナレッジベースがサポートするさまざまなベクターストアに関する詳細情報を見つけることができます。このデフォルトオプションでは、新しい [Amazon OpenSearch Serverless](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-overview.html) クラスターが作成されます

    以下のように他の設定はデフォルトのままにし、次に進んでください：

![](images/bedrock-kbs/02c-create-kb-index.png "Cohere Embed Multilingual 埋め込みモデルと quick-create vector store を含むナレッジベースのベクターインデックス設定のスクリーンショット。「Next」ボタンが表示されています。")

5. **Next** をクリックして設定を確認し、次に **Create knowledge base** をクリックしてプロセスを完了します。

    > ⏰ 作成が完了するまでに **数分**かかる場合があります。上にスクロールすると進行状況インジケーターバナーが表示されるはずです。または別のタブで、[Amazon OpenSearch Serverless Collections コンソール](https://console.aws.amazon.com/aos/home?#opensearch/collections) を確認することもできます。ここでは、基盤となるベクターコレクションが作成されています。

    ナレッジベースが正常に作成されると、以下に示すように詳細画面に移動します：

![](images/bedrock-kbs/03-kb-detail-page.png "作成された Amazon Bedrock ナレッジベースの詳細画面、作成成功バナーを表示。'Knowledge base overview'（KB ID、名前、その他の詳細を含む）、「Tags」（空）、「Data source」（1 つの Amazon S3 データソースがリストされている）、「Embeddings model」（Cohere Embed）のセクションが含まれ、右側には対話型の「Test knowledge base」チャットサイドバーがあり、一部のデータソースが同期されていないという警告が表示されています。")

6. 先ほどの警告ボックスで述べたように、新しいナレッジベースには、データソースを**同期**するまでドキュメントは含まれません：

    データソースリストで名前の左側にあるチェックボックスを選択して S3 データソースを**選択 (Select)** し、上にある**同期 (Sync)** ボタンをクリックして同期を開始します。

    *Status* が数秒間 `Syncing` に変わった後、`Available` に戻ります

![](images/bedrock-kbs/04a-kb-data-source-after-sync.png "同期を実行した後の KB 'data source' セクションのスクリーンショット、データソースが選択され、ステータスが 'available' と表示されている")
</md>

同期が完了すると、ナレッジベースは使用できるようになります。

オプションとして、データソースをクリックして、期待通りに 20 個のファイルが `Added` されたかどうかを確認できます：

<img src="images/bedrock-kbs/04b-kb-data-sync-details.png" width="600" alt="同期が正常に完了し、20 個のファイルが検出されてインデックスに追加され、0 個のファイルが失敗したことを示すデータソース詳細画面"/>
</md>

### ナレッジベースのテスト

大規模な評価について議論する前に、ナレッジベースが正しく機能しているか確認するためにテストクエリを実行しましょう。ナレッジベースの詳細ページに戻りましょう。

例えば、以下のスクリーンショットでは、ページ上部の *Knowledge Base overview* パネルにナレッジベースIDが `Z746ERZP5X` であることがわかります（ご自身の *Knowledge Base ID* を確認してください）。

![](images/bedrock-kbs/04c-kb-main-page.png "ナレッジベースのメインページのスクリーンショット")

👉 以下のプレースホルダーをナレッジベースの固有IDに**置き換え**、以下のセルを実行して続行してください：

In [None]:
knowledge_base_id = "<置き換える>"  # Something like "Z746ERZP5X"

With the ID identified, you can use the Bedrock runtime [RetrieveAndGenerate API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_agent-runtime_RetrieveAndGenerate.html) to query your knowledge base.

In [None]:
query = "Victoria 州の経済状況はどうですか？"

In [None]:
# RetrieveAndGenerate API と Nova Pro モデルを使用してナレッジベースをクエリする
rag_resp = bedrock_agent_runtime.retrieve_and_generate(
    input={"text": query},
    retrieveAndGenerateConfiguration={
        "knowledgeBaseConfiguration": {
            "knowledgeBaseId": knowledge_base_id,
            "modelArn": f"arn:aws:bedrock:us-west-2:{account_id}:inference-profile/us.amazon.nova-pro-v1:0",
        },
        "type": "KNOWLEDGE_BASE",
    },
    # オプションのセッション ID は、フォローアップ質問の結果を改善するのに役立ちます：
    # sessionId='string'
)

print("Plain text response:")
print("--------------------")
print(rag_resp["output"]["text"], end="\n\n\n")

print("Full API output:")
print("----------------")
rag_resp

上記のセルで出力したフルの API 応答に示されているように、`RetrieveAndGenerate` アクションは以下を提供します：

- 最終的なテキスト回答
- 検索エンジンからの `retrievedReferences`
- テキスト回答の異なる部分で引用されるべき参照を特定する `citations`

また、以下に示すように、API を通じて**検索のみ**を実行し、生成回答合成ステップをスキップすることも可能です。

In [None]:
retrieve_resp = bedrock_agent_runtime.retrieve(
    knowledgeBaseId=knowledge_base_id,
    retrievalQuery={"text": query},
)
print(json.dumps(retrieve_resp["retrievalResults"], indent=2))

## 評価のためのデータセットと指標の設定

### データセットの読み込み

この例では、RAG システムにクエリを送信し、結果をキュレーションして、参照入出力ペアを持つデータセットを使用します。Langfuse から本番データを取得する方法については、以降のセクションを参照してください。

このデータセットには次の列が含まれています：

- `question`: list[str] - これらは、RAG パイプラインで評価される質問です。

- `contexts`: list[list[str]] - 質問に答えるために LLM に渡されたコンテキスト。

- `answer`: list[str] - RAG パイプラインから生成され、ユーザーに提供される回答。

- `ground_truths`: list[list[str]] - 質問に対する真実の回答。ただし、オンライン評価では、このケースでは Ground Truth データにアクセスできないため、これを無視できます。

このデータセットの詳細については、[Exploding Gradients Dataset](https://huggingface.co/datasets/explodinggradients/fiqa/viewer/ragas_eval) を参照してください。

データセットをロードすることから始めましょう。

In [None]:
from datasets import load_dataset


fiqa_eval = load_dataset("explodinggradients/fiqa", "ragas_eval")["baseline"]
fiqa_eval

### RAGAS 指標
RAG システムの以下の側面を測定します。これらの指標は [RAGAS](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/) で定義されています：

- [Faithfulness (忠実度)](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/faithfulness/)：これは、生成された回答の事実の一貫性を、与えられたコンテキストに対して測定します。
- [Response relevancy (応答の関連性)](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/answer_relevance/)：Response Relevancy 指標は、応答がユーザー入力に対してどれだけ関連性があるかを測定します。
- [Context precision (コンテキストの適合率)](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/context_precision/)：コンテキスト適合率は、コンテキストに存在するすべての Ground Truth の関連アイテムが高いランクにランク付けされているかどうかを評価する指標です。理想的には、すべての関連チャンクがトップランクに表示される必要があります。

これらの指標とその仕組みについて詳しくは、[RAGAS ドキュメント](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/)をご覧ください。

In [None]:
# メトリクスのインポート
from ragas.metrics import (
    Faithfulness,
    LLMContextPrecisionWithoutReference,
    ResponseRelevancy,
)


# 選択したメトリクス
metrics = [
    Faithfulness(),
    ResponseRelevancy(),
    LLMContextPrecisionWithoutReference(),
]

In [None]:
from ragas.metrics.base import MetricWithEmbeddings, MetricWithLLM
from ragas.run_config import RunConfig


# RAGAS メトリクスを初期化するユーティリティ関数
def init_ragas_metrics(metrics, llm, embedding):
    for metric in metrics:
        if isinstance(metric, MetricWithLLM):
            print(metric.name + " llm")
            metric.llm = llm
        if isinstance(metric, MetricWithEmbeddings):
            print(metric.name + " embedding")
            metric.embeddings = embedding
        run_config = RunConfig()
        metric.init(run_config)

次に、選択した LLM と埋め込みモデルを使用して指標を初期化する必要があります。この例では、Amazon Bedrock Nova Pro モデルと Cohere 埋め込み英語モデルを使用し、`langchain-aws` ライブラリの便利なラッパーを使用します。

In [None]:
from langchain_aws import BedrockEmbeddings, ChatBedrockConverse
from ragas.embeddings import LangchainEmbeddingsWrapper
from ragas.llms import LangchainLLMWrapper


config = {
    "region_name": "us-west-2",  # E.g. "us-east-1"
    "llm": "us.amazon.nova-pro-v1:0",  # E.g Claude モデルなども利用可能 "anthropic.claude-3-5-sonnet-20241022-v2:0"
    "embeddings": "cohere.embed-english-v3",  # E.g or "amazon.titan-embed-text-v2:0"
    "temperature": 0.4,
}

evaluator_llm = LangchainLLMWrapper(
    ChatBedrockConverse(
        region_name=config["region_name"],
        base_url=f"https://bedrock-runtime.{config['region_name']}.amazonaws.com",
        model=config["llm"],
        temperature=config["temperature"],
    )
)

evaluator_embeddings = LangchainEmbeddingsWrapper(
    BedrockEmbeddings(
        region_name=config["region_name"],
        model_id=config["embeddings"],
    )
)

init_ragas_metrics(
    metrics,
    llm=evaluator_llm,
    embedding=evaluator_embeddings,
)

## Langfuse で評価結果をトレースする

RAGAS を使用してモデルベースの評価を行う方法は 2 つあります：
1. すべてのトレースにスコアを付ける：これは、各トレース項目に対して評価を実行することを意味します。これにより、RAG パイプラインへの各呼び出しのパフォーマンスについてより良いアイデアが得られますが、コストに注意してください。

2. サンプリングによるスコア付け：この方法では、定期的にトレースのランダムサンプルを取得し、それらにスコアを付けます。これによりコストが削減され、アプリのパフォーマンスの概算が得られますが、重要なサンプルを見逃す可能性があります。

この例では、事前構築されたデータセットと Amazon Bedrock Knowlegebase を使用した RAG パイプラインを使用して、両方のソリューションを試します。

### すべてのトレースにスコアを付ける

単一のトレースの小さな例を取り上げ、RAGAS でどのようにスコア付けできるかを見てみましょう。まず、選択した指標でトレースにスコアを付けるためのユーティリティ関数を定義します。

In [None]:
from ragas.dataset_schema import SingleTurnSample


async def score_with_ragas(query, chunks, answer, metrics):
    scores = {}
    for metric in metrics:
        sample = SingleTurnSample(
            user_input=query,
            retrieved_contexts=chunks,
            response=answer,
        )
        print(f"{metric.name} 計算中...")
        scores[metric.name] = await metric.single_turn_ascore(sample)
    return scores

#### サンプルデータセット項目のスコアリング

各リクエストでスコアを計算します。以下では、以下のステップを実行するダミーアプリケーションを説明します：

- ユーザーから質問を取得する
- ユーザーの質問に答えるために使用できるデータベースまたはベクターストアからコンテキストを取得する
- 質問とコンテキストを LLM に渡して回答を生成する

この場合、Langfuse Python [低レベル SDK](https://langfuse.com/docs/sdk/python/low-level-sdk) を使用して、より詳細な制御でトレースにログを記録する使用方法を示しています。また、後続のセクションで [デコレータ](https://langfuse.com/docs/sdk/python/decorators) を使用した例を見たり、[langfuse ドキュメント](https://langfuse.com/docs/sdk/overview)で詳細を読むこともできます。

In [None]:
# 質問を受けたら新しいトレースを開始
row = fiqa_eval[0]
question = row["question"]
trace = langfuse.trace(name="rag-fiqa")

# 関連するチャンクを取得
# chunks = get_similar_chunks(question)
contexts = row["contexts"]
# スパンに渡す
trace.span(name="retrieval", input={"question": question}, output={"contexts": contexts})

# LLM を使ってチャンクに基づいた回答を生成
# answer = get_response_from_llm(question, chunks)
answer = row["answer"]
trace.generation(
    name="generation",
    input={"question": question, "contexts": contexts},
    output={"answer": answer},
)

# 質問、コンテキスト、回答のタプルのスコアを計算
ragas_scores = await score_with_ragas(question, contexts, answer, metrics)
ragas_scores

In [None]:
print(
    f"Langfuse でトレースされていますが、スコアはまだついていません。Langfuse UI で確認できます:\n{os.environ['LANGFUSE_HOST']}"
)

以下のように実行することで、トレースにスコアを添付することができます。

In [None]:
# スコアを送信
for m in metrics:
    trace.score(name=m.name, value=ragas_scores[m.name])

Now the score is attached

![](images/bedrock-kbs/04e-langfuse-single-eval-trace-score.png)

#### RAG のスコアリング
最初のセクションで Amazon Bedrock Knowledge Bases を設定済みなので、今度はテストデータセットに対してその結果の品質を**評価**し、高品質かつ低コストの構成に**最適化**するための手助けをします。

まず、質問、参照回答、およびそのソースドキュメントのサンプルデータセットを読み込みます（このデータセットの準備方法の詳細については、[この GitHub](https://github.com/aws-samples/llm-evaluation-methodology/blob/main/datasets/Prepare-SQuAD.ipynb) を参照してください）：

In [None]:
dataset_df = pd.read_json("datasets/qa.manifest.jsonl", lines=True)
dataset_df.head(10)

このデータセットのレコードには以下が含まれます：

- (`doc`) このサンプルに対するソースドキュメントの全文
- (`doc_id`) ソースドキュメントの一意の識別子
- (`question`) ユーザーが尋ねる質問
- (`question_id`) 質問の一意の識別子
- (`answers`) ドキュメントによってサポートされる（複数の可能性がある）参照「正解」のリスト

[RAGAS の API リファレンス](https://docs.ragas.io/en/latest/references/evaluation.html)に示されているように、RAGAS 評価データセットのレコードには通常以下が含まれます：

- 尋ねられた `question`
- システムが生成した `answer`
- 答えの根拠となった実際のテキスト `contexts`（つまり、検索エンジンによって取得されたドキュメントテキストのスニペット）
- `ground_truth` の答え

ここでは、`@observe()` デコレータを使用して、Langfuse Python SDK で [Langfuse トラッキング](https://langfuse.com/docs/tracing) を RAG パイプラインに統合します。

以下に示すように、Amazon Bedrock KB の取得および生成パイプラインで例の質問を実行し、メトリクスを計算する準備ができた参照を抽出できます。

In [None]:
from langfuse.decorators import langfuse_context, observe


@observe(name="Knowledge Base Retrieve and Generate")
def retrieve_and_generate(
    question: str,
    kb_id: str,
    generate_model_arn: str = f"arn:aws:bedrock:us-west-2:{account_id}:inference-profile/us.amazon.nova-pro-v1:0",
    **kwargs,
):
    rag_resp = bedrock_agent_runtime.retrieve_and_generate(
        input={"text": question},
        retrieveAndGenerateConfiguration={
            "knowledgeBaseConfiguration": {
                "knowledgeBaseId": kb_id,
                "modelArn": generate_model_arn,
            },
            "type": "KNOWLEDGE_BASE",
        },
    )
    answer = rag_resp["output"]["text"]

    # ネストされた引用文献からフラットな引用文献リストを取得 -> retrievedReferences:
    all_refs = [r for cite in rag_resp["citations"] for r in cite["retrievedReferences"]]
    contexts = [r["content"]["text"] for r in all_refs]
    ref_s3uris = [r["location"]["s3Location"]["uri"] for r in all_refs]
    # マッピング e.g. 's3://.../doc_id.txt' -> 'doc_id':
    ref_ids = [uri.rpartition("/")[2].rpartition(".")[0] for uri in ref_s3uris]

    # トレースする追加のデータを記録
    langfuse_context.update_current_observation(
        input={"question": question, "contexts": contexts},
        output=answer,
        model="us.amazon.nova-pro-v1:0",
        session_id="kb-rag-session",
        tags=["dev"],
        metadata=kwargs,
    )

    # 独立したスコアリングのためにトレース ID を取得
    trace_id = langfuse_context.get_current_trace_id()
    return {
        "answer": answer,
        "retrieved_doc_ids": ref_ids,
        "retrieved_doc_texts": contexts,
        "trace_id": trace_id,
    }

リクエストが来たら RAG を実行し、結果をすぐにスコアリングします。

In [None]:
from asyncio import run

from langfuse.decorators import observe


@observe(name="Knowledge Base Pipeline")
def rag_pipeline(
    question,
    user_id: str | None = None,
    session_id: str | None = None,
    kb_id: str | None = None,
    metrics: Any | None = None,
):
    generated_answer = retrieve_and_generate(
        question=question,
        kb_id=kb_id,
        kwargs={"database": "Bedrock Knowledge Base", "kb_id": kb_id},
    )
    contexts = generated_answer["retrieved_doc_texts"]
    answer = generated_answer["answer"]
    trace_id = generated_answer["trace_id"]

    score = run(score_with_ragas(question, contexts, answer=answer, metrics=metrics))
    langfuse_context.update_current_trace(
        user_id=user_id,
        session_id=session_id,
        tags=["dev"],
    )
    for s in score:
        langfuse.score(name=s, value=score[s], trace_id=trace_id)
    return generated_answer

In [None]:
response = rag_pipeline(dataset_df.iloc[0]["question"], kb_id=knowledge_base_id, metrics=metrics)
response

### サンプリングしてスコアを付ける

すべてのプロダクショントレースにスコアを付けることは、アプリケーションのアーキテクチャーやトラフィックによっては時間がかかり、コストがかかる場合があります。その場合は、サンプリング手法を採用すると良いでしょう。バッチ処理を実行するタイムスライスと、そのタイムスライスからサンプリングするトレースの数を決定します。データセットを作成し、`ragas.evaluate` を呼び出して結果を分析します。

これを定期的に実行して、タイムスライス間でスコアがどのように変化しているかを追跡し、不一致がないかを確認できます。

先ほど `retrieve_and_generate()` 関数によって生成された既存の結果を評価します。

まず、データセットの最初の 10 問に対して RAG を実行し、10 個のプロダクショントレースをシミュレートします。

In [None]:
rag_generated_outputs = [
    retrieve_and_generate(
        question=rec.question,
        kb_id=knowledge_base_id,
        kwargs={"database": "Bedrock Knowledge Base", "kb_id": knowledge_base_id},
    )
    for _, rec in dataset_df.head(10).iterrows()
]
rag_generated_outputs[0]

Langfuse にアップロードされた結果は、以下の便利な関数を使って必要に応じて取り出すことができます。

In [None]:
from langfuse.api.resources.commons.types.trace_with_details import TraceWithDetails


def get_traces(
    limit: int = 5,
    name: str | None = None,
    user_id: str | None = None,
    session_id: str | None = None,
    from_timestamp: str | None = None,
    to_timestamp: str | None = None,
) -> list[TraceWithDetails]:
    """与えられたフィルターにマッチするトレースをLangfuseにクエリする。
    詳細は https://langfuse.com/docs/query-traces を確認。"""

    all_data = []
    page = 1

    while True:
        response = langfuse.fetch_traces(
            page=page,
            name=name,
            user_id=user_id,
            session_id=session_id,
            from_timestamp=from_timestamp,
            to_timestamp=to_timestamp,
        )
        if not response.data:
            break
        page += 1
        all_data.extend(response.data)
        if len(all_data) > limit:
            break

    return all_data[:limit]

In [None]:
from random import sample


NUM_TRACES_TO_SAMPLE = 3
traces = get_traces(name="Knowledge Base Retrieve and Generate", limit=10)
if len(traces) > NUM_TRACES_TO_SAMPLE: # noqa: SIM108
    traces_sample = sample(traces, NUM_TRACES_TO_SAMPLE)
else:
    traces_sample = traces

print(f"{len(traces)} 件のフィルターされたトレースから {len(traces_sample)} 件のトレースをサンプリングしました。")
for trace in traces_sample:
    print(f"Trace ID: {trace.id}")

次に、バッチを作成してスコアを付けましょう。RAGAS は、huggingface のデータセットオブジェクトを使用してデータセットを構築し、評価を実行します。これを独自のプロダクションデータで実行する場合は、適切なキーを使用してトレースから質問、コンテキスト、および回答を抽出してください。

In [None]:
# サンプルをスコア付け
evaluation_batch = {
    "question": [],
    "contexts": [],
    "answer": [],
    "trace_id": [],
}

for sample in traces_sample:
    evaluation_batch["question"].append(sample.input["question"])
    evaluation_batch["contexts"].append(sample.input["contexts"])
    evaluation_batch["answer"].append(sample.output)
    evaluation_batch["trace_id"].append(sample.id)

RAGAS evaluate 関数を使用して（単一ターンのやり取りではなく）データセット全体にスコアを付けます。詳細については、[RAGAS evaluate](https://docs.ragas.io/en/latest/references/evaluate/) を参照してください。

In [None]:
# RAGAS evaluate を実行
from datasets import Dataset
from ragas import evaluate
from ragas.metrics import Faithfulness, ResponseRelevancy


ds = Dataset.from_dict(evaluation_batch)
evals_results = evaluate(
    ds,
    llm=evaluator_llm,
    embeddings=evaluator_embeddings,
    metrics=[Faithfulness(), ResponseRelevancy()],
)
evals_results

これで完了です！一定期間にわたるスコアを確認できます。データフレームで結果をレンダリングして、スコアを確認しましょう。

In [None]:
df = evals_results.to_pandas()

# 結果のデータフレームに Langfuse trace_id を追加
df["trace_id"] = ds["trace_id"]

df.head()

スコアを Langfuse にプッシュバックし、トレースに添付することもできます。

In [None]:
for _, row in df.iterrows():
    for metric_name in ["faithfulness", "answer_relevancy"]:
        langfuse.score(name=metric_name, value=row[metric_name], trace_id=row["trace_id"])

Langfuse コンソールに戻って、トレースで更新されたスコアを確認できます。

![](images/bedrock-kbs/score-with-sampling.png)

### おめでとうございます！
ラボ 2 を無事終了しました。

AWS イベントに参加している場合は、次のラボに進む前に、ワークショップスタジオに戻って追加の指示を確認ください。次のラボでは、モデルベースの評価とガードレールについて学習します。