In [132]:
import os
from dotenv import load_dotenv
from langchain_community.vectorstores.azuresearch import AzureSearch
from langchain_openai import AzureOpenAIEmbeddings
from dotenv import load_dotenv
import os
from langchain_openai import AzureChatOpenAI
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
import streamlit as st
from typing import Any, Dict, List
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.vectorstores.azuresearch import AzureSearch
from langchain_openai import AzureOpenAIEmbeddings
from langchain_core.output_parsers.string import StrOutputParser
from langchain_core.runnables.passthrough import RunnablePassthrough
from langchain_core.prompts import PromptTemplate

In [133]:
load_dotenv()

AZURE_SEARCH_KEY = str(os.getenv("AZURE_SEARCH_KEY"))
embeddings = AzureOpenAIEmbeddings(
    azure_deployment="orisai-text-embedding-3-large-development",
)

index_name: str = "demo-index-coman"
vector_store: AzureSearch = AzureSearch(
    azure_search_endpoint="https://orisai-search-development.search.windows.net",
    azure_search_key=AZURE_SEARCH_KEY,
    index_name=index_name,
    embedding_function=embeddings.embed_query,
)

llm = AzureChatOpenAI(
    openai_api_version=str(os.getenv("AZURE_OPENAI_API_VERSION")),
    azure_deployment=str(os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME")),
)

In [134]:
from __future__ import annotations
from typing import (
    Any,
    ClassVar,
    Collection,
    Dict,
    List,
)
from langchain_core.callbacks import (
    AsyncCallbackManagerForRetrieverRun,
    CallbackManagerForRetrieverRun,
)
from langchain_core.documents import Document
from langchain_core.pydantic_v1 import root_validator
from langchain_core.retrievers import BaseRetriever
from langchain_community.vectorstores.azuresearch import AzureSearch

class CustomAzureSearchVectorStoreRetriever(BaseRetriever):
    """Retriever that uses `Azure Cognitive Search`."""

    vectorstore: AzureSearch
    """Azure Search instance used to find similar documents."""
    search_type: str = "hybrid"
    """Type of search to perform. Options are "similarity", "hybrid",
    "semantic_hybrid", "similarity_score_threshold", "hybrid_score_threshold", 
    or "semantic_hybrid_score_threshold"."""
    k: int = 4
    """Number of documents to return."""
    allowed_search_types: ClassVar[Collection[str]] = (
        "similarity",
        "similarity_score_threshold",
        "hybrid",
        "hybrid_score_threshold",
        "semantic_hybrid",
        "semantic_hybrid_score_threshold",
    )

    filters: str | None = None

    score_threshold: float | None = None

    class Config:
        """Configuration for this pydantic object."""

        arbitrary_types_allowed = True

    @root_validator()
    def validate_search_type(cls, values: Dict) -> Dict:
        """Validate search type."""
        if "search_type" in values:
            search_type = values["search_type"]
            if search_type not in cls.allowed_search_types:
                raise ValueError(
                    f"search_type of {search_type} not allowed. Valid values are: "
                    f"{cls.allowed_search_types}"
                )
        return values

    def _get_relevant_documents(
        self,
        query: str,
        run_manager: CallbackManagerForRetrieverRun,
        **kwargs: Any,
    ) -> List[Document]:
        kwargs['filters'] = self.filters
        kwargs['score_threshold'] = self.score_threshold
        print(self.search_type, kwargs)
        if self.search_type == "similarity":
            docs = self.vectorstore.vector_search(query, k=self.k,**kwargs)
        elif self.search_type == "similarity_score_threshold":
            docs = [
                doc
                for doc, _ in self.vectorstore.similarity_search_with_relevance_scores(
                    query, k=self.k, **kwargs
                )
            ]
        elif self.search_type == "hybrid":
            docs = self.vectorstore.hybrid_search(query, k=self.k, **kwargs)
        elif self.search_type == "hybrid_score_threshold":
            docs = [
                doc
                for doc, _ in self.vectorstore.hybrid_search_with_relevance_scores(
                    query, k=self.k, **kwargs
                )
            ]
        elif self.search_type == "semantic_hybrid":
            docs = self.vectorstore.semantic_hybrid_search(query, k=self.k, **kwargs)
        elif self.search_type == "semantic_hybrid_score_threshold":
            docs = [
                doc
                for doc, _ in self.vectorstore.semantic_hybrid_search_with_score(
                    query, k=self.k, **kwargs
                )
            ]
        else:
            raise ValueError(f"search_type of {self.search_type} not allowed.")
        return docs

    async def _aget_relevant_documents(
        self,
        query: str,
        *,
        run_manager: AsyncCallbackManagerForRetrieverRun,
    ) -> List[Document]:
        raise NotImplementedError(
            "AzureSearchVectorStoreRetriever does not support async"
        )

In [138]:
retriever = CustomAzureSearchVectorStoreRetriever(
    vectorstore=vector_store, 
    k=1, 
    tags=vector_store._get_retriever_tags(),
    search_type="similarity_score_threshold",
    score_threshold=0
)

prompt  = """HUMAN

You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.

Question: {input} 

Context: {context} 

Answer:
"""

custom_rag_prompt = PromptTemplate.from_template(prompt)

In [139]:
question_answer_chain = create_stuff_documents_chain(llm, custom_rag_prompt)
chain = create_retrieval_chain(retriever, question_answer_chain)

In [140]:
chain.invoke({"input": "Wat is de temperatuur van de zon?"})

similarity_score_threshold {'filters': None, 'score_threshold': 0.0}


{'input': 'Wat is de temperatuur van de zon?',
 'context': [Document(page_content='<p><strong>Tijdelijke aanvraagstop renovatiepremie en meeste energiepremies&nbsp;</strong></p><p>De Vlaamse renovatiepremie en de meeste energiepremies van de netwerkbeheerder Fluvius zijn op 1 juli 2022 omgevormd tot een eengemaakte premie: Mijn VerbouwPremie. Door de integratie kan je <strong>van 1 juli tot en met 30 september 2022</strong> geen premies meer aanvragen voor de categorieën van werken die onderdeel zijn van Mijn VerbouwPremie.&nbsp;</p><p>Het gaat om de isolatie en renovatie van daken, buitenmuren en vloeren, ramen en deuren, binnenrenovatie, elektriciteit en sanitair, zonneboilers en warmtepompen of warmtepompboilers.</p><p>Aan die lijst worden twee premies toegevoegd. De eerste is voor de hybride warmtepomp. Die combineert een nieuwe elektrische lucht-waterwarmtepomp die minstens gebruikt wordt voor ruimteverwarming met een nieuwe of bestaande gascondensatieketel. De tweede premie is vo