In [None]:
#!pip install langchain unstructured openai chromadb Cython tiktoken pypdf lark langchain_openai

# Retrieval Stage:
![Retrieval Stage](./images/RS.png)

In [2]:
import os
import openai
import pandas as pd
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationSummaryBufferMemory
from langchain_openai import OpenAI
from langchain.chains import LLMChain
from langchain_openai  import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List
from langchain_core.output_parsers import JsonOutputParser
from langchain.prompts import PromptTemplate
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

# Chatbot Memory Class:
 - ConversationSummaryBufferMemory: It keeps a buffer of recent interactions in memory, but when the interactions' number of tokens exceeds the "max_token_limit", it will summarize the chat history using the LLM.
 - A store function, to save the user input and the final output
 - A is_empty function, to check if the memory is empty
 - A load function, to get the chat history

In [1]:
class chatbot_memo:
  def __init__(self, max_token_limit=100):
    self.llm_memo = OpenAI()
    self.memory = ConversationSummaryBufferMemory(
        llm = self.llm_memo,
        memory_key="chat_history",
        input_key="question",
        max_token_limit=max_token_limit,
    )

  def store(self, question, answer):
    self.memory.save_context({"question": question}, {"output": answer})

  def is_empty(self):
    if len(self.memory.load_memory_variables({})['chat_history']) > 0:
      return False
    else:
      return True

  def load(self):
    conversation = self.memory.load_memory_variables({})['chat_history']

    return conversation

# Question or not Chain Class:
 - It returns a dictionary (JSON format) informing whether the user input is a question about space, sun, or sunspots.
 - We prepare the format for the output using a class inherited from the "langchain_core.pydantic_v1.BaseModel" class.
 - If the user input is not a question about space, sun, or sunspots, it will generate a reply for it.
 - A run chain function, to get the output of the chain.

In [6]:
class question_or_not_json_form(BaseModel):
    is_question: bool = Field(description="the message is question about space, sun, sunspots or not")
    message: str = Field(description="the input message")
    response: str = Field(description="the response for the message if it's not a question about space, sun, sunspots, type None if it's a question about space, sun, sunspots")

In [7]:
class question_or_not_chain:

  def __init__(self, llm_name = "gpt-3.5-turbo"):
    self.llm_name = llm_name
    self.llm = ChatOpenAI(model_name=self.llm_name, temperature=0)
    self.parser = JsonOutputParser(pydantic_object=question_or_not_json_form)
    self.prompt = PromptTemplate(
        template="you are a chat bot and your name is Morfy, you have to check if the message is a question about space, sun, sunspots or not, if not write a response for it.\n{format_instructions}\n{message}\n",
        input_variables=["message"],
        partial_variables={"format_instructions": self.parser.get_format_instructions()},
    )
    self.chain = self.prompt | self.llm | self.parser

  def run_chain(self, message):
      results = self.chain.invoke({"message": message})

      return results




# Paraphrasing Chain Class:
 - The chain returns the human new question as a stand-alone question if it was a follow-up question.
 - If the memory is empty, this chain will not run.
 - If the new question is not a follow-up question, it should return it as it is.
 - A run chain function, to get the output of the chain.

In [8]:
class paraphrasing_chain:

  def __init__(self, llm_name = "gpt-3.5-turbo"):

    self.llm_name = llm_name

    self.llm = ChatOpenAI(model_name=self.llm_name, temperature=0)

    self.template = """system:Use the chat history to paraphrase the human new question as stand alone question, if it's not a follow up question just pass it as it is to the output.\n

    \n
    the chat history:\n
    {chat_history}

    \n
    human new question: {question}"""

    self.paraphrasing_prompt = PromptTemplate(input_variables=["chat_history", "question"], template=self.template)
    self.chain = self.paraphrasing_prompt  | self.llm

  def run_chain(self, message, memory):
    results = self.chain.invoke({"question": message, "chat_history":memory})

    return results.content



# Documents Retriever Class:
 - It uses the OpenAIEmbeddings, to get the most relevant documents from the preprocessed documents in the database in the "persist_directory".
 - A retrieve function, to get the most "K" relevant document using the "search_type" based on the "score_threshold", then it formats the data to be passed to the answer chain.
 - Please check the preprocessing notebook to learn more about this.

In [9]:
class docs_retriever:

  def __init__(self, persist_directory="./chroma", search_type="similarity_score_threshold", score_threshold = 0.5, k = 4):
    self.embedding = OpenAIEmbeddings()
    self.vectordb = Chroma(persist_directory=persist_directory, embedding_function=self.embedding)
    self.retriever = self.vectordb.as_retriever(search_type = search_type, search_kwargs={"score_threshold": score_threshold , "k": k})

  def retrieve(self, question):
    docs = self.retriever.get_relevant_documents(question)
    context = ""
    for i, doc in enumerate(docs):
      try:
        doc_content = doc.page_content.replace("__________", "")
      except:
        doc_content = doc.page_content
      context = context + ("document " + str(i+1) + ":\n") + doc_content + "\n"

    return context


# Answer Chain Class:
 - It will use the LLM to generate an answer using the context from "docs_retriever".
 - It will return the media ID if it exists.
 - The output is a dictionary, containing the answer with images and tables IDs.
 - The media retrieve feature is based on dictionary-like strings in the most relevant documents (to learn more please check the readme file).

In [10]:
class image_atrris(BaseModel):
    image_id: str = Field(description="the id for image varibale from the most relevent document")
    image_description: str = Field(description="the description for image varibale from the most relevent document")

class table_atrris(BaseModel):
    table_id: str = Field(description="the id for table varibale from the most relevent document")
    table_description: str = Field(description="the description for table varibale from the most relevent document")

In [11]:
class get_answer(BaseModel):
    has_media: bool = Field(description="does the most relevent document have images or tables variables")
    answer: str = Field(description="the answer for the question")
    images: List[image_atrris] = Field(description="a list of images variables attributes if existed in the most relevent document")
    tables: List[table_atrris] = Field(description="a list of tables variables attributes if existed in the most relevent document")

In [12]:
class answer_chain:
  def __init__(self):

    self.llm = OpenAI()
    self.parser = JsonOutputParser(pydantic_object=get_answer)

    self.prompt = PromptTemplate(
        template="Given the following extracted documents and a question, create a final answer.\nthe documents:\n{context}.\n{format_instructions}\n human question:{message}\n",
        input_variables=["message", "context"],
        partial_variables={"format_instructions": self.parser.get_format_instructions()},
        )

    self.chain = self.prompt | self.llm | self.parser

  def run_chain(self, message, context):
    results = self.chain.invoke({"context":context, "message": message})

    return results



# Media Retriever Class:
 - Takes the ID and the media type and uses a CSV file to get its path.

In [82]:
class media_retriever:
  def __init__(self, media_path="./images_tables", IDs_table_path = "./images_tables/final-tokens.csv"):
    self.ID_table = pd.read_csv(IDs_table_path)
    self.media_path = media_path

  def get_media(self, ID, m_type):
    int_ID = int(ID[ID.find(" ")+1::])
    name = self.ID_table.loc[(self.ID_table['ID'] == int_ID) & (self.ID_table['type'] == m_type)].iloc[0]["name"]
    path = self.media_path +"/" + m_type + "s/" + name

    return path

# Putting It All Together:

In [83]:
class QA_BO_chatbot:

  def __init__(self,max_token_limit=100, llm_name = "gpt-3.5-turbo", persist_directory="./chroma", search_type="similarity_score_threshold", score_threshold = 0.5, k = 4, media_path="./images_tables", IDs_table_path = "./images_tables/final-tokens.csv"):
    self.bot_memo = chatbot_memo(max_token_limit)
    self.question_or_not = question_or_not_chain(llm_name)
    self.paraphrasing = paraphrasing_chain(llm_name)
    self.retriever = docs_retriever(persist_directory, search_type, score_threshold, k)
    self.answerer = answer_chain()
    self.MediaRetriever = media_retriever(media_path, IDs_table_path)

  def run_chain(self, message):
    imgs = []
    tables = []
    S1 = self.question_or_not.run_chain(message)

    if (S1["is_question"]):
      if (self.bot_memo.is_empty()):
        memo = self.bot_memo.load()
        paraphrased_question = self.paraphrasing.run_chain(message, memo)
        context =  self.retriever.retrieve(paraphrased_question)
        answerer_output = self.answerer.run_chain(paraphrased_question, context)
      else:
        context =  self.retriever.retrieve(message)
        answerer_output = self.answerer.run_chain(message, context)

      if (answerer_output["has_media"]):
        if len(answerer_output["images"]) > 0:
          for img in answerer_output["images"]:
            path = self.MediaRetriever.get_media(img["image_id"], "image")
            desc = img["image_description"]
            imgs.append({"path":path, "description": desc})

        if len(answerer_output["tables"]) > 0:
          for table in answerer_output["tables"]:
            path = self.MediaRetriever.get_media(table["table_id"], "table")
            desc = table["table_description"]
            tables.append({"path":path, "description": desc})

      final_output = {"input": message, "response": answerer_output["answer"], "has_media": answerer_output["has_media"], "images": imgs, "tables": tables}
    else:
       final_output = {"input": message, "response": S1["response"], "has_media": False, "images": [], "tables": []}

    self.bot_memo.store(message, final_output["response"])

    return final_output




# Testing The Chatbot:

In [84]:
final_bot = QA_BO_chatbot()

In [107]:
ansr = final_bot.run_chain("what are sunspots")

In [108]:
ansr

{'input': 'what are sunspots',
 'response': 'Sunspots form on the surface of the Sun due to strong magnetic field lines coming up from within the Sun trough the solar surface and appear visibly as dark spots compared to their surroundings.',
 'has_media': True,
 'images': [{'path': './images_tables/images/image_29.jpg', 'description': ''},
  {'path': './images_tables/images/image_30.jpg', 'description': ''}],
 'tables': []}