In [2]:
!pip install python-docx
!pip install sentence-transformers
!pip install faiss-cpu
!pip install langchain_community




In [3]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')
import os

# Specify the path to your folder in Google Drive containing the PDF files
folder_path = "/content/drive/MyDrive/Data"

# Change the current working directory to the specified folder
os.chdir(folder_path)

# List all files in the current directory (optional)
files = os.listdir()
print("Files in the folder:", files)




Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Files in the folder: ['paper.pdf', 'bert_embeddings.npy', 'resume.pdf', 'inputFile.txt', 'indices.faiss', 'food', 'Book.txt', 'merged.txt', 'business', 'data.docx', 'mergedbusiness.txt', 'faiss_index.index', 'Netflix_TC.txt', 'data_base.index', 'Samsung_TC.txt']


In [None]:
from docx import Document

def read_docx(file_path):
    # Open the document
    doc = Document(file_path)

    full_text = []
    # Extract text from paragraphs
    for para in doc.paragraphs:
        full_text.append(para.text)

    # Extract text from tables
    for table in doc.tables:
        for row in table.rows:
            row_data = []
            for cell in row.cells:
                row_data.append(cell.text.strip())
            full_text.append('\t'.join(row_data))

    return '\n'.join(full_text)

# Example usage
file_path = 'data.docx'  # Replace with your file path
text = read_docx(file_path)


In [None]:
import os

def read_txt(file_path):
    if not os.path.exists(file_path):
        print(f"Error: The file '{file_path}' does not exist.")
        return None

    with open(file_path, 'r', encoding='utf-8') as file:
        text1 = file.read()
        if text1:
            print("File content:")
            print(text1)
        else:
            print(f"The file '{file_path}' is empty.")
    return text1

# Example usage
file_path = 'Netflix_TC.txt'  # Replace with your file path
text1 = read_txt(file_path)
if text1 is not None:
    print("Read text successfully.")


File content:
Netflix Terms of Use
Netflix provides a personalized subscription service that allows our members to access entertainment content (“Netflix content”) over the Internet on certain Internet-connected TVs, computers and other devices ("Netflix ready devices").

These Terms of Use govern your use of our service. As used in these Terms of Use, "Netflix service", "our service" or "the service" means the personalized service provided by Netflix for discovering and accessing Netflix content, including all features and functionalities, recommendations and reviews, our websites, and user interfaces, as well as all content and software associated with our service. References to ‘you’ in these Terms of Use indicate the member who created the Netflix account and whose payment method is charged.

Membership

1.1. Your Netflix membership will continue until terminated. To use the Netflix service you must have Internet access and a Netflix ready device, and provide us with one or more Pa

# MISTRAL

In [53]:
import re
import time
import numpy as np
import faiss
from sklearn.metrics.pairwise import cosine_similarity
from langchain_community.embeddings import HuggingFaceEmbeddings
from docx import Document

class ChunkerBase:
    def __init__(self, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200):
        self.embeddings_model = HuggingFaceEmbeddings(model_name=model_name)
        self.chunks = []
        self.chunk_embeddings = []
        self.max_chunk_size = max_chunk_size
        self.doc_id = None

    def embed_chunks(self):
        self.chunk_embeddings = self.embeddings_model.embed_documents(self.chunks)

    def store_embeddings_faiss(self, index_path="faiss_index.index"):
        d = len(self.chunk_embeddings[0])  # dimension of embeddings
        index = faiss.IndexFlatL2(d)  # build the index
        embeddings = np.array(self.chunk_embeddings).astype(np.float32)
        doc_ids = np.array([self.doc_id] * len(self.chunk_embeddings))
        combined = np.hstack((doc_ids.reshape(-1, 1), embeddings))
        index.add(combined)  # add embeddings to the index
        faiss.write_index(index, index_path)  # save index to a file
        return index

class SemanticChunker(ChunkerBase):
    def __init__(self, similarity_threshold=0.6, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200):
        super().__init__(model_name, max_chunk_size)
        self.similarity_threshold = similarity_threshold

    def chunk_text(self, text):
        sentences = re.split(r'(?<=[.?!])\s+', text)
        self.chunks = []
        current_chunk = sentences[0]

        for i in range(1, len(sentences)):
            next_sentence = sentences[i]
            similarity_score = self._calculate_similarity(current_chunk, next_sentence)
            if similarity_score >= self.similarity_threshold:
                current_chunk += ' ' + next_sentence
            else:
                self.chunks.append(current_chunk)
                current_chunk = next_sentence

        self.chunks.append(current_chunk)

    def _calculate_similarity(self, sentence1, sentence2):
        embeddings = self.embeddings_model.embed_documents([sentence1, sentence2])
        similarity_score = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]
        return similarity_score

class RecursiveChunker(ChunkerBase):
    def __init__(self, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200, max_paragraph_words=100):
        super().__init__(model_name, max_chunk_size)
        self.max_paragraph_words = max_paragraph_words

    def _split_sentences(self, text):
        return re.split(r'(?<=[.?!])\s+', text)

    def recursive_chunk_text(self, text):
        words = text.split()

        if len(words) <= self.max_chunk_size:
            return [text]

        # Find the best splitting point
        split_point = self.max_chunk_size
        while split_point > 0 and not words[split_point - 1].endswith(('.', '!', '?')):
            split_point -= 1

        if split_point == 0:  # No good splitting point found, force split at max_chunk_size
            split_point = self.max_chunk_size

        # Split the text
        chunk = ' '.join(words[:split_point])
        remaining_text = ' '.join(words[split_point:])

        # Recursive call
        chunks = [chunk] + self.recursive_chunk_text(remaining_text)



        return chunks

    def chunk_text(self, text):
        paragraphs = text.split('\n\n')
        self.chunks = []

        for paragraph in paragraphs:
            if len(paragraph.split()) > self.max_paragraph_words:
                indented_chunks = self.recursive_chunk_text(paragraph)
                self.chunks.extend(indented_chunks)
            else:
                self.chunks.append(paragraph)

def read_text_from_file(file_path):
    with open(file_path, 'r') as file:
        text = file.read()
    return text

def read_docx(file_path):
    # Open the document
    doc = Document(file_path)

    full_text = []
    # Extract text from paragraphs
    for para in doc.paragraphs:
        full_text.append(para.text)

    # Extract text from tables
    for table in doc.tables:
        for row in table.rows:
            row_data = []
            for cell in row.cells:
                row_data.append(cell.text.strip())
            full_text.append('\t'.join(row_data))

    return '\n'.join(full_text)



def search_similar_chunks(query, index, chunkers):
    query_embedding = chunkers[0].embeddings_model.embed_documents([query])[0]
    distances, indices = index.search(np.array([[0, query_embedding]]), 5)  # search for 5 most similar chunks
    similar_chunks = []
    similarity_scores = []
    for i, (doc_id, distance) in enumerate(zip(indices[0, :, 0], distances[0])):
        chunker = chunkers[doc_id]
        chunk_index = indices[0, i, 1]
        similar_chunks.append(chunker.chunks[chunk_index])
        similarity_scores.append(1 - distance)
    print("\nTop 5 most similar chunks:")
    for i in range(len(similar_chunks)):
        print(f"{i+1}. Chunk: {similar_chunks[i]}")
        print(f"   Similarity Score: {similarity_scores[i]:.4f}")
        print("-" * 40)


def main():
    file_paths = ['data.docx', 'inputFile.txt', 'Netflix_TC.txt', 'Samsung_TC.txt']
    chunkers = []
    chunking_method = input("Select chunking method (semantic 1 / recursive 2): ").strip().lower()
    if chunking_method == "1":
        chunker = SemanticChunker()
    elif chunking_method == "2":
        chunker = RecursiveChunker()
    else:
        print("Invalid method selected. Defaulting to recursive chunking.")
        chunker = RecursiveChunker()

    for file_path in file_paths:
        if file_path.endswith('.docx'):
            text = read_docx(file_path)
        else:
            text = read_text_from_file(file_path)

        # Chunk the text using the selected method
        chunker = SemanticChunker() if chunking_method == "1" else RecursiveChunker()
        chunker.doc_id = len(chunkers)
        chunker.chunk_text(text)
        chunkers.append(chunker)

    # Store the embeddings in FAISS
    index_path = "faiss_index.index"
    index = None
    for chunker in chunkers:
        if index is None:
            index = chunker.store_embeddings_faiss(index_path)
        else:
            chunker.store_embeddings_faiss(index_path)
            faiss.read_index(index_path)
            index.merge_with(faiss.read_index(index_path), inplace=True)
            faiss.write_index(index, index_path)
    print(f"Number of embeddings stored in FAISS: {index.ntotal}")
    print(f"FAISS index saved at: {index_path}")

    # Query for similar chunks
    query = input("\nEnter a query to search for similar chunks: ")
    search_similar_chunks(query, index, chunkers)

if __name__ == "__main__":
    main()


Select chunking method (semantic 1 / recursive 2): 2


IndexError: list index out of range

# GPT

In [56]:
import re
import time
import numpy as np
import faiss
from sklearn.metrics.pairwise import cosine_similarity
from langchain_community.embeddings import HuggingFaceEmbeddings

class ChunkerBase:
    def __init__(self, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200, index_path="faiss_index.index"):
        self.embeddings_model = HuggingFaceEmbeddings(model_name=model_name)
        self.chunks = []
        self.chunk_embeddings = []
        self.max_chunk_size = max_chunk_size
        self.index_path = index_path
        self.index = self._load_or_create_index()

    def _load_or_create_index(self):
        try:
            index = faiss.read_index(self.index_path)
            print(f"Loaded existing FAISS index from {self.index_path}")
        except:
            d = len(self.embeddings_model.embed_documents(["dummy"]))
            index = faiss.IndexFlatL2(d)
            print(f"Created new FAISS index at {self.index_path}")
        return index

    def embed_chunks(self):
        self.chunk_embeddings = self.embeddings_model.embed_documents(self.chunks)

    def add_embeddings_to_index(self):
        embeddings = np.array(self.chunk_embeddings).astype(np.float32)
        self.index.add(embeddings)

    def store_embeddings_faiss(self):
        faiss.write_index(self.index, self.index_path)
        print(f"Embeddings stored in FAISS index at {self.index_path}")

class SemanticChunker(ChunkerBase):
    def __init__(self, similarity_threshold=0.6, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200, index_path="faiss_index.index"):
        super().__init__(model_name, max_chunk_size, index_path)
        self.similarity_threshold = similarity_threshold

    def chunk_text(self, text):
        sentences = re.split(r'(?<=[.?!])\s+', text)
        self.chunks = []
        current_chunk = sentences[0]

        for i in range(1, len(sentences)):
            next_sentence = sentences[i]
            similarity_score = self._calculate_similarity(current_chunk, next_sentence)
            if similarity_score >= self.similarity_threshold:
                current_chunk += ' ' + next_sentence
            else:
                self.chunks.append(current_chunk)
                current_chunk = next_sentence

        self.chunks.append(current_chunk)

    def _calculate_similarity(self, sentence1, sentence2):
        embeddings = self.embeddings_model.embed_documents([sentence1, sentence2])
        similarity_score = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]
        return similarity_score

class RecursiveChunker(ChunkerBase):
    def __init__(self, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200, max_paragraph_words=100, index_path="faiss_index.index"):
        super().__init__(model_name, max_chunk_size, index_path)
        self.max_paragraph_words = max_paragraph_words

    def _split_sentences(self, text):
        return re.split(r'(?<=[.?!])\s+', text)

    def recursive_chunk_text(self, text):
        words = text.split()

        if len(words) <= self.max_chunk_size:
            return [text]

        split_point = self.max_chunk_size
        while split_point > 0 and not words[split_point - 1].endswith(('.', '!', '?')):
            split_point -= 1

        if split_point == 0:
            split_point = self.max_chunk_size

        chunk = ' '.join(words[:split_point])
        remaining_text = ' '.join(words[split_point:])

        chunks = [chunk] + self.recursive_chunk_text(remaining_text)

        return chunks

    def chunk_text(self, text):
        paragraphs = text.split('\n\n')
        self.chunks = []

        for paragraph in paragraphs:
            if len(paragraph.split()) > self.max_paragraph_words:
                indented_chunks = self.recursive_chunk_text(paragraph)
                self.chunks.extend(indented_chunks)
            else:
                self.chunks.append(paragraph)

def read_text_file(file_path):
    with open(file_path, 'r') as file:
        text = file.read()
    return text

def search_similar_chunks(query, index, chunker):
    query_embedding = chunker.embeddings_model.embed_documents([query])[0]
    distances, indices = index.search(np.array([query_embedding]), 5)
    print(distances)
    print(indices)
    print(query_embedding)
    # similar_chunks = [chunker.chunks[i] for i in indices[0]]
    # similarity_scores = [1 - distance for distance in distances[0]]
    # return similar_chunks, similarity_scores
    return 1,1
def main():
    file_paths = ['inputFile.txt', 'Netflix_TC.txt', 'Samsung_TC.txt']
    texts = [read_text_file(file_path) for file_path in file_paths]

    chunking_method = input("Select chunking method (semantic 1 / recursive 2): ").strip().lower()

    if chunking_method == "1":
        chunker = SemanticChunker()
    elif chunking_method == "2":
        chunker = RecursiveChunker()
    else:
        print("Invalid method selected. Defaulting to recursive chunking.")
        chunker = RecursiveChunker()

    start_time = time.time()
    for text in texts:
        chunker.chunk_text(text)
        chunker.embed_chunks()
        chunker.add_embeddings_to_index()

    chunking_time = time.time() - start_time
    print(f"Total Chunking Time: {chunking_time:.4f} seconds")

    query = input("\nEnter a query to search for similar chunks: ")
    similar_chunks, similarity_scores = search_similar_chunks(query, chunker.index, chunker)
    print("\nTop 5 most similar chunks:")
    for i in range(len(similar_chunks)):
        print(f"{i+1}. Chunk: {similar_chunks[i]}")
        print(f"   Similarity Score: {similarity_scores[i]:.4f}")
        print("-" * 40)

    chunker.store_embeddings_faiss()

if __name__ == "__main__":
    main()


Select chunking method (semantic 1 / recursive 2): 2




Created new FAISS index at faiss_index.index


AssertionError: 

In [None]:
import re
import time
import numpy as np
import faiss
from sklearn.metrics.pairwise import cosine_similarity
from langchain_community.embeddings import HuggingFaceEmbeddings
from docx import Document

class ChunkerBase:
    def __init__(self, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200):
        self.embeddings_model = HuggingFaceEmbeddings(model_name=model_name)
        self.chunks = []
        self.chunk_embeddings = []
        self.max_chunk_size = max_chunk_size

    def embed_chunks(self):
        self.chunk_embeddings = self.embeddings_model.embed_documents(self.chunks)

    def store_embeddings_faiss(self, index_path="faiss_index.index"):
        d = len(self.chunk_embeddings[0])  # dimension of embeddings
        index = faiss.IndexFlatL2(d)  # build the index
        embeddings = np.array(self.chunk_embeddings).astype(np.float32)
        index.add(embeddings)  # add embeddings to the index
        faiss.write_index(index, index_path)  # save index to a file
        return index

class SemanticChunker(ChunkerBase):
    def __init__(self, similarity_threshold=0.6, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200):
        super().__init__(model_name, max_chunk_size)
        self.similarity_threshold = similarity_threshold

    def chunk_text(self, text):
        sentences = re.split(r'(?<=[.?!])\s+', text)
        self.chunks = []
        current_chunk = sentences[0]

        for i in range(1, len(sentences)):
            next_sentence = sentences[i]
            similarity_score = self._calculate_similarity(current_chunk, next_sentence)
            if similarity_score >= self.similarity_threshold:
                current_chunk += ' ' + next_sentence
            else:
                self.chunks.append(current_chunk)
                current_chunk = next_sentence

        self.chunks.append(current_chunk)

    def _calculate_similarity(self, sentence1, sentence2):
        embeddings = self.embeddings_model.embed_documents([sentence1, sentence2])
        similarity_score = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]
        return similarity_score

class RecursiveChunker(ChunkerBase):
    def __init__(self, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200, max_paragraph_words=100):
        super().__init__(model_name, max_chunk_size)
        self.max_paragraph_words = max_paragraph_words

    def _split_sentences(self, text):
        return re.split(r'(?<=[.?!])\s+', text)

    def recursive_chunk_text(self, text):
        words = text.split()

        if len(words) <= self.max_chunk_size:
            return [text]

        # Find the best splitting point
        split_point = self.max_chunk_size
        while split_point > 0 and not words[split_point - 1].endswith(('.', '!', '?')):
            split_point -= 1

        if split_point == 0:  # No good splitting point found, force split at max_chunk_size
            split_point = self.max_chunk_size

        # Split the text
        chunk = ' '.join(words[:split_point])
        remaining_text = ' '.join(words[split_point:])

        # Recursive call
        chunks = [chunk] + self.recursive_chunk_text(remaining_text)



        return chunks

    def chunk_text(self, text):
        paragraphs = text.split('\n\n')
        self.chunks = []

        for paragraph in paragraphs:
            if len(paragraph.split()) > self.max_paragraph_words:
                indented_chunks = self.recursive_chunk_text(paragraph)
                self.chunks.extend(indented_chunks)
            else:
                self.chunks.append(paragraph)

def read_text_from_file(file_path):
    with open(file_path, 'r') as file:
        text = file.read()
    return text

def read_docx(file_path):
    # Open the document
    doc = Document(file_path)

    full_text = []
    # Extract text from paragraphs
    for para in doc.paragraphs:
        full_text.append(para.text)

    # Extract text from tables
    for table in doc.tables:
        for row in table.rows:
            row_data = []
            for cell in row.cells:
                row_data.append(cell.text.strip())
            full_text.append('\t'.join(row_data))

    return '\n'.join(full_text)

def main():
    file_path = 'data.docx'
    text = read_docx(file_path)

    chunking_method = input("Select chunking method (semantic 1 / recursive 2): ").strip().lower()

    if chunking_method == "1":
        chunker = SemanticChunker()
    elif chunking_method == "2":
        chunker = RecursiveChunker()
    else:
        print("Invalid method selected. Defaulting to recursive chunking.")
        chunker = RecursiveChunker()

    # Chunk the text using the selected method
    start_time = time.time()
    chunker.chunk_text(text)
    chunking_time = time.time() - start_time
    print(f"Chunking Time: {chunking_time:.4f} seconds")

    # Print chunks on different lines
    print("Chunks:")
    for chunk in chunker.chunks:
        print(chunk)
        print("=" * 40)

    # Embed the chunks
    chunker.embed_chunks()
    print("\nEmbeddings:")
    for i, embedding in enumerate(chunker.chunk_embeddings):
        print(f"Chunk {i+1} Embedding:")
        print(embedding)
        print("-" * 40)
    # Store the embeddings in FAISS
    index_path = "faiss_index.index"
    index = chunker.store_embeddings_faiss(index_path)
    print(f"Number of embeddings stored in FAISS: {index.ntotal}")
    print(f"FAISS index saved at: {index_path}")

if __name__ == "__main__":
    main()


Select chunking method (semantic 1 / recursive 2): 2
Chunking Time: 0.0152 seconds
Chunks:
Spotify Terms of Use 1. Introduction 2. The Spotify Service Provided by Us 3. Your Use of the Spotify Service 4. Content and Intellectual Property Rights 5. Customer Support, Information, Questions and Complaints 6. Problems and Disputes 7. About these Terms 1. Introduction Please read these Terms of Use (these "Terms") carefully as they govern your use of (which includes access to) Spotify's personalized services for streaming music and other content, including all of our websites and software applications that incorporate or link to these Terms (collectively, the "Spotify Service") and any music, videos, podcasts, or other material that is made available through the Spotify Service (the "Content"). Use of the Spotify Service may be subject to additional terms and conditions presented by Spotify, which are hereby incorporated by this reference into these Terms. By signing up for, or otherwise us

# basic chunker class

In [None]:
class YourChunkerClass:
    def __init__(self, max_chunk_size=200):
        self.max_chunk_size = max_chunk_size
        self.chunks = []

    def recursive_chunk_text(self, text, max_chunk_size):
        words = text.split()

        if len(words) <= max_chunk_size:
            return [text]

        # Find the best splitting point
        split_point = max_chunk_size
        while split_point > 0 and not words[split_point - 1].endswith(('.', '!', '?')):
            split_point -= 1

        if split_point == 0:  # No good splitting point found, force split at max_chunk_size
            split_point = max_chunk_size

        # Split the text
        chunk = ' '.join(words[:split_point])
        remaining_text = ' '.join(words[split_point:])

        # Recursive call
        return [chunk] + self.recursive_chunk_text(remaining_text, max_chunk_size)

    def chunk_text(self, text):
        paragraphs = text.split('\n\n')
        self.chunks = []

        for paragraph in paragraphs:
            indented_chunks = self.recursive_chunk_text(paragraph, self.max_chunk_size)
            self.chunks.extend(indented_chunks)

# Example usage:
def main():
    # text = "Your long text to be chunked recursively. It should be long enough to demonstrate recursive chunking effectively."

    chunker = YourChunkerClass(max_chunk_size=100)
    chunker.chunk_text(text)

    print(f"Number of chunks formed: {len(chunker.chunks)}")
    for i, chunk in enumerate(chunker.chunks):
        print(f"Chunk {i + 1}: {chunk}")

if __name__ == "__main__":
    main()


NameError: name 'text' is not defined

In [61]:
import re
import time
import os
import numpy as np
import faiss
from sklearn.metrics.pairwise import cosine_similarity
from langchain_community.embeddings import HuggingFaceEmbeddings
from docx import Document

class ChunkerBase:
    def __init__(self, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200):
        self.embeddings_model = HuggingFaceEmbeddings(model_name=model_name)
        self.chunks = []
        self.chunk_embeddings = []
        self.max_chunk_size = max_chunk_size

    def embed_chunks(self):
        self.chunk_embeddings = self.embeddings_model.embed_documents(self.chunks)

    def store_embeddings_faiss(self, index_path="data_base.index"):
        d = len(self.chunk_embeddings[0])  # dimension of embeddings
        embeddings = np.array(self.chunk_embeddings).astype(np.float32)

        if os.path.exists(index_path):
            print(f"Loading existing FAISS index from {index_path}")
            index = faiss.read_index(index_path)
        else:
            print(f"Creating new FAISS index of dimension {d}")
            index = faiss.IndexFlatL2(d)

        index.add(embeddings)  # add new embeddings to the index
        faiss.write_index(index, index_path)  # save index to a file
        return index

    def embed_query(self, query):
        return self.embeddings_model.embed_documents([query])[0]

    def search_similar_chunks(self, query_embedding, top_k=5, index_path="data_base1.index"):
        if not os.path.exists(index_path):
            print(f"No FAISS index found at {index_path}")
            return []

        index = faiss.read_index(index_path)
        query_embedding = np.array([query_embedding]).astype(np.float32)
        distances, indices = index.search(query_embedding, top_k)

        if len(indices) == 0 or len(indices[0]) == 0:
            print("No similar chunks found.")
            return []
        print("Indices:")
        print(indices)

        print("distances")
        print(distances)
        results = [(self.chunks[i], distances[0][j]) for j, i in enumerate(indices[0]) if i < len(self.chunks)]

        return results

class SemanticChunker(ChunkerBase):
    def __init__(self, similarity_threshold=0.6, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200):
        super().__init__(model_name, max_chunk_size)
        self.similarity_threshold = similarity_threshold

    def chunk_text(self, text):
        sentences = re.split(r'(?<=[.?!])\s+', text)
        self.chunks = []
        current_chunk = sentences[0]

        for i in range(1, len(sentences)):
            next_sentence = sentences[i]
            similarity_score = self._calculate_similarity(current_chunk, next_sentence)
            if similarity_score >= self.similarity_threshold:
                current_chunk += ' ' + next_sentence
            else:
                self.chunks.append(current_chunk)
                current_chunk = next_sentence

        self.chunks.append(current_chunk)

    def _calculate_similarity(self, sentence1, sentence2):
        embeddings = self.embeddings_model.embed_documents([sentence1, sentence2])
        similarity_score = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]
        return similarity_score

class RecursiveChunker(ChunkerBase):
    def __init__(self, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200, max_paragraph_words=100):
        super().__init__(model_name, max_chunk_size)
        self.max_paragraph_words = max_paragraph_words

    def _split_sentences(self, text):
        return re.split(r'(?<=[.?!])\s+', text)

    def recursive_chunk_text(self, text):
        words = text.split()

        if len(words) <= self.max_chunk_size:
            return [text]

        # Find the best splitting point
        split_point = self.max_chunk_size
        while split_point > 0 and not words[split_point - 1].endswith(('.', '!', '?')):
            split_point -= 1

        if split_point == 0:  # No good splitting point found, force split at max_chunk_size
            split_point = self.max_chunk_size

        # Split the text
        chunk = ' '.join(words[:split_point])
        remaining_text = ' '.join(words[split_point:])

        # Recursive call
        chunks = [chunk] + self.recursive_chunk_text(remaining_text)

        return chunks

    def chunk_text(self, text):
        paragraphs = text.split('\n\n')
        self.chunks = []

        for paragraph in paragraphs:
            if len(paragraph.split()) > self.max_paragraph_words:
                indented_chunks = self.recursive_chunk_text(paragraph)
                self.chunks.extend(indented_chunks)
            else:
                self.chunks.append(paragraph)

def read_text_from_file(file_path):
    with open(file_path, 'r') as file:
        text = file.read()
    return text

def read_docx(file_path):
    # Open the document
    doc = Document(file_path)

    full_text = []
    # Extract text from paragraphs
    for para in doc.paragraphs:
        full_text.append(para.text)

    # Extract text from tables
    for table in doc.tables:
        for row in table.rows:
            row_data = []
            for cell in row.cells:
                row_data.append(cell.text.strip())
            full_text.append('\t'.join(row_data))

    return '\n'.join(full_text)

def query_similar_chunks(query, chunker):
    query_embedding = chunker.embed_query(query)
    top_k = 5
    results = chunker.search_similar_chunks(query_embedding, top_k)
    print(results)
    if results:
        # Sort results by distance (ascending order)
        results.sort(key=lambda x: x[1])  # x[1] corresponds to the distance/similarity score

        print(f"\nTop {top_k} most similar chunks:")
        for i, (result, distance) in enumerate(results, start=1):
            print(f"{i}. Chunk: {result}")
            print(f"   Similarity Score: {1 - distance:.4f}")  # Print similarity score instead of distance
            print("-" * 40)
    else:
        print("No similar chunks found.")

# Prompt user for file path and chunking method
file_path = input("Enter the file path: ").strip()
if file_path.endswith('.docx'):
    text = read_docx(file_path)
elif file_path.endswith('.txt'):
    text = read_text_from_file(file_path)
else:
    print("Unsupported file type.")
    exit()

chunking_method = input("Select chunking method (semantic 1 / recursive 2): ").strip().lower()

# Select the appropriate chunker based on user input
if chunking_method == "1":
    chunker = SemanticChunker()
elif chunking_method == "2":
    chunker = RecursiveChunker()
else:
    print("Invalid method selected. Defaulting to recursive chunking.")
    chunker = RecursiveChunker()

# Perform chunking, embedding, and FAISS indexing
start_time = time.time()
chunker.chunk_text(text)
chunking_time = time.time() - start_time
print(f"Chunking Time: {chunking_time:.4f} seconds")

# Print chunks
print("Chunks:")
for chunk in chunker.chunks:
    print(chunk)
    print("=" * 40)

# Embed chunks
chunker.embed_chunks()
print("\nEmbeddings:")
for i, embedding in enumerate(chunker.chunk_embeddings):
    print(f"Chunk {i+1} Embedding:")
    print(embedding)
    print("-" * 40)

# Store embeddings in FAISS
index_path = "data_base1.index"
index = chunker.store_embeddings_faiss(index_path)
print(f"Number of embeddings stored in FAISS: {index.ntotal}")
print(f"FAISS index saved at: {index_path}")






Enter the file path: inputFile.txt
Select chunking method (semantic 1 / recursive 2): 2




Chunking Time: 0.0002 seconds
Chunks:
Introduction to Quantum Physics
Quantum physics, also known as quantum mechanics or quantum theory, is a fundamental branch of physics that deals with the behavior of matter and energy at the smallest scales, typically at the level of atoms and subatomic particles. Developed in the early 20th century, quantum physics has revolutionized our understanding of the universe, providing insights into the nature of reality that are often counterintuitive and challenging to grasp.
Historical Background
The roots of quantum physics can be traced back to the late 19th and early 20th centuries when classical physics could not explain certain experimental observations. Key milestones in the development of quantum theory include:
 Blackbody Radiation: Max Planck's solution to the ultraviolet catastrophe in 1900 introduced the concept of quantized energy levels, laying the groundwork for quantum theory.
Photoelectric Effect: In 1905, Albert Einstein explained the

In [62]:

# Querying similar chunks based on user input
query = input("Enter your query: ").strip()
query_similar_chunks(query, chunker)

Enter your query: What is quantum physics?
Indices:
[[  1  38 111 148 185]]
distances
[[0.23520154 0.23520154 0.23520154 0.23520154 0.23520154]]
[('Quantum physics, also known as quantum mechanics or quantum theory, is a fundamental branch of physics that deals with the behavior of matter and energy at the smallest scales, typically at the level of atoms and subatomic particles. Developed in the early 20th century, quantum physics has revolutionized our understanding of the universe, providing insights into the nature of reality that are often counterintuitive and challenging to grasp.', 0.23520154)]

Top 5 most similar chunks:
1. Chunk: Quantum physics, also known as quantum mechanics or quantum theory, is a fundamental branch of physics that deals with the behavior of matter and energy at the smallest scales, typically at the level of atoms and subatomic particles. Developed in the early 20th century, quantum physics has revolutionized our understanding of the universe, providing ins

In [20]:
import re
import time
import os
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from langchain_community.embeddings import HuggingFaceEmbeddings
from docx import Document

# Global variable to store embeddings
embeddings_list = []

class ChunkerBase:
    def __init__(self, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200):
        self.embeddings_model = HuggingFaceEmbeddings(model_name=model_name)
        self.chunks = []
        self.chunk_embeddings = []
        self.max_chunk_size = max_chunk_size

    def embed_chunks(self):
        self.chunk_embeddings = self.embeddings_model.embed_documents(self.chunks)
        # Append embeddings to global list
        embeddings_list.extend(self.chunk_embeddings)

    def store_embeddings_in_memory(self):
        return embeddings_list

class SemanticChunker(ChunkerBase):
    def __init__(self, similarity_threshold=0.6, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200):
        super().__init__(model_name, max_chunk_size)
        self.similarity_threshold = similarity_threshold

    def chunk_text(self, text):
        sentences = re.split(r'(?<=[.?!])\s+', text)
        self.chunks = []
        current_chunk = sentences[0]

        for i in range(1, len(sentences)):
            next_sentence = sentences[i]
            similarity_score = self._calculate_similarity(current_chunk, next_sentence)
            if similarity_score >= self.similarity_threshold:
                current_chunk += ' ' + next_sentence
            else:
                self.chunks.append(current_chunk)
                current_chunk = next_sentence

        self.chunks.append(current_chunk)

    def _calculate_similarity(self, sentence1, sentence2):
        embeddings = self.embeddings_model.embed_documents([sentence1, sentence2])
        similarity_score = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]
        return similarity_score

class RecursiveChunker(ChunkerBase):
    def __init__(self, model_name="BAAI/bge-small-en-v1.5", max_chunk_size=200, max_paragraph_words=100):
        super().__init__(model_name, max_chunk_size)
        self.max_paragraph_words = max_paragraph_words

    def _split_sentences(self, text):
        return re.split(r'(?<=[.?!])\s+', text)

    def recursive_chunk_text(self, text):
        words = text.split()

        if len(words) <= self.max_chunk_size:
            return [text]

        # Find the best splitting point
        split_point = self.max_chunk_size
        while split_point > 0 and not words[split_point - 1].endswith(('.', '!', '?')):
            split_point -= 1

        if split_point == 0:  # No good splitting point found, force split at max_chunk_size
            split_point = self.max_chunk_size

        # Split the text
        chunk = ' '.join(words[:split_point])
        remaining_text = ' '.join(words[split_point:])

        # Recursive call
        chunks = [chunk] + self.recursive_chunk_text(remaining_text)

        return chunks

    def chunk_text(self, text):
        paragraphs = text.split('\n\n')
        self.chunks = []

        for paragraph in paragraphs:
            if len(paragraph.split()) > self.max_paragraph_words:
                indented_chunks = self.recursive_chunk_text(paragraph)
                self.chunks.extend(indented_chunks)
            else:
                self.chunks.append(paragraph)

def read_text_from_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    return text

def read_docx(file_path):
    # Open the document
    doc = Document(file_path)

    full_text = []
    # Extract text from paragraphs
    for para in doc.paragraphs:
        full_text.append(para.text)

    # Extract text from tables
    for table in doc.tables:
        for row in table.rows:
            row_data = []
            for cell in row.cells:
                row_data.append(cell.text.strip())
            full_text.append('\t'.join(row_data))

    return '\n'.join(full_text)

# Function to store embeddings in memory (list)
def store_embeddings_in_memory(embeddings):
    global embeddings_list
    embeddings_list.extend(embeddings)

# Function to query similar chunks
def query_similar_chunks(query_text, model, embeddings_list, paragraph_list, top_k=5):
    """
    Queries the stored embeddings list with the query text and returns the top K similar paragraphs.
    """
    # Generate embedding for the query text
    query_embedding = model.embed_documents([query_text])[0]

    # Calculate cosine similarity between query embedding and stored embeddings
    similarities = cosine_similarity([query_embedding], embeddings_list)[0]

    # Sort indices based on similarity (descending) and get top K
    top_k_indices = np.argsort(similarities)[::-1][:top_k]

    # Retrieve and print the text of the top K similar paragraphs
    similar_paragraphs = [paragraph_list[i] for i in top_k_indices]
    return similar_paragraphs

# Main code
file_path = input("Enter the file path: ").strip()
if file_path.endswith('.docx'):
    text = read_docx(file_path)
elif file_path.endswith('.txt'):
    text = read_text_from_file(file_path)
else:
    print("Unsupported file type.")
    exit()

chunking_method = input("Select chunking method (semantic 1 / recursive 2): ").strip().lower()

if chunking_method == "1":
    chunker = SemanticChunker()
elif chunking_method == "2":
    chunker = RecursiveChunker()
else:
    print("Invalid method selected. Defaulting to recursive chunking.")
    chunker = RecursiveChunker()

# Perform chunking and embedding
chunker.chunk_text(text)
chunker.embed_chunks()

# Store embeddings in memory (list)
store_embeddings_in_memory(chunker.chunk_embeddings)

# Querying similar chunks based on user input
query = input("Enter your query: ").strip()
similar_paragraphs = query_similar_chunks(query, chunker.embeddings_model, embeddings_list, chunker.chunks)

print("\nTop 5 most similar chunks:")
for i, paragraph in enumerate(similar_paragraphs, start=1):
    print(f"{i}. {paragraph}")
    print("-" * 40)


Enter the file path: inputFile.txt
Select chunking method (semantic 1 / recursive 2): 2
Enter your query: what is quantum physics?


IndexError: list index out of range

# Without FAISS

In [37]:
import re
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from langchain_community.embeddings import HuggingFaceEmbeddings

class SentenceChunker:
    def __init__(self, similarity_threshold=0.6, model_name="BAAI/bge-small-en-v1.5"):
        self.similarity_threshold = similarity_threshold
        self.embeddings_model = HuggingFaceEmbeddings(model_name=model_name)
        self.chunks = []
        self.chunk_embeddings = []

    def _split_sentences(self, text):
        return re.split(r'(?<=[.?!])\s+', text)

    def chunk_text(self, text):
        # Split text into individual sentences
        sentences = self._split_sentences(text)

        # Initialize variables
        self.chunks = []
        current_chunk = sentences[0]

        # Iterate over sentences to form chunks
        for i in range(1, len(sentences)):
            next_sentence = sentences[i]

            # Calculate cosine similarity between current and next sentence embeddings
            similarity_score = self._calculate_similarity(current_chunk, next_sentence)

            # Check if similarity score is above threshold
            if similarity_score >= self.similarity_threshold:
                current_chunk += ' ' + next_sentence
            else:
                self.chunks.append(current_chunk)
                current_chunk = next_sentence

        # Append the last chunk
        self.chunks.append(current_chunk)

        # Print the number of chunks formed
        print(f'Number of chunks formed: {len(self.chunks)}')

    def _calculate_similarity(self, sentence1, sentence2):
        # Embed the sentences
        embeddings = self.embeddings_model.embed_documents([sentence1, sentence2])
        embedding1 = embeddings[0]
        embedding2 = embeddings[1]

        # Calculate cosine similarity between embeddings
        similarity_score = cosine_similarity([embedding1], [embedding2])[0][0]
        return similarity_score

    def embed_chunks(self):
        # Embed each chunk using embeddings model
        self.chunk_embeddings = self.embeddings_model.embed_documents(self.chunks)

    def similarity_search(self, query, top_k=5):
        # Check if chunks are already formed and embedded
        if not self.chunks or not self.chunk_embeddings:
            raise ValueError("Text must be chunked and embedded before performing a similarity search.")

        # Embed the query
        query_embedding = self.embeddings_model.embed_documents([query])[0]

        # Calculate similarity between query embedding and each chunk embedding
        similarities = []
        for chunk_embedding in self.chunk_embeddings:
            similarity_score = cosine_similarity([query_embedding], [chunk_embedding])[0][0]
            similarities.append(similarity_score)

        # Sort chunks based on similarity scores (descending order)
        sorted_indices = np.argsort(similarities)[::-1]
        top_k_chunks = [self.chunks[idx] for idx in sorted_indices[:top_k]]
        top_k_similarities = [similarities[idx] for idx in sorted_indices[:top_k]]

        return top_k_chunks, top_k_similarities

    def generate_prompt(self, query, similar_text):
        return f"Query: {query}\nSimilar Text: {similar_text}"

# Example usage
def read_text_from_file(file_path):
    with open(file_path, 'r') as file:
        text = file.read()
    return text

file_path = "Netflix_TC.txt"
text = read_text_from_file(file_path)

sentence_chunker = SentenceChunker()

# Chunk the text once
sentence_chunker.chunk_text(text)

# Embed the chunks
sentence_chunker.embed_chunks()




Number of chunks formed: 62


In [38]:
for i, chunk in enumerate(sentence_chunker.chunks):
    print(f'Chunk {i+1}: {chunk}')
# Perform similarity search
query ="What is the legal age of use ?"
top_k_chunks, top_k_similarities = sentence_chunker.similarity_search(query)

# Print top-k similar chunks
for i, chunk in enumerate(top_k_chunks):
    print(f"Similar Chunk {i+1}: {chunk}")
    print(f"Similarity Score: {top_k_similarities[i]}\n")

Chunk 1: Netflix Terms of Use
Netflix provides a personalized subscription service that allows our members to access entertainment content (“Netflix content”) over the Internet on certain Internet-connected TVs, computers and other devices ("Netflix ready devices"). These Terms of Use govern your use of our service. As used in these Terms of Use, "Netflix service", "our service" or "the service" means the personalized service provided by Netflix for discovering and accessing Netflix content, including all features and functionalities, recommendations and reviews, our websites, and user interfaces, as well as all content and software associated with our service. References to ‘you’ in these Terms of Use indicate the member who created the Netflix account and whose payment method is charged.
Chunk 2: Membership

1.1.
Chunk 3: Your Netflix membership will continue until terminated. To use the Netflix service you must have Internet access and a Netflix ready device, and provide us with one