# Build a Retrieval Augmented Generation (RAG) based LLM assistant using Streamlit and Snowflake Cortex Search

*NOTE: For prerequisites and other instructions, please refer to the [QuickStart Guide](https://quickstarts.snowflake.com/guide/ask_questions_to_your_own_documents_with_snowflake_cortex_search/index.html#0).*

## Setup

Create a database and a schema.

In [None]:
--CREATE DATABASE If NOT EXISTS CC_QUICKSTART_CORTEX_SEARCH_DOCS;
--CREATE SCHEMA If NOT EXISTS DATA;

## Organize Documents and Create Pre-Processing Function

Step 1. Download sample [PDF documents](https://github.com/Snowflake-Labs/sfguide-ask-questions-to-your-documents-using-rag-with-snowflake-cortex-search/tree/main).

Step 2. Create a Stage with Directory Table where you will be uploading your documents.

In [None]:
create or replace stage docs ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE') DIRECTORY = ( ENABLE = true );

Step 3. Upload documents to your staging area

- Select Data on the left
- Click on your database CC_QUICKSTART_CORTEX_SEARCH_DOCS
- Click on your schema DATA
- Click on Stages and select DOCS
- On the top right click on the **+Files** botton
- Drag and drop the PDF documents you downloaded

Step 4. Check files has been successfully uploaded

In [None]:
ls @docs;

## Pre-process and Label Documents

Step 1. Create the table where we are going to store the chunks for each PDF.

We are going to leverage Snowflake native document processing functions to prepare documents before enabling Cortex Search. We are also going to use Cortex CLASSIFY_TEXT function in order to label the type of document being processed so we can use that metadata to filter searches.

In [None]:
CREATE or replace TEMPORARY table RAW_TEXT AS
SELECT 
    RELATIVE_PATH,
    SIZE,
    FILE_URL,
    build_scoped_file_url(@docs, relative_path) as scoped_file_url,
    TO_VARCHAR (
        SNOWFLAKE.CORTEX.PARSE_DOCUMENT (
            '@docs',
            RELATIVE_PATH,
            {'mode': 'LAYOUT'} ):content
        ) AS EXTRACTED_LAYOUT 
FROM 
    DIRECTORY('@docs');

In [None]:
create or replace TABLE DOCS_CHUNKS_TABLE ( 
    RELATIVE_PATH VARCHAR(16777216), -- Relative path to the PDF file
    SIZE NUMBER(38,0), -- Size of the PDF
    FILE_URL VARCHAR(16777216), -- URL for the PDF
    SCOPED_FILE_URL VARCHAR(16777216), -- Scoped url (you can choose which one to keep depending on your use case)
    CHUNK VARCHAR(16777216), -- Piece of text
    CHUNK_INDEX INTEGER, -- Index for the text
    CATEGORY VARCHAR(16777216) -- Will hold the document category to enable filtering
);

Step 2. Use the CORTREX PARSE_DOCUMENT function in order to read the PDF documents from the staging area. There is no need to create embeddings as that will be managed automatically by Cortex Search service later.

In [None]:
 insert into docs_chunks_table (relative_path, size, file_url,
                            scoped_file_url, chunk, chunk_index)

    select relative_path, 
            size,
            file_url, 
            scoped_file_url,
            c.value::TEXT as chunk,
            c.INDEX::INTEGER as chunk_index
            
    from 
        raw_text,
        LATERAL FLATTEN( input => SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER (
              EXTRACTED_LAYOUT,
              'markdown',
              1512,
              256,
              ['\n\n', '\n', ' ', '']
           )) c;


### Label the product category

We are going to use the power of Large Language Models and the function [CLASSIFY_TEXT](https://docs.snowflake.com/en/sql-reference/functions/classify_text-snowflake-cortex)  to easily classify the documents we are ingesting in our RAG application. We are going to pass the document name and the first chunk of text into the classify_text function.

First we will create a temporary table with each unique file name and we will be passing that file name and the first chunk of text to CLASSIFY_TEXT. Classification is not mandatory for Cortex Search but we want to use it here to also demo hybrid search.

Run this SQL to create that table:

In [None]:
CREATE OR REPLACE TEMPORARY TABLE docs_categories AS WITH unique_documents AS (
  SELECT
    DISTINCT relative_path, chunk
  FROM
    docs_chunks_table
  WHERE 
    chunk_index = 0
  ),
 docs_category_cte AS (
  SELECT
    relative_path,
    TRIM(snowflake.cortex.CLASSIFY_TEXT (
      'Title:' || relative_path || 'Content:' || chunk, ['Bike', 'Snow']
     )['label'], '"') AS category
  FROM
    unique_documents
)
SELECT
  *
FROM
  docs_category_cte;


You can check that table to identify how many categories have been created and if they are correct:

In [None]:
select category from docs_categories group by category;

We can also check that each document category is correct:

In [None]:
select * from docs_categories;

Now we can just update the table with the chunks of text that will be used by Cortex Search service to include the category for each document:

In [None]:
update docs_chunks_table 
  SET category = docs_categories.category
  from docs_categories
  where  docs_chunks_table.relative_path = docs_categories.relative_path;

## Create Cortex Search Service

Next step is to create the CORTEX SEARCH SERVICE in the table we created before.

- The name of the service is CC_SEARCH_SERVICE_CS.
- The service will use the column chunk to create embeddings and perform retrieval based on similarity search.
- The column category could be used as a filter.
- To keep this service updated, warehosue COMPUTE_WH will be used. NOTE: You may replace the warehouse name with another one that you have access to.
- The service will be refreshed every minute.
- The data retrieved will contain the chunk, relative_path, file_url and category.

In [None]:
create or replace CORTEX SEARCH SERVICE CC_SEARCH_SERVICE_CS
ON chunk
ATTRIBUTES category
warehouse = COMPUTE_WH
TARGET_LAG = '1 minute'
as (
    select chunk,
        chunk_index,
        relative_path,
        file_url,
        category
    from docs_chunks_table
);

## Build Chat Interface

To build and run chat interface in Streamlit, continue and complete the steps outlined in the [QuickStart Guide](https://quickstarts.snowflake.com/guide/ask_questions_to_your_own_documents_with_snowflake_cortex_search/index.html#4).



## Automatic Processing of New Documents
Maintaining your RAG system up to date when new documents are added, deleted or updated can be tedious. Snowflake makes it very easy. In one side, Cortex Search is a self-managed service. We only need to add, delete or update rows in the table where Cortex Search Service has been enabled and automatically the service will update the indexes and create new embeddings based on the frequency defined during service creation.

In addition, we can use Snowflake features like Streams, Tasks and Stored Procedures to automatically process new PDF files as they are added into Snowflake. 

First we create two streams for the DOCS staging area. One is going to be used to process deletes and other to process inserts. The Streams captures the chages on the Directory Table used for the DOCS staging area. So we can track new updates and deletes:

In [None]:
create or replace stream insert_docs_stream on stage docs;
create or replace stream delete_docs_stream on stage docs;

Second, we are going to define a Stored Procedure that process those streams to:

- Delete from the docs_chunk_table the content for files that has been deleted from the stagin area, so they are no longer relevant
- Parse new PDF documents that has been added into the staging area using PARSE_DOCUMENT
- Chunk the new document into pieces using SPLIT_TEXT_RECURSIVE_CHARACTER
- Classify the new documents and update the label (this step is optional just to show the part of the possible)

Create the Stored Procedure:

In [None]:
create or replace procedure insert_delete_docs_sp()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN

DELETE FROM docs_chunks_table
    USING delete_docs_stream
    WHERE docs_chunks_table.RELATIVE_PATH = delete_docs_stream.RELATIVE_PATH
    and delete_docs_stream.METADATA$ACTION = 'DELETE';


CREATE or replace TEMPORARY table RAW_TEXT AS
    SELECT 
        RELATIVE_PATH,
        SIZE,
        FILE_URL,
        build_scoped_file_url(@docs, relative_path) as scoped_file_url,
        TO_VARCHAR (
            SNOWFLAKE.CORTEX.PARSE_DOCUMENT (
                '@docs',
                RELATIVE_PATH,
                {'mode': 'LAYOUT'} ):content
            ) AS EXTRACTED_LAYOUT 
    FROM 
        insert_docs_stream
    WHERE 
        METADATA$ACTION = 'INSERT';

    -- Insert new docs chunks
    insert into docs_chunks_table (relative_path, size, file_url,
                            scoped_file_url, chunk, chunk_index)

    select relative_path, 
            size,
            file_url, 
            scoped_file_url,
            c.value::TEXT as chunk,
            c.INDEX::INTEGER as chunk_index
            
    from 
        RAW_TEXT,
        LATERAL FLATTEN( input => SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER (
              EXTRACTED_LAYOUT,
              'markdown',
              1512,
              256,
              ['\n\n', '\n', ' ', '']
           )) c;

    -- Classify the new documents

    CREATE OR REPLACE TEMPORARY TABLE docs_categories AS 
    WITH unique_documents AS (
      SELECT DISTINCT
        d.relative_path, d.chunk
      FROM
        docs_chunks_table d
      INNER JOIN
        RAW_TEXT r
        ON d.relative_path = r.relative_path
      WHERE 
        d.chunk_index = 0
    ),
    docs_category_cte AS (
      SELECT
        relative_path,
        TRIM(snowflake.cortex.CLASSIFY_TEXT (
          'Title:' || relative_path || 'Content:' || chunk, ['Bike', 'Snow']
        )['label'], '"') AS category
      FROM
        unique_documents
    )
    SELECT
      *
    FROM
      docs_category_cte;

    -- Update cathegories

    update docs_chunks_table 
        SET category = docs_categories.category
        from docs_categories
        where  docs_chunks_table.relative_path = docs_categories.relative_path;

END;
$$;

Now we can create a Task that every X minutes can check if there is new data in the stream and take an action. We are setting the schedule to 5 minutes so you can follow the execution, but fell free to reduce the time to 1 minute if needed. Consider what would be best for your app and how often new docs are updated.

We define:
  - Where: This is going to be executed using warehouse **COMPUTE_WH**. Please name to your own Warehouse.
  - When: Check every 5 minutes, and execute in the case of new records in the delete_docs_stream stream (we could also use the other stream)
  - What to do: call the stored procedure insert_delete_docs_sp()

Execute this code in your Snowflake worksheet to create the task:

In [None]:
create or replace task insert_delete_docs_task
    warehouse = COMPUTE_WH
    schedule = '5 minute'
    when system$stream_has_data('delete_docs_stream')
as
    call insert_delete_docs_sp();


alter task  insert_delete_docs_task resume;