Preparing pdf file

In [1]:
from pypdf import PdfReader, PdfWriter

In [None]:
filename="./data/" + "<Your file name>" #Change to name of yout file (make sure the file name does not include any space)

offset = 0       #The character count from the start of the document
page_map = []    #List of turples: (page_num, offset, page_text)

print(f"Extracting text from '{filename}' using PdfReader")

reader = PdfReader(filename)
pages = reader.pages
for page_num, p in enumerate(pages):
    page_text = p.extract_text()
    page_map.append((page_num, offset, page_text))
    offset += len(page_text)
    
page_map

In [None]:
import re
import os
import base64
from transformers import AutoTokenizer, AutoModel, AutoModelForCausalLM, BitsAndBytesConfig
import torch
import json
import numpy as np
import sys

Indexing the texts.

In [None]:
MAX_SECTION_LENGTH = 500
SENTENCE_SEARCH_LIMIT = 100
SECTION_OVERLAP = 100


def filename_to_id(filename): 
    filename_ascii = re.sub("[^0-9a-zA-Z_-]", "_", filename)
    filename_hash = base64.b16encode(filename.encode('utf-8')).decode('ascii')
    return f"file-{filename_ascii}-{filename_hash}"

def split_text(page_map):
    SENTENCE_ENDINGS = [".", "!", "?"]
    WORDS_BREAKS = [",", ";", ":", " ", "(", ")", "[", "]", "{", "}", "\t", " "]

    def find_page(offset):
        l = len(page_map)
        for i in range(l - 1):
            if offset >= page_map[i][1] and offset < page_map[i + 1][1]:
                return i
        return l - 1

    all_text = "".join(p[2] for p in page_map)
    length = len(all_text)
    start = 0
    end = length
    while start + SECTION_OVERLAP < length:
        last_word = -1
        end = start + MAX_SECTION_LENGTH

        if end > length:
            end = length
        else:
            # Try to find the end of the sentence
            while end < length and (end - start - MAX_SECTION_LENGTH) < SENTENCE_SEARCH_LIMIT and all_text[end] not in SENTENCE_ENDINGS:
                if all_text[end] in WORDS_BREAKS:
                    last_word = end
                end += 1
            if end < length and all_text[end] not in SENTENCE_ENDINGS and last_word > 0:
                end = last_word # Fall back to at least keeping a whole word
        if end < length:
            end += 1

        # Try to find the start of the sentence or at least a whole word boundary
        last_word = -1
        while start > 0 and start > end - MAX_SECTION_LENGTH - 2 * SENTENCE_SEARCH_LIMIT and all_text[start] not in SENTENCE_ENDINGS:
            if all_text[start] in WORDS_BREAKS:
                last_word = start
            start -= 1
        if all_text[start] not in SENTENCE_ENDINGS and last_word > 0:
            start = last_word
        if start > 0:
            start += 1

        section_text = all_text[start:end]
        yield (section_text, find_page(start))

        last_table_start = section_text.rfind("<table")
        if (last_table_start > 2 * SENTENCE_SEARCH_LIMIT and last_table_start > section_text.rfind("</table")):
            # If the section ends with an unclosed table, we need to start the next section with the table.
            # If table starts inside SENTENCE_SEARCH_LIMIT, we ignore it, as that will cause an infinite loop for tables longer than MAX_SECTION_LENGTH
            # If last table starts inside SECTION_OVERLAP, keep overlapping
            start = min(end - SECTION_OVERLAP, start + last_table_start)
        else:
            start = end - SECTION_OVERLAP
        
    if start + SECTION_OVERLAP < end:
        yield (all_text[start:end], find_page(start))

In [26]:
embedding_model = 'BAAI/bge-small-en-v1.5'
tokenizer = AutoTokenizer.from_pretrained(embedding_model)
model = AutoModel.from_pretrained(embedding_model)
tokenizer.save_pretrained("model/tokenizer")
model.save_pretrained("model/embedding")

Embedding the texts.

In [None]:
def compute_embedding(text):
    tokenizer = AutoTokenizer.from_pretrained("./model/tokenizer") 
    model = AutoModel.from_pretrained("./model/embedding")

    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True) 
    
    # Generate the embeddings 
    with torch.no_grad():    
        embeddings = model(**inputs).last_hidden_state.mean(dim=1).squeeze()

    return embeddings.tolist()

In [None]:
sections = []
file_id = filename_to_id(filename)
for i, (content, pagenum) in enumerate(split_text(page_map)):
    section = {
        "id": f"{file_id}-page-{i}",
        "content": content,
        "embedding": compute_embedding(content),
        "sourcepage": os.path.splitext(os.path.basename(filename))[0] + f"-{pagenum}" + ".pdf",
        "sourcefile": filename
    }
    sections.append(section)
# can download to json file or database and get back for retrieval so that no need for re-embedding

Calculate the most-matched parts.

In [None]:
def compute_matches(sections, query_str, top_k):
    # Get the embedding for the query string
    query_str_embedding = np.array(compute_embedding(query_str))
    scores = []

    # Calculate the cosine similarity between the query embedding and each chunk's embedding
    for section in sections:
        embedding_array = section['embedding']
        norm_query = np.linalg.norm(query_str_embedding)
        norm_chunk = np.linalg.norm(embedding_array)
        if norm_query == 0 or norm_chunk == 0:
            score = 0
        else:
            score = np.dot(query_str_embedding, embedding_array)/(norm_query * norm_chunk)
            item = {"id":section['id'],
                    "content": section["content"],
                    "sourcepage":section['sourcepage'],
                    "score": score}
        scores.append(item)

    #sorted scores
    sorted_scores = sorted(scores, key = lambda x: x['score'], reverse=True)[:top_k]
    return sorted_scores

In [None]:
user_query = "Who can apply for URFP?"

In [None]:
results = compute_matches(sections, user_query, 5)
context, temp_context = "", ""
for index, result in enumerate(results):
    temp_context += result['content']
    tokenized = tokenizer(temp_context, return_tensors="pt")
    if len(tokenized["input_ids"][0]) < 500:
           context = temp_context
    else: break
print(context)

Use llm to shape the answer from retrieved context.

In [None]:
model_id = "meta-llama/Meta-Llama-3-8B-Instruct"

tokenizer = AutoTokenizer.from_pretrained(model_id)

model = AutoModelForCausalLM.from_pretrained(
    model_id,
    torch_dtype=torch.bfloat16,
    device_map="auto",
    load_in_4bit=True,
)


def construct_prompt(context, user_query):
    system_prompt = f"""
                    You are an intelligent search engine. You will be provided with some retrieved context, as well as the users query.
                    Your job is to understand the request, and answer based on the retrieved context. According to the retrieved context, if you cannot find the answer to the users'query, say you do not know.
                    Do not include any special symbols related to programming language such as \n in your answer.
                    
                    Here is the retrieved context
                    {context}
                    """
    
    prompt = [{"role": "system", "content": system_prompt},
              {"role": "user", "content": user_query}]
    return prompt

def create_response(messages, model):
    input_ids = tokenizer.apply_chat_template(
        messages,
        add_generation_prompt=True,
        return_tensors="pt"
    ).to(model.device)

    outputs = model.generate(
        input_ids,
        max_new_tokens=256,
        # eos_token_id=terminators,
        do_sample=True,
        temperature=0.75,
        top_p=0.9,
        pad_token_id =tokenizer.eos_token_id
    )
    response = outputs[0][input_ids.shape[-1]:]
    return tokenizer.decode(response, skip_special_tokens=True)

# Usage

prompt = construct_prompt(context, user_query)

response = create_response(prompt, model)
print(response)