In [22]:
import tiktoken
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter
from typing import List,Tuple
import openai
import os
import numpy as np
import dotenv
dotenv.load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")

In [2]:
TEXT_FILE = "data/input/MSA_Juniper_IBM.txt"

## Read text and chunk it up

In [11]:
encoding = tiktoken.get_encoding("cl100k_base")
def token_count(text):
    tokens = encoding.encode(text)
    return len(tokens)

with open(TEXT_FILE) as f:
    all_text = f.read()
    # replace non-asci characters
    all_text = all_text.encode("ascii", errors="ignore").decode()

print(f"Total tokens: {token_count(all_text)}")




def split_text(text, chunk_size=400):
    overlap = chunk_size // 4
    splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap)
    chunks = splitter.split_text(text)
    return chunks

chunks_400 = split_text(all_text, chunk_size=400)
chunks_1000 = split_text(all_text, chunk_size=1000)
chunks_4000 = split_text(all_text, chunk_size=4000)
chunks = chunks_400 + chunks_1000 + chunks_4000

print(f"Number of chunks: 400: {len(chunks_400)}, 1000: {len(chunks_1000)}, 4000: {len(chunks_4000)}")


Total tokens: 45290
Number of chunks: 400: 865, 1000: 393, 4000: 93


## Embeddings

In [40]:
def get_embedding_ada(texts:List[str]):
    model = "text-embedding-ada-002"
    data = openai.Embedding.create(input = texts, model=model)['data']
    return np.array([d['embedding'] for d in data])

def load_embeddings_ada(texts:List[str]):
    emb_file = "data/embeddings/ada.npy"
    if os.path.exists(emb_file):
        return np.load(emb_file)
    embs = get_embedding_ada(texts)
    os.makedirs(os.path.dirname(emb_file), exist_ok=True)
    np.save(emb_file, embs)
    return embs

embs_ada = load_embeddings_ada(chunks)
    

## Retrieval

In [41]:
emb_functions = {
    "ada": get_embedding_ada,
}
embeddings = {
    "ada": embs_ada,
}

In [42]:
def get_context_cos(text, emb_type="ada", top_k=1000, text_embedding=None) -> List[Tuple[str, float]]:
    if text_embedding is None:
        text_embedding = emb_functions[emb_type]([text])[0]
    embs = embeddings[emb_type]
    
    # find argmax of cosine similarity. All vectors are normalized, so this is equivalent to argmax of dot product
    cos_sim = np.dot(embs, text_embedding)
    top_k_indices = np.argsort(cos_sim)[-top_k:]
    top_k_indices = top_k_indices[::-1]
    return [(chunks[i], cos_sim[i]) for i in top_k_indices]

def print_context(context: List[Tuple[str, float]]):
    for i, (chunk, cos) in enumerate(context):
        print("-"*30 + f" Result {i}: {cos:.3f} " + "-"*30)
        print(chunk)

def answer_pos_in_context(context: List[Tuple[str, float]], answer)-> Tuple[int, float]:
    for i, (chunk, cos) in enumerate(context):
        if answer in chunk:
            return i, cos
    return None, None

def run_test(question, answer, emb_type="ada", do_print_context=False, top_k=1000):
    context = get_context_cos(question, emb_type=emb_type, top_k=top_k)
    top_score = context[0][1]
    answer_pos, answer_score = answer_pos_in_context(context, answer)
    print(f"Answer pos: {answer_pos}. Answer score: {answer_score:.3f}. First result score: {top_score:.3f}")
    if do_print_context:
        print_context(context)
    return answer_pos

In [43]:
run_test("How often do we have access to training by IBM?", "once every Contract Year or upon request after at least thirty(30) days")

Answer pos: 1. Answer score: 0.802. First result score: 0.807
------------------------------ Result 0: 0.807 ------------------------------
audits under this Agreement will be Customer Agents which perform or render auditing services to or for Customer and are not an IBM Competitor and will execute an appropriate confidentiality agreement provided in advance in writing to Customer. Audits will be conducted during reasonable business hours and be conducted no more than twice annually and apply only to the previous twelve months activities (or, if longer, the time since the last previous audit) except in the case of audits required or performed by a Governmental Authority or security or emergency response, disaster recovery and business continuity audits, which may be conducted at any time on an ad-hoc basis.
------------------------------ Result 1: 0.802 ------------------------------
(A)At least once every Contract Year or upon request after at least thirty(30) days notice from Custome

1