# Composite Chain

This chain takes in the output from conversation retrieval chain and references the eligiblity table to decided whether to return the original answer or a paraphrased response that guides the conversation along.

In [1]:
! /opt/conda/envs/python3-11-6/bin/python3.11 -m pip install --no-cache-dir -r requirements.txt

Collecting google-cloud-aiplatform==1.43.0 (from -r requirements.txt (line 18))
  Downloading google_cloud_aiplatform-1.43.0-py2.py3-none-any.whl.metadata (27 kB)
Collecting langchain==0.1.10 (from -r requirements.txt (line 35))
  Downloading langchain-0.1.10-py3-none-any.whl.metadata (13 kB)
Collecting langchain-community==0.0.25 (from -r requirements.txt (line 36))
  Downloading langchain_community-0.0.25-py3-none-any.whl.metadata (8.1 kB)
Collecting langchain-core==0.1.28 (from -r requirements.txt (line 37))
  Downloading langchain_core-0.1.28-py3-none-any.whl.metadata (6.0 kB)
Collecting langsmith==0.1.10 (from -r requirements.txt (line 40))
  Downloading langsmith-0.1.10-py3-none-any.whl.metadata (13 kB)
Downloading google_cloud_aiplatform-1.43.0-py2.py3-none-any.whl (4.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.1/4.1 MB[0m [31m127.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading langchain-0.1.10-py3-none-any.whl (806 kB)
[2K   [90m━━━━━━━━━━━━━━━━━

In [2]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth as google_auth

    google_auth.authenticate_user()

In [3]:
PROJECT_ID="engaged-domain-403109"
REGION="asia-southeast1"
GCS_BUCKET="engaged-domain-403109-me-bucket"
ME_INDEX_ID="projects/510519063638/locations/asia-southeast1/indexes/4693366538231611392"
ME_ENDPOINT_ID="projects/510519063638/locations/asia-southeast1/indexEndpoints/3617586769429528576"

# Utility Function

In [4]:
from langchain_core.prompt_values import ChatPromptValue, StringPromptValue
import pprint

pp = pprint.PrettyPrinter(indent=2)

def debug_fn(x):
    """This function takes a generic, and prints it before passing it on to the next function.

    Think of it as a middleware.

    Examples: 
    answer = {
            "question": lambda x: x["question"],
            # pylint: disable-next=not-callable
            "answer": final_inputs | ANSWER_PROMPT | debug_fn | self.model,
            "docs": itemgetter("docs"),
        }

    standalone_question = {
            "standalone_question": {
                "question": lambda x: x["question"],
                "chat_history": lambda x: x["chat_history"],
            }
            | CONDENSE_QUESTION_PROMPT
            | debug_fn
            | self.model
            | StrOutputParser(),
        }
    """
    if isinstance(x, (ChatPromptValue, StringPromptValue)):
        prompt_val = x.to_string()
        pp.pprint({
          "len": len(prompt_val),
          "prompt_val": prompt_val,
        })
    else:
        # Prints input as is
        pp.pprint(x)
    
    return x


In [5]:
import os
import faiss

from langchain_google_vertexai import VertexAIEmbeddings
from langchain.vectorstores.matching_engine import MatchingEngine
from langchain_community.docstore import InMemoryDocstore
from langchain_community.vectorstores import FAISS
from langchain.memory import VectorStoreRetrieverMemory


def get_vector_search_retriever():
    """
    This method returns a retriever using vector search (ie. Matching Engine)
    """

    embeddings = VertexAIEmbeddings(location=REGION, model_name="textembedding-gecko@001")

    me = MatchingEngine.from_components(
        project_id=PROJECT_ID,
        region=REGION,
        gcs_bucket_name=GCS_BUCKET,
        embedding=embeddings,
        index_id=ME_INDEX_ID,
        endpoint_id=ME_ENDPOINT_ID,
    )

    NUMBER_OF_RESULTS = 4

    # Expose index to the retriever
    # https://api.python.langchain.com/en/latest/vectorstores/langchain_community.vectorstores.matching_engine.MatchingEngine.html?highlight=matchingengine#langchain_community.vectorstores.matching_engine.MatchingEngine.as_retriever
    retriever = me.as_retriever(
        search_type="similarity",
        search_kwargs={
            "k": NUMBER_OF_RESULTS,
        },
    )

    return retriever


def get_memory_retriever():
    """
    This method returns a vector store retriever that retrieves stored memories
    """
    EMBEDDING_SIZE = 768
    index = faiss.IndexFlatL2(EMBEDDING_SIZE)
    embedding_fn = VertexAIEmbeddings(model_name="textembedding-gecko@001")

    # pylint: disable-next=not-callable
    vectorstore_memory = FAISS(embedding_fn, index, InMemoryDocstore({}), {})

    retriever = vectorstore_memory.as_retriever(search_kwargs={"k": 2})
    memory = VectorStoreRetrieverMemory(
        retriever=retriever,
        return_messages=True,
        input_key="human",
        output_key="ai"
    )

    return memory


# Singleton

In [6]:
from threading import Lock

class SingletonMeta(type):
    """
    This is a thread-safe implementation of Singleton.
    """

    _instances = {}

    _lock: Lock = Lock()
    """
    We now have a lock object that will be used to synchronize threads during
    first access to the Singleton.
    """

    def __call__(cls, *args, **kwargs):
        """
        Possible changes to the value of the `__init__` argument do not affect
        the returned instance.
        """
        # Now, imagine that the program has just been launched. Since there's no
        # Singleton instance yet, multiple threads can simultaneously pass the
        # previous conditional and reach this point almost at the same time. The
        # first of them will acquire lock and will proceed further, while the
        # rest will wait here.
        with cls._lock:
            # The first thread to acquire the lock, reaches this conditional,
            # goes inside and creates the Singleton instance. Once it leaves the
            # lock block, a thread that might have been waiting for the lock
            # release may then enter this section. But since the Singleton field
            # is already initialized, the thread won't create a new object.
            if cls not in cls._instances:
                instance = super().__call__(*args, **kwargs)
                cls._instances[cls] = instance
        return cls._instances[cls]

# Templates

In [7]:
# SYSTEM_TEMPLATE = """You are a helpful assistant from the Singapore government to caregivers or persons with disabilities, including the elderly. You are able to suggest relevant grants and schemes based on their unique circumstances and type of support they are looking for. You are most familiar with financial grants and schemes, but should still be able to answer generic questions related to caregiving or other support for persons with disabilities. 
# You should first understand what type of support they are looking for. Only then, you should proceed to ask them for more information in order to provide more relevant recommendations. If possible, try to anticipate what they need and assess relevant information based on the context they provide. Determine the subject of their responses based on context of the most recent messages they have sent.
# As part of your assessment to provide recommendations, you should ideally consider the beneficiary's age, impairment, activities of daily living that they need assistance with, and the average income per capita in their household. With regard to the activites of daily living, there are six pre-defined categories you should look out for: eating, dressing, toileting, bathing, walking or moving around, transferring from bed to chair and vice versa. 

# If the user is unwilling to share any information, do not force them to disclose this information, but don't give up. Move on to ask for other details instead. 
# The more details you know, the more relevant suggestions you can give by narrowing them down based on the eligibility criteria. The less details you know, the more generic suggestions you can give. Even if the user provides no details, try to give them at least the most generic suggestions, even if it means giving them some examples of all available solutions.

# English might not be your user's first language. Always ensure that your responses are concise, easy to read and understand.Ask them questions one at a time as follow-up instead of overwhelming them with multiple questions at once. 
# Your responses should always be empathetic but not sympathetic and respectful to preserve the dignity of the caregiver or persons with disabilities. Always revise your response to replace or explain technical jargons, and match the complexity of language to the human's inputs, without being condescending or using derogatory terms."""
SYSTEM_TEMPLATE = """Imagine you are an assistant from the Singapore government specialising in suggesting relevant financial grants and schemes for caregivers or persons with disabilities including the elderly based on their unique circumstances. 
Your task is to guide them to understand and decide what they can do next by providing concise answers to their questions, followed by further details on complex topics and relevant suggestions on grants and schemes if necessary. 
You should focus on providing empathetic and informative responses that prioritise user understanding and actionable recommendations."""

# TODO: 
# - I noticed that the topic setting functionality in steps 7-8 is not good when the question is not explicitly about grants.
# - For eg, question is "What is SPED?", the topic set is "Caregiving" when it should be more verbose
# - It might help if we be more verbose in Step 8 in describing "If no...."
# STANDALONE_TEMPLATE = """Imagine you are assisting someone who specialises in helping caregivers or persons with disabilities. Your task is to refine a given user question by incorporating relevant context from a given conversation history. Respond with an enhanced standalone question that reflects a deeper understanding of the user's needs. Follow these steps in your response: 
# 1. Begin by analysing the given user question related to caregivers or persons with disabilities. Identify key themes and keywords without making any assumptions.
# 2. Consider the provided conversation history to understand the context of the ongoing discussion and any relevant topics or details. Think critically and identify explain your judgement on how relevant the given pieces of the conversation history is to the user's question.
# 3. Integrate only pieces from the conversation history that you have evaluated as relevant as context into the user question without making any assumptions or referring to your own knowledge. If there is no relevant context identified from the conversation history, do not alter the given user question at all, and you should return the same given user question as your refined question. 
# 5. Ensure that the refined question is clear, coherent and reflects a deeper understanding of the user's situation on its own, without a need refer to any of the given conversation history. 
# 6. Make an overall judgement on how well the refined standalone question incorporates pertinent details from the given conversation history. 
# 7. Evaluate if the refined standalone question requires a response with information about specific grants that are either explicitly mentioned in the question, or you identify as relevant examples. 
# 8. If no, identify the main subject of each topic and set it as a topic. If yes, identify the name of the grant and set it as the topic.
# The previous conversation is: 
# {chat_history}
# 
# 
# Follow Up Input: {question}
# {format_instructions}
# """
STANDALONE_TEMPLATE = """Your task is to refine a given user question by weaving in relevant details from a provided chat history. Craft a rephrased question that can be understood independently, capturing the essence of the original query while incorporating contextual nuances. Follow the steps below while ensuring that each transition in thought maintains coherence and clarity, resulting in a refined, standalone question reflective of the conversation's progression: 

1. Begin by analysing the given user question thoroughly. This entails identifying the main topic, intent, and any specific details or nuances in the query. 
2. Analyse the provided chat history and make a judgement if there is relevant information that can enhance the understanding of the given user question. Look for context, background details or previous discussions related to the main topic at hand. If there is no relevant judgement, skip the remaining steps and return the given user question as your response without making any changes. 
{chat_history}
3. Extract and integrate the relevant context into the rephrased question without making any assumptions outside of the given chat history. Transition smoothly from the user's original inquiry to incorporating details from the chat history, ensuring coherence and logical progression. 
4. Preserve the essence of the user's original question throughout the rephrasing process. Although incorporating additional context, ensure that the core topic and intent remain unchanged. 
5. Craft the rephrased question in a manner that is standalone, clear and comprehensible. Ensure that a third party can gain a deep understanding of the inquiry in the context of the user's situation without needed to refer back to previous interactions. 
6. Make a judgement if the rephrased question pertains to a specific grant or scheme, or if it covers a broader topic. 
7. If the rephrased question is about a specific grant or scheme, dynamically assign the name of the grant or scheme as the main topic. If the question is broader in scope, designate the topic you have identified as the main topic.  
8. Make an overall judgement on how well the rephrased question can help a third party understand the inquiry in the context of the user's situation without needing to refer back to previous interactions. 

Follow Up Input: {question}
{format_instructions}
"""



# Original 
# ANSWER_TEMPLATE = """Try to answer the question based on the following context:
# {context}

# The examples are:
# {examples}

# These examples are only teaching you how to navigate a conversation around a specific topic. You should not replace the current question topic with the example topic.

# Be precise and concise with your answer. Do not include half-finished sentences.

# Question: {question}
# Answer:
# """

# COT implementation
# ANSWER_TEMPLATE = """Try to answer the question based on the following context:
# {context}

# Question: {question}

# Topic: {topic}

# Examples: {examples}

# Follow these steps in your response:
# 1. Understanding the intent of the question.
# 2. Use the examples a reference in helping you understanding the nature of the input question.
# 3. These examples are only teaching you how to navigate a conversation around a specific topic. You should not replace the topic with the example topic.
# 4. In addition, reference the topic when crafting your answer.
# 3. Be precise and concise with your answer. Do not include half-finished sentences.

# {format_instructions}
# """

ANSWER_TEMPLATE = """Follow these steps in your response to the given user question:
1. Begin by analysing the given user message in the context of financial grants or schemes for caregivers, persons with disabilities and/or the elderly. Identify the main topic, intent and inquiry by the user. 
2. Generate a clear, comprehensible and concise response without making any assumptions. If the user is asking for examples, include a list of least 2 and up to 3 examples in your response without too much explanation. Do not explicitly reveal or make known your intention of giving them a summary. 
3. Ensure that your response is <800 characters. 
4. Ensure that language used should always be respectful, empathetic, not condescending and not derogatory. Always match the complexity of language used in the given user question. 
5. Ensure that throughout the answering process, your response directly addresses the core topic and intent of the user question based on the given context: 
{context}
6. Make an overall judgement how well your final response addresses the user's question. 

Question: {question}
Answer: 
"""


# 1. Evaluate if the question requires a response with information about specific grants that are either explicitly mentioned in the question, or you identify as relevant examples. 
# 2. If no, skip to point 3. If yes, craft a response for each relevant grant you have identified by following these steps: 

# TODO: 
# I noticed that 2c. is replaced by the name of the grant instead of "About the grant". It might be because of the introduction of {topic}
# in the prompt. 
# I believe you can remove the topic if you need to. The chain should be able to hanldle that.
# INFO_TEMPLATE = """Try to provide a list of summarized points based on the following context:
# {context}

# Question: {question}

# Follow these steps in your response:
# 1. Evaluate if the topic is a grant and/or the question requires a response with information about specific grants that are either explicitly mentioned in the question, or you identify as relevant examples. 
# 2. If no, skip to point 3. If yes, craft a response for each relevant grant you have identified by following these steps: 
# 2a. [Grants] In this case, the topic should be the name of the grant.
# 2b. [Grants] For each of the following sub-topics, first set these sub-topics from 2c - 2f as the title. Then, consider the context and craft a description about it.
# 2c. [Grants] About the grant
# 2d. [Grants] Eligibility
# 2e. [Grants] Expected benefits
# 2f. [Grants] Application process

# 3. Evaluate if the question requires a response with information about a general topics. Craft a response for each relevant topic you have identified by following these steps: 
# 3a. Identify the main subject of each topic and set it as a topic. 
# 3b. Consider the context, and craft a description about it. Each description should be self-contained, and should not be mentioned in another description point. Where relevant, include examples in this description

# {format_instructions}
# """

INFO_TEMPLATE = """Follow these steps to generate a response to the given topic and user question:

Topic: {topic}
Question: {question} 

1. Begin by understanding if the given topic is about a specific grant or scheme, or if it covers a broader topic.  
2. Analyse the following context to extract relevant information to the given question: 
{context}
If the given topic is about a specific grant or scheme, this should include details about specific grants, eligibility criteria, expected benefits and application processes. 
If the given topic or question is broader in scope, skip to Step 5.
3. Ensure the inclusion of fixed sections with these specific titles in your response: About the grant, Eligibility, Expected benefits, Application process. These sections provide comprehensive information extracted from the context about the grant or scheme to help user's understanding. 
4. Assess if there are remaining relevant details extracted from the context in Step 2 other than those already covered in Step 3. If no, skip to Step 7.
5. Assess if this extracted information from the context contains complex topics that the user may need more details or explanation for better understanding. If yes, assess if it can be broken down into mutually-exclusive sub-topics for clarity and ease of understanding. Ensure that each sub-topic is of sufficient complexity to warrant its own section. If there is no extracted information, or if the information if simple enough not to require additional explanation, skip all remaining steps and return nothing. 
Choose a maximum of the top 5 most relevant sub-topics as a part of your response.
6. If such mutually-exclusive sub-topics are identified, partition the response accordingly, with each sub-topic serving as a distinct title. Ensure seamless integration of relevant information from the given context into each sub-topic. 
7. If relevant, include a section titled 'Examples'. Provide a minimum of two relevant examples to illustrate key points or concepts, enhancing user's comprehension. 
8. Throughout the response crafting process, prioritise clarity and coherence. Ensure that the response is structured logically, facilitating easy navigation and understanding for the user. Details in each sub-topic should be self-contained and not mentioned in another sub-topic within the same response. 
Ensure that language used should always be respectful, empathetic, not condescending and not derogatory. Always match the complexity of language used in the given user question. 

{format_instructions}
"""

In [8]:
# Should be the examples to guide the crafting of standalone quesitons
FEWSHOT_ANSWER_EXAMPLES = [
    {
        "human": "My grandma just had an accident and I don't know what to do.",
        "ai": "I'm really sorry to hear about your grandma's accident. I'm here to help. Could you please provide more details about the situation? What happened, and what kind of assistance or information are you looking for? I'll do my best to support you."
    },
    {
        "human": "What kind of help can I get when my parents are getting old?",
        "ai": "As your parents age, different types of help are available, like support with daily tasks, healthcare, or financial assistance. To give better suggestions, could you share what your parents need and the type of help you're looking for?"
    },
    {
        "human": "What can I do to help my grandfather?",
        "ai": "I'm glad to help you find support for your grandfather. To give you more relevant suggestions, could you please share what kind of support you're looking for? Are you looking for financial assistance, help with daily tasks, or something else?"
    },
    {
        "human": "What kind of support is available to caregivers?",
        "ai": "Caregivers can get different kinds of help. This includes support with daily tasks, finances, and resources for information. To offer more specific suggestions, could you tell me more about your caregiving situation and the type of support you need?"
    },
    {
        "human": "What are ADLs?",
        "ai": """"ADLs, or Activities of Daily Living, are everyday tasks we usually do without help. There are 6 ADLs in Singapore:
1. Eating
2. Bathing
3. Dressing
4. Using the toilet
5. Moving from a bed to chair or chair to bed
6. Walking or moving around
It's good to know about ADLs because they can affect your eligibility for certain support grants and schemes. If you have more questions or need examples, feel free to ask!"""
    },
    {
        "human": "Maybe money?",
        "ai": "That's a good start. To provide more relevant recommendations, could you share more details about your situation? For example, what is the average income per capita in the household?"
    },
    {
        "human": "I don't want to share financial details",
        "ai": "That's okay. You don't have to share that information if you're not comfortable. There are still other ways I could help. Could you tell me more about the situation or challenges your grandparents are facing? "
    },
    {
        "human": "They are on a wheelchair",
        "ai": """"I noticed that your grandma uses a wheelchair, which may indicate she might need help with daily activities like:
- Bathing
- Using the toilet
- Moving from bed to chair or chair to bed
- Walking or moving around
Is this information correct? If you can think of any other activities or have any questions, let me know!"""
    },
]

# No longer in use
FEWSHOT_STANDALONE_QUESTION_EXAMPLES = [
    {
        "human": "What is ADL?",
        "chat_history": "",
        "ai": "What is ADL?"
    },
    {
        "human": "Name me a few",
        "chat_history": "human:  What is SPED?\nai: ' SPED stands for Special Education. SPED schools cater to children and youths with special needs who require more intensive and specialised assistance.'",
        "ai": "Name me a few SPED schools in Singapore"
    }
]


# CONVERSATIONAL RETRIEVAL CHAIN

In [9]:
from typing import List, Optional
from operator import itemgetter

from langchain_google_vertexai import ChatVertexAI
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain.prompts import ChatPromptTemplate, PromptTemplate, FewShotChatMessagePromptTemplate, HumanMessagePromptTemplate, SystemMessagePromptTemplate
from langchain_core.output_parsers import StrOutputParser, CommaSeparatedListOutputParser, JsonOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough, RunnableSequence

from langchain_google_vertexai import VertexAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.prompts import SemanticSimilarityExampleSelector

from langchain.prompts.prompt import PromptTemplate
from langchain.schema import format_document


DEFAULT_DOCUMENT_PROMPT = PromptTemplate.from_template(template="{page_content}")

def combine_documents(
    docs, document_prompt=DEFAULT_DOCUMENT_PROMPT, document_separator="\n\n"
):
    """This method formats documents into a string"""
    doc_strings = [format_document(doc, document_prompt) for doc in docs]
    return document_separator.join(doc_strings)


def get_fewshot_example_selector(examples, k=2):
    """
    This method returns an example selector that select from a series of 
    examples to dynamically place in-context information into your prompt.
    """
    embeddings = VertexAIEmbeddings(model_name="textembedding-gecko@001")
    return SemanticSimilarityExampleSelector.from_examples(
        examples,
        embeddings,
        FAISS,
        k
    )

def handle_malformed_json(x):
    """
    Function to handle occasional malformed LLM JSON output

    Backticks will be placed at the back of json string, breaking JsonOutputParser
    output='{...some json data}\n```'

    The fix is to remove the backticks if it happens.
    """
    if "\n```" in x.content:
        x.content = x.content.replace("\n```", "\n")
    return x

class StandaloneQuestionOutput(BaseModel):
    """Typings for standalone question output item"""
    topic: str = Field(
        description="This the main topic of the refined standalone question.")
    standalone_question: str = Field(
        description="This is the refined standalone question.")

class InfoItem(BaseModel):
    """Typings for description item"""
    content: str = Field(
        description="This is the content string for each InfoItem.")
    title: str = Field(
        description="This is the title associated with each InfoItem. It summarizes the corresponding content string.")


class InfoOutput(BaseModel):
    """Typings for descriptions chain output"""
    details: List[InfoItem] = Field(
        description="This is the list of InfoItems.")
    # explanation: str = Field(
    #     description="This is the explanation of your thought process in crafting the entire output. Be as thorough and detailed as you can be.")


class ConversationalRetrievalChain():
    """
    This class creates a chain that attempts to FIRST answer user question on the dataset before falling back on its own knowledge.

    final_chain = loaded_memory | standalone_question | retrieved_documents | answer / descriptions | updateMemory    
    """

    def __init__(self) -> None:
        """This method instantiates an instance of ConversationalRetrievalChain"""
        # https://cloud.google.com/vertex-ai/docs/generative-ai/learn/models
        # pylint: disable-next=not-callable
        self.model = ChatVertexAI(
            model_name="chat-bison-32k", temperature=0, max_output_tokens=8192)
        self.memory = get_memory_retriever()
        self.retriever = get_vector_search_retriever()
        self.chain = self.get_chain()

    def get_chain(self) -> RunnableSequence:
        """This method instantiates the chain"""
        loaded_memory = RunnableParallel({
            "question": lambda x: x["question"],
            "chat_history": lambda x: self.memory.load_memory_variables({"human": x["question"]})["history"]
        })

        retrieved_documents = RunnablePassthrough.assign(
            docs=itemgetter("standalone_question") | self.retriever
        )

        # get chains
        standalone_question_chain = self.get_standalone_question_chain()
        answer_chain = self.get_answer_chain()
        info_chain = self.get_info_chain()

        update_memory = RunnablePassthrough.assign(
            _=lambda x: self.save_to_memory(x["question"], x["answer"]),
        )

        final_chain = (
            loaded_memory
            | standalone_question_chain
            | retrieved_documents
            | RunnableParallel({
                "question": lambda x: x["standalone_question"],
                "topic": lambda x: x["topic"],
                "answer": answer_chain,
                "information": info_chain
            })
            | update_memory
        )

        return final_chain

    def save_to_memory(self, question: str, answer: str) -> None:
        """This method saves chat history to memory"""
        self.memory.save_context({"human": question}, {"ai": answer})

    def get_standalone_question_chain(self) -> RunnableSequence:
        """This method returns the standalone question chain"""
        standalone_question_parser = JsonOutputParser(
            pydantic_object=StandaloneQuestionOutput)
        CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(
            STANDALONE_TEMPLATE, partial_variables={"format_instructions": standalone_question_parser.get_format_instructions()})

        standalone_question_chain = (RunnablePassthrough()
                                     | CONDENSE_QUESTION_PROMPT
                                     | self.model
                                     | standalone_question_parser
                                     )

        return standalone_question_chain

    def get_answer_chain(self) -> RunnableSequence:
        """This method returns the answer chain"""
        ANSWER_PROMPT = ChatPromptTemplate(messages=[
            SystemMessagePromptTemplate.from_template(SYSTEM_TEMPLATE),
            HumanMessagePromptTemplate.from_template(ANSWER_TEMPLATE)
        ])

        FEWSHOT_ANSWER_EXAMPLE_PROMPT = ChatPromptTemplate.from_messages([
            ("human", "{human}"), ("ai", "{ai}")
        ])

        FEWSHOT_ANSWER_PROMPT = FewShotChatMessagePromptTemplate(
            example_prompt=FEWSHOT_ANSWER_EXAMPLE_PROMPT,
            example_selector=get_fewshot_example_selector(
                FEWSHOT_ANSWER_EXAMPLES, k=2)
        )

        final_inputs = {
            "context": lambda x: combine_documents(x["docs"]),
            "topic": itemgetter('topic'),
            "question": itemgetter("standalone_question"),
            "examples": lambda x: FEWSHOT_ANSWER_PROMPT.format(human=x["standalone_question"]),
        }

        answer_chain = final_inputs | ANSWER_PROMPT | self.model | StrOutputParser()

        return answer_chain

    def get_info_chain(self) -> RunnableSequence:
        """This method returns the information chain"""

        info_parser = JsonOutputParser(pydantic_object=InfoOutput)
        INFO_PROMPT = PromptTemplate.from_template(INFO_TEMPLATE, partial_variables={
            "format_instructions": info_parser.get_format_instructions()
        })

        info_chain = (
            {
                "context": lambda x: combine_documents(x["docs"]),
                "topic": itemgetter("topic"),
                "question": itemgetter("standalone_question")
            }
            | INFO_PROMPT
            | ChatVertexAI(model_name="chat-bison-32k", temperature=0, max_output_tokens=8192)
            | handle_malformed_json
            | info_parser
        )

        return info_chain


# Evaluation Chain

In [10]:
CRITERIA_TEMPLATE = """
Respond Y or N based on how well the following response follows the specified rubric. Grade only based on the rubric and expected response:

Grading Rubric: {criteria}

DATA:
---------
Question: {question}
---------
{format_instructions}
"""

CLASSIFIER_TEMPLATE = """
Given the user question below, classify it as either being about one of the criteria's in this list
{criteria_names}

Do not respond with more than one word. 

<question>
{question}
</question>

Classification:
"""


In [11]:
from operator import itemgetter
from typing import Dict

from langchain_google_vertexai import VertexAI
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableBranch, RunnableSequence, RunnableParallel
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain.prompts import PromptTemplate


from enum import Enum


class CriteriaEnum(str, Enum):
    # """The types of the Criteria."""
    # LOW_INCOME = "Is the user or the person in question considered low income?"
    # LOW_FINANCIAL_AND_OR_LEGAL_LITERACY = "Does the person in question demonstrate a limited understanding of fundamental financial and/or legal concepts?"
    # FAMILY_OF_PWD = "Is the person in question a family of a person with disability?"
    # ACCIDENT = "Was the user or person in question recently in an accident?"
    """The types of the Criteria."""
    CITIZENSHIP = "Is the recipient Singaporean or a Singapore Permanent Resident?"
    ADL = "Does the care recipient need help with 3 or more of 6 Activities of Daily Living, which are bathing, feeding, dressing, moving from bed to chair or chair to bed, using the toilet, walking or moving around?"
    RESIDENTIAL_STATUS = "Is the recipient currently in a residential long-term institution like a nursing home?"
    RESIDENTIAL_ADDRESS = "Is the address listed on the care recipient's NRIC a private property?"
    AVG_HOUSEHOLD_INCOME = "Is the average income per person in the household $2,800 and less?"
    PROPERTY_OWNERSHIP = "Does the user or their household member own more than 1 property?"



class Answer(BaseModel):
    """
    Typings for Evaluator output
    """
    criteria: str = Field(description="criteria")
    reasoning: str = Field(description="detailed explanation")
    value: str = Field(description="Y/N")


class EvaluationChain():
    """This class evaluates the user's input"""

    def __init__(self, eligibility_dict: Dict[str, bool]) -> None:
        """This method instantiates an instance of EvaluationChain"""
        # pylint: disable-next=not-callable
        self.model = VertexAI(
            model="chat-bison-32k", temperature=0, verbose=True, max_tokens=32768)
        self.eligibility_dict = eligibility_dict
        self.chain = self.get_chain()

    def get_chain(self) -> RunnableSequence:
        """
        This method returns the evaluation chain.

        ..evaluation_chain
        classifier_chain | RunnableBranch

        The classifier_chain classifies the input as one of the types in CriteriaEnum
        and passes it to RunnableBranch. The matched runnable is then invoked based
        on the value from the classifier_chain.

        ..RunnableBranch consists of
            1. Default Runnable that returns None if no branches are matched
            2. Custom runnables loaded with each criteria in CriteriaEnum
        """
        evaluators = self.get_evaluators()
        classifier_chain = self.get_classifier_chain()

        def update_and_invoke(evaluator: RunnableSequence, question: str, criteria: str) -> Dict[str, str]:
            """
            This helper function does:
            1. Invokes evaluation chain
            2. Updates eligibility criteria dictionary based on the outcome in 1
            """
            res = evaluator.invoke(question)

            if res["value"] == "Y" and self.eligibility_dict[criteria] in [False, None]:
                self.eligibility_dict[criteria] = True
            return res

        branch = RunnableBranch(
            *[
                (
                    lambda x, k=k: k in x["criteria"].strip() and self.eligibility_dict[k] in [
                        False, None],
                    lambda x, k=k, v=v: update_and_invoke(v, x["question"], k)
                ) for k, v in evaluators.items()
            ],
            lambda x: None
        )
        eval_chain = (
            RunnableParallel({
                "criteria": classifier_chain,
                "question": lambda x: x["question"]
            })
            | branch
        )

        return eval_chain

    def get_classifier_chain(self) -> RunnableSequence:
        """
        This method returns a classifier chain that classifies the question as one
        of the criteria defined in CriteriaEnum.

        If no criteria is matched, None is returned.
        """
        criteria_names = [criteria.name for criteria in CriteriaEnum]
        CLASSIFIER_PROMPT = PromptTemplate.from_template(CLASSIFIER_TEMPLATE)
        classifier_chain = (
            {
                "question": RunnablePassthrough() | itemgetter("question"),
                "criteria_names": lambda x: criteria_names
            }
            | CLASSIFIER_PROMPT
            # pylint: disable-next=not-callable
            | VertexAI(verbose=True)
            | StrOutputParser()
        )
        return classifier_chain

    def get_evaluators(self) -> Dict[str, RunnableSequence]:
        """
        This method returns a dictionary of CRITERIA_NAME : CRITERIA_CHAIN
        """
        parser = JsonOutputParser(pydantic_object=Answer)
        CRITERIA_PROMPT = PromptTemplate.from_template(
            CRITERIA_TEMPLATE, partial_variables={
                "format_instructions": parser.get_format_instructions()}
        )
        return {
            criteria.name: (
                {
                    "question": RunnablePassthrough(),
                    "criteria": lambda x, c=criteria: {c.name: c.value}
                }
                | CRITERIA_PROMPT
                # pylint: disable-next=not-callable
                | VertexAI(verbose=True)
                | parser
            ) for criteria in CriteriaEnum
        }


# Composite Chain

In [12]:
PARAPHRASE_AND_SUGGEST_TEMPLATE = """Follow these steps to paraphrase a given original answer to a given user question: 

Question: {question}
Original answer: {original_answer}


1. Begin by analysing the given user question to understand its core topic and intent. Determine if there is an opportunity, explicit or based on your prediction, to assess eligibility for grants or schemes based on original answer addresses it to guide the subsequent steps base on the given eligibility table. 
Question: {question}
Eligibility table: {eligibility_table}
2. If there are no more ‘None’ values in the eligibility table, do not rephrase the question and skip to Step 6. 
3. If there's a relevant opportunity, proceed to Step 3 to paraphrase the given original answer by subtly incorporating consideration of criteria with 'None' values in the given eligibility table. If there is no such relevant opportunity, subtly introduce the topic of eligibility assessment where relevant while ensuring that the paraphrased response maintains the primary focus of addressing the user's question directly, then skip to Step 6. 
Original answer: {original_answer} 
4. Subtly integrate follow-up questions or statements into the paraphrased answer to guide the conversation towards providing information to fill in the 'None' values in the eligibility table. The paraphrased response should only refer to one such value at a time. Ensure that the paraphrased response addresses the given question as the primary focus, and guiding the conversation being secondary. Maintain the core topic and intent of the original answer throughout the paraphrasing process.
5. Ensure guidance provided aligns with the natural flow of the conversation, and does not explicitly reveal your intention to assess eligibility, or your decision-making process based on the eligibility table. 
6. Store the returned response in memory:
{{paraphrased_response}}
7. Analyse the final paraphrased response to identify potential areas where the user might want to find out more information, including key topics or examples mentioned. 
8. Generate a minimum of 2 and maximum of 4 possible replies or questions from the perspective of the human, that they might might make to your paraphrased response. Ensure that each option is concise (<124 characters) and avoids derogatory language.
9. Where relevant, provide options that allow the user to: Clarify or understand more about each specific topic or example mentioned in the paraphrased response, skip answering the question where necessary, engage in further discussion or seek additional information. 
10. Maintain coherence between the paraphrased response and the generated reply options to facilitate a smooth transition in the conversation.

{format_instructions}
"""

In [13]:
from typing import List

from langchain.prompts import PromptTemplate
from langchain_google_vertexai import VertexAI
from langchain_core.runnables import RunnableParallel, RunnableSequence
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.pydantic_v1 import BaseModel, Field


class ParaphraseAndSuggestOutput(BaseModel):
    """
    Typings for paraphrased chain output
    """
    answer: str = Field(description="answer")
    reply_options: List[str] = Field(
        description="list of suggested user responses")


class CompositeChain(metaclass=SingletonMeta):
    """
    This class combines both evaluation_chain and conversation_retrieval_chain 
    and paraphrases the answer if necessary.
    """

    def __init__(self) -> None:
        """This method instantiates an instance of CompositeChain"""
        self.eligibility_dict = {
            criteria.name: None
            for criteria in CriteriaEnum
        }
        self.eval_chain = EvaluationChain(self.eligibility_dict).chain
        self.cr_chain = ConversationalRetrievalChain().chain
        self.paraphrase_and_suggest_chain = self.get_paraphrase_and_suggest_chain()
        self.chain = self.get_chain()

    def get_chain(self) -> RunnableSequence:
        """
        This method returns the composite chain.

        ..composite_chain
          (evaluation_chain, conversational_chain) | paraphrase_chain

        The evaluation_chain and classifier_chain are ran in parallel. The outputs are passed
        to the paraphrase_chain which paraphrases the answer from classifier_chain based on
        the output from evaluation_chain.

        This is to guide the conversation along to determine the eligibility of the user for
        the grants.
        """
        chain = (
            RunnableParallel(
                {
                    "eval_chain": self.eval_chain,
                    "conversational_chain": self.cr_chain,
                })
            | {
                "response": self.paraphrase_and_suggest_chain,
                "information": lambda x: x["conversational_chain"]["information"] if "information" in x["conversational_chain"] else [],
                "topic": lambda x: x["conversational_chain"]["topic"]
            }
            | {
                "answer": lambda x: x['response']["answer"],
                "reply_options": lambda x: x['response']["reply_options"],
                "information": lambda x: x["information"],
                "topic": lambda x: x["topic"]
            }
        )

        return chain

    def get_paraphrase_and_suggest_chain(self) -> RunnableSequence:
        """This method returns a chain that paraphrases the answer, and provides suggested human responses"""
        parser = JsonOutputParser(pydantic_object=ParaphraseAndSuggestOutput)
        PARAPHRASE_AND_SUGGEST_PROMPT = PromptTemplate.from_template(PARAPHRASE_AND_SUGGEST_TEMPLATE, partial_variables={
            "format_instructions": parser.get_format_instructions()
        })

        paraphrase_and_suggest_chain = (
            {
                "original_answer": lambda x: x["conversational_chain"]["answer"],
                "eligibility_table": lambda x: self.eligibility_dict,
                "question": lambda x: x["conversational_chain"]["question"]
            }
            | PARAPHRASE_AND_SUGGEST_PROMPT
            | ChatVertexAI(model_name="chat-bison-32k", temperature=0, verbose=True, max_output_tokens=8192)
            | parser
        )

        return paraphrase_and_suggest_chain


In [None]:
chain = ConversationalRetrievalChain().chain

question = "Tell me about Home Caregiving Grant."

result = chain.invoke({ "question": question })

print("======= answer ========", end="\n")
pp.pprint(result)

# print("======= docs ========", end="\n")
# for i,doc in enumerate(result["docs"]):
#     pp.pprint(f"{i}: {doc}")

In [None]:
question = "Can you help me assess my eligibility?"
res = chain.invoke({"question": question})
pp.pprint(res)