# AI Observability for Airline Search Service

 
Before beginning, install the following packages using the packages menu at the top right of this notebook: `snowflake-ml-python`, `snowflake.core`, `trulens-core`, `trulens-providers-cortex`, `trulens-connectors-snowflake`

## Run "Create Objects and Load Data.sql" before running the steps below

## Initialize the session

In [None]:
from snowflake.snowpark.context import get_active_session
session = get_active_session()

## Ensure that the AIRLINE_STG has been created and the two Airline PDFs have been loaded to this stage.
## Verify the PDF Files are uploaded successfully

In [None]:
ls @cortex_ai_db.public.airline_stg

## Parse PDF Files and save the parsed text in a table 

In [None]:
CREATE OR REPLACE TABLE CORTEX_AI_DB.PUBLIC.PARSED_AIRLINE_CONTENT AS SELECT 
relative_path,
TO_VARCHAR(
SNOWFLAKE.CORTEX.PARSE_DOCUMENT(
@cortex_ai_db.public.airline_stg, 
relative_path, 
{'mode': 'LAYOUT'}
) :content
) AS parsed_text
FROM directory(@cortex_ai_db.public.airline_stg)
WHERE relative_path LIKE '%.pdf'

## View the table containing the parsed texts

In [None]:
SELECT * FROM CORTEX_AI_DB.PUBLIC.PARSED_AIRLINE_CONTENT;

# Storing Chunks
### 1. Create a new table (CHUNKED_AIRLINE_CONTENT) to store chunks from parsed text.
### 2. Insert the chunks derived from the parsed text into CHUNKED_AIRLINE_CONTENT by using SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER function
### The SPLIT_TEXT_RECURSIVE_CHARACTER function divides text into smaller segments through a recursive process, optimizing content for text embedding or search indexing applications. It produces an array of text chunks derived from the original content based on specified parameters. This function is particularly useful when preparing text data for natural language processing tasks that require standardized text chunk sizes.The chunk_size and overlap can be experimented with.

In [None]:
CREATE OR REPLACE TABLE CORTEX_AI_DB.PUBLIC.CHUNKED_AIRLINE_CONTENT (
    file_name VARCHAR,
    CHUNK VARCHAR
);

INSERT INTO CORTEX_AI_DB.PUBLIC.CHUNKED_AIRLINE_CONTENT (file_name, CHUNK)
SELECT
    relative_path,
    c.value AS CHUNK
FROM
    CORTEX_AI_DB.PUBLIC.PARSED_AIRLINE_CONTENT,
    LATERAL FLATTEN( input => SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER (
        parsed_text,
        'markdown',
        1000,
        120
    )) c;

## View the inserted chunked text outputs

In [None]:
SELECT * FROM CORTEX_AI_DB.PUBLIC.CHUNKED_AIRLINE_CONTENT;

## Create a Search Service

In [None]:
CREATE OR REPLACE CORTEX SEARCH SERVICE CORTEX_AI_DB.PUBLIC.AIRLINE_SEARCH_SERVICE
    ON chunk
    WAREHOUSE = compute_wh
    TARGET_LAG = '1 minute'
    EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
    AS (
    SELECT
        file_name,
        chunk
    FROM CORTEX_AI_DB.PUBLIC.CHUNKED_AIRLINE_CONTENT
    );

## Creating a Class object in Python to store and call the Cortex service created above
### We will create a class CortexSearchRetriever that connects to a Snowflake Cortex Search Service and retrieves search results based on a query. The search results are retrieved as a list of chunks.

In [None]:
import os
from snowflake.core import Root
from typing import List
from snowflake.snowpark.session import Session

class CortexSearchRetriever:

    def __init__(self, snowpark_session: Session, limit_to_retrieve: int = 4):
        self._snowpark_session = snowpark_session
        self._limit_to_retrieve = limit_to_retrieve

    def retrieve(self, query: str) -> List[str]:
        root = Root(session)

        search_service = (root
          .databases["CORTEX_AI_DB"]
          .schemas["PUBLIC"]
          .cortex_search_services["AIRLINE_SEARCH_SERVICE"]
        )
        resp = search_service.search(
          query=query,
          columns=["chunk"],
          limit=self._limit_to_retrieve
        )

        if resp.results:
            return [curr["chunk"] for curr in resp.results]
        else:
            return []

## Turn on OpenTelemetry Tracing

Before constructing our Retrieval-Augmented Generation system, we will need to first set up TruLens-OpenTelemetry to provide comprehensive tracing and observability capabilities.

In [None]:
import os
os.environ["TRULENS_OTEL_TRACING"] = "1"

### Use cortex_ai_db.observability_schema to store our traces and evaluations 

In [None]:
use schema cortex_ai_db.observability_schema;

## The airline file containing the test queries for RAG validation should already be loaded to a table that will be compared with for computing the LLM's performance metrics

In [None]:
select * from airline_queries_answers limit 5;

## Create the RAG with instrumentation

We will implement the Retrieval-Augmented Generation system with built-in monitoring tools. By incorporating specific span types and attributes in our instrumentation, we will enable detailed evaluation of the captured performance data.

In [None]:
from snowflake.cortex import complete
from trulens.core.otel.instrument import instrument
from trulens.otel.semconv.trace import SpanAttributes

class RAG:

    def __init__(self):
        self.retriever = CortexSearchRetriever(snowpark_session=session, limit_to_retrieve=4)

    @instrument(
        span_type=SpanAttributes.SpanType.RETRIEVAL,
        attributes={
            SpanAttributes.RETRIEVAL.QUERY_TEXT: "query",
            SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: "return",
            }
    )
    def retrieve_context(self, query: str) -> list:
        """
        Retrieve relevant text from vector store.
        """
        return self.retriever.retrieve(query)


    @instrument(
        span_type = SpanAttributes.SpanType.GENERATION)
    def generate_completion(self, query: str, context_str: list) -> str:
        """
        Generate answer from context.
        """
        prompt = f"""
          You are an expert airline support assistant well equipped with retrieving information from context provided to answer questions.
          Use only the provided context to answer questions accurately and concisely.Do not fabricate or infer 
          information beyond what is given. 
          If the answer is not in the context, respond clearly that the information is unavailable.
          Context: {context_str}
          Question:
          {query}
          Answer:
        """
        response = ""
        stream = complete("mistral-7b", prompt, stream = True)
        for update in stream:    
          response += update
          print(update, end = '')
        return response

    @instrument(
        span_type=SpanAttributes.SpanType.RECORD_ROOT, 
        attributes={
            SpanAttributes.RECORD_ROOT.INPUT: "query",
            SpanAttributes.RECORD_ROOT.OUTPUT: "return",
        })
    def query(self, query: str) -> str:
        context_str = self.retrieve_context(query)
        return self.generate_completion(query, context_str)


rag = RAG()

## Register the App using Trulens Snowflake Connector

Set metadata including application name and version, along with the snowpark session to store the experiments.

In [None]:
from trulens.apps.app import TruApp
from trulens.connectors.snowflake import SnowflakeConnector

tru_snowflake_connector = SnowflakeConnector(snowpark_session=session)

app_name = "AIRLINE_RAG"
app_version = "V0"

tru_rag_air = TruApp(
        rag,
        app_name=app_name,
        app_version=app_version,
        connector=tru_snowflake_connector
    )

## Set up the configuration for running experiments and add the run to TruLens.

In [None]:
from trulens.core.run import Run
from trulens.core.run import RunConfig

run_name = "airline_experiment_run_1"

run_config = RunConfig(
    run_name=run_name,
    dataset_name="AIRLINE_QUERIES_ANSWERS",
    description="Questions about the Airline Passenger experience, ticketing and refund policies.",
    label="airline_rag_eval",
    llm_judge_name= "mistral-large2",
    source_type="TABLE",
    dataset_spec={
        "input": "QUERY",
        "ground_truth_output":"GROUND_TRUTH_RESPONSE",
    },
)

run: Run = tru_rag_air.add_run(run_config=run_config)

## Start the run

Start the experiment run with the prepared test set. Doing so will invoke the application in batch using the inputs in the dataset you provided in the run.

In [None]:
run.start()

### BEFORE RUNNING THE NEXT CELL - Navigate to the Evaluation in AI/ML section within Snowsight (AIRLINE_RAG). 
### Refresh and verify that the run is complete (Check the Status and wait till the progress spinner in the Status column stops indicating completion) before running the next step to compute metrics.

## Compute metrics

In [None]:
run.compute_metrics(metrics=[
    "coherence",
    "answer_relevance",
    "groundedness",
    "context_relevance",
    "correctness",
])

## Now access the Evaluation in AI/ML section within Snowsight to examine the performance metrics for your current experiment.
### The Experiment will take few minutes to run.
### Click on the experiment (airline_experiment_run_1) and access metrics.
### Check the Status and wait till the progress spinner in the Status column stops indicating completion. 
### If the metrics are not available, check again in a while.

### Post Evaluation you could choose to delete the run, update prompts, make other changes like experimenting with other models, updating chunk sizes etc

# Delete current run

In [None]:
# run.delete()

# List all the instances in the app

In [None]:
tru_rag_air.list_runs()

# Delete all Instances in the app

In [None]:
#tru_rag_air.delete()