# Evaluate Retrieval-Augmented Generation (RAG) pipelines with Amazon OpenSearch, Amazon Sagemaker AI, AWS Bedrock, Ragas and Langfuse

In this notebook we'll explore ways to evaluate the quality of Retrieval-Augmented Generation (RAG) pipelines with the opensource tools like [RAGAS](https://docs.ragas.io/en/v0.1.21/index.html) and leverage the features in [Langfuse](https://langfuse.com/) to manage and trace the RAG pipelines with traces and spans. We will create a OpenSearch Vector Database and the RAG results generation to show offline evaluation and scoring.

## Pre-requisites

> If you haven't selected the kernel, please click on the "Select Kernel" button at the upper right corner, select Python Environments and choose ".venv (Python 3.9.20) .venv/bin/python Recommended".

> To execute each notebook cell, press Shift + Enter.

> ℹ️ You can **skip these prerequisite steps** if you're in an instructor-led workshop using temporary accounts provided by AWS

### Dependencies and Environment Variables

In [None]:
# Uncomment the following line to install dependencies if you are not using AWS workshop environment
%pip install langfuse datasets ragas python-dotenv sagemaker langchain-aws opensearch-py requests_aws4auth boto3 --upgrade

Please make sure you have completed the prerequisites to setup the Langfuse project and API keys in the .env file to connect to self-hosted or cloud Langfuse environment.


In [None]:
# if you already define the environment variables in the .env of the vscode server, please skip the following cell
# Define the environment variables for langfuse
# You can find those values when you create the API key in Langfuse
import os
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-e409877e-57f9-4c26-bc63-8bb5fb119b10" # Your Langfuse project secret key
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-0d278d2c-3be0-4932-b63e-0c2114064d82" # Your Langfuse project public key
os.environ["LANGFUSE_HOST"] = "https://us.cloud.langfuse.com" # Langfuse domain

# Required Langfuse environment variables
required_env_vars = [
    "LANGFUSE_SECRET_KEY",
    "LANGFUSE_PUBLIC_KEY",
    "LANGFUSE_HOST"
]

for var in required_env_vars:
    assert os.environ.get(var), f"❌ Environment variable '{var}' is not set!"

See [Langfuse documentation](https://langfuse.com/docs/get-started) for more details.

## Initialization
Run the following cells to initialize common libraries.

In [None]:
import json
import os
from typing import Any, Dict, List, Optional

# External Dependencies:
import pandas as pd  # For working with tabular data
import boto3, uuid
from botocore.response import StreamingBody
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
from datasets import Dataset
from random import sample
from asyncio import run



# Langchain
from langchain_aws.chat_models.sagemaker_endpoint import ChatSagemakerEndpoint, ChatModelContentHandler
from langchain_core.messages import HumanMessage, AIMessageChunk, SystemMessage
from langchain_aws.embeddings import BedrockEmbeddings


# Langfuse
import langfuse  # assuming you're using the SDK
from langfuse import Langfuse
from langfuse.api.resources.commons.types.trace_with_details import TraceWithDetails
from langfuse.decorators import observe, langfuse_context


# Sagemaker
import sagemaker
from sagemaker.huggingface import HuggingFaceModel
from sagemaker.huggingface import get_huggingface_llm_image_uri
from sagemaker.utils import name_from_base
from sagemaker import get_execution_role

# RAGAS
import ragas
from ragas.run_config import RunConfig
from ragas.metrics.base import MetricWithLLM, MetricWithEmbeddings
from ragas import evaluate
from ragas.metrics import Faithfulness, ResponseRelevancy
from ragas.metrics import answer_relevancy, faithfulness, context_precision
from ragas.llms import LangchainLLMWrapper
from ragas.embeddings import LangchainEmbeddingsWrapper
from ragas.dataset_schema import SingleTurnSample

Initialize AWS Bedrock clients and check models available in your account. 

In [None]:
import boto3  # General Python SDK for AWS (including Bedrock)

# used to access Bedrock configuration
bedrock = boto3.client(service_name="bedrock", region_name="us-east-1")

bedrock_agent_runtime = boto3.client(
    service_name="bedrock-agent-runtime", region_name="us-east-1"
)


## Deploy Qwen2.5-1.5B-Instruct Model to Sagemaker Endpoint

In [None]:
llm_image = get_huggingface_llm_image_uri(
  "huggingface",
  version="3.0.1"
)


role = get_execution_role()
print(role)

hub = {
    'HF_TASK': 'text-generation', 
    'HF_MODEL_ID': 'Qwen/Qwen2.5-1.5B-Instruct'
}

model_for_deployment = HuggingFaceModel(
    #model_data=s3_location,
    role=role,
    env=hub,
    image_uri=llm_image,
)

endpoint_name = name_from_base("qwen25")

instance_type = "ml.g5.2xlarge"
number_of_gpu = 1
health_check_timeout = 300

model_for_deployment.deploy(
    endpoint_name=endpoint_name,
    initial_instance_count=1,
    instance_type=instance_type,
    container_startup_health_check_timeout=health_check_timeout,
    routing_config = {
        "RoutingStrategy":  sagemaker.enums.RoutingStrategy.LEAST_OUTSTANDING_REQUESTS
    }
)

## Initialize the Langfuse client and check credentials are valid.

In [None]:
# langfuse client
langfuse = Langfuse()
if langfuse.auth_check():
    print("Langfuse has been set up correctly")
    print(f"You can access your Langfuse instance at: {os.environ['LANGFUSE_HOST']}")
else:
    print(
        "Credentials not found or invalid. Check your Langfuse API key and host in the .env file."
    )

## Set up Open Search Vector Database

### Step 1: Update your IAM role:

- Add the following policy to your IAM role:
```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "aoss:*",
                "es:*"
            ],
            "Resource": "*"
        }
    ]
}

```


### Step 2. Create the OpenSearch Domain (Once)

- From the AWS Console:
  - Go to OpenSearch Service
  - Click Create domain
  - Select Deployment type: "Development and testing"
  - **Choose:**
   - Domain name: ragas-langfuse
   - Engine version: latest stable (e.g., 2.11+)
   - In Data nodes, keep defaults (t3.small.search, 10 GB, 1 AZ)
  - **In Network** , choose:
    - Public access (if testing from your SageMaker notebook or local machine)
  - **In Access policy**, allow:
      - Your SageMaker IAM role, or

* if you're testing quickly (you can lock it down later)

⏳ It takes ~10 mins to spin up

### Step 3: Index your data into the Open Search Domain

In [None]:
region = ""
domain = ""

assert region != "", "Please include your region of choice"
assert domain != "", "Please copy the domain name from the Open Search Console (IPV4) here WITHOUT the https://"
assert domain.find("https://") < 0, "Please remove the https:// and make sure it is the IPV4 domain"


# Setup OpenSearch client
index_name = "documents"

credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(
    credentials.access_key,
    credentials.secret_key,
    region,
    "es",
    session_token=credentials.token
)

opensearch = OpenSearch(
    hosts=[{'host': domain, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection  # ✅ this makes AWS4Auth compatible
)


In [None]:
# Ingest a doc
def index_document(content, file_name):
    doc_id = str(uuid.uuid4())
    doc = {
        "file_name": file_name,
        "content": content
    }
    response = opensearch.index(index=index_name, id=doc_id, body=doc)
    print("✅ Indexed:", response)

# Index all files

corpus_dir = "./datasets/corpus"

for fname in os.listdir(corpus_dir):
    if fname.endswith(".txt"):
        file_path = os.path.join(corpus_dir, fname)
        with open(file_path, "r") as f:
            content = f.read()
        print(f"📥 Indexing: {fname}")
        index_document(content, fname)


### Step 4: Test your Open Search Domain and retreive a sample result

In [None]:
def search_documents(query_text, index_name="documents", size=5):
    response = opensearch.search(
        index=index_name,
        body={
            "query": {
                "match": {
                    "content": {
                        "query": query_text
                    }
                }
            }
        },
        size=size
    )
    
    hits = response['hits']['hits']
    print(f"\n🔍 Found {len(hits)} result(s):\n")
    for hit in hits:
        print(f"📄 {hit['_source']['file_name']}")
        print(f"🧠 Score: {hit['_score']}")
        snippet = hit['_source']['content']
        print(f"📝 Content: {snippet}...\n---\n")


#### Testing Indexing

In [None]:
search_documents("Bretton Woods Accord and price of gold")

# 📂 Test Evaluation Pipeline


Let's start by loading the dataset.

In [None]:
from datasets import load_dataset

fiqa_eval = load_dataset("explodinggradients/fiqa", "ragas_eval")["baseline"]
fiqa_eval

### 📊 RAGAS Evaluation Metrics

We're going to measure the following aspects of a RAG system. These metrics are defined in **[RAGAS]**(https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/):

- 🔍 **[Faithfulness](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/faithfulness/)**  
  Measures how factually consistent the generated answer is with the retrieved context. It evaluates whether the answer could reasonably be derived from the context.

- 🎯 **[Response Relevancy](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/answer_relevance/)**  
  Assesses how relevant the generated answer is to the original user query. A high score indicates the answer is on-topic and useful.

- 🧠 **[Context Precision](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/context_precision/)**  
  Measures how many of the retrieved contexts are truly relevant to answering the question. Precision reflects the "purity" of the retrieved chunks.

- 📥 **[Context Recall](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/context_recall/)**  
  Evaluates how well the retrieved context covers the information needed to answer the question completely. High recall means fewer relevant facts are missed.

- 🧬 **[Answer Similarity](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/answer_similarity/)**  
  Compares the generated answer to a reference answer (if available), measuring how semantically close they are using embedding-based similarity.

- ✅ **[Answer Correctness](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/answer_correctness/)**  
  Evaluates whether the generated answer is factually correct and aligns with known ground-truth answers, if such references are available.

> 📚 Want to dive deeper into how each metric is computed?  
Check out the full [RAGAS metrics documentation](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/).


In [None]:
# import metrics
metrics=[
        ragas.metrics.answer_relevancy,
        ragas.metrics.faithfulness,
        ragas.metrics.context_precision,
        ragas.metrics.context_recall,
        ragas.metrics.answer_similarity,
        ragas.metrics.answer_correctness,
    ]

In [None]:
# util function to init Ragas Metrics
def init_ragas_metrics(metrics, llm, embedding):
    for metric in metrics:
        if isinstance(metric, MetricWithLLM):
            print(metric.name + " llm")
            metric.llm = llm
        if isinstance(metric, MetricWithEmbeddings):
            print(metric.name + " embedding")
            metric.embeddings = embedding
        run_config = RunConfig()
        metric.init(run_config)

Now we have to initialize the metrics with LLMs and embedding models of your choice. In this example we are going to use the Qwen2.5-1.5B-Instruct model and amazon.titan-embed-text-v1 embedding model, and use the convenience wrappers from the `langchain-aws` library.

### Creating the Sagemaker Chat Wrapper for the Qwen2.5 1.5B Instruct Model

In [None]:
sm = boto3.Session().client('sagemaker-runtime')
endpoint_name = "SET ENDPOINT NAME"
assert endpoint_name != "SET ENDPOINT NAME", f"❌ endpoint name is not set!"

class ContentHandler(ChatModelContentHandler):
    content_type = "application/json"
    accepts = "application/json"

    def transform_input(self, prompt, model_kwargs: Dict) -> bytes:
        body = {
            "messages": prompt,
            "stream": True,
            **model_kwargs  # Ensure all model parameters are passed
        }
        return json.dumps(body).encode("utf-8")

    def transform_output(self, output: StreamingBody) -> AIMessageChunk:
        stop_token = "[DONE]"
        try:
            all_content = []

            # Process streaming response line by line
            for line in output.iter_lines():
                if line:
                    line = line.decode("utf-8").strip()

                    # Skip empty lines or lines without "data:"
                    if not line.startswith("data:"):
                        continue

                    # Validate and parse JSON
                    try:
                        json_data = json.loads(line[6:])
                        
                    except json.JSONDecodeError as e:
                        #print(f"Skipping invalid JSON chunk: {line}")
                        continue
                    
                    # Check for stop token
                    if json_data.get("choices", [{}])[0].get("delta", {}).get("content") == stop_token:
                        break
                    
                    # Extract content and append to the list
                    content = json_data["choices"][0]["delta"]["content"]
                    all_content.append(content)

            # Join all chunks into a single string
            full_response = "".join(all_content)
            return AIMessageChunk(content=full_response)
        except Exception as e:
            return AIMessageChunk(content=f"Error processing response: {str(e)}")


chat_content_handler = ContentHandler()

chat_llm = ChatSagemakerEndpoint(
    endpoint_name=endpoint_name,
    client=sm,
    model_kwargs={
        "temperature": 0.7,  # Adjust temperature for balanced randomness
        "max_new_tokens": 1200,  # Ensure sufficient token generation
        "top_p": 0.95,  # Use nucleus sampling for diversity
        "do_sample": True  # Enable sampling for generative tasks
    },
    content_handler=chat_content_handler
)

### Score with RAGAS

In [None]:
llm = ChatSagemakerEndpoint(
    name="Testmodel",
    endpoint_name=endpoint_name,
    client=sm,
    model_kwargs={
        "temperature": 0.7,  # Adjust temperature for balanced randomness
        "max_new_tokens": 1200,  # Ensure sufficient token generation
        "top_p": 0.95,  # Use nucleus sampling for diversity
        "do_sample": True  # Enable sampling for generative tasks
    },
    content_handler=chat_content_handler
)

# Use correct region for Titan Embed (e.g., us-east-1)
client = boto3.client(
    "bedrock-runtime",
    region_name="us-east-1",  # Titan Embed supported here
)

emb = BedrockEmbeddings(
    client=client,
    model_id="amazon.titan-embed-text-v1",  # Case-sensitive!
)

init_ragas_metrics(
    metrics,
    llm=LangchainLLMWrapper(llm),
    embedding=LangchainEmbeddingsWrapper(emb),
)

## Trace eval results with Langfuse

You can use model-based evaluation with Ragas in 2 ways:
1. Score every trace: This means you will run the evaluations for each trace item. This gives you much better idea of how each call made to your RAG pipelines is performing, but please be mindful of the cost.

2. Score with sampling: In this method we will take random samples of traces on a periodic basis and score them. This brings down the cost and gives you a rough estimate the performance of your app but may miss out on important samples.

In this example, we will demonstrate both solutions using prebuilt dataset and a live RAG pipeline with AWS Open Search.

### Score every trace

Lets take a small example of a single trace and see how you can score that with Ragas. We first define a utility function to score your trace with the metrics you chose.

In [None]:
async def score_with_ragas(query, chunks, answer, metrics):
    scores = {}
    for metric in metrics:
        sample = SingleTurnSample(
            user_input=query,
            retrieved_contexts=chunks,
            response=answer,
            reference=chunks[0]
        )
        print(f"calculating {metric.name}")
        scores[metric.name] = await metric.single_turn_ascore(sample)
    return scores

#### Scoring sample dataset item

You compute the score with each request. Below we will go through a dummy application that does the following steps:

- Gets a question from the user
- Fetch context from the database or vector store that can be used to answer the question from the user
- Pass the question and the contexts to the LLM to generate the answer

In this case we are demonstrating the use of the Langfuse Python [low-level SDK](https://langfuse.com/docs/sdk/python/low-level-sdk) to log the traces with more granular controls. You can also see an example with the [decorator](https://langfuse.com/docs/sdk/python/decorators) in the later section or read more about them the [langfuse documentation](https://langfuse.com/docs/sdk/overview).

In [None]:
# start a new trace when you get a question
row = fiqa_eval[0]
question = row["question"]
trace = langfuse.trace(name="rag-fiqa")

# retrieve the relevant chunks
# chunks = get_similar_chunks(question)
contexts = row["contexts"]
# pass it as span
trace.span(
    name="retrieval", input={"question": question}, output={"contexts": contexts}
)

# use llm to generate a answer with the chunks
# answer = get_response_from_llm(question, chunks)
answer = row["answer"]
trace.generation(
    name="generation",
    input={"question": question, "contexts": contexts},
    output={"answer": answer},
)

# compute scores for the question, context, answer tuple
ragas_scores = await score_with_ragas(question, contexts, answer, metrics)
ragas_scores

In [None]:
print(
    f"Now you can see this is traced in langfuse but with no score attached, we can check it in the Langfuse UI at:\n{os.environ['LANGFUSE_HOST']}"
)

You can then attach the scores to the trace by running the following

In [None]:
# send the scores
for m in metrics:
    trace.score(name=m.name, value=ragas_scores[m.name])

Now the score is attached. It should look similar to this

![](images/sagemaker_langfuse_score_single.png)

#### Scoring RAG
We have already setup the Open Search Database in the first section, we can now **evaluate** the quality of its results against a test dataset - to help us **optimize** the configuration for high quality and low cost.

First, let's load the sample dataset of questions, reference answers, and their source documents (to find more of how to prepare this dataset, please see more details in [this github](https://github.com/aws-samples/llm-evaluation-methodology/blob/main/datasets/Prepare-SQuAD.ipynb)):


In [None]:
import pandas as pd
dataset_df = pd.read_json("datasets/qa.manifest.jsonl", lines=True)
dataset_df.head(10)

Records in this dataset include:

- (`doc`) The full text of the source document for this example
- (`doc_id`) A unique identifier for the source document
- (`question`) The user question to be asked
- (`question_id`) A unique identifier for the question
- (`answers`) A list of (possibly multiple) reference 'correct' answers, supported by the document

As shown in [Ragas' API Reference](https://docs.ragas.io/en/latest/references/evaluation.html), records in Ragas evaluation datasets typically include:

- The `question` that was asked
- The `answer` the system generated
- The actual text `contexts` the answer was based on (i.e. snippets of document text retrieved by the search engine)
- The `ground_truth` answer(s)

Here we will integrate [Langfuse Tracking](https://langfuse.com/docs/tracing) into the RAG pipeline with the Langfuse Python SDK using the `@observe()` decorator.

We can run an example question through the OpenSearch Vector database to retrieve and generate pipeline as shown below, and extract the references ready to calculate metrics.

In [None]:
# Bedrock Runtime
bedrock_runtime = boto3.client("bedrock-runtime", region_name="us-east-1")

@observe(name="OpenSearch RAG with Qwen")
def retrieve_and_generate(
    question: str,
    top_k: int = 3,
    system_prompt: str = "You are a helpful assistant. Use the context to answer concisely.",
    **kwargs,
):
    # Step 1: Retrieve relevant context from OpenSearch
    response = opensearch.search(
        index="documents",
        body={
            "query": {
                "match": {
                    "content": {
                        "query": question
                    }
                }
            }
        },
        size=top_k
    )
    
    hits = response["hits"]["hits"]
    contexts = [hit["_source"]["content"] for hit in hits]
    doc_ids = [hit["_id"] for hit in hits]

    # Step 2: Format prompt with retrieved context
    combined_context = "\n\n".join(contexts)
    full_prompt = f"""Context:
{combined_context}

Question: {question}
Answer:"""

    # Step 3: Call your SageMaker-hosted model using LangChain
    messages: List = [
        SystemMessage(content=system_prompt),
        HumanMessage(content=full_prompt)
    ]
    
    response_chunk = chat_llm.invoke(messages)
    answer = response_chunk.content  # already joined by your handler

    # Step 4: Log trace to Langfuse
    langfuse_context.update_current_observation(
        input={"question": question, "contexts": contexts},
        output=answer,
        model=endpoint_name,
        session_id="opensearch-rag-session",
        tags=["dev", "qwen", "opensearch"],
        metadata=kwargs,
    )

    trace_id = langfuse_context.get_current_trace_id()

    return {
        "answer": answer,
        "retrieved_doc_ids": doc_ids,
        "retrieved_doc_texts": contexts[:300],
        "trace_id": trace_id,
    }


Run RAG as requests come in and score the results immediately.

In [None]:
from asyncio import run


langfuse_client = Langfuse()  # picks up env vars: LANGFUSE_PUBLIC_KEY, SECRET_KEY, HOST


@observe(name="OpenSearch, Qwen2.5, Langfuse Pipeline")
def rag_pipeline(
    question: str,
    user_id: Optional[str] = None,
    session_id: Optional[str] = None,
    metrics: Optional[Any] = None,
):
    generated_answer = retrieve_and_generate(
        question=question,
        top_k=3,  # or whatever makes sense for your context window
        system_prompt="You are a helpful assistant. Use the context below to answer the question."
    )

    answer = generated_answer["answer"]
    contexts = generated_answer["retrieved_doc_texts"]
    trace_id = generated_answer["trace_id"]

    
    metrics=[
            # A looot of metrics to give a general overview:
            ragas.metrics.answer_relevancy,
            ragas.metrics.faithfulness,
            ragas.metrics.context_precision,
            ragas.metrics.context_recall,
            ragas.metrics.answer_similarity,
            ragas.metrics.answer_correctness,
        ]


    score = run(score_with_ragas(question, contexts, answer=answer, metrics=metrics))

    langfuse_context.update_current_trace(
        user_id=user_id,
        session_id=session_id,
        tags=["dev", "opensearch", "qwen"]
    )

    for s in score:
        langfuse_client.score(name=s, value=score[s])


    print(f"🔗 Langfuse trace: https://cloud.langfuse.com/trace/{trace_id}")

    return generated_answer



In [None]:
response = rag_pipeline(
    question=dataset_df.iloc[0]["question"],
    user_id="AWSome",
    session_id="qwen-test-session"
)
response

### Scoring with sampling

Scoring every production trace can be time-consuming and costly depending on your application architecture and traffic. In that case, it's better to start off with a sampling method. Decide a timespan you want to run the batch process and the number of traces you want to sample from that time slice. Create a dataset and call ragas.evaluate to analyze the result.

You can run this periodically to keep track of how the scores are changing across timeslices and figure out if there are any discrepancies.

We will evaluate the existing results generated previously by the `retrieve_and_generate()` function.

Simulate 10 production traces by running RAG on the first 10 questions in the dataset.

In [None]:
rag_generated_outputs = [
    retrieve_and_generate(
        question=rec["question"],
        top_k=3,  # or whatever makes sense for your context window
        system_prompt="You are a helpful assistant. Use the context below to answer the question."
    )
    for _, rec in dataset_df.head(10).iterrows()
]
rag_generated_outputs[0] 

Now that the results are uploaded to langfuse you can retrieve it as needed with this handy function.

In [None]:
def get_traces(
    limit: int = 5,
    name: Optional[str] = None,
    user_id: Optional[str] = None,
    session_id: Optional[str] = None,
    from_timestamp: Optional[str] = None,
    to_timestamp: Optional[str] = None,
) -> List[TraceWithDetails]:
    """Query Langfuse for traces matching the given filters.
    See https://langfuse.com/docs/query-traces for more details."""

    all_data = []
    page = 1

    while True:
        response = langfuse_client.fetch_traces(
            page=page,
            name=name,
            user_id=user_id,
            session_id=session_id,
            from_timestamp=from_timestamp,
            to_timestamp=to_timestamp,
        )
        if not response.data:
            break
        page += 1
        all_data.extend(response.data)
        if len(all_data) > limit:
            break

    return all_data[:limit]

In [None]:
NUM_TRACES_TO_SAMPLE = 3
traces = get_traces(name="OpenSearch RAG with Qwen", limit=10)
if len(traces) > NUM_TRACES_TO_SAMPLE:
    traces_sample = sample(traces, NUM_TRACES_TO_SAMPLE)
else:
    traces_sample = traces

print(f"Sampled {len(traces_sample)} traces from {len(traces)} filtered traces")
for trace in traces_sample:
    print(f"Trace ID: {trace.id}")

Now lets make a batch and score it. Ragas uses huggingface dataset object to build the dataset and run the evaluation. If you run this on your own production data, use the right keys to extract the question, contexts and answer from the trace

In [None]:
# score on a sample
evaluation_batch = {
    "question": [],
    "contexts": [],
    "answer": [],
    "trace_id": [],
}

for sample in traces_sample:
    evaluation_batch["question"].append(sample.input["question"])
    evaluation_batch["contexts"].append(sample.input["contexts"])
    evaluation_batch["answer"].append(sample.output)
    evaluation_batch["trace_id"].append(sample.id)

Using the ragas evaluate function to score an entire dataset instead of single turn. See [Ragas evaluate](https://docs.ragas.io/en/latest/references/evaluate/) for more information.

In [None]:
ds = Dataset.from_dict(evaluation_batch)
evals_results = evaluate(
    ds,
    llm=llm,
    embeddings=emb,
    metrics=[Faithfulness(), ResponseRelevancy()],
)
evals_results

And that is it! You can see the scores over a time period. Let's render the results in a dataframe to see the scores.

In [None]:
df = evals_results.to_pandas()

# add the langfuse trace_id to the result dataframe
df["trace_id"] = ds["trace_id"]

df.head()

You can also push the scores back into Langfuse and attach them to the traces.

In [None]:
for _, row in df.iterrows():
    for metric_name in ["faithfulness", "answer_relevancy"]:
        langfuse.score(
            name=metric_name, value=row[metric_name], trace_id=row["trace_id"]
        )

You can now go back to the Langfuse console and check the updated scores in the traces.

![](images/score-with-sampling.png)

### Congratuations!
You have now learnt how to use AWS OpenSearch, Amazon Sagemaker AI , AWS Bedrock and Langfuse to evaluate and score RAG workflows