# Lab 3.1: Evaluate Langfuse Traces using an external evaluation pipeline

#### An external evaluation pipeline is useful when you need:
- More control over when traces get evaluated. You could schedule the pipeline to run at specific times or responding to event-based triggers like Webhooks.
- Greater flexibility with your custom evaluations, when your needs go beyond what’s possible with the Langfuse UI
- Version control for your custom evaluations
- The ability to evaluate data using existing evaluation frameworks and pre-defined metrics

In this notebook, we will learn to implement a external evaluation pipeline by doing the following:
1. Create a synthetic dataset to test your models.
2. Use the Langfuse client to gather and filter traces of previous model runs
3. Evaluate these traces offline and incrementally
4. Add scores to existing Langfuse traces

## Prerequisites

> ℹ️ You can **skip these prerequisite steps** if you're in an instructor-led workshop using temporary accounts provided by AWS

In [None]:
# Uncomment the following line to install dependencies if you are not using AWS workshop environment
# %pip install langfuse datasets ragas python-dotenv langchain-aws boto3 --upgrade

Connect to self-hosted or cloud Langfuse environment.

In [None]:
# if you already define the environment variables in the .env of the vscode server, please skip the following cell
# Define the environment variables for langfuse
# You can find those values when you create the API key in Langfuse
# import os
# os.environ["LANGFUSE_SECRET_KEY"] = "xxxx" # Your Langfuse project secret key
# os.environ["LANGFUSE_PUBLIC_KEY"] = "xxxx" # Your Langfuse project public key
# os.environ["LANGFUSE_HOST"] = "xxx" # Langfuse domain

## Initialization and Authentication Check
Run the following cells to initialize common libraries and clients.

In [None]:
import os
from typing import Any, Dict, List, Optional

import boto3
from botocore.exceptions import ClientError
from langfuse import Langfuse
from langfuse.client import PromptClient
from langfuse.decorators import langfuse_context, observe

Initialize AWS Bedrock clients and check models available in your account.

In [None]:
import boto3  # General Python SDK for AWS (including Bedrock)

# used to access Bedrock configuration
bedrock = boto3.client(service_name="bedrock", region_name="us-west-2")

# used to invoke the Bedrock Converse API
bedrock_runtime = boto3.client(service_name="bedrock-runtime", region_name="us-west-2")

bedrock_agent_runtime = boto3.client(
    service_name="bedrock-agent-runtime", region_name="us-west-2"
)

# Check which models are available in your account
models = bedrock.list_inference_profiles()
for model in models["inferenceProfileSummaries"]:
    print(model["inferenceProfileName"] + " - " + model["inferenceProfileId"])

Initialize the Langfuse client and check credentials are valid.

In [None]:
from langfuse import Langfuse

# langfuse client
langfuse = Langfuse()
if langfuse.auth_check():
    print("Langfuse has been set up correctly")
    print(f"You can access your Langfuse instance at: {os.environ['LANGFUSE_HOST']}")
else:
    print(
        "Credentials not found or invalid. Check your Langfuse API key and host in the .env file."
    )

# Generate synthetic data

In this notebook, we consider a use case of leveraging a LLM to generate product descriptions that can be used in advising the product on a e-commerce page. The first step is to generate a list of products and for each of them, instruct Amazon Nova Lite to 
generate brief product descriptions.

In [None]:
# Let's prompt the model to generate 50 products
messages = [
    {
        "role": "user",
        "content": [
            {
                "text": "For a variety of 50 different product categories sold on a e-commerce website, \
    generate one product that is interesting to a consumer. The product names should be reflective of the actual product being sold. \
    Generate the 50 product items as comma separated values. Do not generate any additional words apart from the product names"
            }
        ],
    },
]

# Make the API call to the Nova Lite model
model_response = bedrock_runtime.converse(
    modelId="us.amazon.nova-lite-v1:0",  # you can update the model id to other foundation models in bedrock
    messages=messages,
)

# Print the generated text
print("\n[Response Content Text]")
print(model_response["output"]["message"]["content"][0]["text"])

In [None]:
# check the model generation outputs
products_text = model_response["output"]["message"]["content"][0]["text"]
products = [item.strip() for item in products_text.split(",")]

for prd in products:
    print(prd)

#### For each of the products, we will now generate product descriptions using Amazon Nova Lite, and capture the traces to Langfuse using the ```@observe()``` decorator

In [None]:
# Generate product descriptions for each product
from typing import Optional


@observe(as_type="generation")
def general_chat(
    product,
    messages: List[Dict[str, Any]],
    prompt: Optional[PromptClient] = None,
    modelId: str = "us.amazon.nova-lite-v1:0",
    user_id: str = "lab3-model-eval-user",
    **kwargs,
) -> Optional[str]:
    # 1. extract model metadata
    inferenceConfig = {"maxTokens": 500, "temperature": 0.1}
    additionalModelRequestFields = {}

    model_parameters = {**inferenceConfig, **additionalModelRequestFields}

    langfuse_context.update_current_observation(
        input=messages,
        model=modelId,
        model_parameters=model_parameters,
        prompt=prompt,
        metadata=kwargs,
    )

    langfuse_context.update_current_trace(
        name=f"Description of '{product}'",
        user_id=user_id,
        tags=["bedrock_eval_pipelines"],
    )

    # Extract the system prompts from the messages and convert them to the format expected by the Bedrock Converse API
    system_prompts = [
        {"text": message["content"]}
        for message in messages
        if message["role"] == "system"
    ]

    # Convert the rest of messages to the format expected by the Bedrock Converse API
    messages = [
        {
            "role": message["role"],
            "content": (
                message["content"]
                if isinstance(message["content"], list)
                else [{"text": message["content"]}]
            ),
        }
        for message in messages
        if message["role"] != "system"  # Add this condition
    ]

    # 2. model call with error handling
    try:
        response = bedrock_runtime.converse(
            modelId=modelId,
            messages=messages,
            system=system_prompts,
            inferenceConfig=inferenceConfig,
            additionalModelRequestFields=additionalModelRequestFields,
            **kwargs,
        )
    except (ClientError, Exception) as e:
        error_message = f"ERROR: Can't invoke '{modelId}'. Reason: {e}"
        langfuse_context.update_current_observation(
            level="ERROR", status_message=error_message
        )
        print(error_message)
        return

    # 3. extract response metadata
    response_text = response["output"]["message"]["content"][0]["text"]
    langfuse_context.update_current_observation(
        output=response_text,
        usage={
            "input": response["usage"]["inputTokens"],
            "output": response["usage"]["outputTokens"],
            "total": response["usage"]["totalTokens"],
        },
        metadata={
            "ResponseMetadata": response["ResponseMetadata"],
        },
    )

    return response_text


prompt_template = "You are a product marketer and you need to generate detailed \
product descriptions for products which will be used for selling \
the product on a e-commerce website. Any catchy phrases from the \
descriptions will also be used for social meda campaigns. \
From the product descriptions, customers should be able to understand \
how the product can help them in their lives but also be able to trust \
this company. Your descriptions are fun and engaging. \
Your answer should be 4 sentences at max."

for product in products:
    print(f"Input: Generate a description for {product}")
    messages = [
        {"role": "system", "content": prompt_template},
        {"role": "user", "content": f"Generate a description for {product}"},
    ]
    print(f"Answer: {general_chat(product, messages)} \n")

### Now you should see these product descriptions in the Traces section of the langfuse UI.
![Traces collected from the LLM generations](./images/product_description_traces.png "Langfuse Traces")


The goal of this tutorial is to show you how to build an model-based evaluation pipeline. These pipelines will run in your CI/CD environment, or be run in a different orchestrated container service. No matter the environment you choose, three key steps always apply:

1. Fetch Your Traces: Get your application traces to your evaluation environment
2. Run Your Evaluations: Apply any evaluation logic you prefer
3. Save Your Results: Attach your evaluations back to the Langfuse trace used for calculating them.

***
Goal: This evaluation pipeline is executed on all the traces over the past 24 hours
***

## 1. Fetch the traces

The ```fetch_traces()``` function has arguments to filter the traces by tags, timestamps, and beyond. We can also choose the number of samples for pagination.

In [None]:
from datetime import datetime, timedelta

BATCH_SIZE = 10
TOTAL_TRACES = 50

now = datetime.now()
last_24_hours = now - timedelta(days=1)


traces_batch = langfuse.fetch_traces(
    page=1,
    limit=BATCH_SIZE,
    tags="bedrock_eval_pipelines",
    user_id="lab3-model-eval-user",
    from_timestamp=last_24_hours,
    to_timestamp=datetime.now(),
).data

print(f"Traces in first batch: {len(traces_batch)}")

In [None]:
traces_batch[1].id

In [None]:
response = langfuse.get_generations(trace_id=traces_batch[1].id).data[0]
response.output

## 2. Categorical Evaluation using LLM-as-a-judge

Evaluation functions should take a trace as input and yield a valid score.
When analyzing the outputs of your LLM applications, you may want to evaluate traits that are defined qualitatively such as readability, helpfulness or measures for reducing hallucinations such as completeness.

We're building product descriptions and to ensure it resonates with customers, we want to measure readability. For more LLM-as-a-judge definitions, check out the judge based evaluator prompts defined in the [Amazon Bedrock Evaluator Prompts](https://docs.aws.amazon.com/bedrock/latest/userguide/model-evaluation-type-judge-prompt.html)

In [None]:
template_readability = """
You are a helpful agent that can assess an LLM response according to the given rubrics.

You are given a product description generated by a LLM. Your task is to assess the readability of the LLM response to the question, in other words, how easy it is for a typical reading audience to comprehend the response at a normal reading rate.

Please rate the readability of the response based on the following scale:
- unreadable: The response contains gibberish or could not be comprehended by any normal audience.
- poor readability: The response is comprehensible, but it is full of poor readability factors that make comprehension very challenging.
- fair readability: The response is comprehensible, but there is a mix of poor readability and good readability factors, so the average reader would need to spend some time processing the text in order to understand it.
- good readability: Very few poor readability factors. Mostly clear, well-structured sentences. Standard vocabulary with clear context for any challenging words. Clear organization with topic sentences and supporting details. The average reader could comprehend by reading through quickly one time.
- excellent readability: No poor readability factors. Consistently clear, concise, and varied sentence structures. Simple, widely understood vocabulary. Logical organization with smooth transitions between ideas. The average reader may be able to skim the text and understand all necessary points.

Here is the product description that needs to be evaluated: {prd_desc}

Firstly explain your response, followed by your final answer. You should follow the format
Explanation: [Explanation], Answer: [Answer],
where '[Answer]' can be one of the following:
```
unreadable
poor readability
fair readability
good readability
excellent readability
```
"""


def generate_readability_score(trace_output):
    prd_desc_readability = template_readability.format(prd_desc=trace_output)
    message_1 = {"role": "user", "content": [{"text": prd_desc_readability}]}

    # query = [f"Rate the readability of product description: {traces_batch[1].output}"]

    readability_score = bedrock_runtime.converse(
        modelId="us.amazon.nova-pro-v1:0", messages=[message_1]
    )
    explanation, score = readability_score["output"]["message"]["content"][0][
        "text"
    ].split("\n\n")
    return explanation, score


print(f"User query: {response.input[1]['content']}")
print(f"Model answer: {response.output}")
explanation, score = generate_readability_score(response.output)
print(f"Readability: {score}, Explanation: {explanation}")

## 3. Add the evaluation to the trace

Now that we have generated a readability score as well as a explanation, we can use the Langfuse client to add scores to existing traces.

In [None]:
langfuse.score(
    trace_id=traces_batch[1].id,
    observation_id=traces_batch[1].observations[0],
    name="readability",
    value=score,
    comment=explanation,
)

# Putting everything together

We just saw how to do this for one trace, let's put it all together in a function to run it on all the traces collected in the last 24 hours.

In [None]:
import math

for page_number in range(1, math.ceil(TOTAL_TRACES / BATCH_SIZE)):

    traces_batch = langfuse.fetch_traces(
        page=page_number,
        limit=BATCH_SIZE,
        tags="bedrock_eval_pipelines",
        user_id="lab3-model-eval-user",
        from_timestamp=last_24_hours,
        to_timestamp=datetime.now(),
    ).data

    for trace in traces_batch:
        print(f"Processing {trace.name}")
        response = langfuse.get_generations(trace_id=trace.id).data[0]
        if response.output is None:
            print(
                f"Warning: \n Trace {trace.name} had no generated output, \
            it was skipped"
            )
            continue
        explanation, score = generate_readability_score(response.output)
        langfuse.score(
            trace_id=trace.id,
            observation_id=trace.observations[0],
            name="readability",
            value=score,
            comment=explanation,
        )

    print(f"Batch {page_number} processed 🚀 \n")

#### If your pipeline ran successfully, you should now see scores added to your traces

![Langfuse Trace with score added for readability](scored_trace.png "Scored trace on langfuse")

### Congratuations
You have successfully finished Lab 3.1.

If you are at an AWS event, you can return to the workshop studio for additional instructions before moving into the next lab, where we will explore GenAI guardrails.