In [1]:
import json
import boto3
import pickle
import gradio as gr
from haystack.nodes import EmbeddingRetriever, PromptNode, PromptModel, PromptTemplate, AnswerParser
from haystack.pipelines import Pipeline
from typing import Tuple, Dict

s3 = boto3.resource('s3')

BUCKET = 'knowledge-base-assist-demo'
OPEN_AI_API_S3_KEY = 'openai_api_key.json'

In [2]:
def load_json(bucket: str, key: str) -> dict:
    """Loads a JSON file from S3 bucket.
    
    Args:
        bucket (str): S3 bucket containing JSON file
        key (str): Path within bucket of JSON file
        
    Returns:
        dict
    """
    content_object = s3.Object(bucket, key)
    file_content = content_object.get()["Body"].read().decode("utf-8")
    return json.loads(file_content)


def create_initial_pipe(prompt_model: PromptModel, initial_template: str) -> Pipeline:
    """Create the initial pipeline for query classification.

    Args:
        prompt_model (PromptModel): The prompt model to be used in the PromptNode.
        initial_template (str): The template for the initial prompt.

    Returns:
        Pipeline: The initial pipeline with a single PromptNode.
    """
    initial_node = PromptNode(prompt_model, default_prompt_template=initial_template)
    initial_pipe = Pipeline()
    initial_pipe.add_node(component=initial_node, name="initial_prompt_node", inputs=["Query"])
    
    return initial_pipe


def create_lfqa_section_doc_pipe(prompt_model: PromptModel, split_doc_retriever: EmbeddingRetriever) -> Pipeline:
    """Create the LFQA pipeline for answering questions using document sections.

    Args:
        prompt_model (PromptModel): The prompt model to be used in the PromptNode.
        split_doc_retriever (EmbeddingRetriever): The retriever for obtaining document sections.

    Returns:
        Pipeline: The LFQA pipeline with a Split Document Retriever and a PromptNode.
    """
    lfqa_section_node = PromptNode(prompt_model, default_prompt_template="question-answering")
    lfqa_section_pipe = Pipeline()
    lfqa_section_pipe.add_node(component=split_doc_retriever, name="split_doc_retriever", inputs=["Query"])
    lfqa_section_pipe.add_node(component=lfqa_section_node, name="lfqa_prompt_node", inputs=["split_doc_retriever"])
    
    return lfqa_section_pipe


def create_lfqa_whole_doc_pipe(prompt_model: PromptModel, whole_doc_retriever: EmbeddingRetriever) -> Pipeline:
    """Create the LFQA pipeline for answering questions using whole documents.

    Args:
        prompt_model (PromptModel): The prompt model to be used in the PromptNode.
        whole_doc_retriever (EmbeddingRetriever): The retriever for obtaining whole documents.

    Returns:
        Pipeline: The LFQA pipeline with a Whole Document Retriever and a PromptNode.
    """
    lfqa_whole_node = PromptNode(prompt_model, default_prompt_template="question-answering")
    lfqa_whole_pipe = Pipeline()
    lfqa_whole_pipe.add_node(component=whole_doc_retriever, name="whole_doc_retriever", inputs=["Query"])
    lfqa_whole_pipe.add_node(component=lfqa_whole_node, name="summariser_prompt_node", inputs=["whole_doc_retriever"])
    
    return lfqa_whole_pipe


def extract_outputs(output: Dict) -> Tuple[str, Dict[str, str]]:
    """Extract the answer and documents from the pipeline output.

    Args:
        output (dict): The output dictionary from the pipeline.

    Returns:
        tuple: A tuple containing the answer (str) and a dictionary of documents.
    """
    answer = output['answers'][0].answer
    docs = {f"Document {idx}": {'content': doc.content, 'name': doc.meta['name']} for idx, doc in enumerate(output['documents'], start=1)}
    return answer, docs


def process_query(query: str, pipeline: Pipeline) -> Tuple[str, Dict[str, str]]:
    """Processes a user's query using the specified pipeline and extracts the answer and related documents.

    Args:
        query (str): The user's query.
        pipeline (Pipeline): The Haystack pipeline used to process the query and generate an answer.

    Returns:
        tuple: A tuple containing the answer (str) and a dictionary of related documents.
    """
    answer_output = pipeline.run(query=query)
    return extract_outputs(answer_output)


def run_lfqa(query: str, initial_pipe: Pipeline, lfqa_section_pipe: Pipeline, lfqa_whole_pipe: Pipeline) -> Tuple[str, Dict[str, str]]:
    """Runs the initial pipeline to classify the query, then runs the appropriate LFQA
    pipeline based on the query classification result.

    Args:
        query (str): The user's query.
        initial_pipe (Pipeline): The initial pipeline for query classification.
        lfqa_section_pipe (Pipeline): The LFQA pipeline for answering questions using document sections.
        lfqa_whole_pipe (Pipeline): The LFQA pipeline for answering questions using whole documents.

    Returns:
        tuple: A tuple containing the answer (str) and a dictionary of documents.
    """
    initial_output = initial_pipe.run(query=query)
    query_class = initial_output['answers'][0].answer
    
    if 'section' in query_class.lower():
        return process_query(query, lfqa_section_pipe)
    elif 'whole' in query_class.lower():
        return process_query(query, lfqa_whole_pipe)
    else:
        return "Answer cannot be provided with internal knowledge base.", {}

In [3]:
with open('whole_doc_store.pkl', 'rb') as f:
    whole_doc_store = pickle.load(f)

with open('split_doc_store.pkl', 'rb') as f:
    split_doc_store = pickle.load(f)

openai_secrets = load_json(bucket=BUCKET, key=OPEN_AI_API_S3_KEY)
OPENAI_API_KEY = openai_secrets['Key']

In [4]:
policy_names = [
        "Attendance and remote work policy: Guidelines for employee attendance, remote work eligibility, home office allowances, and communication expectations at Dunder Mifflin.",
        "Benefits and compensation policy: Outlines payroll procedures, medical benefits, wellness programs, bonuses, allowances, retirement plans, and other benefits to support employee well-being and work-life balance.",
        "Code of conduct policy: Outlines standards of behavior, emphasizing core values of the company. Employees must adhere to ethical conduct, professionalism, diversity, conflict of interest, confidentiality, health and safety, and anti-harassment policies.",
        "Expense policy: Covers reimbursement for authorized business expenses, such as travel, accommodation, meals, and conferences. Employees must obtain prior approval, submit expense reports with documentation, and adhere to guidelines for corporate credit card usage.",
        "Health and safety policy: Outlines shared responsibilities among employees to maintain a safe workplace. Includes hazard identification, risk assessment, accident reporting, emergency procedures, and training.",
        "Employee leave policy: Covers vacation, sick leave, public holidays, parental, bereavement, jury duty, unpaid leave, work from home, leave donation, professional development, and military leave.",
        "Performance evaluation and promotion policy: outlines annual evaluations, promotion processes, and criteria. The policy emphasizes fairness, transparency, employee growth.",
        "Recruitment policy: outlines principles and processes for attracting, selecting, and onboarding employees. The policy covers candidate selection criteria, recruitment process, employee referral program, and comprehensive onboarding.",
        "Employee termination policy: Outlines guidelines for voluntary and involuntary terminations, ensuring fairness and legal compliance. It covers resignation notice, performance-based termination, layoffs, termination for cause, notice periods, exit interviews, final pay, benefits continuation, and return of company property.",
        "Employee working hours policy: Outlines guidelines for standard working hours, flexible schedules, breaks, and overtime, promoting a healthy work-life balance and legal compliance. Covers timekeeping, overtime compensation, approval, recordkeeping.",
        "IT acceptable use policy: Outlines employee responsibilities for using company technology resources securely and efficiently, including authorization, personal use, security measures, password protection, email etiquette, data communication, software installation, remote access, and file storage."
    ]

initial_template = PromptTemplate(
                name="initial-prompt",
                prompt_text=f"You are a knowledge base assistant for Dunder Mifflin Paper Company, Inc. and you have access to the following HR and IT policies: {' ,'.join(policy_names)}"
                "Your task is to Determine if user query needs entire an policy (answer 'Whole'), a section (answer 'Section'), or cannot be answered using policies (answer 'N/A')."
                "Only use 'Whole', 'Section', or 'N/A' as response with no additional text."
                "User query: {query}; Answer:",
                output_parser=AnswerParser(),
            )

prompt_model = PromptModel(model_name_or_path="gpt-3.5-turbo", max_length=750, api_key=OPENAI_API_KEY)

# OpenAI EmbeddingRetriever
split_doc_retriever = EmbeddingRetriever(
    document_store=split_doc_store,
    batch_size=8,
    use_gpu=False,
    embedding_model="text-embedding-ada-002",
    api_key=OPENAI_API_KEY,
    max_seq_len=8192,
    top_k=3
)

whole_doc_retriever = EmbeddingRetriever(
    document_store=whole_doc_store,
    batch_size=8,
    use_gpu=False,
    embedding_model="text-embedding-ada-002",
    api_key=OPENAI_API_KEY,
    max_seq_len=8192,
    top_k=1
)

In [5]:
initial_pipe = create_initial_pipe(prompt_model, initial_template)
lfqa_section_pipe = create_lfqa_section_doc_pipe(prompt_model, split_doc_retriever)
lfqa_whole_pipe = create_lfqa_whole_doc_pipe(prompt_model, whole_doc_retriever)

In [7]:
def gradio_interface(query: str) -> Tuple[str, Dict[str, str]]:
    max_retries = 3
    retries = 0

    while retries < max_retries:
        try:
            answer, docs = run_lfqa(query, initial_pipe, lfqa_section_pipe, lfqa_whole_pipe)
            formatted_docs = "\n\n".join([f"{key} ({value['name']}):\n{value['content']}" for key, value in docs.items()])
            return answer, formatted_docs
        
        except Exception as e:
            if 'timed out' in str(e):
                retries += 1
                if retries == max_retries:
                    raise Exception(
                        "Error: The server took too long to respond. Please try again later.",
                    )
            else:
                raise Exception("Unexpected error occured.")

# Define example inputs
EXAMPLES = [
    "What is the remote work policy?",
    "How are performance evaluations conducted?",
    "What is the process for employee recruitment?",
    "What are the guidelines for employee working hours?",
    "How do employees report business expenses for reimbursement?",
    "What is the code of conduct policy?",
    "What types of employee leaves are available?",
    "How does the company handle employee termination?",
]

APP_DESCRIPTION = """
Welcome to the Dunder Mifflin Paper Company, Inc. Knowledge Base Assistant. 
This tool is powered by advanced AI models and is designed to answer questions related to the company's Human Resource and IT policies.

Simply enter your question in the textbox and the Assistant will provide a synthesized answer along with any relevant documents from the knowledge base. 

The Assistant's responses are based on the following documents in our database:
1. Attendance and remote work policy - Guidelines for employee attendance, remote work eligibility, home office allowances, and communication expectations.
2. Benefits and compensation policy - Outlines payroll procedures, medical benefits, wellness programs, bonuses, allowances, retirement plans, and other benefits to support employee well-being and work-life balance.
3. Code of conduct policy - Standards of behavior emphasizing the company's core values. Details ethical conduct, professionalism, diversity, conflict of interest, confidentiality, health and safety, and anti-harassment policies.
4. Expense policy - Covers authorized business expense reimbursements such as travel, accommodation, meals, and conferences. Outlines the process for obtaining prior approval, submitting expense reports with documentation, and corporate credit card usage guidelines.
5. Health and safety policy - Shared responsibilities among employees to maintain a safe workplace. Includes hazard identification, risk assessment, accident reporting, emergency procedures, and training.
6. Employee leave policy - Covers various types of leave including vacation, sick leave, public holidays, parental, bereavement, jury duty, unpaid leave, work from home, leave donation, professional development, and military leave.
7. Performance evaluation and promotion policy - Annual evaluations, promotion processes, and criteria. Emphasizes fairness, transparency, and employee growth.
8. Recruitment policy - Principles and processes for attracting, selecting, and onboarding employees. Covers candidate selection criteria, recruitment process, employee referral program, and comprehensive onboarding.
9. Employee termination policy - Guidelines for voluntary and involuntary terminations. Covers resignation notice, performance-based termination, layoffs, termination for cause, notice periods, exit interviews, final pay, benefits continuation, and return of company property.
10. Employee working hours policy - Standard working hours, flexible schedules, breaks, and overtime. Promotes a healthy work-life balance and legal compliance. Covers timekeeping, overtime compensation, approval, recordkeeping.
11. IT acceptable use policy - Employee responsibilities for using company technology resources securely and efficiently. Includes authorization, personal use, security measures, password protection, email etiquette, data communication, software installation, remote access, and file storage.

Please note that the Assistant will classify your question and determine whether the answer lies in a whole document, a document's section, or if it cannot be answered using the available policies. If you have any questions not related to these documents, the Assistant may not be able to provide a precise answer.
"""

# Create Gradio interface
iface = gr.Interface(
    fn=gradio_interface,
    inputs=gr.inputs.Textbox(lines=2, label="Enter your query"),
    outputs=[
        gr.outputs.Textbox(label="Answer"),
        gr.outputs.Textbox(label="Relevant Documents")
    ],
    examples=EXAMPLES,
    cache_examples=False,
    allow_flagging='never',
    title="Dunder Mifflin Knowledge Base Assistant using ChatGPT",
    description=APP_DESCRIPTION,
)

# Launch Gradio interface
iface.launch()

Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.




Calculating embeddings:   0%|          | 0/1 [00:00<?, ?it/s]

Calculating embeddings:   0%|          | 0/1 [00:00<?, ?it/s]