In [None]:
# Load environment variables
from dotenv import load_dotenv
import os
load_dotenv()
print(os.getenv("OPENAI_API_KEY")[:20])
print(os.getenv("OPIK_WORKSPACE"))
print(os.getenv("OPIK_PROJECT_NAME"))

## Monitoring and Tracing with Comet Opik

Comet Opik is a great tool for monitoring and tracing LangGraph workflows. It is also available as open-source to run on your own infrastructure.

Refer to https://www.comet.com/docs/opik/cookbook/langgraph for more details on how to use this with LangGraph.

As the system becomes more and more complex, it's necessary to be able to monitor and trace the workflow. Otherwise it becomes a black box, an expensive black box!!

## Setup Instructions
Refer to the Maven page on how to setup Comet Opik.

In [4]:
from opik import Opik, track

## Opik Features Demo:

Configure projects and workspaces in Comet Opik using the list here - https://www.comet.com/docs/opik/tracing/sdk_configuration#configuration-values.

Opik also provides an SDK which lets you see the traces programmatically. Reference: https://www.comet.com/docs/opik/tracing/export_data.


In [5]:
# Build a simple LangGraph workflow which takes in a question and returns the category
# of the question.
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate

# Define the state
class State(TypedDict):
    question: str
    category: str | None

# Create the LLM
llm = init_chat_model(model="gpt-4o-mini", model_provider="openai")

# Create the prompt template
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant that categorizes questions amongst the following categories: "
              "1. Math"
              "2. Science"
              "3. History"
              "4. Geography"
              "5. Other"
              "Respond with just one word for the category."),
    ("human", "{question}")
])

# Create the chain
chain = prompt | llm

# Define the processing function
def process(state: State) -> State:
    """Process the question and determine its category."""
    result = chain.invoke({"question": state["question"]})
    return {"question": state["question"], "category": result.content}

# Create the graph
workflow = StateGraph(State)

# Add the processing node
workflow.add_node("process", process)

# Set the start and end points
workflow.add_edge(START, "process")
workflow.add_edge("process", END)

# Compile the graph
graph = workflow.compile()


In [None]:
# Test the graph
result = graph.invoke({"question": "What is the capital of France?"})
print(f"Question: {result['question']}")
print(f"Category: {result['category']}")

In [None]:
from opik.integrations.langchain import OpikTracer

# Track the graph structure:
tracer = OpikTracer(graph=graph.get_graph(xray=True))

# Trace the invocation for a particular input:
# This will generate a trace in the Comet Opik project.
#
# The use of tracer is optional for each invocation.
result = graph.invoke({"question": "Where is the coldest place on earth?"}, config={"callbacks": [tracer]})
print(result)

### Searching through historical traces

In [None]:
# Opik provides an API to search all the recorded traces within a project.
# You can then use this information to generate your own metrics and monitor them if needed.
# This will reduce the dependency on Comet Opik to implement everything.
import opik
client = opik.Opik()
traces = client.search_traces(project_name="Cohort 2 Demo Project", filter_string='')
print(len(traces))
print(traces[0])

## Metric types and single data-point evaluation

Comet Opik supports many metrics

In [None]:
from opik.evaluation.metrics import Hallucination

metric = Hallucination()

metric.score(
    input="What is the capital of France?",
    output="The capital of France is Paris. It is famous for its iconic Eiffel Tower and rich cultural heritage.",
)

In [None]:
# Incorrect output should be marked with hallucination score close to 1.0
metric.score(
    input="What is the capital of France?",
    output="The capital of France is Italy, it has amazing food and wine!",
)

In [None]:
# https://www.comet.com/docs/opik/evaluation/metrics/g_eval
# Uses Chain of Thought to evaluate the output
from opik.evaluation.metrics import GEval

metric = GEval(
    task_introduction="You are a helpful judge tasked with evaluating output and compare it with EXPECTED_OUTPUT",
    evaluation_criteria="The OUTPUT must be very similar in semantic meaning and factual correctness to EXPECTED_OUTPUT",
)
metric.score(
    output="""
    OUTPUT: 16 eggs are left/remaining
    EXPECTED_OUTPUT: 34 eggs are left/remaining
    """,
)

## Datasets and Experiments

Opik has a concept called Datasets. Datasets are a collection of inputs and expected outputs. These datasets can be combined with evaluation metrics to perform experiments on the entire dataset or a subset of it.

Refer to https://www.comet.com/docs/opik/evaluation/overview for more details on how to create and use datasets.


In [None]:
from opik import Opik
from opik.evaluation import evaluate

# Get or create a dataset
client = Opik()
dataset = client.get_or_create_dataset(name="Cohort 2 - Assignment 1")

# Add dataset items to it from assignment 1:
dataset.insert([
    {"input": "When did World War II end?", "expected_label": "history"},
    {"input": "What is photosynthesis?", "expected_label": "science"},
    {"input": "What is the capital of France?", "expected_label": "geography"},
    {"input": "What is the value of 2 + 2?", "expected_label": "math"},
])

#### Manual Evaluation by iterating over the dataset

In [None]:
# Simple string matching metric
from opik.evaluation.metrics import Equals

metric = Equals()

# Evaluate the langgraph agent using the dataset:
def run_task(input_data):
    return graph.invoke({"question": input_data})

# Manually evaluate the dataset:
for item in dataset.get_items():
    data_input = item['input']
    data_output = run_task(data_input)
    print(metric.score(output=data_output['category'], reference=item['expected_label']))


#### Using `evaluate` function:

In [None]:
from opik.evaluation.metrics import Equals

# Run the evaluation using `evaluate` function:
def run_task(input_data):
    result = graph.invoke({"question": input_data})
    return {"output": result['category'], "reference": input_data['expected_label']}

result = evaluate(
    experiment_name="Cohort 2 - Assignment 1 Evaluation",
    dataset=dataset,
    task=run_task,
    scoring_metrics=[Equals()]
)