In [None]:
%pip install -q ragstack-ai trulens

In [None]:
from dotenv import load_dotenv

load_dotenv()

In [None]:
import os
from langchain.chat_models import AzureChatOpenAI
from langchain.embeddings import AzureOpenAIEmbeddings

gpt_35_turbo = AzureChatOpenAI(
    azure_deployment="gpt-35-turbo",
    openai_api_version="2023-05-15",
    model_version="0613",
    temperature=0.1,
)

gpt_35_turbo_16k = AzureChatOpenAI(
    openai_api_version="2023-05-15",
    azure_deployment="gpt-35-turbo-16k",
    model_version="0613",
    temperature=0.1,
)

gpt_4 = AzureChatOpenAI(
    openai_api_version="2023-05-15",
    azure_deployment="gpt-4",
    model_version="1106-preview",
    temperature=0.1,
)

gpt_4_32k = AzureChatOpenAI(
    openai_api_version="2023-05-15",
    azure_deployment="gpt-4-32k",
    model_version="0613",
    temperature=0.1,
)

embeddings = AzureOpenAIEmbeddings(
    azure_deployment="text-embedding-ada-002",
    openai_api_version="2023-05-15"
)

In [None]:
from langchain.vectorstores.astradb import AstraDB
import os
vstore = AstraDB(
    collection_name="open_ai_512",
    embedding=embeddings,
    token=os.getenv("ASTRA_DB_TOKEN"),
    api_endpoint=os.getenv("ASTRA_DB_ENDPOINT")
)

In [None]:
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnableLambda, RunnablePassthrough

prompt_template = """
Answer the question based only on the supplied context. If you don't know the answer, say you don't know the answer.
Context: {context}
Question: {question}
Your answer:
"""
prompt = ChatPromptTemplate.from_template(prompt_template)

# Standard RAG, nothing fancy
base_retriever = vstore.as_retriever()

base_chain = (
    {"context": base_retriever, "question": RunnablePassthrough()}
    | prompt
    | gpt_35_turbo_16k
    | StrOutputParser()
)

In [None]:
from langchain.callbacks.base import BaseCallbackHandler
from langchain.chat_models import ChatOpenAI
from langchain_core.documents import Document
from langchain_core.tracers.context import register_configure_hook

from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, TypeVar, Union, Generator
from uuid import UUID

from contextvars import ContextVar
from contextlib import contextmanager


class RetrievedContextCallbackHandler(BaseCallbackHandler):
    documents: Sequence[Document]
    run_id: UUID
    parent_run_id: Optional[UUID]

    def on_retriever_end(self, documents: Sequence[Document], run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs) -> None:
        self.documents = documents
        self.run_id = run_id
        self.parent_run_id = parent_run_id

    def __repr__(self) -> str:
        return "boop"

    def __copy__(self) -> "RetrievedContextCallbackHandler":
        """Return a copy of the callback handler."""
        return self

    def __deepcopy__(self, memo: Any) -> "RetrievedContextCallbackHandler":
        """Return a deep copy of the callback handler."""
        return self

    @property
    def always_verbose(self) -> bool:
        """Whether to call verbose callbacks even if verbose is False."""
        return True

retrieved_context_callback_var: ContextVar[Optional[RetrievedContextCallbackHandler]] = ContextVar(
    "retrieved_context_callback", default=None
)

register_configure_hook(retrieved_context_callback_var, True)

@contextmanager
def get_retrieved_context_callback() -> Generator[RetrievedContextCallbackHandler, None, None]:
    cb = RetrievedContextCallbackHandler()
    retrieved_context_callback_var.set(cb)
    yield cb
    retrieved_context_callback_var.set(None)

In [None]:
with get_retrieved_context_callback() as cb:
    resp = base_chain.invoke("What are the symptoms?", )
    print(f"context documents: {cb.documents}")

In [None]:
resp

In [None]:
from trulens_eval import Tru

tru = Tru()
tru.reset_database()

In [None]:
import nest_asyncio

nest_asyncio.apply()

from trulens_eval import OpenAI as fOpenAI

os.environ["OPENAI_API_KEY"] = openai_api_key

provider = fOpenAI(model_engine="gpt_4", endpoint=openai_endpoint)

In [None]:
from trulens_eval import Feedback

f_qa_relevance = Feedback(
    provider.relevance_with_cot_reasons,
    name="Answer Relevance"
).on_input_output()

In [None]:
context_selection = TruLlama.select_source_nodes().node.text

In [None]:
import numpy as np

f_qs_relevance = (
    Feedback(provider.qs_relevance_with_cot_reasons,
             name="Context Relevance")
    .on_input()
    .on(context_selection)
    .aggregate(np.mean)
)

from trulens_eval.feedback import Groundedness

grounded = Groundedness(groundedness_provider=provider)

f_groundedness = (
    Feedback(grounded.groundedness_measure_with_cot_reasons,
             name="Groundedness"
            )
    .on(context_selection)
    .on_output()
    .aggregate(grounded.grounded_statements_aggregator)
)

In [None]:
response = my_llm_app(query)

from trulens_eval import TruChain
tru_recorder = TruChain(
    my_llm_app,
    app_id='Chain1_ChatApplication')

response, tru_record = tru_recorder.with_record(my_llm_app, query)
json_like = tru_record.layout_calls_as_app()

context_selection = json_like

In [None]:
from langchain.chains import RetrievalQA
from trulens_eval import TruChain

chain = RetrievalQA.from_llm(llm=gpt_35_turbo_16k, prompt=prompt, retriever=vstore.as_retriever())

# f_lang_match, f_qa_relevance, f_qs_relevance are feedback functions
tru_recorder = TruChain(
    chain,
    app_id='Chain1_ChatApplication',
    feedbacks=[f_lang_match, f_qa_relevance, f_qs_relevance]
)
with tru_recorder as recording:
    chain("What are the symptoms?")

tru_record = recording.records[0]

### Manual Logging

https://www.trulens.org/trulens_eval/logging/#wrap-with-truchain-to-instrument-your-chain

In [None]:
# Wrap with TruChain to instrument your chain
tc = TruChain(chain, app_id='Chain1_ChatApplication')

# Making the first call to your wrapped LLM Application will now also produce a log or "record" of the chain execution.
prompt_input = 'que hora es?'
response, record = tc.call_with_record(prompt_input)

# We can log the records but first we need to log the chain itself.
tru.add_app(app=tc)

# Then we can log the record
tru.add_record(record)

# Capturing app feedback such as user feedback of the responses can be added with one call.
thumb_result = True
tru.add_feedback(name="👍 (1) or 👎 (0)", record_id=record.record_id, result=thumb_result)

# To assess your LLM quality, you can provide the feedback functions to `tru.run_feedback()` in a list provided to `feedback_functions`

from trulens_eval import Feedback

f_qa_relevance = Feedback(
    provider.relevance_with_cot_reasons,
    name="Answer Relevance"
).on_input_output()


import numpy as np
from trulens_eval.feedback import Groundedness

json_like = record.layout_calls_as_app()

print(json_like['app'])
context_selection = json_like['app']['source_nodes']

print(context_selection)

grounded = Groundedness(groundedness_provider=provider)

f_groundedness = (
    Feedback(grounded.groundedness_measure_with_cot_reasons,
             name="Groundedness"
            )
    .on(context_selection)
    .on_output()
    .aggregate(grounded.grounded_statements_aggregator)
)

f_qs_relevance = (
    Feedback(provider.qs_relevance_with_cot_reasons,
             name="Context Relevance")
    .on_input()
    .on(context_selection)
    .aggregate(np.mean)
)

feedback_results = tru.run_feedback_functions(
    record=record,
    feedback_functions=[f_qa_relevance, f_groundedness, f_qs_relevance]
)
display(feedback_results)

# After capturing feedback, you can then log it to your local database.

tru.add_feedbacks(feedback_results)

In [None]:
json_like['app']['combine_documents_chain']

## Starting with RAGAS + LangSmith

In [None]:
from langchain.chains import RetrievalQA

ragas_chain = RetrievalQA.from_llm(llm=gpt_35_turbo_16k, prompt=prompt, retriever=vstore.as_retriever(), return_source_documents=True)

In [None]:
# testing it out
question = "What are the symptoms?"
result = ragas_chain({"query": question})
result

In [None]:
result["ground_truths"] = ["Symptoms include fever, coughing, sore throat, fatigue, and shortness of breath.\nHowever, be aware that at this stage if you have a cough or a cold, it's likely that you just have a cough or a cold and not coronavirus.\nIf you have serious symptoms such as difficulty breathing, call 000 for urgent medical help.\nIf you get these symptoms above after being in contact with someone who has been diagnosed with COVID-19, seek medical attention.\nThe same goes if you develop symptoms within 14 days of returning home to Australia after being overseas."]

In [None]:
from ragas.llms import LangchainLLM
from ragas.metrics import context_precision, answer_relevancy, faithfulness, context_recall

# wrappers around azure_models
ragas_gpt4 = LangchainLLM(gpt_4)
ragas_gpt35 = LangchainLLM(gpt_35_turbo)

# patch the new RagasLLM instance
answer_relevancy.llm = ragas_gpt35

# embeddings can be used as it is
answer_relevancy.embeddings = embeddings

context_precision.llm = ragas_gpt35
context_recall.llm = ragas_gpt35
faithfulness.llm = ragas_gpt35

In [None]:
from ragas.langchain import RagasEvaluatorChain

# make eval chains
eval_chains = {
    m.name: RagasEvaluatorChain(metric=m) for m in [context_precision, answer_relevancy, faithfulness, context_recall]
}

In [None]:
# evaluate
for name, eval_chain in eval_chains.items():
    score_name = f"{name}_score"
    print(f"{score_name}: {eval_chain(result)[score_name]}")

In [None]:
## Datasets

# data
from datasets import load_dataset

fiqa_eval = load_dataset("explodinggradients/fiqa", "ragas_eval")
fiqa_eval

In [None]:
fiqa_eval["baseline"][0]

In [None]:
from ragas import evaluate

result = evaluate(
    fiqa_eval["baseline"],
    metrics=metrics,
)

result

## LangSmith Dataset Creation

In [None]:
import json

with open("./data/covid_qa/rag_dataset.json") as f:
    examples = json.load(f)['examples']

len(examples)

In [None]:
# dataset creation

from langsmith import Client
from langsmith.utils import LangSmithError

client = Client()
dataset_name = "covid_qa"

try:
    # check if dataset exists
    dataset = client.read_dataset(dataset_name=dataset_name)
    print("using existing dataset: ", dataset.name)
except LangSmithError:
    # if not create a new one with the generated query examples
    dataset = client.create_dataset(
        dataset_name=dataset_name, description="llamaindex covid_qa dataset"
    )
    for e in examples:
        client.create_example(
            inputs={"query": e["query"]},
            outputs={"ground_truths": [e["reference_answer"]]},
            dataset_id=dataset.id,
        )

    print("Created a new dataset: ", dataset.name)

## Evaluation Setup

Before you call `run_on_dataset` you need a factory function which creates a new instance of the QA chain you want to test. This is so that the internal state is not reused when running against each example.

In [None]:
# factory function that return a new qa chain
def create_qa_chain(return_context=True):
    qa_chain = RetrievalQA.from_llm(
        llm=gpt_35_turbo_16k,
        prompt=prompt,
        retriever=vstore.as_retriever()
    )
    return qa_chain

In [None]:
from ragas.langchain.evalchain import RagasEvaluatorChain

faithfulness_chain = RagasEvaluatorChain(metric=faithfulness)
answer_rel_chain = RagasEvaluatorChain(metric=answer_relevancy)
context_rel_chain = RagasEvaluatorChain(metric=context_precision)
context_recall_chain = RagasEvaluatorChain(metric=context_recall)

## Run Evaluation

In [None]:
run_evaluation()