# **SNOWFLAKE CORTEX COMPLETE FINANCIAL SERVICES DEMO**

## Authors: John Heisler, Garrett Frere

In this demo, using Snowflake Cortex (https://www.snowflake.com/en/data-cloud/cortex/), we will build an AI-infused Data Pipeline with Cortex Complete.

### AI Pipeline Overview

We'll learn how to extract raw text from a PDF, perform prompt engineering, and pass custom prompts and data to a large language model of our choosing all without leaving Snowflake.

Specifically, we will be taking on the role of an AI Engineer who is working closely with a portfolio team at an asset manager. The portfolio team would like to speed up their ingestion and comprehension of statements by the Federal Open Market Committee (FOMC) who determines the direction of monetary policy by directing open market operations. Ultimately they would like to get a signal as to whether interest rates will increase, remain the same, or increase (hawkish, or, dovish respectively).

I refer to this as an AI pipeline because we can imbue this type of signal generation with AI much further up the data delivery value chain. In this way, we will maximize the value of our work imbuing into a common dataset. End users will not need invoke any additional logic; good design is invisible!

### Next Steps

 * To industrialize this demo with continuous ingestion and scoring, please check out the `FSI_Cortex_AI_Pipeline_Industrialization.ipynb` notebook in this repository
 * Check out the companion demo in this repository: `FSI_Cortex_Search.ipynb`

# 🛑 BEFORE YOU START 🛑

**Be sure to do the following FIRST to create dependent database objects for the following steps**:
1. Run the `1_SQL_SETUP_FOMC.sql` script
2. Load the PDF docs in the `FOMC_DOCS` directory

------

### AI Pipeline: Step 1 - Create File Extraction Function

We need to extract text from the PDFs. We will do that with a new python function.

> Note that we're builidng this function directly in SQL.

The steps below requires the `langchain`, `pypdf2` and `pandas` packages. To import packages from Anaconda, install them first using the package selector at the top of the page.

In [None]:

USE DATABASE GEN_AI_FSI;
USE SCHEMA FOMC;

### AI Pipeline: Step 2 - Create and Register `generate_prompt` Function

As we load data into our system, we want to automatically generate a signal. To do so, we need to call an LLM and pass it our prompt. 

Below, we define our specialized prompt engineering as a python function and then we register the function for later reuse when loading data.

In [None]:
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.types import *

session = get_active_session() 

def generate_prompt(document_text):
    prompt = f"""
        <Role> You are an experienced Senior Economist deeply knowledgeable on Federal Reserve guidance including FOMC or Federal Open Market Committee meeting minutes and communications.
        You are an expert in interpreting Hawkish and Dovish signals from the Fed or Federal Reserve. Such signals are derived from guidance conveyed in FOMC meeting notes and communications.
        
        As an analyst, you excel at discerning macroeconomic trends for each FOMC meeting notes and communications published by the Federal Reserve.
        The  signal or trends are either Hawkish or Dovish based on the growth outlook and inflation outlook of the Fed. The Federal Reserve has a long 
        term objective of keeping inflation around 2%, and low unemployment. Hawkish sentiment could imply 
        the Federal Reserve intends to raise interest rates to increase the cost of borrowing and slow economic activity. 
        The Fed typically increases interest rates when inflation is high or rising, or when the unemployment 
        rate is low or falling. Conversely, dovish sentiment could imply the Federal Reserve intends to lower interest 
        rates to allow easier access borrowing and lowering the cost of money to stimulate economic activity.  The Fed 
        typically decreases interest rates when inflation is low or falling, or when the unemployment rate is high or rising.
        
        Signal categories known as Economic Policy Stances:
        Hawkish stance or attitude for economic policy
        -characterized by a focus on combating inflation and often involves advocating for higher interest rates and tolerant to higher levels of unemployment.
        -concerned about rising inflation. Hawkish stance believes higher interest rates can help keep inflation in check, even if it slows down economic growth or increases unemployment.
        
        Dovish stance or attitude for economic policy
        -characterized by a focus on prioritizing stimulating economic growth, reducing unemployment, and tolerant to higher levels of inflation.
        -concerned with boosting economic activity, reducing unemployment and, for this reason, lower interest rates are preferred to create economic growth and employment.
        
        Neutral stance or attitude for economic policy
        -characterized by a focus on balance between combating inflation and supporting economic growth, with no strong inclination toward either side.
        -concerned with maintaining a steady economic environment without significant deviations. They seek to neither overly stimulate the economy nor excessively tighten it.
        </Role>
        
        <Data> 
        You are provided the text of a Federal Reserve Guidance or FOMC meeting notes as context. These generally are released before the Federal Reserve takes action on economic policy. 
        </Data>

        <FOMC_meeting_notes>
        {document_text}
        </FOMC_meeting_notes>
        
        <Task>: Follow these instructions,
        1) Review the provided FOMC communication or meeting notes text. Then,
        2) Consider the FOMC members or Committee Members tone and sentiment around economic conditions. Then,
        3) Consider specific guidance and stated conditions that validate the tone and signal FOMC members make concerning current macro economic conditions. Then,
        4) Based on this sentiment classify if the FOMC communication text indicates Hawkish, Dovish, or Neutral outlook for the economy. Be critical and do not categorize sentiment as "Neutral" unless necessary. This will be output as [Signal].
        5) Summarize a concise and accurate rationale for classifying the sentiment Hawkish, Neutral, or Dovish sentiment. This will be output as [Signal_Summary].
        </Task>
        
        <Output> 
        produce valid JSON. Absolutely do not include any additional text before or following the JSON. Output should use following JSON_format
        </Output>
        
        <JSON_format>
        {{
            "Signal": (A trend sentiment classification of Hawkish, Neutral or Dovish),
            "Signal_Summary": (A concise summary of sentiment trend),
        }}
        </JSON_format>"""
    return prompt

session.add_packages("snowflake-snowpark-python", "snowflake-ml-python", "snowflake")

session.udf.register(
  func = generate_prompt,
  return_type = StringType(),
  input_types = [ StringType()],
  is_permanent = True,
  name = 'generate_prompt',
  replace = True,
  stage_location = '@gen_ai_fsi.fomc.fed_logic')

### AI Pipeline: Step 3 - Ingest Text and Determine Signal

Now we're using the functions that we've just created in a simple insert statement. This approach of encapsulating complexity for later reuse in SQL pipelines greatly increases the value of our work in a one-to-many relationship.

### 🤯 🧠 CHECK IT OUT! 🧠 🤯 
* We're calling our pdf text extractor function! (line 11)
* We're calling our promp function! (line 28)

In [None]:
INSERT INTO gen_ai_fsi.fomc.pdf_full_text (id, relative_path, size, last_modified, md5, etag, file_url, file_text, file_date, sentiment)
WITH cte AS (SELECT gen_ai_fsi.fomc.fed_pdf_full_text_sequence.nextval AS id,
                    relative_path                                      AS relative_path,
                    size                                               AS size,
                    last_modified                                      AS last_modified,
                    md5                                                AS md5,
                    etag                                               AS etag,
                    file_url                                           AS file_url,
                    REPLACE(gen_ai_fsi.fomc.pdf_text_extractor(build_scoped_file_url('@gen_ai_fsi.fomc.fed_pdf', relative_path)), '''', '')  AS file_text,
                    TRY_TO_DATE(REGEXP_SUBSTR(relative_path, '\\d{8}'), 'YYYYMMDD') AS file_date
             FROM gen_ai_fsi.fomc.fomc_stream
             WHERE metadata$action = 'INSERT')

SELECT id,
       relative_path,
       size,
       last_modified,
       md5,
       etag,
       file_url,
       file_text,
       file_date,
       snowflake.cortex.try_complete('mistral-large2', gen_ai_fsi.fomc.generate_prompt(file_text)) AS signal_mis
FROM cte;

In [None]:
select * from gen_ai_fsi.fomc.fomc_stream;

In [None]:
CREATE OR REPLACE TABLE gen_ai_fsi.fomc.pdf_full_text (
    id            NUMBER(19, 0),
    relative_path VARCHAR(16777216),
    size          NUMBER(38, 0),
    last_modified TIMESTAMP_TZ(3),
    md5           VARCHAR(16777216),
    etag          VARCHAR(16777216),
    file_url      VARCHAR(16777216),
    file_text     VARCHAR(16777216),
    file_date     DATE,
    sentiment     VARCHAR(16777216)
);

In [None]:
INSERT INTO gen_ai_fsi.fomc.pdf_full_text (id, relative_path, size, last_modified, md5, etag, file_url, file_text, file_date, sentiment)
WITH cte AS (SELECT gen_ai_fsi.fomc.fed_pdf_full_text_sequence.nextval AS id,
                    relative_path                                      AS relative_path,
                    size                                               AS size,
                    last_modified                                      AS last_modified,
                    md5                                                AS md5,
                    etag                                               AS etag,
                    file_url                                           AS file_url,
                    REPLACE(TO_VARCHAR (
                        SNOWFLAKE.CORTEX.PARSE_DOCUMENT ('@gen_ai_fsi.fomc.fed_pdf', relative_path)), '''', '')  AS file_text,
                    TRY_TO_DATE(REGEXP_SUBSTR(relative_path, '\\d{8}'), 'YYYYMMDD') AS file_date
             FROM directory(@gen_ai_fsi.fomc.fed_pdf)
)
SELECT id,
       relative_path,
       size,
       last_modified,
       md5,
       etag,
       file_url,
       file_text,
       file_date,
       snowflake.cortex.try_complete('mistral-large2', gen_ai_fsi.fomc.generate_prompt(file_text)) AS signal_mis
FROM cte;

In [None]:
select * from gen_ai_fsi.fomc.pdf_full_text;

### AI Pipeline: Step 3.1 - Check out the result

select from our PDF table to view our signal and a summary or reasoning.

In [None]:
select * from GEN_AI_FSI.FOMC.PDF_FULL_TEXT;

-------

## Build a RAG Interface on FOMC Documents

Awesome, we have created the pipeline to ingest and generate a signal when new data is avaiable -- this is our **AI pipeline**. 

Next, let's also give our users a means to ask more detailed questions about the content in the documents with a RAG interface. We'll use Cortex Search and the data we already have in the Stage as a foundation. 

In this section we'll: 
1. Create a new table function to chunk the pdfs.
2. Use the chunking function to break the text into chunks and load it into our table.
3. Create a Cortex Search Service to handle the vectorization and search functionality.
4. Create a Chat interface with Streamlit.

## Chunking Function
Earlier, we created a function that pulled all of the text out of a PDF. Now, we'll do somethign very similar but we're going to break the text up into chunks for fuel our RAG interface. 

In [None]:
--create a function to chunk the pdfs
CREATE OR REPLACE FUNCTION gen_ai_fsi.fomc.pdf_text_chunker(file_url string)
RETURNS TABLE (chunk varchar)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
HANDLER = 'pdf_text_chunker'
PACKAGES = ('snowflake-snowpark-python','PyPDF2', 'langchain')
AS
$$
from snowflake.snowpark.types import StringType, StructField, StructType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from snowflake.snowpark.files import SnowflakeFile
import PyPDF2, io
import logging
import pandas as pd

class pdf_text_chunker:

    def read_pdf(self, file_url: str) -> str:
    
        logger = logging.getLogger("udf_logger")
        logger.info(f"Opening file {file_url}")
    
        with SnowflakeFile.open(file_url, 'rb') as f:
            buffer = io.BytesIO(f.readall())
            
        reader = PyPDF2.PdfReader(buffer)   
        text = ""
        for page in reader.pages:
            try:
                text += page.extract_text().replace('\n', ' ').replace('\0', ' ')
            except:
                text = "Unable to Extract"
                logger.warn(f"Unable to extract from file {file_url}, page {page}")
        
        return text

    def process(self,file_url: str):

        text = self.read_pdf(file_url)
        
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size = 500, #Adjust this as you see fit
            chunk_overlap  = 50, #This let's text have some form of overlap. Useful for keeping chunks contextual
            length_function = len
        )
    
        chunks = text_splitter.split_text(text)
        df = pd.DataFrame(chunks, columns=['chunks'])
        
        yield from df.itertuples(index=False, name=None)
$$;

## Build the Chunk Table

Using our newly create chunking table function, bring in and chunk all of the documents.

In [None]:
TRUNCATE TABLE gen_ai_fsi.fomc.pdf_chunks;

INSERT INTO gen_ai_fsi.fomc.pdf_chunks (id, full_text_fk, relative_path, file_date, chunk)
WITH chunk_cte AS (SELECT gen_ai_fsi.fomc.fed_pdf_chunk_sequence.nextval AS id,
                          relative_path,
                          REPLACE(func.chunk, '''', '')  AS chunk
                   FROM directory(@gen_ai_fsi.fomc.fed_pdf),
                        TABLE(gen_ai_fsi.fomc.pdf_text_chunker(build_scoped_file_url(@gen_ai_fsi.fomc.fed_pdf, relative_path))) AS func)

SELECT cte.id,
       pft.id,
       cte.relative_path,
       pft.file_date,
       cte.chunk
FROM chunk_cte cte
         LEFT JOIN gen_ai_fsi.fomc.pdf_full_text pft
            ON cte.relative_path = pft.relative_path;

## Create a Cortex Search Service

Cortex Search enables low-latency, high-quality “fuzzy” search over your Snowflake data. Cortex Search powers a broad array of search experiences for Snowflake users including Retrieval Augmented Generation (RAG) applications leveraging Large Language Models (LLMs).

We'll use this service later to power our RAG application. 

In [None]:
--create a cortex Search Service 
CREATE OR REPLACE CORTEX SEARCH SERVICE SRCH_FED
ON CHUNK
ATTRIBUTES ID, FILE_DATE
WAREHOUSE = GEN_AI_FSI_WH
TARGET_LAG = '1 day'
AS (
    SELECT 
        ID,
        FILE_DATE::string as FILE_DATE,
        CHUNK AS CHUNK  
FROM PDF_CHUNKS);

# FOMC Chat Interface
* We're leveraging our Cortex Search service enabling users to ask targeted questions of the documents in their stage.
* a robust chat interface could be built to handle this, for the demo, we have a bare bones interaction.

In [None]:
from snowflake.snowpark.context import get_active_session
from snowflake.core import Root
import streamlit as st
import json5 as json
import pandas as pd
#import snowflake.snowpark.modin.plugin

#get our session
session = get_active_session()

# access search service through Python API
root = Root(session)
search_service = (root
                  .databases["GEN_AI_FSI"]
                  .schemas["FOMC"]
                  .cortex_search_services["SRCH_FED"]    
)

#create a function to generate response
def complete_cs(model_name, prompt):
    cmd = f"""SELECT SNOWFLAKE.CORTEX.TRY_COMPLETE('{model_name}','{prompt}') as response"""
    df_response = session.sql(cmd).collect()
    response = df_response[0].RESPONSE
    return response


#get FOMC files
database = 'GEN_AI_FSI'
schema = 'FOMC'

#USER INPUT: select time frame
query_document_dates = f"""SELECT DISTINCT FILE_DATE FROM {database}.{schema}.PDF_CHUNKS order by file_date desc;"""
df_document_dates = session.sql(query_document_dates).to_pandas()

#USER INPUT: select model
query_models = f"""SELECT MODEL FROM {database}.{schema}.MODELS"""
df_models = session.sql(query_models).to_pandas()

#USER INPUT: display
col1, col2 = st.columns(2)
with col1:
    user_input_date = st.selectbox("Select Document Date", df_document_dates, key="CS_date_select_box")
with col2: 
    user_input_model = st.selectbox("Select Model", df_models, key="CS_model_select_box")

#Generate a response
user_input_question = st.text_input("Ask me a question")

ask= st.button("Ask", key = "button_ask")
if ask: 
    #get the cunks that are relevant to the question
    response = search_service.search(
        user_input_question,
        columns=["ID", "FILE_DATE", "CHUNK"],
        filter={"@eq": {"FILE_DATE": f"""{user_input_date}"""} },
        limit=5,
    ).to_json()

    #st.json(response)
    # Parse the JSON5 string
    context_chunks = json.loads(response)
    
    #transform the data into a single string
    context_full = ""
    for chunk in context_chunks['results']:
        context_full += chunk['CHUNK'] + " "

    #build our prompt
    cs_prompt = f'''
            Role: You are an expert Senior Economist deeply knowledgeable on Federal Reserve documents and guidance including FOMC or Federal Open Market Committee 
            meeting minutes and communications. You are an expert in interpreting and answering investment-related questions based on meeting minutes and communications 
            which you are provided as context with each question.
            
            Data: You are provided with relevant text of a Federal Reserve Guidance or FOMC meeting notes relenavt to the question asked. 
            These meeting notes are generally released before the Federal Reserve takes action on economic policy.
            
            Task: Follow these instructions,
            1) Answer the question based on the context. 
            2) Be terse and do not consider information outside what you have been provided in the question and context.
            
            Output: produce thourough, valid, grammatically correct, and concise response in a professional tone. Please do not preface your response. also provide the document and location you used to answer the question.
            Context: {context_full}
            Question: Based on documents released on this date {user_input_date}, {user_input_question} 
            '''

    data = complete_cs(user_input_model, cs_prompt)
    
    with st.chat_message("model", avatar ="assistant"):
        st.write(data)