In [2]:
import re


### Util Function
This function takes in a string and performs a series of text cleaning operations.

In [3]:
def clean_string(text):
    # Replacement of newline characters:
    text = text.replace("\n", " ")

    # Stripping and reducing multiple spaces to single:
    cleaned_text = re.sub(r"\s+", " ", text.strip())

    # Removing backslashes:
    cleaned_text = cleaned_text.replace("\\", "")

    # Replacing hash characters:
    cleaned_text = cleaned_text.replace("#", " ")
    cleaned_text = re.sub(r"([^\w\s])\1*", r"\1", cleaned_text)
    return cleaned_text



## Load Document:
<p>
The PDF document will be loaded using its file path.
</p>

In [4]:
# Load data from PDF using PyPDFLoader from LangChain
from langchain.document_loaders import PyPDFLoader

In [5]:
class PdfFileLoader:
    def load_data(self, url):
        """Load data from a PDF file."""
        loader = PyPDFLoader(url)
        output = []
        pages = loader.load_and_split()
        if not len(pages):
            raise ValueError("No data found")
        for page in pages:
            content = page.page_content
            content = clean_string(content)
            meta_data = page.metadata
            meta_data["url"] = url
            output.append(
                {
                    "content": content,
                    "meta_data": meta_data,
                }
            )
        return output

## CHUNKING


In [20]:
import hashlib
from typing import Optional,Callable
from langchain.text_splitter import RecursiveCharacterTextSplitter
from vectordb import ChromaDB

In [7]:
TEXT_SPLITTER_CHUNK_PARAMS = {
    "chunk_size": 200,
    "chunk_overlap": 56,
    "length_function": len,
}

In [46]:
class ChunkerConfig:
    def __init__(
        self,
        chunk_size: Optional[int] = 200,
        chunk_overlap: Optional[int] = 56,
        length_function: Optional[Callable[[str], int]] = len,
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.length_function = length_function

    def as_dict(self):
        return vars(self)

In [53]:
class PdfFileChunker:
    def __init__(self,config: Optional[ChunkerConfig]=None):
        """Initialize the chunker."""
        if config is None:
            config = TEXT_SPLITTER_CHUNK_PARAMS
        self.text_splitter = RecursiveCharacterTextSplitter(**config)

    def create_chunks(self, loader, src):
        """
        Loads data and chunks it.

        :param loader: The loader which's `load_data` method is used to create
        the raw data.
        :param src: The data to be handled by the loader. Can be a URL for
        remote sources or local content for local loaders.
        """
        documents = []
        ids = []
        idMap = {}
        datas = loader.load_data(src)
        metadatas = []
        for data in datas:
            content = data["content"]
            meta_data = data["meta_data"]
            url = meta_data["url"]

            chunks = self.get_chunks(content)

            for chunk in chunks:
                chunk_id = hashlib.sha256((chunk + url).encode()).hexdigest()
                if idMap.get(chunk_id) is None:
                    idMap[chunk_id] = True
                    ids.append(chunk_id)
                    documents.append(chunk)
                    metadatas.append(meta_data)
        return {
            "documents": documents,
            "ids": ids,
            "metadatas": metadatas,
        }

    def get_chunks(self, content):
        """
        Returns chunks using text splitter instance.
        """
        return self.text_splitter.split_text(content)

In [10]:
import os
from chromadb.utils import embedding_functions
from dotenv import load_dotenv
from langchain.docstore.document import Document
from langchain.memory import ConversationBufferMemory
from langchain.tools import DuckDuckGoSearchRun

from gpt4all import GPT4All

import re
from string import Template

In [99]:
class InitConfig:
    def __init__(self, log_level=None, ef=None, db=None, host=None, port=None, id=None):
        """
        :param log_level: Optional. (String) Debug level
        ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'].
        :param ef: Optional. Embedding function to use.
        :param db: Optional. (Vector) database to use for embeddings.
        :param id: Optional. ID of the app. Document metadata will have this id.
        :param host: Optional. Hostname for the database server.
        :param port: Optional. Port for the database server.
        """
        self._setup_logging(log_level)
        self.ef = ef
        self.host = host
        self.port = port
        self.id = id
        self._set_embedding_function()
        self._set_db_to_default()
        return

    def _set_embedding_function(self, ef=None):
        if  ef:
            self.ef = ef
        else:
            self.ef = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="all-MiniLM-L6-v2")
        return

    def _set_embedding_function_to_default(self):
        """
        Sets embedding function to default (`text-embedding-ada-002`).

        :raises ValueError: If the template is not valid as template should contain
        $context and $query
        """
        if os.getenv("OPENAI_API_KEY") is None and os.getenv("OPENAI_ORGANIZATION") is None:
            raise ValueError("OPENAI_API_KEY or OPENAI_ORGANIZATION environment variables not provided")  # noqa:E501
        self.ef = embedding_functions.OpenAIEmbeddingFunction(
            api_key=os.getenv("OPENAI_API_KEY"),
            organization_id=os.getenv("OPENAI_ORGANIZATION"),
            model_name="text-embedding-ada-002",
        )
        return
    def as_dict(self):
        return vars(self)
        
    def _set_db(self, db):
        if db:
            self.db = db
        return

    def _set_db_to_default(self):
        """
        Sets database to default (`ChromaDb`).
        """
        self.db = ChromaDB(ef=self.ef, host=self.host, port=self.port)

    def _setup_logging(self, debug_level):
        pass


In [12]:

DEFAULT_PROMPT = """
  Use the following pieces of context to answer the query at the end.
  If you don't know the answer, just say that you don't know, don't try to make up an answer.

  $context

  Query: $query

  Helpful Answer:
"""  # noqa:E501

DEFAULT_PROMPT_WITH_HISTORY = """
  Use the following pieces of context to answer the query at the end.
  If you don't know the answer, just say that you don't know, don't try to make up an answer.
  I will provide you with our conversation history.

  $context

  History: $history

  Query: $query

  Helpful Answer:
"""  # noqa:E501

DOCS_SITE_DEFAULT_PROMPT = """
  Use the following pieces of context to answer the query at the end.
  If you don't know the answer, just say that you don't know, don't try to make up an answer. Wherever possible, give complete code snippet. Dont make up any code snippet on your own.

  $context

  Query: $query

  Helpful Answer:
"""  # noqa:E501

DEFAULT_PROMPT_TEMPLATE = Template(DEFAULT_PROMPT)
DEFAULT_PROMPT_WITH_HISTORY_TEMPLATE = Template(DEFAULT_PROMPT_WITH_HISTORY)
DOCS_SITE_PROMPT_TEMPLATE = Template(DOCS_SITE_DEFAULT_PROMPT)
query_re = re.compile(r"\$\{*query\}*")
context_re = re.compile(r"\$\{*context\}*")
history_re = re.compile(r"\$\{*history\}*")


class QueryConfig:
    """
    Config for the `query` method.
    """

    def __init__(
        self,
        number_documents=2,
        template: Template = DEFAULT_PROMPT_WITH_HISTORY_TEMPLATE,
        model=None,
        temperature=None,
        max_tokens=None,
        top_p=None,
        history=None,
        stream: bool = False,
    ):
        if number_documents is None:
            self.number_documents = 1
        else:
            self.number_documents = number_documents

        if not history:
            self.history = None
        else:
            if len(history) == 0:
                self.history = None
            else:
                self.history = history

        if template is None:
            if self.history is None:
                template = DEFAULT_PROMPT_TEMPLATE
            else:
                template = DEFAULT_PROMPT_WITH_HISTORY_TEMPLATE

        self.temperature = temperature if temperature else 0
        self.max_tokens = max_tokens if max_tokens else 1000
        self.model = model if model else "gpt-3.5-turbo-0613"
        self.top_p = top_p if top_p else 1

        if self.validate_template(template):
            self.template = template
        else:
            if self.history is None:
                raise ValueError("`template` should have `query` and `context` keys")
            else:
                raise ValueError("`template` should have `query`, `context` and `history` keys")

        if not isinstance(stream, bool):
            raise ValueError("`stream` should be bool")
        self.stream = stream

    def validate_template(self, template: Template):
        if self.history is None:
            return re.search(query_re, template.template) and re.search(context_re, template.template)
        else:
            return (
                re.search(query_re, template.template)
                and re.search(context_re, template.template)
                and re.search(history_re, template.template)
            )


In [13]:
class AddConfig:
    """
    Config for the `add` method.
    """

    def __init__(
        self,
        chunker: Optional[ChunkerConfig] = None,
        loader = None,
    ):
        self.loader = loader
        self.chunker = chunker

In [14]:
DEFAULT_CHAT_PROMPT = """
  You are a chatbot having a conversation with a human. You are given chat
  history and context.
  You need to answer the query considering context, chat history and your knowledge base. If you don't know the answer or the answer is neither contained in the context nor in history, then simply say "I don't know".

  $context

  History: $history

  Query: $query

  Helpful Answer:
"""  # noqa:E501

DEFAULT_PROMPT_CHAT_TEMPLATE = Template(DEFAULT_CHAT_PROMPT)


class ChatConfig(QueryConfig):

    def __init__(
        self,
        number_documents=None,
        template: Template = None,
        model=None,
        temperature=None,
        max_tokens=None,
        top_p=None,
        stream: bool = False,
    ):
        if template is None:
            template = DEFAULT_PROMPT_CHAT_TEMPLATE
        super().__init__(
            number_documents=number_documents,
            template=template,
            model=model,
            temperature=temperature,
            max_tokens=max_tokens,
            top_p=top_p,
            history=[0],
            stream=stream,
        )

    def set_history(self, history):
        self.history = history
        return


In [15]:
gpt4all_model = None

load_dotenv()

ABS_PATH = os.getcwd()
DB_DIR = os.path.join(ABS_PATH, "db")

memory = ConversationBufferMemory()

In [87]:
class ChatBot:
    def __init__(self, config: InitConfig):
        """
        Initializes the EmbedChain instance, sets up a vector DB client and
        creates a collection.

        :param config: InitConfig instance to load as configuration.
        """

        self.config = config
        self.db_client = self.config.db.client
        self.collection = self.config.db.collection
        self.user_asks = []
        self.is_docs_site_instance = False
        self.online = False

    def add_local(self, data_type, content, metadata=None, config: AddConfig = None):
        """
        Adds the data you supply to the vector db.
        Loads the data, chunks it, create embedding for each chunk
        and then stores the embedding to vector database.

        :param data_type: The type of the data to add.
        :param content: The local data. Refer to the `README` for formatting.
        :param metadata: Optional. Metadata associated with the data source.
        :param config: Optional. The `AddConfig` instance to use as
        configuration options.
        """
        if config is None:
            config = AddConfig()

        self.user_asks.append([data_type, content])
        self.load_and_embed(
            PdfFileLoader(),
            PdfFileChunker(),
            content,
            metadata,
        )

    def load_and_embed(self, loader, chunker, src, metadata=None):
        """
        Loads the data from the given URL, chunks it, and adds it to database.

        :param loader: The loader to use to load the data.
        :param chunker: The chunker to use to chunk the data.
        :param src: The data to be handled by the loader. Can be a URL for
        remote sources or local content for local loaders.
        :param metadata: Optional. Metadata associated with the data source.
        """
        embeddings_data = chunker.create_chunks(loader, src)
        documents = embeddings_data["documents"]
        metadatas = embeddings_data["metadatas"]
        ids = embeddings_data["ids"]
        # get existing ids, and discard doc if any common id exist.
        where = {"app_id": self.config.id} if self.config.id is not None else {}
        # where={"url": src}
        existing_docs = self.collection.get(
            ids=ids,
            where=where,  # optional filter
        )
        existing_ids = set(existing_docs["ids"])

        if len(existing_ids):
            data_dict = {id: (doc, meta) for id, doc, meta in zip(ids, documents, metadatas)}
            data_dict = {id: value for id, value in data_dict.items() if id not in existing_ids}

            if not data_dict:
                print(f"All data from {src} already exists in the database.")
                return

            ids = list(data_dict.keys())
            documents, metadatas = zip(*data_dict.values())

        # Add app id in metadatas so that they can be queried on later
        if self.config.id is not None:
            metadatas = [{**m, "app_id": self.config.id} for m in metadatas]

        chunks_before_addition = self.count()

        # Add metadata to each document
        metadatas_with_metadata = [meta or metadata for meta in metadatas]

        self.collection.add(documents=documents, metadatas=list(metadatas_with_metadata), ids=ids)
        print((f"Successfully saved {src}. New chunks count: " f"{self.count() - chunks_before_addition}"))

    def _format_result(self, results):
        return [
            (Document(page_content=result[0], metadata=result[1] or {}), result[2])
            for result in zip(
                results["documents"][0],
                results["metadatas"][0],
                results["distances"][0],
            )
        ]

    def get_llm_model_answer(self, prompt):
        raise NotImplementedError

    def retrieve_from_database(self, input_query, config: QueryConfig):
        """
        Queries the vector database based on the given input query.
        Gets relevant doc based on the query

        :param input_query: The query to use.
        :param config: The query configuration.
        :return: The content of the document that matched your query.
        """
        where = {"app_id": self.config.id} if self.config.id is not None else {}  # optional filter
        result = self.collection.query(
            query_texts=[
                input_query,
            ],
            n_results=config.number_documents,
            where=where,
        )
        results_formatted = self._format_result(result)
        contents = [result[0].page_content for result in results_formatted]
        return contents

    def _append_search_and_context(self, context, web_search_result):
        return f"{context}\nWeb Search Result: {web_search_result}"

    def generate_prompt(self, input_query, contexts, config: QueryConfig, **kwargs):
        """
        Generates a prompt based on the given query and context, ready to be
        passed to an LLM

        :param input_query: The query to use.
        :param contexts: List of similar documents to the query used as context.
        :param config: Optional. The `QueryConfig` instance to use as
        configuration options.
        :return: The prompt
        """
        context_string = (" | ").join(contexts)
        web_search_result = kwargs.get("web_search_result", "")
        if web_search_result:
            context_string = self._append_search_and_context(context_string, web_search_result)
        if not config.history:
            prompt = config.template.substitute(context=context_string, query=input_query)
        else:
            prompt = config.template.substitute(context=context_string, query=input_query, history=config.history)
        return prompt

    def get_answer_from_llm(self, prompt, config: ChatConfig):
        """
        Gets an answer based on the given query and context by passing it
        to an LLM.

        :param query: The query to use.
        :param context: Similar documents to the query used as context.
        :return: The answer.
        """

        return self.get_llm_model_answer(prompt, config)

    def access_search_and_get_results(self, input_query):
        from langchain.tools import DuckDuckGoSearchRun

        search = DuckDuckGoSearchRun()
        return search.run(input_query)

    def query(self, input_query, config: QueryConfig = None):
        """
        Queries the vector database based on the given input query.
        Gets relevant doc based on the query and then passes it to an
        LLM as context to get the answer.

        :param input_query: The query to use.
        :param config: Optional. The `QueryConfig` instance to use as
        configuration options.
        :return: The answer to the query.
        """
        if config is None:
            config = QueryConfig()
        if self.is_docs_site_instance:
            config.template = DOCS_SITE_PROMPT_TEMPLATE
            config.number_documents = 5
        k = {}
        if self.online:
            k["web_search_result"] = self.access_search_and_get_results(input_query)
        contexts = self.retrieve_from_database(input_query, config)
        prompt = self.generate_prompt(input_query, contexts, config, **k)

        answer = self.get_answer_from_llm(prompt, config)

        if isinstance(answer, str):
            return answer
        else:
            return self._stream_query_response(answer)

    def _stream_query_response(self, answer):
        streamed_answer = ""
        for chunk in answer:
            streamed_answer = streamed_answer + chunk
            yield chunk

    def chat(self, input_query, config: ChatConfig = None):
        """
        Queries the vector database on the given input query.
        Gets relevant doc based on the query and then passes it to an
        LLM as context to get the answer.

        Maintains the whole conversation in memory.
        :param input_query: The query to use.
        :param config: Optional. The `ChatConfig` instance to use as
        configuration options.
        :return: The answer to the query.
        """
        if config is None:
            config = ChatConfig()
        if self.is_docs_site_instance:
            config.template = DOCS_SITE_PROMPT_TEMPLATE
            config.number_documents = 5
        k = {}
        if self.online:
            k["web_search_result"] = self.access_search_and_get_results(input_query)
        contexts = self.retrieve_from_database(input_query, config, **k)

        global memory
        chat_history = memory.load_memory_variables({})["history"]
        if chat_history:
            config.set_history(chat_history)

        prompt = self.generate_prompt(input_query, contexts, config, **k)
        answer = self.get_answer_from_llm(prompt, config)

        memory.chat_memory.add_user_message(input_query)

        if isinstance(answer, str):
            memory.chat_memory.add_ai_message(answer)
            return answer
        else:
            # this is a streamed response and needs to be handled differently.
            return self._stream_chat_response(answer)

    def _stream_chat_response(self, answer):
        streamed_answer = ""
        for chunk in answer:
            streamed_answer = streamed_answer + chunk
            yield chunk
        memory.chat_memory.add_ai_message(streamed_answer)

    def count(self):
        """
        Count the number of embeddings.

        :return: The number of embeddings.
        """
        return self.collection.count()

    def reset(self):
        """
        Resets the database. Deletes all embeddings irreversibly.
        `App` has to be reinitialized after using this method.
        """
        self.db_client.reset()


class OpenSourceApp(ChatBot):
    """
    The OpenSource app.
    Same as App, but uses an open source embedding model and LLM.

    Has two function: add and query.

    adds(data_type, url): adds the data from the given URL to the vector db.
    query(query): finds answer to the given query using vector database and LLM.
    """
    def __init__(self, config: InitConfig = None):
        """
        :param config: InitConfig instance to load as configuration. Optional.
        `ef` defaults to open source.
        """
        print("Loading open source embedding model. This may take some time...")  # noqa:E501
        if not config:
            config = InitConfig()

        if not config.ef:
            config._set_embedding_function(
                embedding_functions.SentenceTransformerEmbeddingFunction(model_name="all-MiniLM-L6-v2")
            )

        if not config.db:
            config._set_db_to_default()

        print("Successfully loaded open source embedding model.")
        super().__init__(config)

    def get_llm_model_answer(self, prompt, config: ChatConfig):
        global gpt4all_model
        if gpt4all_model is None:
            gpt4all_model = GPT4All("orca-mini-3b.ggmlv3.q4_0.bin")
        response = gpt4all_model.generate(prompt=prompt, streaming=config.stream)
        return response


In [100]:
chromadb_host = "localhost"
chromadb_port = 8000

config = InitConfig(host=chromadb_host, port=chromadb_port)

In [101]:
qa_bot = OpenSourceApp(config)

Loading open source embedding model. This may take some time...
Successfully loaded open source embedding model.


In [102]:
qa_bot.add_local("pdf", "/Users/muhammedashique/Downloads/manual.pdf")

All data from /Users/muhammedashique/Downloads/manual.pdf already exists in the database.


In [103]:
qa_bot.chat("How to protect the alloy wheel ?")

Found model file at  /Users/muhammedashique/.cache/gpt4all/orca-mini-3b.ggmlv3.q4_0.bin


objc[849]: Class GGMLMetalClass is implemented in both /Users/muhammedashique/QABot/.venv/lib/python3.8/site-packages/gpt4all/llmodel_DO_NOT_MODIFY/build/libreplit-mainline-metal.dylib (0x283e70208) and /Users/muhammedashique/QABot/.venv/lib/python3.8/site-packages/gpt4all/llmodel_DO_NOT_MODIFY/build/libllamamodel-mainline-metal.dylib (0x284374208). One of the two will be used. Which one is undefined.
llama.cpp: using Metal
llama.cpp: loading model from /Users/muhammedashique/.cache/gpt4all/orca-mini-3b.ggmlv3.q4_0.bin
llama_model_load_internal: format     = ggjt v3 (latest)
llama_model_load_internal: n_vocab    = 32000
llama_model_load_internal: n_ctx      = 2048
llama_model_load_internal: n_embd     = 3200
llama_model_load_internal: n_mult     = 240
llama_model_load_internal: n_head     = 32
llama_model_load_internal: n_layer    = 26
llama_model_load_internal: n_rot      = 100
llama_model_load_internal: ftype      = 2 (mostly Q4_0)
llama_model_load_internal: n_ff       = 8640
llama_m

llama_new_context_with_model: max tensor size =    54.93 MB


' To protect your alloy wheels, you can use wheel guards or wheel covers. Wheel guards are usually made of a flexible material that fits over the wheel and provides protection from rocks, debris, and other hazards on the road. Wheel covers are also available in various sizes and shapes to fit different types of wheels. It is important to note that these protective measures should be taken regularly to ensure your alloy wheels remain safe and durable.'

In [104]:
qa_bot.chat("Seat belt usage is really necessary ?")

' AI: Yes, seat belts are essential for protecting passengers in case of a collision or other emergency situation. They provide a safety harness that holds the occupant in place, reducing the risk of injury. Additionally, wearing a seat belt can help prevent injuries by keeping the body properly positioned and aligned during a crash. It is important to always wear a seat belt when traveling in a vehicle.'