In [1]:
from dotenv import load_dotenv
import os
load_dotenv("../.env")
api_key = os.environ.get('OPENAI-API-KEY')

In [3]:
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.docstore.document import Document
from langchain.prompts import PromptTemplate
from langchain.document_loaders import PyMuPDFLoader
from langchain.llms import OpenAI
from operator import itemgetter
import numpy as np
import tiktoken
from langchain.prompts import BasePromptTemplate
from pydantic import BaseModel, validator

from typing import List, Dict, Tuple

# Functions

In [4]:
# this prevents the metadata from being None which causes errors with the vectorstore
def sanitize_metadata(data):
    for item in data:
        meta = item.metadata
        for key, value in meta.items():
            if value is None:
                meta[key] = ""
    return data

def unpack (data):
    return [{'page' : idx + 1, 'content' : page.page_content, 'metadata' : page.metadata} for idx, page in enumerate(data)]

def dot_product_similarity(doc_data: List[Dict], query_data: Dict) -> List[Tuple[int, float]]:
    query_embedding = query_data['embedding']
    doc_embeddings = [page['embedding'] for page in doc_data]
    tuples_list = [(page['page'], np.dot(query_embedding, embedding)) for page, embedding in zip(doc_data, doc_embeddings)]
    ordered_tuples = sorted(tuples_list, key=itemgetter(1), reverse=True)
    top_five_tuples = ordered_tuples[:5]
    return top_five_tuples

def embed_query(query: str):
    base_embeddings = OpenAIEmbeddings()
    embedding = base_embeddings.embed_query(query)
    return {"query" : query, "embedding" : embedding}

def embed_doc(text_list : List[str]):
    base_embeddings = OpenAIEmbeddings()
    doc_embeddings = base_embeddings.embed_documents(text_list)
    return doc_embeddings

def get_context(query_data, doc_data: List[Dict]) -> List[Dict]:
    top_five_tuples = dot_product_similarity(doc_data, query_data)
    context = []
    for item in top_five_tuples:
        page = item[0]
        data = {'page': page, 'similarity' : item[1], 'text': doc_data[page - 1]['content'], 'metadata' : doc_data[page - 1]['metadata']}
        context.append(data)
    return context

def get_tokens(string: str, model: str) -> int:
    """Returns the number of tokens in a text string."""
    encoding = tiktoken.encoding_for_model(model)
    num_tokens = len(encoding.encode(string))
    return num_tokens

def format_context(context: List[Dict], model: str, token_limit : int) -> str:
    """Returns a string of the first 1024 tokens of the context."""
    context_string = ""
    meta_list = []
    encoding = tiktoken.encoding_for_model(model)
    for idx, item in enumerate(context):
        sanitized_text = item['text'].replace("\n", " ")
        context_string += f"Page: {item['page']}\n\nText: {sanitized_text}\n\n"
        meta_list.append(item['metadata'])
        tokens = get_tokens(context_string, model)
        if tokens > token_limit:
            encoded_text = encoding.encode(context_string)
            # cut it down to the token limit
            encoded_text = encoded_text[:token_limit]
            # decode it back to a string
            context_string = encoding.decode(encoded_text)
            # some testing to make sure it worked
            tokens = get_tokens(context_string, model)
            assert tokens <= token_limit, f"format context function failed to cut context down far enough. tokens: {tokens}"
            break
    return context_string, meta_list

class DocQAPromptTemplate(BasePromptTemplate, BaseModel):
    """ A custom prompt template that takes a query and document data, and formats the prompt template to provide the formatted context + query to the language model. """
    @validator("input_variables")
    def validate_input_variables(cls, v):
        """ Validate that the input variables are correct. """
        if len(v) != 2:
            raise ValueError("DocQAPromptTemplate must have two input variables: query and context.")
        return v
    
    def format(self, query, formatted_context) -> str:
        # Get the source code of the function
        instruction = "Answer the query with a lengthy, deatiled reponse, to the best of your ability based on the provided context. If the question isn't relevant to the context, tell me that and briefly describe the context."
        # Generate the prompt to be sent to the language model
        prompt = f"INSTRUCTION:\n{instruction}\n\nCONTEXT:\n{formatted_context}\n\nQUERY:\n{query}\n\nOUTPUT:\n"
        return prompt
    
    def _prompt_type(self):
        return "doc context + query"

# Main

In [7]:
fpath = "../data/powers2017.pdf"

loader = PyMuPDFLoader(fpath)
# load the data
unsanitized = loader.load()
# make sure the metadata is not None
data = sanitize_metadata(unsanitized)

# get the doc embeddings
doc_embeddings = embed_doc([page.page_content for page in data])

# unpack the data and add the embeddings
mongodoc = unpack(data)
mongodoc = [{**page, "embedding": embedding} for page, embedding in zip(mongodoc, doc_embeddings)]

In [None]:
def upload_doc(fpath):
    loader = PyMuPDFLoader(fpath)
    # load the data
    unsanitized = loader.load()
    # make sure the metadata is not None
    data = sanitize_metadata(unsanitized)

    # get the doc embeddings
    doc_embeddings = embed_doc([page.page_content for page in data])

    # unpack the data and add the embeddings
    mongodoc = unpack(data)
    mongodoc = [{**page, "embedding": embedding} for page, embedding in zip(mongodoc, doc_embeddings)]
    
    return mongodoc

In [11]:
def query_doc(query: str, mongodoc: List[Dict], model: str, token_limit: int):
    # embed the query
    query_data = embed_query(query)
    # get the context
    context = get_context(query_data, mongodoc)
    # format the context
    formatted_context, meta_list = format_context(context, model, token_limit)
    # format the prompt
    prompt_template = DocQAPromptTemplate(input_variables=["query", "formatted_context"])
    prompt = prompt_template.format(query, formatted_context)
    # query the language model
    llm = OpenAI(temperature=0.7)
    response = llm(prompt)
    # return the response, the metadata, and the context
    return response, meta_list, context

In [12]:
response, meta_list, context = query_doc("What is the purpose of the study?", mongodoc, "text-davinci-003", 2800)