# Build a Retrieval Augmented Generation (RAG) based LLM assistant using Streamlit and Snowflake Cortex Search

# Step 1: Organize Documents and Create Pre-Processing Function

## Sample Document Download

### Overview
This step provides sample documents for testing the RAG system. The documents include:
- Bike product documentation
- Ski equipment manuals
- User guides and specifications
- Product information sheets

### Available Documents
**Bike Documentation**
- Mondracer Infant Bike
- Premium Bycycle User Guide
- The Ultimate Downhill Bike
- The Xtreme Road Bike 105 SL

**Ski Equipment**
- Ski Boots TDBootz Special
- Carver Skis Specification Guide
- OutPiste Skis Specification Guide
- RacingFast Skis Specification Guide

### Important Notes
- Documents contain specific product information
- Files are text-based PDFs
- Content is fictional for demonstration
- Additional documents can be added
- System will be tested with and without document context

## Database and Schema Setup

### Overview
This section establishes the foundational database structure for implementing a RAG (Retrieval Augmented Generation) system using Snowflake Cortex Search. The setup creates a dedicated database and schema for storing:
- Document chunks and embeddings
- Search functionality
- Processing functions
- Application components

### Architecture
The database structure follows a simple organization:
1. **Database Layer** - CC_QUICKSTART_CORTEX_SEARCH_DOCS for all RAG components
2. **Schema Layer** - DATA schema for organizing objects and tables
3. **Objects** - Will contain:
   - Document storage tables
   - Processing functions
   - Search services
   - Application components



In [None]:
use role DEMOADMIN;

CREATE DATABASE CC_QUICKSTART_CORTEX_SEARCH_DOCS;
CREATE SCHEMA DATA;

## Text Chunking Function

### Overview
This UDF (User-Defined Function) implements document text chunking using LangChain's RecursiveCharacterTextSplitter. The function:
- Takes a text string as input
- Splits it into overlapping chunks
- Returns chunks as a table of strings

### Technical Details
- **Runtime**: Python 3.9
- **Dependencies**: 
  - snowflake-snowpark-python
  - langchain
- **Input**: Single text string
- **Output**: Table of text chunks
- **Chunk Parameters**:
  - Size: 1512 characters
  - Overlap: 256 characters

In [None]:
create or replace function text_chunker(pdf_text string)
returns table (chunk varchar)
language python
runtime_version = '3.9'
handler = 'text_chunker'
packages = ('snowflake-snowpark-python', 'langchain')
as
$$
from snowflake.snowpark.types import StringType, StructField, StructType
from langchain.text_splitter import RecursiveCharacterTextSplitter
import pandas as pd

class text_chunker:

    def process(self, pdf_text: str):
        
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size = 1512, #Adjust this as you see fit
            chunk_overlap  = 256, #This let's text have some form of overlap. Useful for keeping chunks contextual
            length_function = len
        )
    
        chunks = text_splitter.split_text(pdf_text)
        df = pd.DataFrame(chunks, columns=['chunks'])
        
        yield from df.itertuples(index=False, name=None)
$$;

## Internal Stage Creation

### Overview
This SQL creates a named internal stage for storing document files with server-side encryption and directory table functionality enabled. The stage will be used to:
- Store documents securely within Snowflake
- Track and manage staged files using directory tables
- Support the document processing pipeline

### Technical Specifications
- **Stage Type**: Internal (Snowflake-managed)
- **Encryption**: Server-side encryption (SNOWFLAKE_SSE)
- **Directory Table**: Enabled
- **Purpose**: Document storage for RAG implementation

In [None]:
create or replace stage docs ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE') DIRECTORY = ( ENABLE = true );


## Document Upload to Stage

### Overview
This manual step involves uploading PDF documents to the previously created internal stage using Snowflake's web interface (Snowsight). The process enables:
- Document storage in the secure stage
- Preparation for text extraction
- Integration with the RAG pipeline

### Navigation Steps
1. **Access Staging Area**
   - Open Snowsight interface
   - Navigate to Data browser
   - Select database: CC_QUICKSTART_CORTEX_SEARCH_DOCS
   - Select schema: DATA
   - Open Stages section
   - Select DOCS stage

2. **Upload Process**
   - Click '+Files' button (top right)
   - Drag and drop PDF files
   - Wait for upload completion

### Important Notes
- Ensure PDFs are properly formatted and readable
- Files will be encrypted using SNOWFLAKE_SSE
- Directory table will automatically track uploaded files
- Files become available for subsequent processing steps

## Stage Content Verification

### Overview
This command lists all files currently stored in the DOCS stage, allowing verification of successful document uploads. The command helps:
- Confirm file upload status
- View file metadata
- Check stage contents

### Technical Details
- **Command**: Native Snowflake LS command
- **Target**: DOCS internal stage
- **Output**: List of files with metadata including:
  - File names
  - Sizes
  - Last modified timestamps
  - Status information

In [None]:
ls @docs;


# Step 2: Pre-process and Label Documents

## Document Chunks Storage Table

### Overview
This table definition creates the primary storage structure for processed document chunks and their metadata. The table serves as:
- Central repository for document fragments
- Storage for document metadata
- Reference point for search operations

### Schema Details
The table includes fields for:
- Document location tracking
- File metadata storage
- Text chunk storage
- Document categorization

### Technical Specifications
All VARCHAR fields use maximum size (16777216) to accommodate varying content lengths.

In [None]:
create or replace TABLE DOCS_CHUNKS_TABLE ( 
    RELATIVE_PATH VARCHAR(16777216), -- Relative path to the PDF file
    SIZE NUMBER(38,0), -- Size of the PDF
    FILE_URL VARCHAR(16777216), -- URL for the PDF
    SCOPED_FILE_URL VARCHAR(16777216), -- Scoped url (you can choose which one to keep depending on your use case)
    CHUNK VARCHAR(16777216), -- Piece of text
    CATEGORY VARCHAR(16777216) -- Will hold the document category to enable filtering
);

## Document Processing Pipeline

### Overview
This SQL statement implements the core document processing pipeline, combining:
- Document metadata extraction
- PDF text parsing
- Text chunking
- Storage of processed chunks

### Technical Details
The pipeline performs these operations:
- Reads document metadata from stage directory
- Parses PDF content using Cortex
- Chunks text using custom function
- Generates scoped URLs for access
- Stores results in chunks table


In [None]:
insert into docs_chunks_table (relative_path, size, file_url,
                            scoped_file_url, chunk)

    select relative_path, 
            size,
            file_url, 
            build_scoped_file_url(@docs, relative_path) as scoped_file_url,
            func.chunk as chunk
    from 
        directory(@docs),
        TABLE(text_chunker (TO_VARCHAR(SNOWFLAKE.CORTEX.PARSE_DOCUMENT(@docs, 
                              relative_path, {'mode': 'LAYOUT'})))) as func;

## Document Category Classification

### Overview
This SQL creates a temporary table that categorizes documents using LLM-based classification. The process:
- Identifies unique documents
- Uses Llama 3 70B to classify content
- Stores classifications for later use

### Technical Details
The query structure includes:
- CTE for unique document identification
- LLM integration using Cortex COMPLETE
- Single-word category assignment
- Temporary table storage

In [None]:
CREATE
OR REPLACE TEMPORARY TABLE docs_categories AS WITH unique_documents AS (
  SELECT
    DISTINCT relative_path
  FROM
    docs_chunks_table
),
docs_category_cte AS (
  SELECT
    relative_path,
    TRIM(snowflake.cortex.COMPLETE (
      'llama3-70b',
      'Given the name of the file between <file> and </file> determine if it is related to bikes or snow. Use only one word <file> ' || relative_path || '</file>'
    ), '\n') AS category
  FROM
    unique_documents
)
SELECT
  *
FROM
  docs_category_cte;

## Category Distribution Analysis

### Overview
This query retrieves the unique document categories assigned by the LLM classification process. The query:
- Aggregates category assignments
- Eliminates duplicate categories




In [None]:
select category from docs_categories group by category;

## Category Assignment Review

### Overview
This query displays the complete category classification results, showing:
- Document paths
- Assigned categories

In [None]:
select * from docs_categories;

## Document Category Assignment

### Overview
This SQL statement updates the document chunks table with category classifications from the temporary categories table. The update:
- Assigns categories to all chunks
- Maintains document-level categorization
- Enables category-based filtering

### Technical Details
- **Target**: docs_chunks_table
- **Source**: docs_categories temporary table
- **Join Condition**: relative_path matching
- **Updated Field**: category column

In [None]:
update docs_chunks_table 
  SET category = docs_categories.category
  from docs_categories
  where  docs_chunks_table.relative_path = docs_categories.relative_path;

# Step 3: Cortex Search Service Creation

### Overview
This SQL creates a Cortex Search Service for semantic document search. The service:
- Enables natural language querying of document chunks
- Provides category-based filtering
- Maintains document metadata access
- Updates index frequently

### Technical Specifications
- **Search Column**: chunk (document text segments)
- **Filter Attribute**: category
- **Compute**: COMPUTE_WH warehouse
- **Update Frequency**: 1-minute target lag
- **Metadata**: Preserves file paths and URLs

In [None]:
create or replace CORTEX SEARCH SERVICE CC_SEARCH_SERVICE_CS
ON chunk
ATTRIBUTES category
warehouse = COMPUTE_WH
TARGET_LAG = '1 minute'
as (
    select chunk,
        relative_path,
        file_url,
        category
    from docs_chunks_table
);

# Step 4: Build Chat UI with Retrieval and Generation Logic
## Streamlit Chat Interface Setup

### Overview
This step creates a Streamlit application that provides a user interface for the RAG system. The application:
- Enables natural language queries against documents
- Demonstrates RAG vs. pure LLM responses
- Displays retrieved context chunks
- Maintains security through Snowflake's access controls

### Technical Requirements
- **Platform**: Streamlit in Snowflake
- **Database**: CC_QUICKSTART_CORTEX_SEARCH_DOCS
- **Schema**: DATA
- **Warehouse**: Small compute warehouse
- **App Name**: CC_CORTEX_SEARCH_APP

### Implementation Steps
1. **Access Streamlit**
   - Navigate to Streamlit tab in left panel
   - Click + Streamlit App button

2. **Configure Application**
   - Set application name
   - Select compute warehouse
   - Choose database and schema
   - Configure access settings

### Security Notes
- Application runs within Snowflake environment
- Access controlled by Snowflake RBAC
- Data remains within Snowflake security boundary
- Only authorized users can access documents

# Step 5: Build a ChatBot UI with Conversation History
## Conversational ChatBot Implementation

### Overview
This section extends the basic RAG interface into a full conversational chatbot that maintains context across interactions. The implementation:
- Creates a chat-style interface
- Maintains conversation history
- Uses sliding window for context management
- Implements conversation summarization
- Leverages Snowflake Cortex via Python API

### Technical Requirements
**Dependencies**
- snowflake-ml-python (1.6.2)
- snowflake.core (0.9.0)
- python (3.8)
- snowflake-snowpark-python (1.22.1)
- streamlit (1.26.0)

### Key Features
- **Stateful Conversations**: Maintains chat history despite LLM statelessness
- **Context Window Management**: Uses sliding window technique
- **Dynamic Context Retrieval**: Combines conversation summary with current query
- **Streamlit Chat Elements**: Enhanced UI components
- **Python API Integration**: Direct access to Snowflake Cortex

### Architecture Notes
- Implements conversation memory management
- Uses summarization for context preservation
- Combines historical context with new queries
- Manages context window limitations
- Provides seamless chat experience

In [None]:
SELECT 
    CURRENT_WAREHOUSE(),
    CURRENT_DATABASE(),
    CURRENT_SCHEMA(),
    CURRENT_ROLE();

# Step 6: Automatic Processing of New Documents

## Stream Setup for Document Changes

### Overview
This SQL sequence sets up a stream to track changes in the document stage. The setup:
- Establishes correct database context
- Creates change tracking for staged files
- Enables automated processing of new documents

### Technical Details
- **Database**: CC_QUICKSTART_CORTEX_SEARCH_DOCS
- **Schema**: DATA
- **Stream Type**: Stage-based
- **Monitored Object**: DOCS stage
- **Purpose**: Change data capture (CDC) for documents

In [None]:
USE DATABASE CC_QUICKSTART_CORTEX_SEARCH_DOCS;
USE SCHEMA DATA;
create or replace stream docs_stream on stage docs;



## Automated Document Processing Task

### Overview
This SQL creates and activates a scheduled task that automatically processes new documents from the staging area. The task:
- Monitors document stream for changes
- Processes new PDFs when detected
- Chunks and stores document content
- Runs on a frequent schedule

### Technical Specifications
- **Warehouse**: COMPUTE_WH
- **Schedule**: Every 1 minute
- **Trigger**: Stream data presence check
- **Operations**: 
  - PDF parsing
  - Text chunking
  - Metadata extraction
  - Data insertion

In [None]:
create or replace task parse_and_insert_pdf_task 
    warehouse = COMPUTE_WH
    schedule = '1 minute'
    when system$stream_has_data('docs_stream')
    as
  
    insert into docs_chunks_table (relative_path, size, file_url,
                            scoped_file_url, chunk)
    select relative_path, 
            size,
            file_url, 
            build_scoped_file_url(@docs, relative_path) as scoped_file_url,
            func.chunk as chunk
    from 
        docs_stream,
        TABLE(text_chunker (TO_VARCHAR(SNOWFLAKE.CORTEX.PARSE_DOCUMENT(@docs, relative_path, {'mode': 'LAYOUT'})))) as func;

alter task parse_and_insert_pdf_task resume;

## Stream Status Check

### Overview
This query monitors the document processing stream for new files awaiting processing. The query:
- Shows documents in transit
- Displays file metadata
- Helps verify processing status

### Technical Details
- **Source**: docs_stream
- **Content**: Only shows unprocessed files
- **Empty Results**: Indicate completed processing
- **Purpose**: Processing pipeline monitoring

In [None]:
select * from docs_stream;


## Task Suspension Command

### Overview
This SQL suspends the automated document processing task. The suspension:
- Stops scheduled execution
- Allows current executions to complete
- Prevents processing of new documents
- Conserves compute resources

### Technical Details
- **Target**: parse_and_insert_pdf_task
- **Effect**: Changes task state to 'Suspended'
- **Current Runs**: Will complete execution
- **Scheduled Runs**: Will be cancelled

In [None]:
alter task parse_and_insert_pdf_task suspend;


## Cleanup

In [None]:
SELECT 
    CURRENT_WAREHOUSE(),
    CURRENT_DATABASE(),
    CURRENT_SCHEMA(),
    CURRENT_ROLE();

In [None]:
USE DATABASE CC_QUICKSTART_CORTEX_SEARCH_DOCS;
USE SCHEMA DATA;

In [None]:
-- Suspend and drop the task
ALTER TASK parse_and_insert_pdf_task SUSPEND;
DROP TASK IF EXISTS parse_and_insert_pdf_task;

-- Drop the stream
DROP STREAM IF EXISTS docs_stream;

-- Drop the search service
DROP CORTEX SEARCH SERVICE IF EXISTS CC_SEARCH_SERVICE_CS;


-- Drop the tables
DROP TABLE IF EXISTS docs_chunks_table;
DROP TABLE IF EXISTS docs_categories;

-- Drop the stage
DROP STAGE IF EXISTS docs;

-- Drop the function
DROP FUNCTION IF EXISTS text_chunker(STRING);




-- Drop the schema
DROP SCHEMA IF EXISTS DATA;

-- Drop the database
DROP DATABASE IF EXISTS CC_QUICKSTART_CORTEX_SEARCH_DOCS;