In [None]:
PINECONE_INDEX_NAME = ""
PINECONE_ENVIRONMENT = ""
PINECONE_API_KEY = ""
PINECONE_NAMESPACE = ""
MANUAL_NAME = ""

MAX_TEXT_EMBEDDING_ARRAY_SIZE = 100

EMBEDDING_MODEL = ""
OPEN_AI_API_KEY = ""
OPENAI_ORGANIZATION = ""

In [None]:
import logging
import time
import openai

openai.api_key = OPEN_AI_API_KEY
openai.organization = OPENAI_ORGANIZATION

CHUNK_SIZE = 200

def chunk_document(text, tokenizer):
    tokens = tokenizer.encode(text)

    """Yield successive n-sized chunks from text."""
    i = 0
    while i < len(tokens):
        # Find the nearest end of sentence within a range of 0.5 * n and 1.5 * n tokens
        j = min(i + int(1.5 * CHUNK_SIZE), len(tokens))
        while j > i + int(0.5 * CHUNK_SIZE):
            # Decode the tokens and check for full stop or newline
            chunk = tokenizer.decode(tokens[i:j])
            if chunk.endswith(".") or chunk.endswith("\n"):
                break
            j -= 1
        # If no end of sentence found, use n tokens as the chunk size
        if j == i + int(0.5 * CHUNK_SIZE):
            j = min(i + CHUNK_SIZE, len(tokens))
        yield tokens[i:j]
        i = j

def get_embeddings(text_array, engine):
    max_retries = 5 # Maximum number of retries
    base_delay = 1 # Base delay in seconds
    factor = 2 # Factor to multiply the delay by after each retry
    while True:
        try:
            return openai.Engine(id=engine).embeddings(input=text_array)["data"]
        except Exception as e:
            if max_retries > 0:
                logging.info(f"Request failed. Retrying in {base_delay} seconds.")
                time.sleep(base_delay)
                max_retries -= 1
                base_delay *= factor
            else:
                raise e

In [None]:
def build_embeddings(text_chunks):
    text_chunks_arrays = [text_chunks[i:i+MAX_TEXT_EMBEDDING_ARRAY_SIZE] for i in range(0, len(text_chunks), MAX_TEXT_EMBEDDING_ARRAY_SIZE)]

    embeddings = []
    for text_chunks_array in text_chunks_arrays:
        embeddings_response = get_embeddings(text_chunks_array, "text-embedding-ada-002")
        embeddings.extend([embedding["embedding"] for embedding in embeddings_response])
    
    text_embeddings = list(zip(text_chunks, embeddings))

    return text_embeddings


In [None]:
import docx2txt
import tiktoken
import logging
import pinecone

def create_embeddings_for_doc(filename, doc_x_file):
    file_text_dict = {}

    tokenizer = tiktoken.get_encoding("gpt2")
    extracted_text = docx2txt.process(doc_x_file)
    file_text_dict[filename] = extracted_text

    clean_file_string = extracted_text.replace("\n", "; ").replace("  ", " ")

    text_to_embed = "Filename is: {}; {}".format(
        filename, clean_file_string)
    
    token_chunks = list(chunk_document(text_to_embed, tokenizer))
    text_chunks = [tokenizer.decode(chunk) for chunk in token_chunks]
    text_embeddings = build_embeddings(text_chunks)

    vectors = []

    for i, (text_chunk, embedding) in enumerate(text_embeddings):
        id = str(filename+"-!"+str(i))
        file_text_dict[id] = text_chunk
        vectors.append(
            (id, embedding, {"filename": filename, "file_chunk_index": i, "text": text_chunk, "manual": MANUAL_NAME}))
        
    batch_size = 100
    batches = [vectors[i:i+batch_size] for i in range(0, len(vectors), batch_size)]
    return batches

def load_pinecone_index() -> pinecone.Index:
    pinecone.init(
        api_key=PINECONE_API_KEY,
        environment=PINECONE_ENVIRONMENT,
    )

    if not PINECONE_INDEX_NAME in pinecone.list_indexes():
        print(pinecone.list_indexes())
        raise KeyError(f"Index '{PINECONE_INDEX_NAME}' does not exist.")
    index = pinecone.Index(PINECONE_INDEX_NAME)

    return index

def upload_pinecone_index(filename, embeddings):
    # Upsert to Pinecone
    pinecone_index = load_pinecone_index()
    for e_batch in embeddings:
        try:
            pinecone_index.upsert(
                vectors=e_batch, namespace=PINECONE_NAMESPACE)

            logging.info(
                "[handle_file_string] Upserted batch of embeddings for {}".format(filename))
        except Exception as e:
            logging.error(
                "[handle_file_string] Error upserting batch of embeddings to Pinecone: {}".format(e))
            raise e

In [None]:
# Uncomment process_folder and provide folder path containing DocX files

import os

def process_folder(folder):
    for filename in os.listdir(folder):
        file = os.path.join(folder, filename)

        if os.path.isfile(file):
            print(file)
            filename = os.path.basename(file)
            embeddings = create_embeddings_for_doc(filename, file)
            upload_pinecone_index(filename, embeddings)


process_folder('./700')

In [None]:
# Retrive closest match embeddings from Pinecone and send context to Open AI completion endpoint to build answer
import pinecone
import openai

def retrieve_context(query):
    pinecone.init(
        api_key=PINECONE_API_KEY,
        environment=PINECONE_ENVIRONMENT,
    )

    index = pinecone.Index(PINECONE_INDEX_NAME)

    res = openai.Embedding.create(
        input=[query],
        engine=EMBEDDING_MODEL
    )

    xq = res['data'][0]['embedding']
    res = index.query(xq, top_k=5, include_metadata=True, namespace=PINECONE_NAMESPACE)

    contexts = [
        x['metadata']['text'] for x in res['matches']
    ]

    return contexts

def build_prompt(query, context):
    context_limit = 3750

    prompt_start = (
        "Answer the question using only the context provided.\n\n" +
        "Remember to:\n\n" +
        "1. Present step-by-step instructions as an ordered list.\n\n" +
        "2. Summarize the question before providing the answer.\n\n" +
        "3. Offer comprehensive and accurate information, covering all necessary details.\n\n" +
        "4. Supply any prerequisite context users need before starting the steps.\n\n" +
        "5. If you're unsure of the answer, respond with: 'I'm sorry, I don't have an answer for that.\n\n" +
        "6. Include any relevent safetyl precautions the user should take.\n\n" +
        "7. Provide specific information, terms, or examples to enhance clarity and understanding.\n\n" +
        "8. Include sources or reference materials (if applicable)\n\n" +
        "9. Include a reference to where the information was found in the manual. Use the format Section - heading. \n\n" +
        "Context:\n"
    )
    
    prompt_end = (
        f"\n\nQuestion: {query}\nAnswer:"
    )
       
    for i in range(1, len(context)):
        if len("\n\n---\n\n".join(context[:i])) >= context_limit:
            prompt = (
                prompt_start +
                "\n\n---\n\n".join(context[:i-1]) +
                prompt_end
            )
            break
        elif i == len(context)-1:
            prompt = (
                prompt_start +
                "\n\n---\n\n".join(context) +
                prompt_end
            )
    return prompt

def complete(prompt):
    res = openai.Completion.create(
        engine='text-davinci-003',
        prompt=prompt,
        temperature=0.2,
        max_tokens=2000,
        frequency_penalty=0,
        presence_penalty=0,
        stop=None
    )
    return res['choices'][0]['text'].strip()

def answer(query):
    context = retrieve_context(query)
    prompt = build_prompt(query, context)
    answer = complete(prompt)
    
    print(answer)

answer("How do I start the laser?")