To run this demo, you need the following:
* OpenAI API key
* pinecone API key

0. Read the document with the software of your choice. At this point, we need a contiguous string to represent the content of the document.

In [None]:
doc_text = ''

## Algorithm:
*   Chunk text
*   Embed each text chunk
*   Embed question/query
*   Perform a similarity search to find the text chunk embeddings that is the most similar to the question/query (i.e. have highest cosine similarities with the question embedding). 
*   API call to the completions endpoint, with the query and the most relevant text chunks included in the prompt. 
*   The GPT model then gives the answer to the question found in the file chunks, if the answer can be found in the extracts.

## Limitations:
*   The app may sometimes generate answers that are not in the files, or hallucinate about the existence of files that are not uploaded.



1. Text Chunking

In [None]:
%%capture 
!pip install tiktoken

In [None]:
from typing import Dict, List, Optional, Tuple
import uuid
import tiktoken

tokenizer = tiktoken.get_encoding(
    "cl100k_base"
)  # The encoding scheme to use for tokenization


In [None]:
# Constants
CHUNK_SIZE = 200  # The target size of each text chunk in tokens
MIN_CHUNK_SIZE_CHARS = 350  # The minimum size of each text chunk in characters
MIN_CHUNK_LENGTH_TO_EMBED = 5  # Discard chunks shorter than this
EMBEDDINGS_BATCH_SIZE = 128  # The number of embeddings to request at a time
MAX_NUM_CHUNKS = 10000  # The maximum number of chunks to generate from a text

def get_text_chunks(text: str, chunk_token_size: Optional[int]) -> List[str]:
    """
    Split a text into chunks of ~CHUNK_SIZE tokens, based on punctuation and newline boundaries.
    Args:
        text: The text to split into chunks.
        chunk_token_size: The target size of each chunk in tokens, or None to use the default CHUNK_SIZE.
    Returns:
        A list of text chunks, each of which is a string of ~CHUNK_SIZE tokens.
    """
    # Return an empty list if the text is empty or whitespace
    if not text or text.isspace():
        return []

    # Tokenize the text
    tokens = tokenizer.encode(text, disallowed_special=())

    # Initialize an empty list of chunks
    chunks = []

    # Use the provided chunk token size or the default one
    chunk_size = chunk_token_size or CHUNK_SIZE

    # Initialize a counter for the number of chunks
    num_chunks = 0

    # Loop until all tokens are consumed
    while tokens and num_chunks < MAX_NUM_CHUNKS:
        # Take the first chunk_size tokens as a chunk
        chunk = tokens[:chunk_size]

        # Decode the chunk into text
        chunk_text = tokenizer.decode(chunk)

        # Skip the chunk if it is empty or whitespace
        if not chunk_text or chunk_text.isspace():
            # Remove the tokens corresponding to the chunk text from the remaining tokens
            tokens = tokens[len(chunk) :]
            # Continue to the next iteration of the loop
            continue

        # Find the last period or punctuation mark in the chunk
        last_punctuation = max(
            chunk_text.rfind("."),
            chunk_text.rfind("?"),
            chunk_text.rfind("!"),
            chunk_text.rfind("\n"),
        )

        # If there is a punctuation mark, and the last punctuation index is before MIN_CHUNK_SIZE_CHARS
        if last_punctuation != -1 and last_punctuation > MIN_CHUNK_SIZE_CHARS:
            # Truncate the chunk text at the punctuation mark
            chunk_text = chunk_text[: last_punctuation + 1]

        # Remove any newline characters and strip any leading or trailing whitespace
        chunk_text_to_append = chunk_text.replace("\n", " ").strip()

        if len(chunk_text_to_append) > MIN_CHUNK_LENGTH_TO_EMBED:
            # Append the chunk text to the list of chunks
            chunks.append(chunk_text_to_append)

        # Remove the tokens corresponding to the chunk text from the remaining tokens
        tokens = tokens[len(tokenizer.encode(chunk_text, disallowed_special=())) :]

        # Increment the number of chunks
        num_chunks += 1

    # Handle the remaining tokens
    if tokens:
        remaining_text = tokenizer.decode(tokens).replace("\n", " ").strip()
        if len(remaining_text) > MIN_CHUNK_LENGTH_TO_EMBED:
            chunks.append(remaining_text)

    return chunks

In [None]:
chunks = get_text_chunks(doc_text, 200)
assert type(chunks[0]) == str

2. Embed each text chunk

In [None]:
%%capture
!pip install -qU openai
!pip install tenacity

In [None]:
import openai
from tenacity import retry, wait_random_exponential, stop_after_attempt

In [None]:
OPENAI_API_KEY = None
# Note that for most operations, you need a paid account
if not OPENAI_API_KEY:
  OPENAI_API_KEY = input('Your OpenAI API key:').strip()
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

In [None]:
embed_model = "text-embedding-ada-002"

@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3))
def get_embeddings(texts: List[str]) -> List[List[float]]:
    """
    Embed texts using OpenAI's ada model.
    Args:
        texts: The list of texts to embed.
    Returns:
        A list of embeddings, each of which is a list of floats.
    Raises:
        Exception: If the OpenAI API call fails.
    """
    # Call the OpenAI API to get the embeddings
    response = openai.Embedding.create(input=texts, model=embed_model)

    # Extract the embedding data from the response
    data = response["data"]  # type: ignore

    # Return the embeddings as a list of lists of floats
    return [result["embedding"] for result in data]

In [None]:
# this would be the embeddings for the whole document: list of embeddings for each document chunk; 
# embedding is done in batches of size EMBEDDINGS_BATCH_SIZE

embeddings: List[List[float]] = []
for i in range(0, len(chunks), EMBEDDINGS_BATCH_SIZE):
  # Get the text of the chunks in the current batch
  batch_texts = [
      chunk for chunk in chunks[i : i + EMBEDDINGS_BATCH_SIZE]
  ]

  # Get the embeddings for the batch texts
  batch_embeddings = get_embeddings(batch_texts)

  # Append the batch embeddings to the embeddings list
  embeddings.extend(batch_embeddings)
# this number may differ dependeing on the embedding model you use. For text-embedding-ada-002, it's this number.
assert len(embeddings[0]) == 1536

3. Index the obtained embeddings


In [None]:
!pip install -qU pinecone-client

In [None]:
import pinecone

index_name = 'openai-honda-manuals'
PINECONE_API_KEY = None
if not PINECONE_API_KEY:
  PINECONE_API_KEY = input("Your pinecone API key:").strip()
# initialize connection to pinecone (get API key at app.pinecone.io)
pinecone.init(
    api_key=PINECONE_API_KEY,
    environment="us-west4-gcp"  # may be different, check at app.pinecone.io under API Keys
)

# check if index already exists (it shouldn't if this is first time)
if index_name not in pinecone.list_indexes():
    # if does not exist, create index
    pinecone.create_index(
        index_name,
        dimension=len(embeddings[0]),
        metric='cosine',
        metadata_config={'indexed': ['channel_id', 'published']}
    )
# connect to index
index = pinecone.Index(index_name)
# view index stats
index.describe_index_stats()

In [None]:
from tqdm.auto import tqdm
import datetime
from time import sleep

for i in tqdm(range(0, len(chunks), EMBEDDINGS_BATCH_SIZE)):
    # find end of batch
    i_end = min(len(chunks), i+EMBEDDINGS_BATCH_SIZE)
    meta_batch = chunks[i:i_end]
    # # get ids
    ids_batch = ['id'+str(i+100) for i in range(len(meta_batch))]
    # get texts to encode
    texts = [x for x in meta_batch]
    # create embeddings (try-except added to avoid RateLimitError)
    try:
        res = openai.Embedding.create(input=texts, engine=embed_model)
    except:
        done = False
        while not done:
            sleep(5)
            try:
                res = openai.Embedding.create(input=texts, engine=embed_model)
                done = True
            except:
                pass
    embeds = [record['embedding'] for record in res['data']]
    # cleanup metadata
    meta_batch = [{'text': x} for x in meta_batch]
    to_upsert = list(zip(ids_batch, embeds, meta_batch))
    # upsert to Pinecone
    index.upsert(vectors=to_upsert)

4. Embedding Question/Query

In [None]:
# # This is the only line that you should change based on what you want to search
query = 'how to change oil'

res = openai.Embedding.create(
    input=[query],
    engine=embed_model
)

# query embedding
xq = res['data'][0]['embedding']

# get relevant contexts (including the questions)
res = index.query(xq, top_k=10, include_metadata=True)

5. Use ChatGPT to pose a question to the document and find an answer for it.

In [None]:
!pip install -qU langchain
# !pip install -qU transformers
# !pip install -qU sentence_transformers
# !pip install -qU chromadb


In [None]:
from langchain.chat_models import ChatOpenAI
from langchain import PromptTemplate, LLMChain
from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    AIMessagePromptTemplate,
    HumanMessagePromptTemplate,
)
from langchain.schema import (
    AIMessage,
    HumanMessage,
    SystemMessage
)

In [None]:
chat = ChatOpenAI(temperature=0)

I asked the following question because the document I read was a Honda engine instruction manual. You can replace the prompt according to the content of your document. For more prompt engineering hints, see LangChain documentation.

In [None]:
QA_prompt = """Given the following text, search for the best answer for the following question.

Text: {document_text}.
Question: {query}.
Answer:"""

# This is the only line that you should change based on what question you want to ask
myquestion = "How do I change engine oil?"

my_context = ' '.join([x['metadata']['text'] for x in res['matches']])
messages = [
    SystemMessage(content="You are a helpful assistant that can search a text and find the answer to a question."),
    HumanMessage(content=QA_prompt.format(document_text=my_context, query=myquestion)),
]

In [None]:
print(chat(messages).content)