In [1]:
from dotenv import load_dotenv
from snowflake.snowpark.session import Session
from snowflake.snowpark.context import get_active_session


import os

load_dotenv()

connection_params = {
  "account":  "KMKFZTE-FWB56344",
  "user": "Xinwei Song",
  "password": "Sxw@565321",
  "role": "ACCOUNTADMIN",
  "database": "ANIMAL_DATA",
  "schema": "PUBLIC",
  "warehouse": "COMPUTE_WH",
  "cortex_search_service":"condition_match_search"
}

snowpark_session = Session.builder.configs(connection_params).create()

In [2]:
import os
from snowflake.core import Root
from typing import List

COLUMNS = [
    "chunk",
    "relative_path",
    "pet_type"
]


class CortexSearchRetriever:

    def __init__(self, snowpark_session: Session, limit_to_retrieve: int = 4):
        self._snowpark_session = snowpark_session
        self._limit_to_retrieve = limit_to_retrieve

    def retrieve(self, query: str) -> List[str]:
        root = Root(self._snowpark_session)
        cortex_search_service = (
            root.databases[connection_params['database']]
            .schemas[connection_params['schema']]
            .cortex_search_services[connection_params['cortex_search_service']]
        )
        resp = cortex_search_service.search(
            query=query,
            columns=COLUMNS,
            limit=self._limit_to_retrieve,
        )

        if resp.results:
            return [curr["chunk"] for curr in resp.results]
        else:
            return []


In [3]:
retriever = CortexSearchRetriever(snowpark_session=snowpark_session, limit_to_retrieve=4)

retrieved_context = retriever.retrieve(query="my cat is vomiting.")

In [4]:
retrieved_context

['As the need to vomit is perceived, the cat appears anxious and may seek attention and reassurance. You will also see the cat begin to salivate and make repeated efforts to swallow.As vomiting starts, a simultaneous contraction of the muscles of the stom- ach and abdominal wall occurs. This leads to an abrupt buildup in intra- abdominal pressure. At the same time, the lower esophageal sphincter relaxes. The stomach contents travel up the esophagus and out the mouth. As the cat vomits, it extends its neck and makes a harsh gagging sound. This sequence should be distinguished from the passive act of regurgitation described earlier.\\n## CAUSES OF VOMITING\\nThe most common cause of vomiting is swallowing hair or some other indi gestible foreign material, such as grass, that is irritating to the stomach. Most cats experience this at one time or another. Intestinal parasites may also cause stomach irritation. Other common causes are overeating or eating too fast. When kittens gobble their

In [5]:
# from trulens.core import TruSession
# from trulens.connectors.snowflake import SnowflakeConnector

# tru_snowflake_connector = SnowflakeConnector(snowpark_session=snowpark_session)

# tru_session = TruSession(connector=tru_snowflake_connector)

from trulens.core import TruSession
from trulens.connectors.snowflake import SnowflakeConnector
from trulens.apps.custom import TruCustomApp

tru_snowflake_connector = SnowflakeConnector(snowpark_session=snowpark_session)
tru_session = TruSession(connector=tru_snowflake_connector)
tru_session.migrate_database()

print("Tru instance initialized successfully!")


Running the TruLens dashboard requires providing a `password` to the `SnowflakeConnector`.


🦑 Initialized with db url snowflake://Xinwei Song:***@KMKFZTE-FWB56344/ANIMAL_DATA/PUBLIC?role=ACCOUNTADMIN&warehouse=COMPUTE_WH .
🛑 Secret keys may be written to the database. See the `database_redact_keys` option of `TruSession` to prevent this.
Error setting TruLens workspace version tag: 000002 (0A000): Unsupported feature 'TAG'., check if you have enterprise version of Snowflake.
🦑 Migrating DB ...


Database does not need migration.


Tru instance initialized successfully!


In [6]:
from trulens.apps.custom import instrument
import json

class RAG_from_scratch:
    def __init__(self, cortex_service, model_name="mistral-large2"):
        """
        Initialize the RAG module to integrate Snowflake Cortex Search and model inference.
        """
        self.cortex_service = cortex_service  # Your Cortex Search Service instance
        self.model_name = model_name  # Model name, e.g., mistral-large2

    @instrument
    def retrieve_context(self, query: str, pet_type: str = None) -> list:
        """
        Retrieve relevant context using Snowflake Cortex Search.
        """
        # Step 1: Call Cortex Search Service
        # You can choose between your exact_type_search or condition_match_search
        filter_condition = {
            "@or": [
                {"@eq": {"pet_type": pet_type or "Undefined"}}, {"@eq": {"pet_type": "Undefined"}}
            ]
        }

        try:
            # Call Cortex Search Service
            response = self.cortex_service.search(
                query=query,
                columns=["chunk", "relative_path", "pet_type"],
                limit=5,
                filter=filter_condition
            )
            json_response = json.loads(response.to_json())

            # Extract context chunks
            return [item["chunk"] for item in json_response.get("results", [])]
        except Exception as e:
            print(f"Error in retrieving context: {str(e)}")
            return []

    @instrument
    def rewrite_query(self, query: str, chat_history: list = None) -> str:
        """
        Rewrite the query to improve relevance for database retrieval.
        """
        # Step 1: If chat history exists, use it to summarize the query
        if chat_history:
            prompt = f"""
            Based on the following chat history and user question, rewrite the query to make it more specific and aligned with the user's needs:
            Chat History: {chat_history}
            User Question: {query}
            Rewritten Query:
            """
        else:
            prompt = f"""
            Rewrite the following query to make it more formal, specific, and relevant for retrieving pet-related medical information from the database:
            Query: {query}
            Rewritten Query:
            """

        rewritten_query = snowpark_session.sql(
            "SELECT SNOWFLAKE.CORTEX.COMPLETE(?, ?)",
            params=[self.model_name, prompt]
        ).collect()[0]['SNOWFLAKE.CORTEX.COMPLETE(?, ?)']
        return rewritten_query.strip()

    @instrument
    def generate_completion(self, query: str, context: list) -> str:
        """
        Generate a response based on the retrieved context and user query.
        """
        # Combine the context into the prompt
        context_str = "\n".join(context)
        prompt = f"""
        You are a professional assistant for pet health issues.
        Use the context provided below to answer the question. Do not fabricate information, and only base your response on the provided context.
        If the context is insufficient, explicitly state that you do not have enough information.
        Context: {context_str}
        Question: {query}
        Answer:
        """
        response = snowpark_session.sql(
            "SELECT SNOWFLAKE.CORTEX.COMPLETE(?, ?)",
            params=[self.model_name, prompt]
        ).collect()[0]['SNOWFLAKE.CORTEX.COMPLETE(?, ?)']

        return response.strip()

    @instrument
    def query(self, question: str, pet_type: str = None, chat_history: list = None) -> str:
        """
        Process the user question: integrate query rewriting, context retrieval, and response generation.
        """
        # Step 1: Rewrite the user question
        rewritten_query = self.rewrite_query(question, chat_history)

        # Step 2: Retrieve context using Cortex Search Service
        context = self.retrieve_context(rewritten_query, pet_type)
        print("Retrieved Context:", context)


        # Step 3: Generate a response
        answer = self.generate_completion(rewritten_query, context)

        return answer


decorating <function RAG_from_scratch.retrieve_context at 0x10983f100>
decorating <function RAG_from_scratch.rewrite_query at 0x12fac84a0>
decorating <function RAG_from_scratch.generate_completion at 0x10983f380>
decorating <function RAG_from_scratch.query at 0x10984d260>
adding method <class '__main__.RAG_from_scratch'> retrieve_context __main__
adding method <class '__main__.RAG_from_scratch'> rewrite_query __main__
adding method <class '__main__.RAG_from_scratch'> generate_completion __main__
adding method <class '__main__.RAG_from_scratch'> query __main__


In [7]:
from trulens.providers.cortex.provider import Cortex
from trulens.core import Feedback
from trulens.core import Select
import numpy as np
import snowflake.connector


provider = Cortex(snowpark_session, "llama3.1-8b")

f_groundedness = (
    Feedback(provider.groundedness_measure_with_cot_reasons, name="Groundedness")
    .on(Select.RecordCalls.retrieve_context.rets[:].collect())
    .on_output()
)

f_context_relevance = (
    Feedback(provider.context_relevance, name="Context Relevance")
    .on_input()
    .on(Select.RecordCalls.retrieve_context.rets[:])
    .aggregate(np.mean)
)

f_answer_relevance = (
    Feedback(provider.relevance, name="Answer Relevance")
    .on_input()
    .on_output()
    .aggregate(np.mean)
)

print(type(f_groundedness), f_groundedness)
print(type(f_context_relevance), f_context_relevance)
print(type(f_answer_relevance), f_answer_relevance)

✅ In Groundedness, input source will be set to __record__.app.retrieve_context.rets[:].collect() .
✅ In Groundedness, input statement will be set to __record__.main_output or `Select.RecordOutput` .
✅ In Context Relevance, input question will be set to __record__.main_input or `Select.RecordInput` .
✅ In Context Relevance, input context will be set to __record__.app.retrieve_context.rets[:] .
✅ In Answer Relevance, input prompt will be set to __record__.main_input or `Select.RecordInput` .
✅ In Answer Relevance, input response will be set to __record__.main_output or `Select.RecordOutput` .
<class 'trulens.core.feedback.feedback.Feedback'> FeedbackDefinition(Groundedness,
	selectors={'source': Lens().__record__.app.retrieve_context.rets[:].collect(), 'statement': Lens().__record__.main_output},
	if_exists=None
)
<class 'trulens.core.feedback.feedback.Feedback'> FeedbackDefinition(Context Relevance,
	selectors={'question': Lens().__record__.main_input, 'context': Lens().__record__.app.r

In [9]:
from trulens.apps.custom import TruCustomApp
from snowflake.core import Root
from snowflake.snowpark import Session
from trulens.apps.custom import instrument


# Initialize Snowflake Cortex Search Service
root = Root(snowpark_session)
cortex_service = root.databases["ANIMAL_DATA"].schemas["PUBLIC"].cortex_search_services["condition_match_search"]

# Initialize your RAG class
rag = RAG_from_scratch(cortex_service=cortex_service)

# Wrap RAG_from_scratch with TruLens for monitoring
tru_rag = TruCustomApp(
    app=rag,
    app_version="1.0.0",
    monitor_costs=False
)

print("App instance:", tru_rag.app)

# Test query
question = "My cat is vomiting. What should I do?"
pet_type = "Large Cat"
chat_history = ["My cat has been lethargic for a few days."]

# Instrumented call using TruLens
answer = tru_rag.app.query(
    question,
    pet_type=pet_type or "Large Cat",
    chat_history=chat_history
)


# Print the result
print(answer)


instrumenting <class '__main__.RAG_from_scratch'> for base <class '__main__.RAG_from_scratch'>
	instrumenting retrieve_context
	instrumenting rewrite_query
	instrumenting generate_completion
	instrumenting query
skipping base <class 'object'> because of class
App instance: <__main__.RAG_from_scratch object at 0x314185d10>
Retrieved Context: ["stupor, and blindness. These are also signs of encephalitis (see page 334).Treatment: Immediately after ingestion, induce vomiting. Seek immediate medical attention. Specific antidotes are available from your veterinarian, who can also do blood tests to determine the lead levels\\n# CORROSIVE HOUSEHOLD PRODUCTS\\nCorrosive and caustic chemicals (acids and alkalis) are found in household cleaners, dishwasher detergents, toilet bowl cleaners, drain decloggers, and commercial solvents. When ingested, they cause burns of the mouth, esopha gus, and stomach. Severe cases are associated with acute perforation of the esophagus and stomach. Later, strictur

In [16]:
# Define your test prompt
prompt = "My cat is vomiting. What should I do?"

# Use mistral-large2 to answer the question
with tru_rag as recording:  # Wrap in TruLens recording
    answer = rag.query(prompt)  # RAG pipeline 使用 mistral-large2 回答问题

# Add feedback to the session
print("Adding Feedbacks to the session...")
print("Groundedness Feedback:", f_groundedness)
print("Context Relevance Feedback:", f_context_relevance)
print("Answer Relevance Feedback:", f_answer_relevance)

# Apply feedback definitions to generate feedback results
groundedness_result = f_groundedness.run()
context_relevance_result = f_context_relevance.run()
answer_relevance_result = f_answer_relevance.run()

print(tru_session.get_records())


# Add feedback results to the session
tru_session.add_feedback(groundedness_result)
tru_session.add_feedback(context_relevance_result)
tru_session.add_feedback(answer_relevance_result)

leaderboard = tru_session.get_leaderboard()
print(leaderboard)


Could not find an instance of OpenAIEndpoint. trulens will create an endpoint for cost tracking.
Could not find an instance of OpenAIEndpoint. trulens will create an endpoint for cost tracking.


calling <function RAG_from_scratch.query at 0x10984d260> with (<__main__.RAG_from_scratch object at 0x314185d10>, 'My cat is vomiting. What should I do?')
calling <function RAG_from_scratch.rewrite_query at 0x12fac84a0> with (<__main__.RAG_from_scratch object at 0x314185d10>, 'My cat is vomiting. What should I do?', None)


Could not find an instance of OpenAIEndpoint. trulens will create an endpoint for cost tracking.


calling <function RAG_from_scratch.retrieve_context at 0x10983f100> with (<__main__.RAG_from_scratch object at 0x314185d10>, 'Rewritten Query:\n              "My cat is experiencing episodes of vomiting. What are the recommended steps I should take to address this medical issue?"', None)


Could not find an instance of OpenAIEndpoint. trulens will create an endpoint for cost tracking.


Retrieved Context: ['sp.|\\n|intolerance/ Enterotoxins||Strongyloides sp.|\\n|Viral Gastroenteritis||Isospora sp.|\\n|Parvovirus||(Spirocerca sp. [Dog])|\\n|Coronavirus||Toxic Gastroenteritis|\\n|Rotavirus||Beta, Agonists|\\n|FeLV/FIV [Cat]||Apomorphine [Dog]|\\n|Canine Distemper Virus||Xylazine|\\n|[Dog]||Syrup of ipecac [Cat]|\\n|Parasitic gastroenteritis||Ephedrine|\\n|Olulanus sp. [Cat] Physaloptera sp.||Local anaesthetics Tricyclic antidepressants|\\n||||\\n||||\\n\\n|Dog||Cat||\\n| :---: | :---: | :---: | :---: |\\n|+++|Food intolerance|+++|Food intolerance|\\n|++|Ingestion of grass (cause or effect?)|++|Ingestion of grass (cause or effect|\\n|++|Gastritis|++|Gastritis|\\n|++|Gastric/Intestinal ulcer|++|Gastric/Intestinal ulcer|\\n|++|Gastrointestinal obstruction|++|Gastrointestinal obstruction|\\n|++|Motility disturbances|++|Motility disturbances|\\n||Organ failure|X|Organ failure|\\nAcute Vomitus\\n|Foreign body|Stenocephala sp.|\\n| :---: | :---: |\\n|Food intolerance/ Enterot

InvalidSelector: Selector __record__.app.retrieve_context.rets[:].collect() does not exist in source data.

In [15]:
from trulens.core.guardrails.base import context_filter

# note: feedback function used for guardrail must only return a score, not also reasons
f_context_relevance_score = Feedback(
    provider.context_relevance, name="Context Relevance"
)


class filtered_RAG_from_scratch(RAG_from_scratch):

    @instrument
    @context_filter(f_context_relevance_score, 0.75, keyword_for_prompt="query")
    def retrieve_context(self, query: str) -> list:
        """
        Retrieve relevant text from vector store.
        """
        return self.retriever.retrieve(query)


filtered_rag = filtered_RAG_from_scratch()

decorating <function context_filter.__call__.<locals>.wrapper at 0x319fcdbc0>
adding method <class '__main__.filtered_RAG_from_scratch'> retrieve_context __main__


TypeError: RAG_from_scratch.__init__() missing 1 required positional argument: 'cortex_service'

In [12]:
from trulens.apps.custom import TruCustomApp

tru_filtered_rag = TruCustomApp(
    filtered_rag,
    app_name="RAG",
    app_version="filtered",
    feedbacks=[f_groundedness, f_answer_relevance, f_context_relevance],
)

NameError: name 'filtered_rag' is not defined

In [11]:
# prompts=[
# "I brought <pet_name> home yesterday from the pet store and she started to have diarreah started this morning, is there anything I can do at the immediate moment to help her? ",
# "I just brought <pet_name> home yesterday, and she has started having diarrhea this morning. Is there anything I can do right now to help her?",
# "<pet_name> has been scratching his ears a lot lately. Could it be an ear infection, and how can I provide relief at home?",
# "My puppy <pet_name> hasn't been eating much for the past couple of days. Is it something to worry about or could it be normal?",
# "<pet_name> has been limping since our walk yesterday. What steps should I take before going to the vet?",
# "I'm noticing that <pet_name> is drinking a lot more water than usual. Is this something that needs immediate attention?",
# "<pet_name> seems to be panting heavily even when it's not hot. Could this be a sign of stress or illness?",
# "I've noticed some redness and swelling around <pet_name>'s eyes. What can I do at home to ease the discomfort?",
# "<pet_name> has been coughing for a couple of days. How do I know if this is something serious or just a minor irritation?",
# "My older dog <pet_name> seems to be disoriented and pacing at night. Could this be related to dementia, and how can I help?",
# ]

prompts = [
    {"question": "My cat is vomiting. What should I do?", "pet_type": "Large Cat", "chat_history": ["My cat has been lethargic for a few days."]},
    {"question": "My dog is not eating. What can I do?", "pet_type": "Large Dog", "chat_history": ["My dog has been refusing food for a day."]}
]


In [None]:
with tru_rag as recording:
    for prompt in prompts:
        answer = rag.query(prompt["question"], pet_type=prompt["pet_type"], chat_history=prompt["chat_history"])
        print(answer)

tru_session.get_leaderboard()

In [10]:
snowpark_session.close()