# ❄️ SKO RAG HOP - Snowflake Cortex with Anthropic and LLM Observability ❄️

This notebook demonstrates how to create a Retrieval-Augmented Generation (RAG) workflow in Snowflake using Cortex Search Services, integrate Anthropic LLMs like Claude 2.5, and evaluate responses with new LLM Observability features. Below is an overview of the flow and its key components.

### Step 1: Parse and Chunk Text from PDFs (BUILD)
### Step 2: Create Cortex Search Service (RETRIEVE)
### Step 3: Test Search Results with Experimental Configurations (AUGMENT)
### Step 4: Pass Retrieved Content to LLMs (GENERATE)
### Step 5: Create RAG Application Class (SERVE)
### Step 6: Observe and Evaluate LLM Performance with AI Observabillity (EVALUATE)

In [None]:
# Import necessary functions
import streamlit as st
from snowflake.snowpark.context import get_active_session
session = get_active_session()

# Define image in a stage and read the file
image=session.file.get_stream("@SKO.HOP.RAG/Flow.jpg", decompress=False).read() 

# Display the image
st.image(image, width=800)

In [None]:
import snowflake.snowpark as snowpark

from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
-- List files in the stage to identify PDFs
LS @SKO.HOP.RAG;

## Step 1: Parse and Chunk Text from PDFs
We begin by parsing the content of uploaded PDFs and chunking the text using Snowflake's [PARSED_TEXT](https://docs.snowflake.com/sql-reference/functions/parse_document-snowflake-cortex) and [SPLIT_TEXT_RECURSIVE_CHARACTER](https://docs.snowflake.com/sql-reference/functions/split_text_recursive_character-snowflake-cortex) features. These steps structure the text into manageable segments optimized for retrieval. To ensure that the PDF parsing and chunking have been processed correctly, we run queries on the parsed and chunked tables. This step helps verify the integrity of the content.

Objective: **Transform unstructured content into indexed chunks for efficient search and retrieval.**

Key Outputs:
- SKO.HOP.PARSED_TEXT: Table containing the raw text.
- SKO.HOP.CORTEX_CHUNK: Chunked, searchable content.

In [None]:
-- Create a table to hold the extracted text from the PDF files loaded in the SKO.HOP.RAG stage

-- Complete the missing code (???) to use create a table called PARSED_TEXT

CREATE OR REPLACE TABLE ???.???.??? (relative_path VARCHAR(500), raw_text VARIANT);

In [None]:
-- Use Snowflake's new PARSED_TEXT feature to extract the text from the PDFs loaded in @SKO.HOP.RAG stage
-- Cortex PARSE_DOCUMENT documentation link is https://docs.snowflake.com/sql-reference/functions/parse_document-snowflake-cortex

-- Complete the missing code (???) to:
---- Insert into your newly created PARSED_TEXT table
---- Use Cortex PARSE_DOCUMENT feature and ocr mode

INSERT INTO ???.???.??? (relative_path, raw_text)
WITH pdf_files AS (
    SELECT DISTINCT
        METADATA$FILENAME AS relative_path
    FROM @SKO.HOP.RAG
    WHERE METADATA$FILENAME ILIKE '%.pdf'
      -- Exclude files that have already been parsed
      AND METADATA$FILENAME NOT IN (SELECT relative_path FROM PARSED_TEXT)
)
SELECT 
    relative_path,
    SNOWFLAKE.???.???(   -- Snowflake feature
        '@SKO.HOP.RAG',  -- Your stage name
        relative_path,   -- File path
        {'mode': '???'}  -- Adjust mode as needed ('layout', 'ocr')
    ) AS raw_text
FROM pdf_files;

In [None]:
-- Use Snowflake's new SPLIT_TEXT_RECURSIVE_CHARACTER feature to chunk parsed text from the PDFs loaded in @SKO.HOP.RAG stage
-- Cortex SPLIT_TEXT_RECURSIVE_CHARACTER documentation link is https://docs.snowflake.com/sql-reference/functions/split_text_recursive_character-snowflake-cortex

-- Complete the missing code (???) to:
---- Create a new table called CORTEX_CHUNK to hold the chunked text from your PDF documents
---- Use Cortex SPLIT_TEXT_RECURSIVE_CHARACTER feature with a 800 chunk size and 100 overlap size

CREATE TABLE ???.???.??? AS
WITH text_chunks AS (
    SELECT
        relative_path,
        SNOWFLAKE.???.???( -- Snowflake feature
            raw_text:content::STRING,  -- Extract the 'content' field from the JSON
            'none',    -- Adjust to 'markdown' if needed
            ???,       -- Adjust chunk size
            ???,       -- Adjust overlap size
            ['\n']     -- Adjust separators
        ) AS chunks
    FROM ???.???.??? -- Your recently created PARSED_TEXT table
)
SELECT
    relative_path,
    c.value AS chunk  -- Extract each chunk of the parsed text
FROM text_chunks,
LATERAL FLATTEN(INPUT => chunks) c;

In [None]:
# Select a PDF file to view
PDF_name = 'Arctic Embed Multilingual.pdf'

In [None]:
-- check the RAW_TEXT to ensure the PDF was parsed as expected
-- Complete the missing code (???) to check the RAW_TEXT to ensure the PDF was parsed as expected

SELECT RELATIVE_PATH, RAW_TEXT 
FROM ???.???.???  -- Your Your recently created PARSED_TEXT table
WHERE RELATIVE_PATH ILIKE '%' || '{{PDF_name}}' || '%';

In [None]:
-- check the CORTEX_CHUNK to ensure the PDF was chunked as expected
-- Complete the missing code (???) to check the CORTEX_CHUNK to ensure the PDF was chunked as expected

SELECT * 
FROM ???.???.??? -- Your recently created CORTEX_CHUNK table 
WHERE RELATIVE_PATH ILIKE '%' || '{{PDF_name}}' || '%';

## Step 2: Create Cortex Search Service
Next, we create a [Cortex Search Service](https://docs.snowflake.com/LIMITEDACCESS/cortex-search/cortex-search-overview#overview) that enables retrieval of relevant text chunks for any query. This service uses the CHUNK column from the chunked table as the indexed content.

Purpose: **Index and search chunked content to support the RAG pipeline.**

Command:
```sql
CREATE OR REPLACE CORTEX SEARCH SERVICE SKO.HOP.RAG_SEARCH_SERVICE ON SEARCH_COL WAREHOUSE = COMPUTE_WH TARGET_LAG = '1 day' AS SELECT  ...;
```

In [None]:
# Define image in a stage and read the file
image=session.file.get_stream("@SKO.HOP.RAG/CortexSearch.jpg", decompress=False).read() 
st.image(image, width=800)

In [None]:
-- Create a search service over your new chunked pdf table that has one searchable text
-- Cortex Search Service documentation link is https://docs.snowflake.com/LIMITEDACCESS/cortex-search/cortex-search-overview#overview

-- Complete the missing code (???) to:
---- Create a search service called SKO.HOP.RAG_SEARCH_SERVICE to run over your new chunked pdf table
---- Queries to the service will search on a new column called SEARCH_COL 
---- Use an x-small warehouse
---- Use a target_lag of 365 days
---- SEARCH_COL is the name of the concatenation of RELATIVE_PATH and CHUNK from the CORTEX_CHUNK table

CREATE CORTEX SEARCH SERVICE ???.???.???
    ON ???
    WAREHOUSE = COMPUTE_WH
    TARGET_LAG = '365 days'
    AS SELECT 
        RELATIVE_PATH,
        CHUNK,
    (RELATIVE_PATH || ' ' || CHUNK) AS ???
FROM ???.???.???; -- Your chunked table

## Step 3: Test Search Results with Experimental Configurations
We will now evaluate [Snowflake Cortex Experimental Knobs](https://docs.google.com/document/d/1HkHtDiY3CmzpSewCe_s9fpMNE5spOUvNSwr6CxFerqE/edit?usp=sharing) to fine-tune the retrieval service and analyze confidence scores and result rankings across configurations. These tests focus on boosting, recency, headers, and reranking to optimize search relevance.

**Configurations Tested:**
- **Boosted vs. Unboosted:** Compare the impact of keyword emphasis on rankings and scores.
- **Time-Based Decays:** Test how prioritizing recent documents affects relevance.
- **Header Boosts:** Evaluate the influence of structured headers (e.g., Markdown) on ranking.
- **Reranked vs. Non-Reranked:** Analyze trade-offs between query latency and search quality.

**Key Metrics:**
- **Confidence Scores:** Global relevance scores (0–3) for each result.
- **Result Rankings:** Position changes reveal the effectiveness of configurations.

By testing these configurations, we aim to enhance Cortex Search Service performance for specific use cases.

In [None]:
# Define image in a stage and read the file
image=session.file.get_stream("@SKO.HOP.RAG/CortexSearchEnhancements.jpg", decompress=False).read() 
st.image(image, width=800)

In [None]:
-- This query compares Cortex Search Service results across multiple experimental settings: boosted (using softBoosts), unboosted (default settings), and decayed (time-based decay).
-- The results are presented side-by-side to analyze the impact of each configuration on confidence scores and document ranking for matching search columns.
-- This analysis helps evaluate the effectiveness of boosting and decay strategies in improving search relevance and recency-based ranking.

-- Complete the missing code (???) to:
---- Call your SKO.HOP.RAG_SEARCH_SERVICE to test experimental configurations
---- Use the query "How does Cortex support multilingual queries?" 
---- For the boosted_results section of the query, use the phrase "Arctic Embed Multilingual.pdf"
---- Set returnConfidenceScores to true for all

WITH boosted_results AS (
    SELECT DISTINCT
        VALUE:"SEARCH_COL"::STRING AS SearchColumn,
        VALUE:"@CONFIDENCE_SCORE"::STRING AS ConfidenceScore
    FROM (
        SELECT PARSE_JSON(
            SNOWFLAKE.CORTEX.SEARCH_PREVIEW(
                '???.???.???',
                '{
                    "query": "???",
                    "experimental": {
                        "softBoosts": [
                            { "phrase": "???" }
                        ],
                        "returnConfidenceScores": ???
                    }
                }'
            )
        ) AS boosted_json
    ),
    LATERAL FLATTEN(input => boosted_json:"results")
),
unboosted_results AS (
    SELECT DISTINCT
        VALUE:"SEARCH_COL"::STRING AS SearchColumn,
        VALUE:"@CONFIDENCE_SCORE"::STRING AS ConfidenceScore
    FROM (
        SELECT PARSE_JSON(
            SNOWFLAKE.CORTEX.SEARCH_PREVIEW(
                'SKO.HOP.RAG_SEARCH_SERVICE',
                '{
                    "query": "???",
                    "experimental": {
                        "returnConfidenceScores": ???
                    }
                }'
            )
        ) AS unboosted_json
    ),
    LATERAL FLATTEN(input => unboosted_json:"results")
),
decayed_results AS (
    SELECT DISTINCT
        VALUE:"SEARCH_COL"::STRING AS SearchColumn,
        VALUE:"@CONFIDENCE_SCORE"::STRING AS ConfidenceScore
    FROM (
        SELECT PARSE_JSON(
            SNOWFLAKE.CORTEX.SEARCH_PREVIEW(
                'SKO.HOP.RAG_SEARCH_SERVICE',
                '{
                    "query": "???",
                    "experimental": {
                        "decays": {
                            "last_modified": {
                                "weight": 1.0,
                                "limitHours": 240.0
                            }
                        },
                        "returnConfidenceScores": ???
                    }
                }'
            )
        ) AS decayed_json
    ),
    LATERAL FLATTEN(input => decayed_json:"results")
)
SELECT 
    COALESCE(b.SearchColumn, u.SearchColumn, d.SearchColumn) AS SearchColumn,
    b.ConfidenceScore AS BoostedConfidenceScore,
    u.ConfidenceScore AS UnboostedConfidenceScore,
    d.ConfidenceScore AS DecayedConfidenceScore
FROM
    boosted_results b
FULL OUTER JOIN unboosted_results u
    ON b.SearchColumn = u.SearchColumn
FULL OUTER JOIN decayed_results d
    ON COALESCE(b.SearchColumn, u.SearchColumn) = d.SearchColumn;

In [None]:
-- Query compares Cortex Search Service results using header boosts
-- Analyzes the impact of prioritizing header matches on document ranking and confidence scores

-- Complete the missing code (???) to:
---- Call your SKO.HOP.RAG_SEARCH_SERVICE to test experimental configurations
---- Use the query "How does Cortex support multilingual queries?" 
---- Use header boost multiplier of 2.0 to double the importance of header matches in comparison to body text matches
---- Set returnConfidenceScores to true

WITH header_boost_results AS (
    SELECT DISTINCT
        VALUE:"SEARCH_COL"::STRING AS SearchColumn,
        VALUE:"@CONFIDENCE_SCORE"::STRING AS ConfidenceScore
    FROM (
        SELECT PARSE_JSON(
            SNOWFLAKE.CORTEX.SEARCH_PREVIEW(
                '???.???.???',
                '{
                    "query": "???",
                    "experimental": {
                        "retrievalWeights": {
                            "headerBoost": {
                                "multiplier": ???,
                                "skipStopWords": true
                            }
                        },
                        "returnConfidenceScores": ??
                    }
                }'
            )
        ) AS header_boost_json
    ),
    LATERAL FLATTEN(input => header_boost_json:"results")
)
SELECT 
    SearchColumn,
    ConfidenceScore AS HeaderBoostConfidenceScore
FROM
    header_boost_results;

## Step 4: Pass Retrieved Content to LLMs
This step demonstrates how to pass retrieved contextual content to various LLMs using the Snowflake Cortex [`COMPLETE`](https://docs.snowflake.com/en/sql-reference/functions/complete-snowflake-cortex) function. The process includes:

- **Retrieving Contextual Information**: Context is fetched from the search service.
- **Generating Structured Prompts**: The retrieved context is injected into prompts for LLMs.
- **LLM Interaction**: Prompts are passed to models like `mistral-7b`, `mistral-large2`, and `Anthropic Claude 3.5` for response generation.
- **Comparative Analysis**: Model outputs are compared for quality, relevance, and coherence.

Example Query:
```sql
SELECT SNOWFLAKE.CORTEX.COMPLETE(
    'claude-3-5-sonnet',
    CONCAT('Your context: ', (SELECT LISTAGG(CHUNK, ' ') FROM searchresults))
) AS RESPONSE
FROM searchresults;
```

In [None]:
# Define image in a stage and read the file
image=session.file.get_stream("@SKO.HOP.RAG/CortexSearch_Complete.jpg", decompress=False).read() 
st.image(image, width=800)

**Queries to test the capabilities of the LLMs based on the PDF content:**
- How does Snowflake simplify the deployment of retrieval-augmented generation (RAG) workflows?
- How does Snowflake's Arctic Embed 2.0 enhance multilingual search capabilities while maintaining efficiency and quality?
- How does Snowflake's FeatEng benchmark enhance the evaluation of large language models (LLMs) in data science tasks?
- How does Snowflake's model hotswapping enhance the efficiency and scalability of LLM inference?

In [None]:
# Query your Snowflake Cortex Search Service using the Snowpark Python API to retrieve and process search results.

# Complete the missing code (???) to:
## Specify your database 'SKO', your schema 'HOP', and your Cortex Search Service named 'RAG_SEARCH_SERVICE'
## Specify your SEARCH_COL as the column of interest
                           
from snowflake.snowpark import Session
from snowflake.core import Root
root = Root(session)

transcript_search_service = (root
  .databases['???']
  .schemas['???']
  .cortex_search_services['???']
)

resp = transcript_search_service.search(
  query="""How does Snowflake simplify the deployment of retrieval-augmented generation (RAG) workflows?""",
  columns=['???'],
  limit=3
)
results = resp.results

context_str = ""
for i, r in enumerate(results):
    context_str += f"Context document {i+1}: {r['???']}\n****************\n"

print(context_str)
df = session.create_dataframe(resp.results)
df.create_or_replace_temp_view("searchresults")

In [None]:
# Define image in a stage and read the file
image=session.file.get_stream("@SKO.HOP.RAG/Claude35Sonnet.jpg", decompress=False).read() 

# Display the image
st.image(image, width=800)

In [None]:
-- Create a temporary table with the LLM responses

-- Complete the missing code (???) to:
---- Create a TEMPORARY table called LLMResults
---- Use mistral-7b for the MISTRAL_7B (first model)
---- Use mistral-large2 for the MISTRAL_LARGE2 (second model)
---- Use the Anthropic model (claude-3-5-sonnet) for the CLAUDE_35 (third model)

CREATE OR REPLACE ??? TABLE ??? AS
WITH PROMPT_TEXT AS (
  SELECT CONCAT(
    'You are a helpful AI assistant specialized in assisting Sales Engineers...',
    (SELECT LISTAGG(SEARCH_COL, ' ') FROM searchresults),
    ' Focus on key points and avoid unnecessary details.'
  ) AS P
)
SELECT 
   SNOWFLAKE.CORTEX.COMPLETE('???', (SELECT P FROM PROMPT_TEXT)) AS MISTRAL_7B,
   SNOWFLAKE.CORTEX.COMPLETE('???', (SELECT P FROM PROMPT_TEXT)) AS MISTRAL_LARGE2,
   SNOWFLAKE.CORTEX.COMPLETE('???', (SELECT P FROM PROMPT_TEXT)) AS CLAUDE_35;

In [None]:
df = session.sql("SELECT * FROM LLMResults").to_pandas()
st.subheader("Output for Mistral-7b LLM")
mistral_7b_value = df.iloc[0]["MISTRAL_7B"]
st.code(mistral_7b_value, language="text")

In [None]:
st.subheader("Output for Mistral-Large2 LLM")
mistral_7b_value = df.iloc[0]["MISTRAL_LARGE2"]
st.code(mistral_7b_value, language="text")

In [None]:
st.subheader("Output for Anthropic Claude 3.5 Sonnet LLM")
claude_rag = df.iloc[0]["CLAUDE_35"]
st.code(claude_rag, language="text")

## Step 5: Create RAG Application Classes

In this step, we will create two Python classes to build a Retrieval-Augmented Generation (RAG) pipeline:

1. **`CortexSearchRetriever`**:
   - This class interacts with the Cortex Search Service to retrieve relevant contextual information based on a user query.
   - It connects to the Cortex Search Service using Snowflake's `Root` object and performs a search with the specified query and result limit.
   - The retrieved context (a list of relevant chunks) will be used to generate prompts for LLMs.

2. **`RAGWithObservability`**:
   - This class integrates the retrieval functionality with a specified Large Language Model (LLM) to complete the RAG pipeline.
   - It uses the retriever to fetch context, creates a structured prompt by combining the context with the user query, and generates a response using the Snowflake Cortex `COMPLETE` function.
   - The class allows testing of different LLMs (e.g., `llama3.1-8b`, `mistral-7b`, `claude-3-5-sonnet`) by specifying the desired model.

### Workflow Summary:
1. The `CortexSearchRetriever` retrieves relevant context from the Cortex Search Service.
2. The `RAGWithObservability` uses this context to create prompts and generate responses with the specified LLM.

These two classes work together to streamline the RAG pipeline, enabling efficient retrieval and response generation for various use cases.

In [None]:
# Define the retriever class for interacting with the Cortex Search Service

# Complete the missing code (???) to:
## Specify your database 'SKO', your schema 'HOP', and your Cortex Search Service named 'RAG_SEARCH_SERVICE'
## Specify your SEARCH_COL as the column of interest
## Intialize retriever with your CortexSearchRetriever class
## Use "What are some components of the Snowflake Cortex offering?" for the test_query

from typing import List
from snowflake.snowpark import Session
from snowflake.core import Root

# CortexSearchRetriever
class CortexSearchRetriever:
    def __init__(self, session: Session, limit_to_retrieve: int = 4):
        self._session = session
        self._limit_to_retrieve = limit_to_retrieve
        

    def retrieve(self, query: str) -> List[str]:
        root = Root(session)
        cortex_search_service = (
            root
            .databases["???"]
            .schemas["???"]
            .cortex_search_services["???"]
        )
        resp = cortex_search_service.search(
            query=query,
            columns=["???"],
            limit=self._limit_to_retrieve,
        )
        return [row["???"] for row in resp.results] if resp.results else []

# Initialize the retriever
retriever = ???(session=session, limit_to_retrieve=5)
test_query = "???"
retrieved_context = retriever.retrieve(query=test_query)
print(retrieved_context)

In [None]:
# Create the RAGWithObservability class to structure the RAG pipeline
from snowflake.cortex import Complete
from trulens.apps.custom import instrument


class RAGWithObservability():
    def __init__(self, llm_model, retriever):
        self.llm_model = llm_model
        self.retriever = retriever
        # self.retriever = CortexSearchRetriever(session=session, limit_to_retrieve=4)
#Here we're using the @instrument decorator to indicate to trulens that we want to trace this step of the applicaiton
    @instrument
    def retrieve_context(self, query: str) -> List[str]:
        return self.retriever.retrieve(query)

    @instrument
    def create_prompt(self, query: str) -> str:
        retrieved_context = self.retrieve_context(query)
        prompt = f"""
        You are an expert assistant extracting information from context provided.
        Answer the question based on the context. Be concise and do not hallucinate.
        If you don't have the information, just say so.
        Context: {' '.join(retrieved_context)}
        Question: {query}
        Answer:
        """
        return prompt, retrieved_context

    @instrument
    def query(self, query: str):
    
        prompt, retrieved_context =self.create_prompt(query)
        
        df_response = Complete(self.llm_model, prompt)
        return df_response


#Define LLM classes
llama_rag = RAGWithObservability('llama3.1-8b', retriever)
mistral7b_rag = RAGWithObservability('mistral-7b', retriever)
claude_rag = RAGWithObservability('claude-3-5-sonnet', retriever)

#Get responses
llama_response = llama_rag.query(test_query)
mistral_response = mistral7b_rag.query(test_query)
claude_response = claude_rag.query(test_query)

# Print responses
print(f"Query: {test_query}")
print(f"Llama response -  {llama_response}")
print(f"Mistral-7b response - {mistral_response}")
print(f"Claude response -  {llama_response}")

## Step 6: Observe and Evaluate LLM Performance with AI Observabillity (Trulens)

**Defining observability parameters for LLM performance using Trulens**
From this step, we enhance the Retrieval-Augmented Generation (RAG) process by introducing observability. Observability ensures that LLM responses can be measured and evaluated based on various feedback metrics, providing insights into the model's performance and areas for improvement.

**How TruSession Connects to Snowflake**
The `TruSession` object establishes a connection between Trulens and Snowflake using the `SnowflakeConnector`. This connection enables Trulens to access Snowflake Cortex data and evaluate LLM performance based on **defining feedback metrics** like:
- **Answer Relevance** - how relevant is the response from the model to the user's prompt?
- **Context Relevance** - how relevant is the retrieved context to the user's prompt?
- **Groundedness**  - how well grounded in the retrieved context is the LLM's response? 
- **Concisenss** - how concise is the LLM's response?
- **Coherance** - how coherent is the LLM's response?
#### Note that these are just a few of the possible feedback functions we could use for evaluation here, see [docs](https://www.trulens.org/getting_started/core_concepts/rag_triad/#putting-it-together) for more detail on these feedback functions.

In [None]:
# Define image in a stage and read the file
image=session.file.get_stream("@SKO.HOP.RAG/AIObservability.jpg", decompress=False).read() 

# Display the image
st.image(image, width=800)

In [None]:
from trulens.core import TruSession
from trulens.connectors.snowflake import SnowflakeConnector
tru_snowflake_connector = SnowflakeConnector(snowpark_session=session)

# Initialize TruLens session
tru_session = TruSession(connector=tru_snowflake_connector)

# Confirmation message
print("TruLens session successfully initialized.")

In [None]:
# Set up feedback functions to evaluate the model's performance.
from trulens.providers.cortex.provider import Cortex
from trulens.core import Feedback, SnowflakeFeedback, Select
import numpy as np
from functools import partial 

# Use llama3.1-8b as the LLM that executes evaluations
provider = Cortex(session, "llama3.1-8b")

# Answer relevance - how relevant is the response to the prompt?
f_answer_relevance = (
    Feedback(provider.relevance_with_cot_reasons, name="Answer Relevance")
    .on(Select.RecordInput.collect())  # Use `collect()` for Trulens
    .on(Select.RecordOutput.collect())  # Use `collect()` for Trulens
    .aggregate(np.mean)
)

# Context relevance - how relevant is the retrieved context to the prompt?
f_context_relevance = (
    Feedback(provider.context_relevance, name="Context Relevance")
    .on(Select.RecordInput.collect())  # Use `collect()` for Trulens
    .on(Select.RecordCalls.retrieve_context.rets.collect())  # Use `collect()` for Trulens
    .aggregate(np.mean)
)

# Groundedness - how well-grounded in the retrieved context is the response?
f_groundedness = (
    Feedback(
        partial(provider.groundedness_measure_with_cot_reasons, use_sent_tokenize=False), 
        name="Groundedness", 
        use_sent_tokenize=False
    )
    .on(Select.RecordCalls.retrieve_context.rets.collect())  # Use `collect()`
    .on_output()
)

# Conciseness - how concise is the LLM's response?
f_conciseness = Feedback(provider.conciseness, name="Conciseness").on_output()

# Coherence - how coherent is the LLM's response?
f_coherence = Feedback(provider.coherence, name="Coherence").on_output()

# Combine all feedback functions into a list
feedbacks = [f_context_relevance, f_groundedness, f_answer_relevance, f_conciseness, f_coherence]

In [None]:
# Test Prompts
prompts = [
    "What are some pitfalls of gen AI workloads?",
    "What languages can I use to embed text into vectors in snowflake?",
    "How does Snowflake simplify the deployment of retrieval-augmented generation (RAG) workflows?",
    "How does a multi-turn conversation with cortex analyst work?",
    "How does neuroscience influence AI architectures?",
    "Who is an expert on Doc AI at Snowflake?",
    "Can I use cortex analyst on multiple tables in a star schema?",
    "What are some common LLM benchmark tests"
]

In [None]:
from trulens.apps.custom import TruCustomApp

# Test prompts
prompts = [
    "What are some components of the Snowflake Cortex offering?",
    "How does Snowflake simplify the deployment of retrieval-augmented generation workflows?",
    "How does a multi-turn conversation with cortex analyst work?",
    "How does neuroscience influence AI architectures?",
]

# Define LLM configurations
llms = [llama_rag,mistral7b_rag,claude_rag]

# Iterate through LLMs and test with observability
for llm in llms:
    print(f"Testing LLM: {llm.llm_model}")

    # Wrap RAG class with Trulens for observability
    tru_rag = TruCustomApp(
        llm,
        app_id="SKO_OBSERVABILITY_1",
        app_version=llm.llm_model,
        feedbacks=feedbacks,
        metadata={"model_name": llm.llm_model}
    )

    # Test the pipeline
    with tru_rag as recording:
        for prompt in prompts:
            try:
                response = llm.query(prompt)
                print(f"Prompt: {prompt}")
                print(f"Response: {response}\n")
            except Exception as e:
                print(f"Error processing prompt: {prompt}\nError: {str(e)}\n")
                print(f"Failed prompt:\n{prompt}\n{'-' * 50}")
            print("    \n")

In [None]:
leaderboard = tru_session.get_leaderboard()
leaderboard

In [None]:
# Define image in a stage and read the file
image=session.file.get_stream("@SKO.HOP.RAG/AIObsApp.jpg", decompress=False).read() 

# Display the image
st.image(image, width=800)

In [None]:
image1=session.file.get_stream("@SKO.HOP.RAG/Anthropic.jpg", decompress=False).read() 
st.image(image1, width=800)
image2=session.file.get_stream("@SKO.HOP.RAG/Summary2.jpg", decompress=False).read() 
st.image(image2, width=800)