# ❄️ Snowflake Chat with your Documents Notebook ❄️

In [None]:
# Import necessary functions
import streamlit as st
from snowflake.snowpark.context import get_active_session
session = get_active_session()

# Define image in a stage and read the file
image=session.file.get_stream("@TALK_TO_DOC.PUBLIC.PDFDOCS/pdf/RAG_flow.png" , decompress=False).read() 

# Display the image
st.image(image, width=800)

In [None]:
import snowflake.snowpark as snowpark

from snowflake.snowpark.context import get_active_session
session = get_active_session()

session.use_database('TALK_TO_DOC')
session.use_schema('PUBLIC')

In [None]:
-- List files in the stage to identify PDFs
LS @TALK_TO_DOC.PUBLIC.PDFDOCS;

In [None]:
CREATE OR REPLACE TABLE TALK_TO_DOC.PUBLIC.PARSED_TEXT (relative_path VARCHAR(500), raw_text VARIANT);

In [None]:
Insert Into TALK_TO_DOC.PUBLIC.PARSED_TEXT (relative_path, raw_text)
with pdf_files (relative_path)
AS
(SELECT distinct
        METADATA$FILENAME AS relative_path           
        FROM @TALK_TO_DOC.PUBLIC.PDFDocs
    WHERE METADATA$FILENAME ILIKE '%.pdf')


SELECT relative_path, SNOWFLAKE.CORTEX.PARSE_DOCUMENT(
                '@TALK_TO_DOC.PUBLIC.PDFDOCS',
                relative_path,
                OBJECT_CONSTRUCT('mode', 'LAYOUT') 
            ) as raw_text
    from pdf_files    
;    

from snowflake.snowpark.functions import col, to_variant

# Query to fetch distinct PDF files from the stage
files_df = session.sql("""
    SELECT DISTINCT METADATA$FILENAME AS file_name
    FROM @HOL.PUBLIC.PDF
    WHERE METADATA$FILENAME ILIKE '%.pdf'
""").collect()

# Loop through the distinct filenames and parse if not already in the target table
for row in files_df:
    file_name = row['FILE_NAME']
    
    # Check if the file has already been parsed
    check_df = session.table("PARSED_TEXT").filter(col("relative_path") == file_name).select("relative_path").collect()

    # If not already parsed, proceed to parse and insert the text
    if not check_df:
        # Extract raw text using the PARSE_DOCUMENT function
        parse_result = session.sql(f"""
            SELECT SNOWFLAKE.CORTEX.PARSE_DOCUMENT(
                '@HOL.PUBLIC.PDF',
                '{file_name}',
                OBJECT_CONSTRUCT('mode', 'OCR')
            ) AS raw_text
        """).collect()
        
        # Ensure parse_result contains data before proceeding
        if parse_result:
            # Get the parsed raw text and cast it to VARIANT
            raw_text = parse_result[0]['RAW_TEXT']
            
            # Create DataFrame with explicit VARIANT type for raw_text
            df_to_insert = session.create_dataframe(
                [(file_name, raw_text)],
                schema=["relative_path", "raw_text"]
            ).select(
                col("relative_path"),
                to_variant(col("raw_text")).alias("raw_text")  # Explicitly cast to VARIANT
            )
            
            # Insert the DataFrame into the PARSED_TEXT table
            df_to_insert.write.mode("append").save_as_table("PARSED_TEXT")

print("PDF files parsed successfully.")

In [None]:
SELECT RELATIVE_PATH, RAW_TEXT FROM TALK_TO_DOC.PUBLIC.PARSED_TEXT LIMIT 5;

In [None]:
from snowflake.snowpark.types import StructType, StructField, StringType
import pandas as pd
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Define the text chunker class
class text_chunker:

    def process(self, text):        
        text_raw = []
        text_raw.append(text) 
        
        text_splitter = RecursiveCharacterTextSplitter(
            separators=["\n"],  # Define an appropriate separator. New line is good typically!
            chunk_size=800,     # Adjust this as you see fit
            chunk_overlap=100,    # This lets text have some form of overlap. Useful for keeping chunks contextual
            length_function=len,
            add_start_index=True  # Optional but useful if you'd like to feed the chunk before/after
        )
    
        chunks = text_splitter.create_documents(text_raw)
        
        # Adjust DataFrame creation to match schema (chunk, meta)
        chunk_texts = [chunk.page_content.encode('utf-8', 'ignore').decode('utf-8') for chunk in chunks]
        metas = [str(chunk.metadata) for chunk in chunks]
        
        df = pd.DataFrame({
            'chunk': chunk_texts,
            'meta': metas
        })
        
        yield from df.itertuples(index=False, name=None)

# Register the UDTF
schema = StructType([
     StructField("chunk", StringType()),
     StructField("meta", StringType()),
 ])

session.udtf.register( 
    handler=text_chunker,
    output_schema=schema, 
    input_types=[StringType()], 
    is_permanent=True, 
    name='CHUNK_TEXT', 
    replace=True, 
    packages=['pandas', 'langchain'], 
    stage_location='@TALK_TO_DOC.PUBLIC.PDFDOCS'
)

-- Create the chunked version of your parsed text
CREATE OR REPLACE TABLE TALK_TO_DOC.PUBLIC.CHUNK_TEXT AS
    SELECT
        raw.relative_path,
        build_scoped_file_url('@TALK_TO_DOC.PUBLIC.PDFDOCS', raw.relative_path) AS file_url,
        CONCAT(raw.RAW_TEXT:content::string, ': ', func.chunk) AS chunk,
        'English' AS language,
        func.meta AS meta_info
    FROM
        TALK_TO_DOC.PUBLIC.PARSED_TEXT AS raw,
        TABLE(TALK_TO_DOC.PUBLIC.CHUNK_TEXT(TO_VARCHAR(raw.raw_text:content::string))) AS func;

In [None]:
-- Create the chunked version of your parsed text
CREATE OR REPLACE TABLE TALK_TO_DOC.PUBLIC.CHUNK_TEXT AS
    SELECT
        raw.relative_path,
        build_scoped_file_url('@TALK_TO_DOC.PUBLIC.PDFDOCS', raw.relative_path) AS file_url,
        CONCAT(raw.relative_path, ': ', func.chunk) AS chunk,
        'English' AS language,
        func.meta AS meta_info
    FROM
        TALK_TO_DOC.PUBLIC.PARSED_TEXT AS raw,
        TABLE(TALK_TO_DOC.PUBLIC.CHUNK_TEXT(TO_VARCHAR(raw.raw_text:content))) AS func;

In [None]:
SELECT RELATIVE_PATH, CHUNK, META_INFO FROM TALK_TO_DOC.PUBLIC.CHUNK_TEXT limit 30;

In [None]:
GRANT DATABASE ROLE SNOWFLAKE.CORTEX_USER TO ROLE accountadmin;

In [None]:
-- Create a search service over your new chunked pdf table
CREATE OR REPLACE CORTEX SEARCH SERVICE TALK_TO_DOC.PUBLIC.TEXT_SEARCH_SERVICE
    ON CHUNK
    ATTRIBUTES LANGUAGE
    WAREHOUSE = COMPUTE_WH
    TARGET_LAG = '7 days'
    AS (
    SELECT
        CHUNK,
        RELATIVE_PATH,
        LANGUAGE
    FROM TALK_TO_DOC.PUBLIC.CHUNK_TEXT
    );

In [None]:
from snowflake.snowpark import Session
from snowflake.core import Root
root = Root(session)

transcript_search_service = (root
  .databases['TALK_TO_DOC']
  .schemas['PUBLIC']
  .cortex_search_services['TEXT_SEARCH_SERVICE']
)

resp = transcript_search_service.search(
  query="""What is Cortex Search and how is it helpful for a RAG application?""",
  columns=['CHUNK'],
  limit=3
)

st.json(resp.to_json())

In [None]:
results = resp.results

context_str = ""
for i, r in enumerate(results):
    context_str += f"Context document {i+1}: {r['CHUNK']}\n****************\n"

print(context_str)
df = session.create_dataframe(resp.results)
df.create_or_replace_temp_view("searchresults")

In [None]:
SELECT SNOWFLAKE.CORTEX.COMPLETE(
    'mistral-large',
    CONCAT(
        'You are a helpful AI assistant specialized in retrieving information from documents. ',
        'The internal user has asked the following question: <results>',
        (SELECT LISTAGG(CHUNK, ' ') FROM searchresults),
        '</results> ',
        'Based on the context provided between the <context> and </context> tags, generate a coherent, concise, and relevant answer to the question. Focus on the key points and avoid unnecessary details. '
    )
) AS CRITIQUE
FROM searchresults
LIMIT 1;

# Next Step:

Please create the associated "Chat with your Documents" SiS app. This will allow users to interactively leverage the Cortex Search RAG.

Once users have saved sufficient questions and customized/corrected answers via the SiS app, return to this Snowflake Notebook to create a customized fine-tuned Cortex model.


In [None]:
ALTER TABLE QA_TABLE
ADD COLUMN source VARCHAR;
UPDATE QA_TABLE
SET
  source = CASE
    WHEN RANDOM () <= 0.7 THEN 'train'
    ELSE 'validation'
  END;

In [None]:
SELECT RAGQUESTION, RAGANSWER, source FROM QA_TABLE LIMIT 5;

In [None]:
SELECT SNOWFLAKE.CORTEX.FINETUNE (
    'CREATE',
    'custom_QA_model',
    'mistral-7b',
    'SELECT RAGQUESTION as prompt, RAGANSWER as completion FROM QA_TABLE WHERE source = \'train\'',
    'SELECT RAGQUESTION as prompt, RAGANSWER as completion FROM QA_TABLE WHERE source = \'validation\''
  );

# Next Step:

Return to the "Chat with your Documents" SiS App. 

Update lines 12 to 17, with this code to include your customized_QA_model:

```
ENABLED_CUSTOM_QA_MODELS = True

MODELS = [
    "mistral-large",
    "snowflake-arctic",
    "llama3-70b",
    "llama3-8b",
    "customized_QA_model"
]

if ENABLED_CUSTOM_QA_MODELS:
    MODELS.append( "customized_QA_model")
```

Now update lines 80 to 110, with an updated def init_config_options function. This will provide a checkbox for the user to select to use their custom model.:

```
def init_config_options():
    st.sidebar.selectbox(
        "Select Cortex Search Service:",
        [s["name"] for s in st.session_state.service_metadata],
        key="selected_cortex_search_service",
    )

    clear_button_clicked = st.sidebar.button("Clear conversation")
    if clear_button_clicked:
        st.session_state.clear_conversation = True
        init_session_state()

    use_chat_history = st.sidebar.checkbox(
        "Use chat history", value=st.session_state.use_chat_history
    )
    st.session_state.use_chat_history = use_chat_history

    with st.sidebar.expander("Advanced options"):
        st.selectbox("Select model:", MODELS, key="model_name")
        st.number_input(
            "Select number of context chunks",
            key="num_retrieved_chunks",
            min_value=1,
            max_value=10,
        )
        st.number_input(
            "Select number of messages to use in chat history",
            key="num_chat_messages",
            min_value=1,
            max_value=10,
        )
        use_custom_model = False
        if ENABLED_CUSTOM_QA_MODELS:
            # Add a checkbox to use customized Q&A model
            use_custom_model = st.checkbox("Use customized Q&A model", key="use_customized_qa_model")
```

Now at line 144 to 145, update the complete function to use the customized_QA_model if the use selects the "Use customized model" checkbox.
```
def complete(model, prompt):
    # Use customized Q&A model if selected
    if st.session_state.get("use_customized_qa_model", False):
        model = "customized_QA_model"
    return Complete(model, prompt).replace("$", "\$")
```

Finally, select the "Run" button for your new code to be used and test your update application. Remember to look in the sidebar "Advanced options" to check the "Use customized Q&A model" when testing new questions. Uncheck the checkbox "Use chat history"