# 📁 Continuous Pipeline for Document Processing

### This notebook provides an end-to-end example of setting up a continuous processing pipeline for documents for the purpose of preparing them for natural language analytics via Cortex Agents in interfaces such as Snowflake Intelligence.

## Step 1: Create a Stage with Directory Table Enabled 🏗️

Create a Snowflake stage with directory table functionality to automatically track file uploads and changes.

In [None]:
-- 🔧 Setup: Configure database context and role
-- Adjust these values to match your Snowflake environment
USE ROLE SYSADMIN;
USE SCHEMA ADVANCED_ANALYTICS.UNSTRUCTURED;

In [None]:
-- 📁 Create a stage with directory table and auto-refresh enabled
-- This allows automatic tracking of file uploads and changes
-- Note: If the stage already exists, use ALTER STAGE to enable these features
CREATE OR REPLACE STAGE pdfs_stage
  ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE')
  DIRECTORY = ( ENABLE = TRUE 
                AUTO_REFRESH = TRUE);

## Step 2: Create a Stream on the Directory Table 🌊

Set up a stream to capture changes in the directory table, enabling real-time processing of newly uploaded files.

In [None]:
CREATE OR REPLACE STREAM pdfs_stream ON STAGE pdfs_stage;

## Step 3: Create Tables for Document Processing 📊

Create the necessary tables to store parsed document content and text chunks for efficient retrieval and search.

In [None]:
-- 📊 Table for storing parsed document content from AI_PARSE_DOCUMENT
CREATE OR REPLACE TABLE parsed_documents (
  file_name VARCHAR,
  parsed_content VARIANT,
  processed_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);

-- 🔍 Table for chunked text optimized for search and retrieval
CREATE OR REPLACE TABLE document_chunks (
  file_name VARCHAR,
  chunk_id INTEGER,
  chunk_text STRING,
  processed_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);


## Step 3.1: Create Stream on Parsed Documents Table 📈

Create a stream on the parsed documents table to monitor for new entries and trigger the chunking process automatically.

In [None]:
CREATE OR REPLACE STREAM parsed_documents_stream ON TABLE parsed_documents;

## Step 4: Create a Task for Document Parsing 🤖

Set up an automated task that monitors the stream and parses new PDF documents using Snowflake's AI_PARSE_DOCUMENT function.

In [None]:
CREATE OR REPLACE TASK parse_new_documents
  SCHEDULE = '1 minute'
  COMMENT = 'Parse new PDF files using AI_PARSE_DOCUMENT'
  WHEN
  SYSTEM$STREAM_HAS_DATA('pdfs_stream')
  AS
  INSERT INTO parsed_documents (file_name, parsed_content)
  SELECT 
      relative_path as file_name,
      AI_PARSE_DOCUMENT(TO_FILE('@pdfs_stage', relative_path)) as parsed_content
  FROM pdfs_stream
  WHERE METADATA$ACTION='INSERT';

## Step 5: Create a Task for Text Chunking ✂️

Create a downstream task that processes parsed documents into smaller, searchable chunks using Snowflake's recursive text splitting function.

In [None]:
CREATE OR REPLACE TASK chunk_parsed_documents
  COMMENT = 'Chunk parsed documents using SPLIT_TEXT_RECURSIVE_CHARACTER'
  AFTER parse_new_documents
  WHEN
  SYSTEM$STREAM_HAS_DATA('parsed_documents_stream')
  AS
   insert into document_chunks (file_name, chunk_id, chunk_text)

    select file_name, 
            c.INDEX::INTEGER as chunk_id,
            c.value::TEXT as chunk_text                
    from 
        parsed_documents_stream,
        LATERAL FLATTEN( input => SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER (
              PARSED_CONTENT,
              'markdown',
              2000,
              300,
              ['\n\n', '\n', ' ', '']
           )) c;

## Step 6: Enable and Test the Pipeline 🚀

Activate the tasks and test the complete pipeline with sample PDF files to ensure everything works correctly.

In [None]:
-- 🚀 Enable all tasks in the pipeline using hierarchical activation
-- This will enable the main task and all dependent tasks automatically
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('ADVANCED_ANALYTICS.UNSTRUCTURED.PARSE_NEW_DOCUMENTS');

## Step 6.1: Upload Files and Monitor Stream 📂

Upload some PDF files into your stage, then check the stream. It should show new files that are ready to be processed by the pipeline.

In a production situation, you will set up a process to ingest the files into the stage. For example, you can use an Openflow connector to connect to Sharepoint, or use Snowflake CLI to upload files from a local file system.

## Step 6.2: Pipeline Verification and Monitoring 🔍

Use the following queries to monitor and verify that your document processing pipeline is working correctly.


In [None]:
-- 👀 Monitor the stream for newly uploaded files
-- This should show files that have been uploaded to the stage
SELECT * FROM pdfs_stream;

In [None]:
-- 📄 Review the parsed document content
-- Check that AI_PARSE_DOCUMENT successfully extracted text from PDFs
SELECT file_name, parsed_content:content::STRING as extracted_text 
FROM parsed_documents;

In [None]:
-- 🔍 Examine the chunked text data
-- Verify that documents were properly split into searchable chunks
SELECT file_name, chunk_id, LEFT(chunk_text, 100) as chunk_preview
FROM document_chunks
ORDER BY file_name, chunk_id;

In [None]:
-- 🛑 Suspend tasks when pipeline testing is complete
-- Uncomment the line below to stop the automated pipeline
-- ALTER TASK parse_new_documents SUSPEND;

## Step 7: Create Cortex Search Service 🔍

Enable semantic search capabilities by creating a Cortex Search Service that indexes your processed document chunks for intelligent querying and retrieval in **Snowflake Intelligence**.

In [None]:
-- 🔍 Create a Cortex Search Service for semantic document search
-- This enables AI-powered search across your processed document chunks
CREATE OR REPLACE CORTEX SEARCH SERVICE pdf_search_service
ON chunk_text                                    -- Main search column
  ATTRIBUTES file_name, processed_timestamp       -- Additional searchable attributes
  WAREHOUSE = compute_wh                          -- Compute resources for search
  TARGET_LAG = '30 days'                          -- Data refresh frequency
  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'  -- AI embedding model
AS 
(
	SELECT
		CHUNK_TEXT,
		FILE_NAME,
		PROCESSED_TIMESTAMP
	FROM ADVANCED_ANALYTICS.UNSTRUCTURED.DOCUMENT_CHUNKS
);
;

### Next Step: 🤖 Agent Integration
Add this cortex search service to a **Cortex Agent** and start having intelligent conversations with your documents in **Snowflake Intelligence**!

## 🎉 Complete AI-Powered Document Pipeline!

Your end-to-end document processing and search pipeline is now fully operational and ready to automatically:

1. **Monitor** 👀 uploaded PDF files in the stage
2. **Parse** 📄 documents using Snowflake's AI_PARSE_DOCUMENT function  
3. **Chunk** ✂️ text into searchable segments using recursive character splitting
4. **Store** 💾 processed data in organized tables
5. **Index** 🔍 content with Cortex Search Service for semantic search

### Next Steps:
- Upload PDF files to your `pdfs_stage` and watch the automation work! 
- Monitor the pipeline using the verification queries above
- **Connect to Snowflake Intelligence** to chat with your documents using natural language
- Scale the pipeline by adjusting warehouse sizes and task schedules
- Explore advanced search queries using the Cortex Search Service
