# Serverside Evaluation and Batch Trace Ingestion with Snowflake

This notebook walks through the complete TruLens + Snowflake experience.

This setup offers two advantages compared to other ways of use:
- Batch ingestion of records (traces) to Snowflake offers a faster ingestion experience
- Compuation of Evaluations on the Snowflake warehouse (serverside) removes the computation from the client

## Step 1: Connect to Snowflake for Logging Traces and Evaluations

Notice we're setting the `init_server_side` parameter to `True`. This will trigger uploading the tasks, streams and stored procedures to your Snowflake account needed to compute evaluations in the warehouse.

In [9]:
from snowflake.snowpark import Session
from trulens_eval import Tru
import os
from dotenv import load_dotenv


load_dotenv('envs/dev.env')



connection_params = {
    "account": os.environ["SNOWFLAKE_ACCOUNT"],
    "user": os.environ["SNOWFLAKE_USER"],
    "password": os.environ["SNOWFLAKE_PASSWORD"],
    "role": os.environ.get("SNOWFLAKE_ROLE", "ENGINEER"),
    "database": os.environ.get("SNOWFLAKE_DATABASE"),
    "schema": os.environ.get("SNOWFLAKE_SCHEMA"),
    "warehouse": os.environ.get("SNOWFLAKE_WAREHOUSE"),
}


# Create a Snowflake session
snowpark_session = Session.builder.configs(connection_params).create()

tru = Tru()
tru.reset_database()

Updating app_name and app_version in apps table: 0it [00:00, ?it/s]
Updating app_id in records table: 0it [00:00, ?it/s]
Updating app_json in apps table: 0it [00:00, ?it/s]


## Connect to Cortex Search

In [10]:
from typing import List
from snowflake.core import Root
from snowflake.snowpark import Session


class CortexSearchRetriever:
    def __init__(self, session: Session, limit_to_retrieve: int = 4):
        self._session = session
        self._limit_to_retrieve = limit_to_retrieve
        self.COLUMNS = ["chunk","relative_path","category"]

    def retrieve(self, query: str) -> List[str]:
        cortex_search_service = (
            Root(self._session).databases["cortex_search_db"]
            .schemas["DATA"]
            .cortex_search_services["CC_SEARCH_SERVICE_CS"]
        )
        resp = cortex_search_service.search(
            query=query,
            columns=self.COLUMNS,
            limit=self._limit_to_retrieve,
        )

        if resp.results:
            return [curr["chunk"] for curr in resp.results]
        else:
            return []

## Step 2: Instrument an existing app

In [11]:
from snowflake.cortex import Complete
from trulens.apps.custom import instrument


class RAG_from_scratch:
    def __init__(self):
        self.retriever = CortexSearchRetriever(
            session=snowpark_session,
            limit_to_retrieve=4
        )

    @instrument
    def retrieve_context(self, query: str) -> list:
        """
        Retrieve relevant text from vector store.
        """
        return self.retriever.retrieve(query)

    @instrument
    def generate_completion(self, query: str, context_str: list) -> str:
        """
        Generate answer from context.
        """
        prompt = f"""
          You are an expert assistant extracting information from context provided.
          Answer the question based on the context. Be concise and do not hallucinate.
          If you don´t have the information just say so.
          Context: {context_str}
          Question:
          {query}
          Answer:
        """
        return Complete("mistral-large", prompt)

    @instrument
    def query(self, query: str) -> str:
        context_str = self.retrieve_context(query)
        return self.generate_completion(query, context_str)


rag = RAG_from_scratch()

## Step 3: Define evaluations to run on Snowflake

By simply using the `SnowflakeFeedback` class isntead of `Feedback`, we specify that these feedback functions will run in Snowflake.

In [12]:
import numpy as np
from trulens.core import Select
from trulens.core.feedback.feedback import SnowflakeFeedback

from trulens_eval import (
    Feedback,
    TruLlama,
    Cortex
)
from trulens_eval.feedback import Groundedness

provider = Cortex(
    snowpark_session,
    model_engine="mistral-large2",
)

qa_relevance = (
    Feedback(provider.relevance_with_cot_reasons, name="Answer Relevance")
    .on_input_output()
)

qs_relevance = (
    Feedback(provider.relevance_with_cot_reasons, name = "Context Relevance")
    .on_input()
    .on(TruLlama.select_source_nodes().node.text)
    .aggregate(np.mean)
)

#grounded = Groundedness(groundedness_provider=openai, summarize_provider=openai)
grounded = Groundedness(groundedness_provider=provider)

groundedness = (
    Feedback(grounded.groundedness_measure_with_cot_reasons, name="Groundedness")
        .on(TruLlama.select_source_nodes().node.text)
        .on_output()
        .aggregate(grounded.grounded_statements_aggregator)
)

feedbacks = [qa_relevance, qs_relevance, groundedness]

  from trulens_eval.feedback import Groundedness


ImportError: cannot import name 'Groundedness' from 'trulens_eval.feedback' (/home/danieldu/anaconda3/envs/snowFlake/lib/python3.10/site-packages/trulens_eval/feedback/__init__.py)

## Step 4: Register the app with TruLens

Here we add the new record ingest mode parameter set to buffered. This means that the records (traces) will be sent to Snowflake in batches.

In [None]:
from trulens.apps.custom import TruCustomApp
from trulens.core.schema.app import RecordIngestMode


def get_trulens_recorder(query_engine, feedbacks, app_id):
    tru_recorder = TruLlama(
        query_engine,
        app_id=app_id,
        feedbacks=feedbacks
    )
    return tru_recorder

def get_prebuilt_trulens_recorder(query_engine, app_id):
    tru_recorder = TruLlama(
        query_engine,
        app_id=app_id,
        feedbacks=feedbacks
        )
    return tru_recorder

tru_recorder = get_prebuilt_trulens_recorder(query_engine,
                                             app_id="Mixtral Direct Query Engine")

tru_rag = TruCustomApp(
    rag,
    app_name="RAG",
    app_version="base",
    feedbacks=[
        f_answer_relevance,
        f_context_relevance,
        f_groundedness,
    ],
    record_ingest_mode=RecordIngestMode.BUFFERED,
)

## Set test set

In [None]:
queries = [
    "How do I deploy streamlit in the cloud?",
    "What is the best way to deploy a streamlit app?",
    "How do I use streamlit buttons?",
    "How do I change the color of the background of a streamlit app?",
    "How do I add a logo to a streamlit app?",
    "How do I deploy streamlit in the cloud?",
    "What is the best way to deploy a streamlit app?",
    "How do I use streamlit buttons?",
    "How do I change the color of the background of a streamlit app?",
    "How do I add a logo to a streamlit app?",
    "How do I deploy streamlit in the cloud?",
    "What is the best way to deploy a streamlit app?",
    "How do I use streamlit buttons?",
    "How do I change the color of the background of a streamlit app?",
    "How do I add a logo to a streamlit app?",
    "How do I deploy streamlit in the cloud?",
    "What is the best way to deploy a streamlit app?",
    "How do I use streamlit buttons?",
    "How do I change the color of the background of a streamlit app?",
    "How do I add a logo to a streamlit app?",
]

## Step 5: Record application traces

In [None]:
for query in queries:
    with tru_rag as recording:
        resp = rag.query(query)

In [None]:

session.get_leaderboard()


NameError: name 'session' is not defined

## Optional: Improve the app

In [None]:
from trulens.core.feedback.feedback import Feedback
from trulens.core.guardrails.base import context_filter

# note: feedback function used for guardrail must only return a score, not also reasons
f_context_relevance_score = Feedback(
    provider.context_relevance,
    name="Context Relevance"
)


class filtered_RAG_from_scratch(RAG_from_scratch):
    @instrument
    @context_filter(f_context_relevance_score, 0.75, keyword_for_prompt="query")
    def retrieve_context(self, query: str) -> list:
        """
        Retrieve relevant text from vector store.
        """
        return self.retriever.retrieve(query)


filtered_rag = filtered_RAG_from_scratch()

In [None]:
from trulens.apps.custom import TruCustomApp

tru_filtered_rag = TruCustomApp(
    filtered_rag,
    app_name="RAG",
    app_version="filtered context",
    feedbacks=[
        f_answer_relevance,
        f_context_relevance,
    ],
    record_ingest_mode=RecordIngestMode.BUFFERED,
)

In [None]:
for query in queries:
    with tru_filtered_rag as recording:
        resp = filtered_rag.query(query)

In [None]:
trus

NameError: name 'trus' is not defined


Caching the list of root modules, please wait!
(This will only be done once - type '%rehashx' to reset cache!)


Caching the list of root modules, please wait!
(This will only be done once - type '%rehashx' to reset cache!)


Caching the list of root modules, please wait!
(This will only be done once - type '%rehashx' to reset cache!)


Caching the list of root modules, please wait!
(This will only be done once - type '%rehashx' to reset cache!)


Caching the list of root modules, please wait!
(This will only be done once - type '%rehashx' to reset cache!)


Caching the list of root modules, please wait!
(This will only be done once - type '%rehashx' to reset cache!)



In [None]:
last_record = recording.record[-1]
from trulens_eval.utils.display import get_feedback_result
get_feedback_result(last_record,"Context Relevance")


AttributeError: '_RecordingContext' object has no attribute 'record'