In [1]:
import os
import re
from transformers import AutoTokenizer, AutoModel, AutoModelForSeq2SeqLM
import uuid
import torch
import numpy as np
import json
import os
import torch.nn.functional as F

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def chunking(directory_path, tokenizer, chunk_size, para_seperator=" /n /n", separator=" "):

    #tokenizer = AutoTokenizer.from_pretrained(model_name)
    documents = {}
    all_chunks = {}
    for filename in os.listdir(directory_path):
        file_path = os.path.join(directory_path, filename)
        print(filename)
        base = os.path.basename(file_path)
        sku = os.path.splitext(base)[0]
        if os.path.isfile(file_path):
            with open(file_path, 'r', encoding='utf-8') as file:
                text = file.read()

            doc_id = str(uuid.uuid4())    

            paragraphs = re.split(para_seperator, text)

            for paragraph in paragraphs:
                words = paragraph.split(separator)
                current_chunk_str = ""
                chunk = []
                for word in words:
                    if current_chunk_str:
                        new_chunk = current_chunk_str + separator + word
                    else:
                        new_chunk = current_chunk_str + word    
                    if len(tokenizer.tokenize(new_chunk)) <= chunk_size:
                        current_chunk_str = new_chunk
                    else:
                        if current_chunk_str:
                            chunk.append(current_chunk_str)
                        current_chunk_str = word
                

                if current_chunk_str:   
                    chunk.append(current_chunk_str)

                for chunk in chunk:
                    chunk_id = str(uuid.uuid4())
                    all_chunks[chunk_id] = {"text": chunk, "metadata": {"file_name":sku}}
        documents[doc_id] = all_chunks
    return documents 

In [3]:
def map_document_embeddings(documents, tokenizer, model):
    mapped_document_db = {}
    for id, dict_content in documents.items():
        mapped_embeddings = {}
        for content_id, text_content in dict_content.items():
            text = text_content.get("text")
            inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
            with torch.no_grad():
                embeddings = model(**inputs).last_hidden_state.mean(dim=1).squeeze().tolist()
            mapped_embeddings[content_id] = embeddings
        mapped_document_db[id] = mapped_embeddings
    return mapped_document_db

In [4]:
def retrieve_information(query, tokenizer, model, top_k, mapped_document_db):
    query_inputs = tokenizer(query, return_tensors="pt", padding=True, truncation=True)
    query_embeddings = model(**query_inputs).last_hidden_state.mean(dim=1).squeeze()
    query_embeddings=query_embeddings.tolist()
    #converting query embeddings to numpy array
    query_embeddings=np.array(query_embeddings)

    scores = {}
    #Now calculating cosine similarity
    for doc_id, chunk_dict in mapped_document_db.items():
        for chunk_id, chunk_embeddings in chunk_dict.items():
            #converting chunk embedding to numpy array for efficent mathmetical operations
            chunk_embeddings = np.array(chunk_embeddings) 

            #Normalizing chunk embeddings and query embeddings  to get cosine similarity score
            normalized_query = np.linalg.norm(query_embeddings)
            normalized_chunk = np.linalg.norm(chunk_embeddings)

            if normalized_chunk == 0 or normalized_query == 0:
            # this is being done to avoid division with zero which will give wrong results i.e infinity. Hence to avoid this we set score to 0
                score == 0
            else:
            # Now calculationg cosine similarity score
                score = np.dot(chunk_embeddings, query_embeddings)/ (normalized_chunk * normalized_query)  

             #STORING SCORES WITH THE REFERENCE
            scores[(doc_id, chunk_id )] = score   

    sorted_scores = sorted(scores.items(), key=lambda item: item[1], reverse=True)[:top_k]

    top_results=[]
    for ((doc_id, chunk_id), score) in sorted_scores:
        results = (doc_id, chunk_id, score)
        top_results.append(results)
    return top_results  

In [5]:
def save_json(path, data):
    with open(path, 'w') as f:
        json.dump(data, f, indent=4)

def read_json(path):
    with open(path, 'r') as f:
        data = json.load(f)
    return data

def retrieve_text(top_results, document_data):
    first_match = top_results[1]
    doc_id = first_match[0]
    chunk_id = first_match[1]
    related_text = document_data[doc_id][chunk_id]
    return related_text


In [6]:
def generate_llm_response(model, tokenizer, query, relavent_text):
    input_text = f"""
    You are an intelligent search engine. You will be provided with some retrieved context, as well as the users query.

    Your job is to understand the request, and answer based on the retrieved context.
    Here is context:
    {relavent_text} 
    
    Question: {query}
    """
    print(input_text)
    inputs = tokenizer(input_text, return_tensors="pt", max_length=512, truncation=True)
    print(len(inputs['input_ids'][0]))
    outputs = model.generate(**inputs, max_length=100)
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)

    return response

In [13]:
directory_path = "documents"
model_name = 'sentence-transformers/all-MiniLM-L6-v2'
embeddings_tokenizer = AutoTokenizer.from_pretrained(model_name)
embeddings_model = AutoModel.from_pretrained(model_name)
chunk_size = 200
para_seperator=" /n /n"
separator=" "
top_k = 2

In [None]:
#creating document store with chunk id, doc_id, text
documents = chunking(directory_path, tokenizer, chunk_size, para_seperator, separator)
documents

In [None]:
#now embedding generation and mapping in database
mapped_document_db = map_document_embeddings(documents, tokenizer, model)
mapped_document_db

In [10]:
#saving json
save_json('database/doc_store_2.json', documents) 
save_json('database/vector_store_2.json', mapped_document_db)    

In [11]:
llm_model_name = "google/flan-t5-base"  # Replace with your chosen model
llm_tokenizer = AutoTokenizer.from_pretrained(llm_model_name)
llm_model = AutoModelForSeq2SeqLM.from_pretrained(llm_model_name)

In [14]:
#Retrieving most relavent data chunks
query = "How to make tantrums less likely to happen?"
top_results = retrieve_information(query, embeddings_tokenizer, embeddings_model, top_k, mapped_document_db)

#reading json
document_data = read_json("database/doc_store_2.json") #read document store

#Retrieving text of relavent chunk embeddings
relavent_text = retrieve_text(top_results, document_data)

#print(relavent_text)
print(relavent_text["text"])

in whatever situation you’re in. Concentrate on putting your plan into action when the tantrum happens.
    • Accept that you can’t control your child’s emotions or behaviour directly. You can only keep your child safe and guide their behaviour so tantrums are less likely to happen in the future.
    • Accept that it takes time for change to happen. Your child has a lot of growing up to do before tantrums are gone forever. Developing and practising self-regulation skills is a life-long task.
    • Beware of thinking that your child is doing it on purpose or trying to upset you. Children don’t have tantrums deliberately. They’re stuck in a bad habit or don’t have the skills right now to cope with the situation.
    • Keep your sense of humour. But don’t laugh at the tantrum – if you do, it might reward your child with attention. It might also upset your child even more if they think

    You are an intelligent search engine. You will be provided with some retrieved context, and users qu

In [16]:
response = generate_llm_response(llm_model, llm_tokenizer, query, relavent_text["text"])

271


In [17]:
response

'Accept that you can’t control your child’s emotions or behaviour directly. You can only keep your child safe and guide their behaviour so tantrums are less likely to happen in the future'