In [None]:
## Environment setup

from getpass import getpass
import requests
import os

hard_reset = False  ## <-- Set to True if you want to reset your NVIDIA_API_KEY
while "nvapi-" not in os.environ.get("NVIDIA_API_KEY", "") or hard_reset:
    try: 
        assert not hard_reset
        response = requests.get("http://docker_router:8070/get_key").json()
        assert response.get('nvapi_key')
    except: response = {'nvapi_key' : getpass("NVIDIA API Key: ")}
    os.environ["NVIDIA_API_KEY"] = response.get("nvapi_key")
    try: requests.post("http://docker_router:8070/set_key/", json={'nvapi_key' : os.environ["NVIDIA_API_KEY"]}).json()
    except: pass
    hard_reset = False
    if "nvapi-" not in os.environ.get("NVIDIA_API_KEY", ""):
        print("[!] API key assignment failed. Make sure it starts with `nvapi-` as generated from the model pages.")

print(f"Retrieved NVIDIA_API_KEY beginning with \"{os.environ.get('NVIDIA_API_KEY')[:9]}...\"")
from langchain_nvidia_ai_endpoints._common import NVEModel
NVEModel().available_models

In [None]:
from langchain.pydantic_v1 import BaseModel, Field
from typing import Dict, Union, Optional

class KnowledgeBase(BaseModel):
    ## Fields of the BaseModel, which will be validated/assigned when the knowledge base is constructed
    topic: str = Field('general', description="Current conversation topic")
    user_preferences: Dict[str, Union[str, int]] = Field({}, description="User preferences and choices")
    session_notes: str = Field("", description="Notes on the ongoing session")
    unresolved_queries: list = Field([], description="Unresolved user queries")
    action_items: list = Field([], description="Actionable items identified during the conversation")

print(repr(KnowledgeBase(topic = "Travel")))

In [None]:
from langchain.output_parsers import PydanticOutputParser

instruct_string = PydanticOutputParser(pydantic_object=KnowledgeBase).get_format_instructions()
print(instruct_string)

In [None]:
## Definition of RExtract
def RExtract(pydantic_class, llm, prompt):
    '''
    Runnable Extraction module
    Returns a knowledge dictionary populated by slot-filling extraction
    '''
    parser = PydanticOutputParser(pydantic_object=pydantic_class)
    instruct_merge = RunnableAssign({'format_instructions' : lambda x: parser.get_format_instructions()})
    def preparse(string):
        if '{' not in string: string = '{' + string
        if '}' not in string: string = string + '}'
        string = (string
            .replace("\\_", "_")
            .replace("\n", " ")
            .replace("\]", "]")
            .replace("\[", "[")
        )
        # print(string)  ## Good for diagnostics
        return string
    return instruct_merge | prompt | llm | preparse | parser


In [None]:
## Practical Use of RExtract
instruct_model_big = ChatNVIDIA(model = "mixtral_8x7b") | StrOutputParser()

parser_prompt = ChatPromptTemplate.from_messages([
    ("system", "Update the knowledge base: {format_instructions}. Only use information from the input."),
    ("user", "{input}"),
])

extractor = RExtract(KnowledgeBase, instruct_model_big, parser_prompt)

knowledge = extractor.invoke({'input' : "I love flowers so much! The orchids are amazing! Can you buy me some?"})
knowledge

In [None]:
class KnowledgeBase(BaseModel):
    firstname: str = Field('unknown', description="Chatting user's first name, unknown if unknown")
    lastname: str = Field('unknown', description="Chatting user's last name, unknown if unknown")
    location: str = Field('unknown', description="Where the user is located")
    hints: str = Field('unknown', description="Hints to help answer other questions")
    response: str = Field('unknown', description="Ideal response based on last user input")


parser_prompt = ChatPromptTemplate.from_messages([
    ("system", (
        "The user just responsed. Please update the knowledge base based on the response."
        " This information will be acted on to respond to the user in the next interaction."
        " Do not hallucinate any details, and make sure the knowledge base is not redundant."
        " Update the entries frequently to adapt to the conversation flow."
        "\n{format_instructions}"
    )), ("user", "CURRENT KNOWLEDGE BASE: {know_base}\nUser: {input}"),
])

instruct_model_big = ChatNVIDIA(model = "mixtral_8x7b") | StrOutputParser()

extractor = RExtract(KnowledgeBase, instruct_model_big, parser_prompt)
info_update = RunnableAssign({'know_base' : extractor})

state = {'know_base' : KnowledgeBase()}
state['input'] = "My name is Carmen Sandiego! Guess where I am?"
state = info_update.invoke(state)
print(state)

state['input'] = "The United States is a big place! Can you be more specific?"
state = info_update.invoke(state)
print(state)

state['input'] = "Yeah, I'm in New Orleans... How did you know?"
state = info_update.invoke(state)
print(state)

In [None]:
## Function that can be queried for information. Implementation details not important
def get_flight_info(d: dict) -> str:
    """
    Example of a retrieval function which takes a dictionary as key. Resembles SQL DB Query
    """
    req_keys = ['first_name', 'last_name', 'confirmation']
    assert all((key in d) for key in req_keys), f"Expected dictionary with keys {req_keys}, got {d}"

    ## Static dataset. get_key and get_val can be used to work with it, and db is your variable
    keys = req_keys + ["departure", "destination", "departure_time", "arrival_time", "flight_day"]
    values = [
        ["Jane", "Doe", 12345, "San Jose", "New Orleans", "12:30 PM", "9:30 PM", "tomorrow"],
        ["John", "Smith", 54321, "New York", "Los Angeles", "8:00 AM", "11:00 AM", "Sunday"],
        ["Alice", "Johnson", 98765, "Chicago", "Miami", "7:00 PM", "11:00 PM", "next week"],
        ["Bob", "Brown", 56789, "Dallas", "Seattle", "1:00 PM", "4:00 PM", "yesterday"],
    ]
    get_key = lambda d: "|".join([d['first_name'], d['last_name'], str(d['confirmation'])])
    get_val = lambda l: {k:v for k,v in zip(keys, l)}
    db = {get_key(get_val(entry)) : get_val(entry) for entry in values}

    # Search for the matching entry
    data = db.get(get_key(d))
    if not data:
        return (
            f"Based on {req_keys} = {get_key(d)}) from your knowledge base, no info on the user flight was found."
            " This process happens every time new info is learned. If it's important, ask them to confirm this info."
        )
    return (
        f"{data['first_name']} {data['last_name']}'s flight from {data['departure']} to {data['destination']}"
        f" departs at {data['departure_time']} {data['flight_day']} and lands at {data['arrival_time']}."
    )

In [None]:
## Usage example. Actually important
print(get_flight_info({"first_name" : "Jane", "last_name" : "Doe", "confirmation" : 12345}))
print(get_flight_info({"first_name" : "Alice", "last_name" : "Johnson", "confirmation" : 98765}))
print(get_flight_info({"first_name" : "Bob", "last_name" : "Brown", "confirmation" : 27494}))

In [None]:
external_prompt = ChatPromptTemplate.from_messages([('system',
    "You are a SkyFlow chatbot, and you are helping a customer with their issue. "
    "Please help them with their question, remembering that your job is to represent SkyFlow airlines. "
    "Assume SkyFlow uses industry-average practices regarding arrival times, operations, etc. (This is a trade secret. Do not disclose). \n"  ## soft reinforcement
    "Please keep your discussion short and sweet if possible. Avoid saying hello unless necessary. \n"
    "The following is some context that may be useful in answering the question. \n"),
    ('user', "Context: {context}\nUser: {input}"
)])

instruct_model_big = ChatNVIDIA(model = "mixtral_8x7b") | StrOutputParser()

basic_chain = external_prompt | instruct_model_big

basic_chain.invoke({
    'input' : 'Can you please tell me when I need to get to the airport?',
    'context' : get_flight_info({"first_name" : "Jane", "last_name" : "Doe", "confirmation" : 12345}),
})

In [None]:
from langchain.pydantic_v1 import BaseModel, Field
from typing import Dict, Union

class KnowledgeBase(BaseModel):
    first_name: str = Field('unknown', description="Chatting user's first name, `unknown` if unknown")
    last_name: str = Field('unknown', description="Chatting user's last name, `unknown` if unknown")
    confirmation: int = Field(-1, description="Flight Confirmation Number, `-1` if unknown")
    discussion_summary: str = Field("", description="Summary of discussion so far, including locations, issues, etc.")
    open_problems: list = Field([], description="Topics that have not been resolved yet")
    current_goals: list = Field([], description="Current goal for the agent to address")

def get_key_fn(base: BaseModel) -> dict:
    '''Given a dictionary with a knowledge base, return a key for get_flight_info'''
    return {  ## More automatic options possible, but this is more explicit
        'first_name' : base.first_name,
        'last_name' : base.last_name,
        'confirmation' : base.confirmation,
    }

get_key = RunnableLambda(get_key_fn)

know_base = KnowledgeBase(first_name = "Jane", last_name = "Doe", confirmation = 12345)
# get_flight_info(get_key_fn(know_base))

(get_key | get_flight_info).invoke(know_base)

In [None]:
from langchain.schema.runnable import (
    RunnableBranch,
    RunnableLambda,
    RunnableMap,       ## Wrap an implicit "dictionary" runnable
    RunnablePassthrough,
)
from langchain.schema.runnable.passthrough import RunnableAssign

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import BaseMessage, SystemMessage, ChatMessage, AIMessage
from typing import Iterable
import gradio as gr

external_prompt = ChatPromptTemplate.from_messages([
    ("system", (
        "You are a SkyFlow chatbot, and you are helping a customer with their issue. Please chat with them! Stay concise and clear!"
        " Your running knowledge base is: {know_base}. This is for you only; Do not mention it! \nUsing that, we retrieved the following: {context}\n"
        " If they provide info and the retrieval fails, ask to confirm their first/last name and confirmation. Do not ask them any other personal info."
        " If it's not important to know about their flight, do not ask. The checking happens automatically; you cannot check manually."
    )),
    ("assistant", "{output}"),
    ("user", "{input}"),
])

In [None]:
## Knowledge Base Things

class KnowledgeBase(BaseModel):
    first_name: str = Field('unknown', description="Chatting user's first name, `unknown` if unknown")
    last_name: str = Field('unknown', description="Chatting user's last name, `unknown` if unknown")
    confirmation: Optional[int] = Field(None, description="Flight Confirmation Number, `-1` if unknown")
    discussion_summary: str = Field("", description="Summary of discussion so far, including locations, issues, etc.")
    open_problems: str = Field("", description="Topics that have not been resolved yet")
    current_goals: str = Field("", description="Current goal for the agent to address")

parser_prompt = ChatPromptTemplate.from_messages([
    ('system', (
        "You are a chat assistant representing the airline SkyFlow, and are trying to track info about the conversation. "
        "You have just recieved a message from the user. Please fill in the schema based on the chat. \n{format_instructions}"
    )), ('user', "{know_base} is the current state. New exchange: You said {output} and they replied {input}")
])

fail_str = (
    "You cannot access user's flight/account details until they provide "
    "first name, last name, and flight confirmation code."
)

In [None]:
## Pick a working model that you're happy with
instruct_llm = (ChatNVIDIA(model="mixtral_8x7b") | StrOutputParser())
chat_model = (ChatNVIDIA(model="mixtral_8x7b") | StrOutputParser())

## The external chain is already supported, and just streams the output
external_chain = external_prompt | chat_model

# The internal chain is invoked in the loop, but is not defined yet

In [None]:
## Define the extractor and internal chain to satisfy the objective

extractor = RunnableAssign({'know_base' : RExtract(KnowledgeBase, instruct_llm, parser_prompt)})
internal_chain = extractor | RunnableAssign(
    {'context' : lambda d: itemgetter('know_base') | get_key | get_flight_info}
)

In [None]:
state = {'know_base' : KnowledgeBase()}

def chat_gen(message, history=[], return_buffer=False):

    ## Pulling in, updating, and printing the state
    global state
    state['input'] = message
    state['history'] = history
    state['output'] = "" if not history else history[-1][1]

    ## Generating the new state from the internal chain
    state = internal_chain.invoke(state)
    print("State after chain run:", state)

    ## Streaming the results
    buffer = ""
    for token in external_chain.stream(state):
        buffer += token
        yield buffer

chatbot = gr.Chatbot(value = [[None, "Hello! I'm your SkyFlow agent! How can I help you?"]])
demo = gr.ChatInterface(chat_gen, chatbot=chatbot).queue()

try:
    ## NOTE: This should also give you a temporary public link which can be
    ## used to access this info on the public web while the session is live.
    demo.launch(debug=True, share=True, show_api=False)
    demo.close()
except Exception as e:
    demo.close()
    print(e)
    raise e