# Demo: Asking questions to your own Documents Using Snowflake Cortex
This notebook demo the usage of new Cortex-LLM functions. Using UDTFs (User Defined Table Functions), PDF documents are read and chunked. Embeddings are used to create vectors for each chunk using one of Snowflake Cortex functions. Those vectors are used later to find similarities with the questions. The user can choose to use RAG to answer questions, so those chunks are provided as context in the prompt.

Streams and Tasks are also used. Each time a new PDF is uploaded into the stage area, it is automatically processed and embeddings are created and appended to a Snowflake table containing the text chunks, their vector array for later use.

Streamlit in Snowflake app is also used to provide a nifty interface to ask your questions, monitor the new stream of documents as you upload them to the stage behind the scenes, and ask a given LLama2 model to use those docs as context.

In [None]:
#  Copyright (c) 2023 Snowflake Computing Inc. All rights reserved.

# Import python packages
# import streamlit as st
import pandas as pd
import json

## Creating Python functions for local use as well as for creating UDTF

In [None]:
def get_new_session(creds_file):
    with open(creds_file, 'r') as ff:
        conn_param=json.load(ff)

    conn_param
    return Session.builder.configs(conn_param).create() 

Note! The below cell will require you to add _langchain_ and _PyPDF2_ packages to be installed first. Look at the README.md for this demo to understand how.

In [None]:
#A class for chunking text and returning a table via UDTF
from langchain.text_splitter import RecursiveCharacterTextSplitter
from snowflake.snowpark.files import SnowflakeFile
import PyPDF2, io
import logging

class pdf_text_chunker:

    def read_pdf(self, file_url: str) -> str:
    
        logger = logging.getLogger("udf_logger")
        logger.info(f"Opening file {file_url}")
    
        with SnowflakeFile.open(file_url, 'rb') as f:
            buffer = io.BytesIO(f.readall())
            
        reader = PyPDF2.PdfReader(buffer)   
        text = ""
        for page in reader.pages:
            try:
                text += page.extract_text().replace('\n', ' ')
            except:
                text = "Unable to Extract"
                logger.warn(f"Unable to extract from file {file_url}, page {page}")
        
        return text

    def process(self,file_url: str):

        text = self.read_pdf(file_url)
        
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size = 3500, #Adjust this as you see fit
            chunk_overlap  = 200, #This let's text have some form of overlap. Useful for keeping chunks contextual
            length_function = len
        )
    
        chunks = text_splitter.split_text(text)
        df = pd.DataFrame(chunks, columns=['chunks'])
        
        yield from df.itertuples(index=False, name=None)

Let's start by grabbing an active Snowpark session or create a new one

In [None]:
# We can also use Snowpark for our analyses!
from snowflake.snowpark.session import Session
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.exceptions import SnowparkSessionException

from_snowflake_notebooks=False
try:
    session = get_active_session()
    from_snowflake_notebooks=True
except SnowparkSessionException as sse:
    session = get_new_session('./creds_clakkad_aws_uswest2.json')

Confirm that the account picked by the session is correct

In [None]:
session.get_current_account()

We'll now create all the Database & Schema level objects 

In [None]:
database="DEMO_DB"
schema="CORTEX"
docs_stage="DOCS"
udf_stage="UDF"
app_stage="SIS_APPS"
docs_stream="DOCS_STREAM"
chunk_table="UNSTRUCTURED_DOCS_WITH_CHUNKS"
task_name="TASK_EXTRACT_CHUNK_VEC_FROM_PDF"

In [None]:
session.sql(f"CREATE DATABASE IF NOT EXISTS {database}").collect()
session.sql(f"CREATE SCHEMA IF NOT EXISTS {database}.{schema}").collect()


if not from_snowflake_notebooks:
    session.use_database(database)
    session.use_schema(schema)

In [None]:
session.sql(f"""
            CREATE OR REPLACE STAGE {docs_stage} 
            ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')
            DIRECTORY = (ENABLE = TRUE)"""
        ).collect()

docs_available = session.sql(f"""
                            SELECT 
                            RELATIVE_PATH
                            ,BUILD_SCOPED_FILE_URL(@{docs_stage}, RELATIVE_PATH) 
                            FROM DIRECTORY(@{docs_stage})"""
                        ).collect()

docs_available


Now we create a stream on the DOCS staging area. This stream will track all new documents uploaded into that staging area.

In [None]:
session.sql(f""" CREATE OR REPLACE STREAM {docs_stream} ON STAGE {docs_stage} """).collect()

docs_available_in_stream = session.sql(f""" 
            SELECT *,
            BUILD_SCOPED_FILE_URL(@{docs_stage}, RELATIVE_PATH) AS SCOPED_FILE_URL 
            FROM {docs_stream} WHERE METADATA$ACTION = 'INSERT'
            LIMIT 1
        """).collect()

docs_available_in_stream

Creating a table to store the links to PDF docs we'll process along with their text splits & vectors

In [None]:
session.sql(f""" 
            CREATE OR REPLACE TABLE {chunk_table} (
            RELATIVE_PATH VARCHAR,
            FILE_URL VARCHAR,
            SCOPED_FILE_URL VARCHAR,
            CHUNK VARCHAR,
            TEXT_CHUNK_VECTOR VECTOR(FLOAT, 768)
            )
        """).collect()

Register the Python function 'pdf_text_chunker' as a UDTF. We're also going to declare any packages used within the Python code for this function and which is available from the Snowflake Anaconda channel.

In [None]:
from snowflake.snowpark.types import StringType, StructField, StructType

session.sql(f"""
            CREATE OR REPLACE STAGE {udf_stage} 
            ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')
            DIRECTORY = (ENABLE = TRUE)"""
        ).collect()


schema = StructType([
     StructField("chunk", StringType())
 ])

session.udtf.register( 
    pdf_text_chunker,
    output_schema= schema, 
    input_types = [StringType()] , 
    is_permanent = True , 
    name = 'pdf_text_chunker' , 
    replace = True , 
    packages=['snowflake-snowpark-python', 'pypdf2','pandas','langchain'], 
    stage_location = udf_stage
)

Process the current PDFs in the DOCS staging area. We are using an internal staging area for simplicity but this would be an S3 in the CSP.

Here we call the UDTF defined before to process the documents and create the new table:

In [None]:
text_chunks_new_doc = session.sql(f"""
            SELECT 
                RELATIVE_PATH
                ,FILE_URL
                ,BUILD_SCOPED_FILE_URL(@{docs_stage}, RELATIVE_PATH) AS SCOPED_FILE_URL
                ,FUNC.CHUNK AS CHUNK
            FROM 
                {docs_stream}, 
                TABLE(PDF_TEXT_CHUNKER(BUILD_SCOPED_FILE_URL(@{docs_stage}, RELATIVE_PATH))) AS FUNC
            WHERE METADATA$ACTION = 'INSERT'
            LIMIT 1
        """).collect()

text_chunks_new_doc

Let´s do some testing to check it finds documents related to the question we are asking:

In [None]:
myquestion = "<insert your question here>"

cmd = f"""
    WITH RESULTS 
    AS (
        SELECT 
            RELATIVE_PATH,
            VECTOR_COSINE_DISTANCE(TEXT_CHUNK_VECTOR, 
                    SNOWFLAKE.ML.EMBED_TEXT('E5-BASE-V2','{myquestion}')) AS DISTANCE,
            CHUNK
        FROM {chunk_table}
        ORDER BY DISTANCE DESC
        LIMIT 1
    )
    SELECT CHUNK, RELATIVE_PATH FROM RESULTS
    """

df_context = session.sql(cmd).to_pandas()
prompt_context = df_context._get_value(0,'CHUNK')
relative_path =  df_context._get_value(0,'RELATIVE_PATH')
print (relative_path)

cmd2 = f"select GET_PRESIGNED_URL(@{docs_stage}, '{relative_path}', 360) as URL_LINK from directory(@{docs_stage})"
df_url_link = session.sql(cmd2).to_pandas()
url_link = df_url_link._get_value(0,'URL_LINK')

print (url_link)

And finally a task is created so when that stream has data the new documents will be processed reading them and creating chunks.

In [None]:
session.sql(f""" 
    CREATE OR REPLACE TASK {task_name} 
    WAREHOUSE = DEMO_WH
    SCHEDULE = '1 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('{docs_stream}')
    AS
    INSERT INTO {chunk_table} (RELATIVE_PATH, FILE_URL, SCOPED_FILE_URL, CHUNK, TEXT_CHUNK_VECTOR)
    SELECT RELATIVE_PATH, 
            FILE_URL, 
            BUILD_SCOPED_FILE_URL(@{docs_stage}, RELATIVE_PATH) AS SCOPED_FILE_URL,
            FUNC.CHUNK AS CHUNK,
            SNOWFLAKE.ML.EMBED_TEXT('e5-base-v2',CHUNK) AS CHUNK_VEC
    FROM 
        {docs_stream},
        TABLE(PDF_TEXT_CHUNKER(BUILD_SCOPED_FILE_URL(@{docs_stage}, RELATIVE_PATH))) AS FUNC
        WHERE METADATA$ACTION = 'INSERT'
"""
).collect()

In [None]:
session.sql(f"ALTER TASK {task_name} RESUME").collect()

Note! If you're running this from inside Snowflake Notebooks, please ensure to upload the Streamlit Python script for this app 'sis_app.py' and it's environment file 'sis_app_environment.yml' to the designated stage before executing the cell after this!

In [None]:
session.sql(f"""
            CREATE OR REPLACE STAGE {app_stage} 
            ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')
            DIRECTORY = (ENABLE = TRUE)"""
        ).collect()

if not from_snowflake_notebooks:
    session.sql(f"PUT file://./sis_app.py @{app_stage} overwrite=True auto_compress=False").collect()
    session.sql(f"PUT file://./sis_app_environment.yml @{app_stage} overwrite=True auto_compress=False").collect()

In [None]:
default_wh = session.get_current_warehouse()

session.sql(f""" 
                CREATE OR REPLACE STREAMLIT pdf_insight
                ROOT_LOCATION = '@{app_stage}'
                MAIN_FILE = '/sis_app.py'
                QUERY_WAREHOUSE = {default_wh}
        """).collect()