# Module 2: Build a RAG with Cortex Search

## Create the database, tables and warehouse

In [None]:
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
CREATE DATABASE IF NOT EXISTS cortex_search_tutorial_db;

CREATE OR REPLACE WAREHOUSE cortex_search_tutorial_wh WITH
     WAREHOUSE_SIZE='X-SMALL'
     AUTO_SUSPEND = 120
     AUTO_RESUME = TRUE
     INITIALLY_SUSPENDED=TRUE;

 USE WAREHOUSE cortex_search_tutorial_wh;

Note:

The CREATE DATABASE statement creates a database. The database automatically includes a schema named PUBLIC.

The CREATE WAREHOUSE statement creates an initially suspended warehouse.

## Get PDF data

You will use a sample dataset of the Federal Open Market Committee (FOMC) meeting minutes for this example. This is a sample of twelve 10-page documents with meeting notes from FOMC meetings from 2023 and 2024. Download the files directly from your browser by following this link:

[FOMC minutes sample](https://drive.google.com/file/d/1C6TdVjy6d-GnasGO6ZrIEVJQRcedDQxG/view)

The complete set of FOMC minutes can be found at the [US Federal Reserve’s website](https://www.federalreserve.gov/monetarypolicy/fomccalendars.htm).

Note: In a non-classroom setting, you would bring your own data, possibly already in a Snowflake stage.

## Load data into Snowflake stage

In [None]:
CREATE OR REPLACE STAGE cortex_search_tutorial_db.public.fomc
    DIRECTORY = (ENABLE = TRUE)
    ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');

Now upload the dataset. You can upload the dataset in Snowsight or using SQL. To upload in Snowsight:

1. Sign in to Snowsight.

2. Select Data in the left-side navigation menu.

3. Select your database cortex_search_tutorial_db.

4. Select your schema public.

5. Select Stages and select fomc.

6. On the top right, Select the + Files button.

7. Drag and drop files into the UI or select Browse to choose a file from the dialog window.

8. Select Upload to upload your file.

## Verify the PDF Files are uploaded to stage

In [None]:
ls @cortex_search_tutorial_db.public.fomc

## Parse PDF Files

In [None]:
CREATE OR REPLACE TABLE CORTEX_SEARCH_TUTORIAL_DB.PUBLIC.PARSED_FOMC_CONTENT AS SELECT 
      relative_path,
      TO_VARCHAR(
        SNOWFLAKE.CORTEX.PARSE_DOCUMENT(
          @cortex_search_tutorial_db.public.fomc, 
          relative_path, 
          {'mode': 'LAYOUT'}
        ) :content
      ) AS parsed_text
    FROM directory(@cortex_search_tutorial_db.public.fomc)
    WHERE relative_path LIKE '%.pdf'

In [None]:
SELECT * FROM CORTEX_SEARCH_TUTORIAL_DB.PUBLIC.PARSED_FOMC_CONTENT LIMIT 2

## Chunk text

In [None]:
CREATE OR REPLACE TABLE CORTEX_SEARCH_TUTORIAL_DB.PUBLIC.CHUNKED_FOMC_CONTENT (
    file_name VARCHAR,
    CHUNK VARCHAR
);

INSERT INTO CORTEX_SEARCH_TUTORIAL_DB.PUBLIC.CHUNKED_FOMC_CONTENT (file_name, CHUNK)
SELECT
    relative_path,
    c.value AS CHUNK
FROM
    CORTEX_SEARCH_TUTORIAL_DB.PUBLIC.PARSED_FOMC_CONTENT,
    LATERAL FLATTEN( input => SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER (
        parsed_text,
        'markdown',
        1800,
        250
    )) c;

In [None]:
SELECT * FROM CORTEX_SEARCH_TUTORIAL_DB.PUBLIC.CHUNKED_FOMC_CONTENT LIMIT 10

## Create Search Service

In [None]:
CREATE OR REPLACE CORTEX SEARCH SERVICE CORTEX_SEARCH_TUTORIAL_DB.PUBLIC.FOMC_SEARCH_SERVICE
    ON chunk
    WAREHOUSE = cortex_search_tutorial_wh
    TARGET_LAG = '1 minute'
    EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
    AS (
    SELECT
        file_name,
        chunk
    FROM CORTEX_SEARCH_TUTORIAL_DB.PUBLIC.CHUNKED_FOMC_CONTENT
    );

## Use the Search Service

In [None]:
import os
from snowflake.core import Root
from typing import List
from snowflake.snowpark.session import Session

class CortexSearchRetriever:

    def __init__(self, snowpark_session: Session, limit_to_retrieve: int = 4):
        self._snowpark_session = snowpark_session
        self._limit_to_retrieve = limit_to_retrieve

    def retrieve(self, query: str) -> List[str]:
        root = Root(session)

        search_service = (root
          .databases["CORTEX_SEARCH_TUTORIAL_DB"]
          .schemas["PUBLIC"]
          .cortex_search_services["FOMC_SEARCH_SERVICE"]
        )
        resp = search_service.search(
          query=query,
          columns=["chunk"],
          limit=self._limit_to_retrieve
        )

        if resp.results:
            return [curr["chunk"] for curr in resp.results]
        else:
            return []

In [None]:
retriever = CortexSearchRetriever(snowpark_session=session, limit_to_retrieve=1)

retrieved_context = retriever.retrieve(query="how was inflation expected to evolve in 2024?")

retrieved_context

## Create a RAG

Now that we have setup Cortex Search to be our retriever, we can add Cortex Complete for generation to build our RAG.

We'll also add TruLens instrumentation with the @instrument decorator to our app.

The first thing we need to do however, is to set the database connection to Snowflake where we'll log the traces and evaluation results from our application. This way we have a stored record that we can use to understand the app's performance.

In [None]:
from trulens.core import TruSession
from trulens.connectors.snowflake import SnowflakeConnector

tru_snowflake_connector = SnowflakeConnector(snowpark_session=session, init_server_side=True)

tru_session = TruSession(connector=tru_snowflake_connector)

In [None]:
from snowflake.cortex import Complete
from trulens.apps.custom import instrument
class RAG:

    def __init__(self):
        self.retriever = CortexSearchRetriever(snowpark_session=session, limit_to_retrieve=4)

    @instrument
    def retrieve_context(self, query: str) -> list:
        """
        Retrieve relevant text from vector store.
        """
        return self.retriever.retrieve(query)

    @instrument
    def generate_completion(self, query: str, context_str: list) -> str:
        """
        Generate answer from context.
        """
        prompt = f"""
          You are an expert assistant extracting information from context provided.
          Answer the question in long-form, fully and completely, based on the context. Do not hallucinate.
          If you don´t have the information just say so.
          Context: {context_str}
          Question:
          {query}
          Answer:
        """
        response = ""
        stream = Complete("mistral-large2", prompt, stream = True)
        for update in stream:    
          response += update
          print(update, end = '')
        return response
        
    def query(self, query: str) -> str:
        context_str = self.retrieve_context(query)
        return self.generate_completion(query, context_str)


rag = RAG()

## Query the RAG

In [None]:
response = rag.query("What were the strongest components to gdp growth in q4?")

## Create Evaluations

In [None]:
from trulens.providers.cortex.provider import Cortex
from trulens.core.feedback.feedback import SnowflakeFeedback
from trulens.core import Select
import numpy as np

provider = Cortex(session, "mistral-large2")

f_groundedness = (
    SnowflakeFeedback(provider.groundedness_measure_with_cot_reasons, name="Groundedness")
    .on(Select.RecordCalls.retrieve_context.rets[:].collect())
    .on_output()
)

f_context_relevance = (
    SnowflakeFeedback(provider.context_relevance, name="Context Relevance")
    .on_input()
    .on(Select.RecordCalls.retrieve_context.rets[:])
    .aggregate(np.mean)
)

f_answer_relevance = (
    SnowflakeFeedback(provider.relevance, name="Answer Relevance")
    .on_input()
    .on_output()
    .aggregate(np.mean)
)

## Register the App

In [None]:
from trulens.apps.custom import TruCustomApp
from trulens.core.schema.app import RecordIngestMode

tru_rag = TruCustomApp(
    rag,
    app_name="FOMC RAG",
    app_version="simple",
    feedbacks=[f_groundedness, f_answer_relevance, f_context_relevance],
    record_ingest_mode=RecordIngestMode.BUFFERED,
)

## Run the App with AI Observability

In [None]:
with tru_rag as recording:
    response = rag.query("how was inflation expected to evolve in 2024?")
    print(response)

In [None]:
with tru_rag as recording:
    response = rag.query("What is the target range for the federal funds rate as of jan 2025?")
    print(response)

## Navigate to AI > Applications to view AI Observability

You may notice that there is low context relevance and groundedness scores for the second query regarding the federal funds rate.

This is a common symptom of an out-of-date knowledge base, where the user has asked a question of knowledge that our search service doesn't yet know about.

## Automatic Processing of New Documents

We can use Snowflake features Streams and Task to automatically process new PDF files as they are added into Snowflake.

The tasks and streams will utilize the same parsing and chunking queries we built earlier in the notebook.

In [None]:
CREATE OR REPLACE STREAM cortex_search_tutorial_db.public.fomc_docs_stream
ON STAGE cortex_search_tutorial_db.public.fomc;

In [None]:
CREATE OR REPLACE STREAM cortex_search_tutorial_db.public.fomc_docs_stream
ON STAGE cortex_search_tutorial_db.public.fomc;

CREATE OR REPLACE TASK cortex_search_tutorial_db.public.fomc_parse_task
    WAREHOUSE = cortex_search_tutorial_wh
    SCHEDULE = '1 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('cortex_search_tutorial_db.public.fomc_docs_stream')
AS
INSERT INTO CORTEX_SEARCH_TUTORIAL_DB.PUBLIC.PARSED_FOMC_CONTENT(RELATIVE_PATH, PARSED_TEXT)
SELECT
      relative_path,
      TO_VARCHAR(
        SNOWFLAKE.CORTEX.PARSE_DOCUMENT(
          @cortex_search_tutorial_db.public.fomc, 
          relative_path, 
          {'mode': 'LAYOUT'}
        ) :content
      ) AS parsed_text
    FROM directory(@cortex_search_tutorial_db.public.fomc)
    WHERE relative_path LIKE '%.pdf'

In [None]:
CREATE OR REPLACE STREAM cortex_search_tutorial_db.public.parsed_fomc_stream
ON TABLE cortex_search_tutorial_db.public.parsed_fomc_content;

In [None]:
CREATE OR REPLACE TASK cortex_search_tutorial_db.public.fomc_chunk_task
    WAREHOUSE = cortex_search_tutorial_wh
    SCHEDULE = '1 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('cortex_search_tutorial_db.public.parsed_fomc_stream')
AS
INSERT INTO CORTEX_SEARCH_TUTORIAL_DB.PUBLIC.CHUNKED_FOMC_CONTENT(FILE_NAME, CHUNK)
SELECT
    s.RELATIVE_PATH AS FILE_NAME,
    chunk.VALUE AS CHUNK
FROM cortex_search_tutorial_db.public.parsed_fomc_stream AS s,
     LATERAL FLATTEN(
       INPUT => SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER (
                  s.PARSED_TEXT,
                  'markdown',
                  1800,
                  250
                )
     ) chunk;

In [None]:
alter task cortex_search_tutorial_db.public.fomc_parse_task resume;
alter task cortex_search_tutorial_db.public.fomc_chunk_task resume;

Now, go upload a new set of minutes from the FOMC website. If we execute the below query while the stream is processing, we can view it.

In [None]:
select * from fomc_docs_stream;

It will return no value once the doc has been processed. Once the document is avilable in the docs_chunks_table, the Snowflake Cortex Search service will automaticaly index it according to the TARGET_LAG that was specified when the serrvice was created.

Once you have finish testing uploading new documents and asking questions, you may want to suspend the task:

In [None]:
with tru_rag as recording:
    response = rag.query("What is the target range for the federal funds rate as of jan 2025?")
    print(response)

In [None]:
alter task cortex_search_tutorial_db.public.fomc_parse_task suspend;
alter task cortex_search_tutorial_db.public.fomc_chunk_task suspend;