# Add AI Search to your RAG solution
This notebook will have the following two components:
1. How to build AI Search components using the REST api
2. How to test the quality and latency of RAG output using Azure OpenAI GPT-4 model

-----------

### 1. How to build AI Search components

In [None]:
import requests
import uuid
import logging


class AISearchIndexer:
    def __init__(
        self,
        search_service,
        data_source_name,
        search_index_name,
        vector_index_name,
        indexer_name,
        vector_skillset_name,
        api_key,
        api_version="2023-10-01-Preview",
    ):
        self.search_service = search_service
        self.data_source_name = data_source_name
        self.search_index_name = search_index_name
        self.vector_index_name = vector_index_name
        self.indexer_name = indexer_name
        self.vector_skillset_name = vector_skillset_name
        self.api_key = api_key
        self.api_version = api_version
        self.headers = {"Content-Type": "application/json", "api-key": self.api_key}
        self.max_service_name_size = 28
        self.vector_search_profile = self.generate_service_name("vector-profile")
        self.vector_search_config = self.generate_service_name("vector-search-config")
        self.vector_search_vectorizer = self.generate_service_name("vectorizer")
        self.semantic_config = self.generate_service_name("semantic-config")

    def generate_service_name(self, service_name_prefix):
        # Generate a UUID
        uuid_str = str(uuid.uuid4())

        # Concatenate the prefix and the UUID
        service_name = service_name_prefix + "-" + uuid_str

        # Truncate the service name to the maximum size if necessary
        if len(service_name) > self.max_service_name_size:
            service_name = service_name[: self.max_service_name_size]

        return service_name

    def create_data_source_blob_storage(
        self, blob_connection, blob_container_name, query
    ):
        data_source_payload = {
            "name": self.data_source_name,
            "description": "Data source for Azure Blob storage container",
            "type": "azureblob",
            "credentials": {"connectionString": blob_connection},
            "container": {"name": blob_container_name, "query": query},
            "dataChangeDetectionPolicy": None,
            "dataDeletionDetectionPolicy": {
                "@odata.type": "#Microsoft.Azure.Search.NativeBlobSoftDeleteDeletionDetectionPolicy"
            },
        }
        response = requests.post(
            f"https://{self.search_service}.search.windows.net/datasources?api-version={self.api_version}",
            headers=self.headers,
            json=data_source_payload,
        )
        if response.status_code == 201:
            self.data_source = response.json()
            return True
        else:
            logging.error(f"{response.status_code} || {response.json()}")
            return False

    def check_data_source_exists(self):
        response = requests.get(
            f"https://{self.search_service}.search.windows.net/datasources('{self.data_source_name}')?api-version={self.api_version}",
            headers=self.headers,
        )
        return response.status_code == 200

    def check_index_exists(self, index_name):
        response = requests.get(
            f"https://{self.search_service}.search.windows.net/indexes('{index_name}')?api-version={self.api_version}",
            headers=self.headers,
        )
        return response.status_code == 200

    def create_search_index_payload(self):
        index_payload = {
            "name": self.search_index_name,
            "fields": [
                {
                    "name": "id",
                    "type": "Edm.String",
                    "key": True,
                    "searchable": True,
                },
                {
                    "name": "content",
                    "type": "Edm.String",
                    "retrievable": True,
                    "searchable": True,
                    "filterable": False,
                    "sortable": False,
                    "facetable": False,
                },
                {
                    "name": "metadata_storage_path",
                    "type": "Edm.String",
                    "retrievable": True,
                    "searchable": False,
                    "filterable": True,
                    "sortable": False,
                    "facetable": False,
                },
                {
                    "name": "metadata_storage_name",
                    "type": "Edm.String",
                    "searchable": False,
                    "filterable": True,
                    "sortable": True,
                },
                {
                    "name": "metadata_storage_size",
                    "type": "Edm.Int64",
                    "searchable": False,
                    "filterable": True,
                    "sortable": True,
                },
                {
                    "name": "metadata_storage_content_type",
                    "type": "Edm.String",
                    "searchable": False,
                    "filterable": True,
                    "sortable": True,
                },
            ],
        }
        return index_payload

    def create_vector_index_payload(
        self, model_uri, model_name, model_api_key, embedding_dims
    ):
        index_payload = {
            "name": self.vector_index_name,
            "defaultScoringProfile": "",
            "fields": [
                {
                    "name": "id",
                    "type": "Edm.String",
                    "searchable": True,
                    "filterable": True,
                    "retrievable": True,
                    "sortable": False,
                    "facetable": False,
                    "key": True,
                    "indexAnalyzer": None,
                    "searchAnalyzer": None,
                    "analyzer": "keyword",
                },
                {
                    "name": "chunk",
                    "type": "Edm.String",
                    "searchable": True,
                    "filterable": False,
                    "retrievable": True,
                    "sortable": False,
                    "facetable": False,
                    "key": False,
                    "analyzer": "standard.lucene",
                },
                {
                    "name": "parent_key",
                    "type": "Edm.String",
                    "searchable": False,
                    "filterable": True,
                    "retrievable": True,
                    "sortable": False,
                    "facetable": False,
                    "key": False,
                },
                {
                    "name": "embedding",
                    "type": "Collection(Edm.Single)",
                    "searchable": True,
                    "filterable": False,
                    "retrievable": True,
                    "sortable": False,
                    "facetable": False,
                    "key": False,
                    "dimensions": embedding_dims,
                    "vectorSearchProfile": self.vector_search_profile,
                    "synonymMaps": [],
                },
            ],
            "scoringProfiles": [],
            "corsOptions": None,
            "suggesters": [],
            "analyzers": [],
            "normalizers": [],
            "tokenizers": [],
            "tokenFilters": [],
            "charFilters": [],
            "encryptionKey": None,
            "similarity": {
                "@odata.type": "#Microsoft.Azure.Search.BM25Similarity",
                "k1": None,
                "b": None,
            },
            "semantic": {
                "defaultConfiguration": None,
                "configurations": [
                    {
                        "name": self.semantic_config,
                        "prioritizedFields": {
                            "titleField": None,
                            "prioritizedContentFields": [{"fieldName": "chunk"}],
                            "prioritizedKeywordsFields": [
                                {"fieldName": "id"},
                                {"fieldName": "parent_key"},
                            ],
                        },
                    }
                ],
            },
            "vectorSearch": {
                "algorithms": [
                    {
                        "name": self.vector_search_config,
                        "kind": "hnsw",
                        "hnswParameters": {
                            # use cosine similarity when using OpenAI models,
                            # else use the distance metric of the embedding model
                            "metric": "cosine",
                            "m": 4,  # bi-directional link count
                            "efConstruction": 400,  # number of nearest neighbors to consider during indexiing
                            "efSearch": 500,  # number of nearest neighbors to consider during search
                        },
                        "exhaustiveKnnParameters": None,
                    }
                ],
                "profiles": [
                    {
                        "name": self.vector_search_profile,
                        "algorithm": self.vector_search_config,
                        "vectorizer": self.vector_search_vectorizer,
                    }
                ],
                "vectorizers": [
                    {
                        "name": self.vector_search_vectorizer,
                        "kind": "azureOpenAI",
                        "azureOpenAIParameters": {
                            "resourceUri": model_uri,
                            "deploymentId": model_name,
                            "apiKey": model_api_key,
                            "authIdentity": None,
                        },
                    }
                ],
            },
        }
        return index_payload

    def create_index(self, index_type="search", **kwargs):
        if self.check_data_source_exists():
            if index_type == "search":
                index_payload = self.create_search_index_payload()
            elif index_type == "vector":
                index_payload = self.create_vector_index_payload(**kwargs)
            response = requests.post(
                f"https://{self.search_service}.search.windows.net/indexes?api-version={self.api_version}",
                headers=self.headers,
                json=index_payload,
            )
            if response.status_code == 201:
                self.index = response.json()
                return True
            else:
                logging.error(f"{response.status_code} || {response.json()}")
                return False
        else:
            return False

    def create_skillset(self, model_uri, model_name, model_api_key):
        """
        Create a skillset for the indexer
        This skillset will be used to enrich the content before indexing
        """
        if self.vector_skillset_name:
            skillset_payload = {
                "name": self.vector_skillset_name,
                "description": "skills required for vector embedding creation processing",
                "skills": [
                    {
                        "@odata.type": "#Microsoft.Skills.Text.SplitSkill",
                        "name": "text-chunking-skill",
                        "description": "Skillset to describe the Text chunking required for vectorization",
                        "context": "/document",
                        "defaultLanguageCode": "en",
                        "textSplitMode": "pages",
                        "maximumPageLength": 2000,
                        "pageOverlapLength": 500,
                        "maximumPagesToTake": 0,
                        "inputs": [{"name": "text", "source": "/document/content"}],
                        "outputs": [{"name": "textItems", "targetName": "chunks"}],
                    },
                    {
                        "@odata.type": "#Microsoft.Skills.Text.AzureOpenAIEmbeddingSkill",
                        "name": "embedding-generation-skill",
                        "description": "",
                        "context": "/document/chunks/*",
                        "resourceUri": model_uri,
                        "apiKey": model_api_key,
                        "deploymentId": model_name,
                        "inputs": [{"name": "text", "source": "/document/chunks/*"}],
                        "outputs": [{"name": "embedding", "targetName": "embedding"}],
                    },
                ],
                "indexProjections": {
                    "selectors": [
                        {
                            "targetIndexName": self.vector_index_name,
                            "parentKeyFieldName": "parent_key",
                            "sourceContext": "/document/chunks/*",
                            "mappings": [
                                {
                                    "name": "chunk",
                                    "source": "/document/chunks/*",
                                    "sourceContext": None,
                                    "inputs": [],
                                },
                                {
                                    "name": "embedding",
                                    "source": "/document/chunks/*/embedding",
                                    "sourceContext": None,
                                    "inputs": [],
                                },
                            ],
                        }
                    ],
                    "parameters": {},
                },
            }
            response = requests.post(
                f"https://{self.search_service}.search.windows.net/skillsets?api-version={self.api_version}",
                headers=self.headers,
                json=skillset_payload,
            )
            if response.status_code == 201:
                self.skillset = response.json()
                return True
            else:
                logging.error(f"{response.status_code} || {response.json()}")
                return False

    def create_indexer(self, cache_storage_connection, batch_size=100):
        if self.check_index_exists(self.search_index_name) and self.check_index_exists(
            self.vector_index_name
        ):
            indexer_payload = {
                "name": self.indexer_name,
                "description": "Indexer for Azure Blob storage container",
                "dataSourceName": self.data_source_name,
                "targetIndexName": self.search_index_name,
                "skillsetName": self.vector_skillset_name,
                "schedule": {"interval": "PT24H", "startTime": "2024-01-01T00:00:00Z"},
                "parameters": {
                    "configuration": {
                        "indexedFileNameExtensions": ".txt",
                        "parsingMode": "text",
                        "dataToExtract": "contentAndMetadata",
                    },
                    "batchSize": batch_size,
                },
                "cache": {
                    "enableReprocessing": True,
                    "storageConnectionString": cache_storage_connection,
                },
            }
            response = requests.post(
                f"https://{self.search_service}.search.windows.net/indexers?api-version={self.api_version}",
                headers=self.headers,
                json=indexer_payload,
            )
            if response.status_code == 201:
                self.indexer = response.json()
                return True
            else:
                logging.error(f"{response.status_code} || {response.json()}")
                return False
        else:
            return False


In [None]:
search_indexer = AISearchIndexer(
            search_service=SEARCH_SERVICE,
            data_source_name=SEARCH_DATASOURCE_NAME,
            search_index_name=SEARCH_INDEX_NAME,
            vector_index_name=VECTOR_INDEX_NAME,
            indexer_name=SEARCH_INDEXER_NAME,
            vector_skillset_name=VECTOR_SKILLSET_NAME,
            api_key=SEARCH_API_KEY,
    )
# Step 1 - Create the Data Source
response = search_indexer.create_data_source_blob_storage(
            blob_connection=STORAGE_CONNECTION,
            blob_container_name=CONTAINER_NAME,
            query=PROJECT_NAME,
)
logging.info(f"Search Data Source status = {response}.")
# Step 2 - Create the Keyword Index
response = search_indexer.create_index(index_type="search")
logging.info(f"Keyword Search Index status = {response}.")
# Step 3 - Create the Vector Index (with embedding model)
response = search_indexer.create_index(
    index_type="vector",
    model_uri=VECTOR_EMBEDDING_URI,
    model_name=VECTOR_EMBEDDING_ID,
    model_api_key=VECTOR_EMBEDDING_API_KEY,
    embedding_dims=VECTOR_EMBEDDING_DIMENSION,
)
logging.info(f"Vector Search Index status = {response}.")
# Step 4 - Create the Vector embedding skillset to enhance the indexer
response = search_indexer.create_skillset(
    model_uri=VECTOR_EMBEDDING_URI,
    model_name=VECTOR_EMBEDDING_ID,
    model_api_key=VECTOR_EMBEDDING_API_KEY,
)
logging.info(f"Vector Skillset status = {response}.")
# Step 5 - Create the indexer which will ultimately call the vector embedding skillset
response = search_indexer.create_indexer(
    cache_storage_connection=STORAGE_CONNECTION,
    batch_size=SEARCH_INDEXER_BATCH_SIZE,
)
logging.info(f"Search Indexer status = {response}")

--------------

### 2. Test quality of AI Search Context for RAG

In [18]:
import pandas as pd
import numpy as numpy
import matplotlib.pyplot as plt
import seaborn as sns
import os
from datetime import datetime
import openai, os, requests
import time
import pprint

pp=pprint.PrettyPrinter()

%load_ext dotenv
%dotenv

The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


In [19]:
print('OpenAI version = ', openai.__version__)

OpenAI version =  0.28.0


In [24]:


openai.api_type = "azure"
# Azure OpenAI on your own data is only supported by the 2023-08-01-preview API version
openai.api_version = "2023-08-01-preview"

# Azure OpenAI setup
openai.api_base = os.getenv("OPENAI_ENDPOINT_URI") # Add your endpoint here
openai.api_key = os.getenv("OPENAI_API_KEY") # Add your OpenAI API key here
deployment_id = os.getenv("OPENAI_DEPLOYMENT") # Add your deployment ID here

# Azure AI Search setup
search_endpoint = os.getenv("AI_SEARCH_ENDPOINT"); # Add your Azure AI Search endpoint here
search_key = os.getenv("AI_SEARCH_KEY"); # Add your Azure AI Search admin key here
search_index_name = os.getenv("AI_SEARCH_INDEX"); # Add your Azure AI Search index name here

# Azure Embedding model endpoint
embedding_endpoint = os.getenv("VECTOR_EMBEDDING_URI")
embedding_key = os.getenv("VECTOR_EMBEDDING_API_KEY")


SYSTEM_PROMPT = """
You are a customer service bot that is designed to answer questions on Telstra's services. 
You must respond only from the data source provided. 
If you do not know the answer, you can say 'I don't know'.
"""

In [22]:

def setup_byod(deployment_id: str) -> None:
    """Sets up the OpenAI Python SDK to use your own data for the chat endpoint.

    :param deployment_id: The deployment ID for the model to use with your own data.

    To remove this configuration, simply set openai.requestssession to None.
    """

    class BringYourOwnDataAdapter(requests.adapters.HTTPAdapter):

        def send(self, request, **kwargs):
            request.url = f"{openai.api_base}/openai/deployments/{deployment_id}/extensions/chat/completions?api-version={openai.api_version}"
            return super().send(request, **kwargs)

    session = requests.Session()

    # Mount a custom adapter which will use the extensions endpoint for any call using the given `deployment_id`
    session.mount(
        prefix=f"{openai.api_base}/openai/deployments/{deployment_id}",
        adapter=BringYourOwnDataAdapter()
    )

    openai.requestssession = session

setup_byod(deployment_id)

In [25]:
BASE_PROMPT = [
    {"role":"system", "content": SYSTEM_PROMPT},
    ]

In [26]:
def print_chat_history(chat_history):
    for message in chat_history:
        pp.pprint(f"{message['role']}: {message['content']}")

def get_chat_response(chat_history,
                      temperature=0.7,
                        max_tokens=150,
                        top_p=1,
                        frequency_penalty=0,
                        presence_penalty=0,
                        seed=12345,
                        topN=5,
                        strictness=3,
                        enforce_inscope=True,
                        queryType="vector", # simple, semantic, vectorSimpleHybrid, vectorSemanticHybrid
                        roleInformation=SYSTEM_PROMPT,
                      ):
    completion = openai.ChatCompletion.create(
        temperature=0.7,
        messages=chat_history,
        deployment_id=deployment_id,
        dataSources=[  # camelCase is intentional, as this is the format the API expects
            {
                "type": "AzureCognitiveSearch",
                "parameters": {
                    "endpoint": search_endpoint,
                    "key": search_key,
                    "indexName": search_index_name,
                    "topNDocuments": topN,
                    "inScope": enforce_inscope,
                    "semanticConfiguration": "semantic-config-a0859498-994",
                    "roleInformation": roleInformation,
                    "strictness": strictness,
                    "embeddingEndpoint": embedding_endpoint,
                    "embeddingKey": embedding_key,
                    "queryType": queryType,
                    "fieldsMapping": {
                        "contentFields": [
                            "chunk",
                        ],
                        # "titleField" : "ADD TITLE TO INDEX",
                        # "urlField" : "ADD URL TO INDEX",
                        # "filepathField" : "ADD FILEPATH TO INDEX",
                    }

                }
            }
        ]
    )
    return completion

In [27]:
import ipywidgets as widgets
from IPython.display import display, clear_output
from copy import deepcopy
def get_user_input():
    # initialize chat history
    chat_history = deepcopy(BASE_PROMPT)
    context_history = []
    # Create text input widget
    text_input = widgets.Textarea(
        value='',
        placeholder='Type something',
        description='User Input:',
        disable=False
    )

    # Create a button widget for submitting
    submit_button = widgets.Button(
        description='Submit',
        disable=False,
        button_style='', # 'success', 'info', 'warning', 'danger' or ''
        tooltip='Submit',
        icon='check' # (FontAwesome names without the `fa-` prefix)
    )

    # Create a button widget for clearing
    clear_button = widgets.Button(
        description='Clear',
        disable=False,
        button_style='', # 'success', 'info', 'warning', 'danger' or ''
        tooltip='Clear',
        icon='remove' # (FontAwesome names without the `fa-` prefix)
    )

    # Display the widgets
    display(text_input, submit_button, clear_button)

    # Function to handle the submit button click event
    def on_submit_button_clicked(b):
        # Get the text input
        user_input = text_input.value

        # construct the prompt
        chat_history.append({"role":"user", "content": user_input})

        # Pass the input to the get_chat_response function
        start=time.time()
        response = get_chat_response(chat_history)
        # response = "DUMMY RESPONSE"
        end=time.time()
        # bot response
        bot_response = response.choices[0]['message']['content']
        context_history.append(response.choices[0]['message']['context']['messages'][0]['content'])

        # append the bot response to the chat history
        chat_history.append({"role":"assistant", "content": bot_response})

        # print chat history
        print_chat_history(chat_history)
        pp.pprint(context_history)
        pp.pprint(f"Time taken: {end-start:.3} seconds")

    # Function to handle the clear button click event
    def on_clear_button_clicked(b):
        # Clear the output
        clear_output(wait=False)
        display(text_input, submit_button, clear_button)


    # Attach the event handlers to the button widgets
    submit_button.on_click(on_submit_button_clicked)
    clear_button.on_click(on_clear_button_clicked)

In [28]:
get_user_input()

Textarea(value='', description='User Input:', placeholder='Type something')

Button(description='Submit', icon='check', style=ButtonStyle(), tooltip='Submit')

Button(description='Clear', icon='remove', style=ButtonStyle(), tooltip='Clear')

('system: \n'
 'You are a customer service bot that is designed to answer questions on '
 "Telstra's services. \n"
 'You must respond only from the data source provided. \n'
 "If you do not know the answer, you can say 'I don't know'.\n")
'user: what is telstra'
('assistant: Telstra is a telecommunications company in Australia that '
 'provides a range of services including mobile and fixed-line voice and data '
 'services, internet access, network services, and digital media services. '
 'They offer products and solutions for both consumers and businesses[doc4]. '
 'Telstra is committed to creating a connected future and acknowledges the '
 'connection of Aboriginal and Torres Strait Islander peoples to the lands and '
 'waterways across Australia[doc1][doc2][doc4][doc5].')
['{"citations": [{"content": "and much more. Download My Telstra\\nContact us '
 "We're here to answer your questions. Contact us Send us a message\\nYou can "
 "message us about a range of topics and we'll get bac

## Batch testing the endpoint

In [9]:
CAPITALAND_QUESTION_BANK = [
    "Describe the step by step Clearance process for CLI's news release. Give your answer in bullet points.",
    "What is the paternity leave policy for employees on fixed term contracts.",
    "Which employee category is eligible for busienss class travel",
    "If an employee is going on a study trip, what air travel class is he eligible for?",
    "What are the general guidelines for CLI spokesperson when it comes to corporate level media interviews?",
]

In [29]:
TELSTRA_QUESTION_BANK = [
    "How do I check the expiry of my SIM card",
    "From which year are the Telstra financial reports available?",
    "How do I buy or sell Telstra shares?",
    "Can sharehholders receive a discount o Telstra products. Answer in yes or no",
    "What is Telstra's sustainability strategy?",
    "How does Telstra work on Digital Literacy?",
    "What does LIMAC stand for and how is it related to access for everyone?",
    "Which edition of the ASX Corporate Governance Principles and Recommendations does Telstra follow?",
    "When did Telstra Group become the new listed entity of the Telstra Corporation Limited?",
    "How do I pay my bills on the My Telstra app? Answer in bullet point format in 1-2 sentences",
]

In [34]:
results = []
for qType in ["vector","simple","semantic","vectorSimpleHybrid", "vectorSemanticHybrid"]:
# for qType in ["vectorSemanticHybrid"]:
    for q in TELSTRA_QUESTION_BANK:
        start = time.time()
        chat_history = deepcopy(BASE_PROMPT)
        chat_history.append({"role":"user", "content": q})
        response = get_chat_response(chat_history, queryType=qType)
        end=time.time()
        bot_response = response.choices[0]['message']['content']
        context_history = response.choices[0]['message']['context']['messages'][0]['content']
        chat_history.append({"role":"assistant", "content": bot_response})
        print_chat_history(chat_history)
        pp.pprint(f"Time taken: {end-start:.3} seconds")
        results.append([qType, q, bot_response, context_history, end-start])

('system: \n'
 'You are a customer service bot that is designed to answer questions on '
 "Telstra's services. \n"
 'You must respond only from the data source provided. \n'
 "If you do not know the answer, you can say 'I don't know'.\n")
'user: How do I check the expiry of my SIM card'
('assistant: To check the expiry of your SIM card, you can follow these '
 'steps:\n'
 '\n'
 '1. On your smartphone or tablet:\n'
 '   - Open the My Telstra app.\n'
 '   - Go to Services.\n'
 '   - Select your Pre-Paid service.\n'
 '   - View your balance on the summary screen.\n'
 '\n'
 '2. On your desktop:\n'
 '   - Sign into My Telstra using your Telstra ID.\n'
 '   - Go to Services.\n'
 '   - Select your Pre-Paid service.\n'
 '   - View your balance on the summary screen.\n'
 '\n'
 'Please note that these steps are specifically for Telstra Pre-Paid SIM '
 'cards. If you have a different type of SIM card or need further assistance, '
 'please contact Telstra customer support for more information.\n'


In [35]:
df = pd.DataFrame(results, columns = ["query_type", "question","response","context", "time_taken"])

In [36]:
df.groupby("query_type").agg({"time_taken": ["mean", "std"]}).sort_values(("time_taken", "mean"))

Unnamed: 0_level_0,time_taken,time_taken
Unnamed: 0_level_1,mean,std
query_type,Unnamed: 1_level_2,Unnamed: 2_level_2
semantic,4.382886,2.178536
vector,4.543798,2.157552
simple,4.63625,2.430625
vectorSimpleHybrid,5.495108,2.605981
vectorSemanticHybrid,5.831898,2.628901


In [37]:
df.to_csv(f"results-{deployment_id}.csv", index=False)

---------