In [70]:
# importing the necessary packages
# this notebook will only use azure ai search, but feel free to extend
# with other vector dbs

from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient, SearchIndexerClient
from azure.search.documents.models import RawVectorQuery
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import ResourceNotFoundError
from azure.search.documents.indexes.models import (
    SearchIndex,  
    SearchField,  
    SearchFieldDataType,  
    SimpleField,  
    SearchableField,  
    SearchIndex,  
    SemanticConfiguration,  
    PrioritizedFields,  
    SemanticField,  
    SearchField,  
    SemanticSettings,  
    VectorSearch,  
    HnswVectorSearchAlgorithmConfiguration,
    HnswParameters,  
    VectorSearch,
    VectorSearchAlgorithmKind,
    VectorSearchProfile,
    SearchIndex,
    SearchField,
    SearchFieldDataType,
    SimpleField,
    SearchableField,
    VectorSearch,
    HnswVectorSearchAlgorithmConfiguration,
    SearchIndex,  
    SearchField,  
    SearchFieldDataType,  
    SimpleField,  
    SearchableField,  
    SearchIndex,  
    SemanticConfiguration,  
    PrioritizedFields,  
    SemanticField,  
    SearchField,  
    SemanticSettings, 
    VectorSearch,  
    HnswVectorSearchAlgorithmConfiguration,
    HnswParameters,  
    VectorSearch,
    VectorSearchAlgorithmKind,
    VectorSearchProfile
)

import ast
import dataclasses
from dataclasses import dataclass
from dotenv import dotenv_values
import evaluate
import json
from openai import AzureOpenAI
from openai.resources import Embeddings
import pandas as pd
import time

rouge = evaluate.load('rouge')

In [46]:
env_values = dotenv_values()

INDEX_NAME = "evaluation-index"

class AzureAiSearchConfig(object):
    _API_ENDPOINT_KEY = "AIS_ENDPOINT"
    _API_VERSION_KEY = "AIS_API_VERSION"
    _API_KEY_KEY = "AIS_KEY"

    _REQUIRED_KEYS = [
        _API_ENDPOINT_KEY,
        _API_VERSION_KEY,
        _API_KEY_KEY
    ]

    api_endpoint: str
    api_version: str
    api_key: str

    def __init__(self, env_values: dict[str, any]):
        _api_endpoint = env_values.get(AzureAiSearchConfig._API_ENDPOINT_KEY)
        _api_version = env_values.get(AzureAiSearchConfig._API_VERSION_KEY)
        _api_key = env_values.get(AzureAiSearchConfig._API_KEY_KEY)

        if (not _api_endpoint or not _api_version or not _api_key):
            raise ValueError(f"The following environment variables are required: {', '.join(AzureAiSearchConfig._REQUIRED_KEYS)}")

        self.api_endpoint = _api_endpoint
        self.api_version = _api_version
        self.api_key = _api_key

class AzureOpenAiConfig(object):
    _API_ENDPOINT_KEY = "AOAI_ENDPOINT"
    _API_VERSION_KEY = "AOAI_API_VERSION"
    _API_KEY_KEY = "AZURE_OPENAI_KEY"
    _DEPLOYMENT_MODEL_KEY = "AOAI_EMBEDDING_DEPLOYED_MODEL"

    _REQUIRED_KEYS = [
        _API_ENDPOINT_KEY,
        _API_VERSION_KEY,
        _API_KEY_KEY,
        _DEPLOYMENT_MODEL_KEY
    ]

    api_endpoint: str
    api_version: str
    api_key: str
    deployment_model: str

    def __init__(self, env_values: dict[str, any]):
        _api_endpoint = env_values.get(AzureOpenAiConfig._API_ENDPOINT_KEY)
        _api_version = env_values.get(AzureOpenAiConfig._API_VERSION_KEY)
        _api_key = env_values.get(AzureOpenAiConfig._API_KEY_KEY)
        _deployment_model = env_values.get(AzureOpenAiConfig._DEPLOYMENT_MODEL_KEY)

        if (not _api_endpoint or not _api_version or not _api_key):
            raise ValueError(f"The following environment variables are required: {', '.join(AzureOpenAiConfig._REQUIRED_KEYS)}")

        self.api_endpoint = _api_endpoint
        self.api_version = _api_version
        self.api_key = _api_key
        self.deployment_model = _deployment_model


azure_openai_config = AzureOpenAiConfig(env_values)
azure_ai_search_config = AzureAiSearchConfig(env_values)

In [88]:
# azure search fields
_ID_FIELD = "id"
_CONTENT_FIELD = "chunk_content"
_VECTOR_FIELD = "chunk_content_vector"
_METADATA_FIELD = "metadata"
_SOURCE_METADATA_FIELD = "source"

_SELECT_FIELDS = [
    _ID_FIELD,
    _CONTENT_FIELD,
    _VECTOR_FIELD,
    _METADATA_FIELD,
]

# index config fields
_DEFAULT_SEMANTIC_CONFIG_NAME = "default"
_HNSW_ALGORITHM_CONFIG_NAME = "hnsw_config"
_VECTOR_SEARCH_PROFILE_NAME = "hnsw_profile"

# hnsw configs
_M = 4
_EF_CONSTRUCTION = 400
_EF_SEARCH = 500
_METRIC = "cosine"

@dataclass
class SearchResult:
    id: str
    content: str
    score: float
    source: str


class _Embedding(object):
    _embeddings: Embeddings
    _embedding_model: str

    def __init__(self, embeddings: Embeddings, embedding_model: str):
        self._embeddings = embeddings
        self._embedding_model = embedding_model

    def embed(self, text: str):
        return self._embeddings.create(input = [text], model=self._embedding_model).data[0].embedding

    @classmethod
    def from_config(cls, config: AzureOpenAiConfig):
        client = AzureOpenAI(
            api_key = config.api_key,  
            api_version = config.api_version,
            azure_endpoint = config.api_endpoint
        )
        embeddings = client.embeddings
        return cls(embeddings, config.deployment_model)


class _SearchClient(object):
    _BATCH_SIZE = 1000

    search_client: SearchClient

    def __init__(self, search_client: SearchClient):
        self._search_client = search_client

    def search(
        self,
        query: str,
        embedding_vector: list[float],
        k: int = 3,
        vector_field: str = _VECTOR_FIELD,
        select: list[str] = _SELECT_FIELDS,
    ):
        vector_query = RawVectorQuery(vector=embedding_vector, k=k, fields=vector_field)
        return self._search_client.search(  
            search_text=query,  
            vector_queries= [vector_query],
            select=_SELECT_FIELDS,
        )

    def upload(
        self,
        documents: list[dict]
    ):
        batch_size = _SearchClient._BATCH_SIZE
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i+batch_size]
            self._search_client.upload_documents(documents=batch)

    @classmethod
    def from_config(cls, config: AzureAiSearchConfig, index_name: str = INDEX_NAME):
        search_client = SearchClient(config.api_endpoint, index_name, AzureKeyCredential(config.api_key))
        return cls(search_client)

class _SearchIndexClient(object):
    _search_index_client: SearchIndexClient

    def __init__(
        self,
        search_index_client: SearchIndexClient
    ):
        self._search_index_client = search_index_client

    def _get_index_client(self, index_name: str):
        return self._search_index_client.get_index(index_name)

    def _create_index_client(
        self,
        index_name: str,
        with_semantic_search: bool = False
    ):
        fields = [
            SimpleField(name=_ID_FIELD, type=SearchFieldDataType.String, key=True),
            SearchableField(name=_CONTENT_FIELD, type=SearchFieldDataType.String),
            SearchField(name=_VECTOR_FIELD, type=SearchFieldDataType.Collection(SearchFieldDataType.Single), searchable=True, vector_search_dimensions=1536, vector_search_profile=_VECTOR_SEARCH_PROFILE_NAME),
            SearchableField(name=_METADATA_FIELD, type=SearchFieldDataType.String)
        ]

        vector_search = VectorSearch(
            algorithms=[
                HnswVectorSearchAlgorithmConfiguration(
                    name=_HNSW_ALGORITHM_CONFIG_NAME,
                    kind=VectorSearchAlgorithmKind.HNSW,
                    parameters=HnswParameters(
                        m=_M,
                        ef_construction=_EF_CONSTRUCTION,
                        ef_search=_EF_SEARCH,
                        metric=_METRIC
                    )
                )
            ],
            profiles=[
                VectorSearchProfile(
                    name=_VECTOR_SEARCH_PROFILE_NAME,
                    algorithm=_HNSW_ALGORITHM_CONFIG_NAME
                )
            ]  
        )

        semantic_settings: SemanticSettings | None = None
        if with_semantic_search:
            semantic_settings = SemanticSettings(
                configuration=[
                    SemanticConfiguration(
                        name=_DEFAULT_SEMANTIC_CONFIG_NAME,
                        prioritized_fields=PrioritizedFields(
                            prioritized_content_field=[
                                SemanticField(field_name=_CONTENT_FIELD),
                                SemanticField(field_name=_METADATA_FIELD)
                            ]
                        )
                    )
                ]
            )

        index = SearchIndex(
            name=index_name,
            fields=fields,
            vector_search=vector_search,
            semantic_settings=semantic_settings)
        return self._search_index_client.create_index(index)
    
    def get_or_create_index_client(
        self,
        index_name: str,
        with_semantic_search: bool = False
    ):
        try:
            return self._get_index_client(index_name)
        except ResourceNotFoundError:
            print(f"Index {index_name} does not exist... Creating a new index.")
            return self._create_index_client(index_name, with_semantic_search)

    def delete_index(self, index_name: str):
        self._search_index_client.delete_index(index_name)

    @classmethod
    def from_config(cls, config: AzureAiSearchConfig):
        search_index_client = SearchIndexClient(config.api_endpoint, AzureKeyCredential(config.api_key))
        return cls(search_index_client)


class GroundTruthDatasetLoader(object):
    _DATASET_PATH = "../code_samples/data/thunderbolt/ground_truth/qa_dataset.csv"

    # df keys
    _QUESTION_KEY = "question"
    _ANSWER_KEY = "answer"
    _SOURCE_KEY = "source"
    _SEARCH_RESULT_KEY = "search_result"

    _df: pd.DataFrame

    def __init__(self):
        self._df = pd.read_csv(GroundTruthDatasetLoader._DATASET_PATH)

    def load_rows(self):
        for i, row in self._df.iterrows():
            yield i, row

    def get_question_from_row(self, row: pd.Series):
        return row[GroundTruthDatasetLoader._QUESTION_KEY]
    
    def get_answer_from_row(self, row: pd.Series):
        return row[GroundTruthDatasetLoader._ANSWER_KEY]
    
    def get_source_from_row(self, row: pd.Series):
        return row[GroundTruthDatasetLoader._SOURCE_KEY]
    
    def get_seach_answers_from_row(self, row: pd.Series) -> list[SearchResult]:
        search_results = json.loads(row[GroundTruthDatasetLoader._SEARCH_RESULT_KEY])
        return [SearchResult(**search_result) for search_result in search_results]

    def set_search_answers_to_row(self, index: int, search_response: list[SearchResult]):
        value = json.dumps([dataclasses.asdict(elem) for elem in search_response])
        self.set_key(index, GroundTruthDatasetLoader._SEARCH_RESULT_KEY, value)

    def set_key(self, index: int, key: str, value: any):
        self._df.at[index, key] = value

    def save(self, path: str):
        self._df.to_json(path, orient="records")


In [89]:
class ExperimentOrchestrator(object):
    _IN_TOP_KEY = "in_top"

    _DATASET_PATH = "../code_samples/data/thunderbolt/embeddings.json"
    _dataset: list[dict]
    _index_name: str

    def __init__(
        self,
        search_index_client: _SearchIndexClient,
        search_client: _SearchClient,
        embedding: _Embedding,
        ground_truth_dataset_loader: GroundTruthDatasetLoader,
        index_name: str = INDEX_NAME
    ):
        # delete the index
        print(f"Deleting index {index_name}...")
        search_index_client.delete_index(index_name)
        time.sleep(2)

        self._search_index_client = search_index_client
        self._search_client = search_client
        self._embedding = embedding
        self._ground_truth_dataset_loader = ground_truth_dataset_loader
        self._index_name = index_name
        self._dataset = self._read_json_dataset(ExperimentOrchestrator._DATASET_PATH)

    def _read_json_dataset(self, dataset_path: str):
        with open(dataset_path, "r") as f:
            return json.load(f)

    def start(self):
        # create the index if not exists and uploading the data
        print(f"Creating index and uploading data...")
        self._search_index_client.get_or_create_index_client(self._index_name)
        self._search_client.upload(self._dataset)
        time.sleep(2) # adding wait as documents may need to settle

        print("Performing search...")
        for index, row in self._ground_truth_dataset_loader.load_rows():
            question = self._ground_truth_dataset_loader.get_question_from_row(row)
            embedding = self._embedding.embed(question)
            search_result_paged = self._search_client.search(question, embedding)

            search_results_final: list[SearchResult] = []
            for item in search_result_paged:
                del item[_VECTOR_FIELD]
                source = json.loads(item[_METADATA_FIELD])[_SOURCE_METADATA_FIELD]
                search_result = SearchResult(
                    id=item[_ID_FIELD],
                    content=item[_CONTENT_FIELD],
                    score=item["@search.score"],
                    source=source
                )
                search_results_final.append(search_result)
            self._ground_truth_dataset_loader.set_search_answers_to_row(index, search_results_final)

        print("Evaluting chunks...")
        for index, row in self._ground_truth_dataset_loader.load_rows():
            # calculate in top init
            expected_source = self._ground_truth_dataset_loader.get_source_from_row(row)
            search_results = self._ground_truth_dataset_loader.get_seach_answers_from_row(row)
            search_sources = [search_result.source for search_result in search_results]

            contains_source = 1 if expected_source in search_sources else 0
            self._ground_truth_dataset_loader.set_key(index, ExperimentOrchestrator._IN_TOP_KEY, contains_source)

            # rouge L metric
            # create code for rouge L score
        self._ground_truth_dataset_loader.save("thunderbolt.json")

    @classmethod
    def from_config(
        cls,
        azure_ai_search_config: AzureAiSearchConfig,
        azure_openai_config: AzureOpenAiConfig,
        index_name: str
    ):
        search_client_index = _SearchIndexClient.from_config(azure_ai_search_config)
        search_client = _SearchClient.from_config(azure_ai_search_config, index_name)
        embedding = _Embedding.from_config(azure_openai_config)
        
        return cls(search_client_index, search_client, embedding, GroundTruthDatasetLoader(), index_name)

orchestrator = ExperimentOrchestrator.from_config(azure_ai_search_config, azure_openai_config, INDEX_NAME)
orchestrator.start()

Deleting index evaluation-index...
Creating index and uploading data...
Index evaluation-index does not exist... Creating a new index.
Performing search...
Evaluting chunks...
