using llm for chunking (with openai api key if we have)

In [None]:
from dotenv import load_dotenv
from langchain.text_splitter import TextSplitter
from typing import List, Any
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
import re
import tiktoken
load_dotenv()

with open("textbook.txt", encoding="utf-8") as f:
    data = f.read()

class Splitter(TextSplitter):
    def __init__(
        self,
        model_name: str = "gpt-4o",
        prompt_type: str = "wide",
        count_tokens: bool = False,
        encoding_name: str = "cl100k_base",
        **kwargs: Any
    )->None:
        super().__init__(**kwargs)
        self.model_name = model_name
        self.count_tokens = count_tokens
        self.encoding_name = encoding_name
        self.model = ChatOpenAI(model=self.model_name)
        self.output_parser = StrOutputParser

        wide = "Split the text according to broad topics and add >>> <<< around each chunk: '{text}'"
        gran = "Split the text into details topics and add >>> <<< around each chunk: '{text}'"

        if prompt_type == "wide":
            self.prompt_template = ChatPromptTemplate.from_template(wide)
        elif prompt_type == "gran":
            self.prompt_template = ChatPromptTemplate.from_template(gran)
        self.chain = self.prompt_template | self.model | self.output_parser

    def num_tokens_from_string(self, string):
        encoding = tiktoken.get_encoding(self.encoding_name)
        num_tokens = len(encoding.encode(string))
        return num_tokens

    def split_text(self, text):
        if self.count_tokens:
            token_count = self.num_tokens_from_string(text)
            print(token_count)
        response = self.chain.invoke({"text": text})
        return self._format_chunks(response)

    def _format_chunks(self, text):
        pattern = r">>>(.*?)<<<"
        chunks = re.findall(pattern, text, re.DOTALL)
        formatted_chunks = [chunk.strip() for chunk in chunks]
        return formatted_chunks

with gpt4free (if no api key)

In [109]:
from g4f.client import Client
import re
import tiktoken

class Splitter:
    def __init__(self, model_name: str = "gpt-4o", count_tokens: bool = False, encoding_name: str = "cl100k_base"):
        self.model_name = model_name
        self.count_tokens = count_tokens
        self.encoding_name = encoding_name
        self.client = Client(provider="Blackbox")

        # Define prompts
        self.identify_topics_prompt = "'{text}'\nIdentify section headers, present them and separate the headers by commas, no additional formatting. Section headers are basically topic headers that form the starting point of different sections."
        self.split_text_prompt = "'{topics}' Utilize the given section headers, and present me the sections from the text. Each section (the entire section, not just the header) should be encased within >>> and <<<. Here is the text: '{text}'"

    def num_tokens_from_string(self, string):
        encoding = tiktoken.get_encoding(self.encoding_name)
        num_tokens = len(encoding.encode(string))
        return num_tokens

    def identify_main_topics(self, text):
        # Ask the LLM to identify the main topics
        response = self.client.chat.completions.create(
            model=self.model_name,
            messages=[{"role": "user", "content": self.identify_topics_prompt.format(text=text)}]
        )
        topics_response = response.choices[0].message.content.strip()
        # Split the topics by commas or new lines
        topics = [topic.strip() for topic in topics_response.split(",")]
        return topics
        # topics_response = response.choices[0].message.content
        # pat = r">>>(.*?)<<<"
        # topics = re.findall(pat, topics_response, re.DOTALL)
        # return topics
    
    def split_text_by_topics(self, text, topics):
        topics_str = ', '.join(topics)
        response = self.client.chat.completions.create(
            model=self.model_name,
            messages=[{"role": "user", "content": self.split_text_prompt.format(topics=topics_str, text=text)}]
        )
        chunks_response = response.choices[0].message.content.strip()
        return self._format_chunks(chunks_response)

    def _format_chunks(self, text):
        # Use a regex pattern to find the chunks
        pattern = r">>>(.*?)<<<"
        chunks = re.findall(pattern, text, re.DOTALL)
        formatted_chunks = [chunk.strip() for chunk in chunks]  # Strip whitespace from chunks
        return formatted_chunks

    def split_text_by_token_limit(self, chunks, token_limit=512):
        # Further split chunks if they exceed the token limit
        smaller_chunks = []
        for chunk in chunks:
            num_tokens = self.num_tokens_from_string(chunk)
            if num_tokens > token_limit:
                # Split large chunks into smaller ones
                smaller_chunks.extend(self.split_large_chunk(chunk, token_limit))
            else:
                smaller_chunks.append(chunk)
        return smaller_chunks

    def split_large_chunk(self, chunk, token_limit):
        # Split the chunk into smaller parts respecting the token limit
        lines = chunk.split("\n")
        smaller_chunks = []
        current_chunk = ""

        for line in lines:
            current_chunk += line + "\n"
            if self.num_tokens_from_string(current_chunk) > token_limit:
                smaller_chunks.append(current_chunk.strip())
                current_chunk = ""

        if current_chunk:
            smaller_chunks.append(current_chunk.strip())

        return smaller_chunks

# Opening the chapter of the book
with open("textbook.txt", encoding="utf-8") as f:
    data = f.read()

# Instantiate the Splitter class
sp = Splitter(count_tokens=True)

# Step 1: Identify the main topics using the LLM
topics = sp.identify_main_topics(data)
print(f"Identified Topics: {topics}\n")

# Step 2: Split the text based on the identified topics
chunks = sp.split_text_by_topics(data, topics)

# Display the final chunks
for idx, chunk in enumerate(chunks):
    print(f"Chunk {idx + 1}: {chunk}\n---")


Identified Topics: ['ARISTOTLE (384–322 B.C.)', 'LOGIC', 'VIRTUE', 'THOMAS HOBBES’S ADDITION']

Chunk 1: ARISTOTLE (384–322 B.C.)  
Wisdom starts with understanding yourself  
Aristotle was born around 384 b.c. Though little is known about his mother, Aristotle’s father was court physician to the Macedonian king Amyntas II (the connection and affiliation with the Macedonian court would continue to play an important role throughout Aristotle’s life). Both of Aristotle’s parents died when he was young, and at the age of seventeen, Aristotle’s guardian sent him to Athens to pursue a higher education. It was in Athens that Aristotle would enroll in Plato’s Academy and study under Plato. He would remain there for the next twenty years, studying with Plato as both a student and colleague.  
When Plato died in 347 b.c., many believed Aristotle would take his place as director of the Academy. However, by that time, Aristotle had differing views on several of Plato’s works (for example, he disa

**with context aware chunking**

inital method that chunked based on headers (doesn't consider token limits + need to modify it into one class and add the methods to the class)

In [151]:
from pdfminer.layout import LAParams, LTTextBox, LTTextLine, LTChar
from pdfminer.high_level import extract_pages
from collections import defaultdict

# Define a class to hold each line's metadata
class DocumentLineItem:
    def __init__(self, position_x, position_y, content, font_height, total_characters):
        self.position_x = position_x
        self.position_y = position_y
        self.content = content
        self.font_height = font_height
        self.total_characters = total_characters

    def __repr__(self):
        return f"Font Height: {self.font_height}, Text: {self.content}"

# Function to parse PDF and extract font metadata
def parse_pdf(file_path):
    line_items = []
    for page_layout in extract_pages(file_path, laparams=LAParams()):
        for element in page_layout:
            if isinstance(element, LTTextBox):
                for line in element:
                    if isinstance(line, LTTextLine):
                        font_sizes = []
                        content = line.get_text().strip()
                        total_characters = len(content)
                        # Collect font sizes from each character in the line
                        for char in line:
                            if isinstance(char, LTChar):
                                font_sizes.append(char.size)
                        if font_sizes:
                            avg_font_size = round(sum(font_sizes) / len(font_sizes))
                            # Capture line metadata
                            line_item = DocumentLineItem(line.x0, line.y0, content, avg_font_size, total_characters)
                            line_items.append(line_item)
    return line_items

# Function to get unique font heights
def get_list_of_fonts(line_items):
    font_heights = sorted(set(item.font_height for item in line_items), reverse=True)
    return font_heights

# Function to classify headers based on font size
def classify_headers_by_font(line_items, unique_fonts):
    # Heuristic: the largest font sizes represent headers
    header_threshold = unique_fonts[:2]  # Top 2 font sizes are likely headers
    return header_threshold

# Chunk the document based on identified headers
def chunk_document(line_items, header_fonts):
    chunks = defaultdict(list)
    current_header = None
    
    for item in line_items:
        # If the current line is in one of the header font sizes, treat it as a header
        if item.font_height in header_fonts:
            current_header = item.content
            chunks[current_header] = []  # Start a new section
        elif current_header:  # Add content to the current section
            chunks[current_header].append(item.content)
    
    return chunks

# Main function to process the PDF and chunk based on headers
def process_pdf(file_path):
    line_items = parse_pdf(file_path)
    unique_fonts = get_list_of_fonts(line_items)
    print("\nUnique Font Heights (sorted):", unique_fonts)
    
    # Classify headers based on font size
    header_fonts = classify_headers_by_font(line_items, unique_fonts)
    print("\nClassified Header Font Sizes:", header_fonts)

    # Chunk the document
    chunks = chunk_document(line_items, header_fonts)
    # Print the chunks
    # for header, content in chunks.items():
    #     print(f"\n\nHeader: {header}\nContent: {' '.join(content[:3])}...")  # Show first few lines of each chunk
    return chunks

# Example usage
chunks = process_pdf('phil_book.pdf')



Unique Font Heights (sorted): [27, 21, 15, 11]

Classified Header Font Sizes: [27, 21]


attempts at splitting chunks beyond a certain limit based on subheadings (wip, doesn't work properly atm)

this one doesn't split the sentences properly

In [None]:
from pdfminer.layout import LAParams, LTTextBox, LTTextLine, LTChar
from pdfminer.high_level import extract_pages
from collections import defaultdict
import re

# Define a class to hold each line's metadata
class DocumentLineItem:
    def __init__(self, position_x, position_y, content, font_height, total_characters, is_bold):
        self.position_x = position_x
        self.position_y = position_y
        self.content = content
        self.font_height = font_height
        self.total_characters = total_characters
        self.is_bold = is_bold  # Store if the font is bold

    def __repr__(self):
        return f"Font Height: {self.font_height}, Text: {self.content}, Bold: {self.is_bold}"

# Function to parse PDF and extract font metadata
def parse_pdf(file_path):
    line_items = []
    for page_layout in extract_pages(file_path, laparams=LAParams()):
        for element in page_layout:
            if isinstance(element, LTTextBox):
                for line in element:
                    if isinstance(line, LTTextLine):
                        font_sizes = []
                        content = line.get_text().strip()
                        total_characters = len(content)
                        is_bold = any('bold' in char.fontname.lower() for char in line if isinstance(char, LTChar))  # Check for bold font
                        # Collect font sizes from each character in the line
                        for char in line:
                            if isinstance(char, LTChar):
                                font_sizes.append(char.size)
                        if font_sizes:
                            avg_font_size = round(sum(font_sizes) / len(font_sizes))
                            # Capture line metadata
                            line_item = DocumentLineItem(line.x0, line.y0, content, avg_font_size, total_characters, is_bold)
                            line_items.append(line_item)
    return line_items

# Function to get unique font heights
def get_list_of_fonts(line_items):
    font_heights = sorted(set(item.font_height for item in line_items), reverse=True)
    return font_heights

# Function to classify headers based on font size
def classify_headers_by_font(line_items, unique_fonts):
    # Heuristic: the largest font sizes represent headers
    header_threshold = unique_fonts[:2]  # Top 2 font sizes are likely headers
    return header_threshold

# Function to split a chunk if it exceeds a specified token limit
def split_chunk(content, limit, header):
    # Convert content list to a single string
    content_str = ' '.join(content)
    
    # Use regex to split the string into sentences based on sentence enders
    sentences = re.split(r'(?<=[.!?]) +', content_str)

    chunks = []
    current_chunk = []
    chunk_number = 1

    for sentence in sentences:
        # Check if adding the current sentence would exceed the limit
        if len(' '.join(current_chunk + [sentence])) <= limit:
            current_chunk.append(sentence)
        else:
            # Only add complete chunks that are within the limit
            if current_chunk:
                chunks.append((f"{header} Part {chunk_number}", ' '.join(current_chunk)))  # Create header for split chunk
                chunk_number += 1  # Increment chunk number
            current_chunk = [sentence]  # Start a new chunk with the current sentence

    if current_chunk:  # Add any remaining sentences as a chunk
        chunks.append((f"{header} Part {chunk_number}", ' '.join(current_chunk)))

    return chunks

# Chunk the document based on identified headers
def chunk_document(line_items, header_fonts, limit):
    chunks = defaultdict(list)
    current_header = None
    
    for item in line_items:
        # If the current line is in one of the header font sizes, treat it as a header
        if item.font_height in header_fonts:
            current_header = item.content
            chunks[current_header] = []  # Start a new section
        elif current_header:  # Add content to the current section
            # Only split if the total token size exceeds the limit
            if len(' '.join(chunks[current_header] + [item.content])) > limit:
                split_contents = split_chunk([item.content], limit, current_header)
                for header, chunk in split_contents:
                    chunks[header].append(chunk)
            else:
                # Just append the content if it doesn't exceed the limit
                chunks[current_header].append(item.content)

    return chunks

# Main function to process the PDF and chunk based on headers
def process_pdf(file_path, limit):
    line_items = parse_pdf(file_path)
    unique_fonts = get_list_of_fonts(line_items)
    print("\nUnique Font Heights (sorted):", unique_fonts)
    
    # Classify headers based on font size
    header_fonts = classify_headers_by_font(line_items, unique_fonts)
    print("\nClassified Header Font Sizes:", header_fonts)

    # Chunk the document
    chunks = chunk_document(line_items, header_fonts, limit)
    
    # Print the chunks
    for header, content in chunks.items():
        print(f"\n\nHeader: {header}\nContent: {content[:3]}...")  # Show first few lines of each chunk
    return chunks

# Example usage
limit = 100  # Set your token limit here
chunks = process_pdf('phil_book.pdf', limit)


this one doesn't split them at all

In [193]:
import tiktoken
import re
from pdfminer.layout import LAParams, LTTextBox, LTTextLine, LTChar
from pdfminer.high_level import extract_pages
from collections import defaultdict

class Tokenizer:
    def __init__(self, encoding_name='cl100k_base'):
        self.encoding_name = encoding_name
    
    def num_tokens_from_string(self, string):
        encoding = tiktoken.get_encoding(self.encoding_name)
        num_tokens = len(encoding.encode(string))
        return num_tokens
        
class PDFProcessor:
    def __init__(self, file_path, token_limit=512):
        self.file_path = file_path
        self.token_limit = token_limit
        self.tokenizer = Tokenizer()
        self.chunks = {}    

    # Function to parse PDF and extract font metadata
    def parse_pdf(self):
        line_items = []
        for page_layout in extract_pages(self.file_path, laparams=LAParams()):
            for element in page_layout:
                if isinstance(element, LTTextBox):
                    for line in element:
                        if isinstance(line, LTTextLine):
                            font_sizes = []
                            content = line.get_text().strip()
                            total_characters = len(content)
                            font_name = ""
                            is_bold = False

                            # Collect font sizes and check if the font is bold
                            for char in line:
                                if isinstance(char, LTChar):
                                    font_sizes.append(char.size)
                                    font_name = char.fontname  # Get the font name
                                    if "bold" in font_name.lower():
                                        is_bold = True  # Check for bold text

                            if font_sizes:
                                avg_font_size = round(sum(font_sizes) / len(font_sizes))
                                # Capture line metadata
                                line_item = DocumentLineItem(line.x0, line.y0, content, avg_font_size, total_characters, is_bold)
                                line_items.append(line_item)
        return line_items

    # Function to get unique font heights
    def get_list_of_fonts(self, line_items):
        font_heights = sorted(set(item.font_height for item in line_items), reverse=True)
        return font_heights

    # Function to classify headers based on font size
    def classify_headers_by_font(self, line_items, unique_fonts):
        # Heuristic: the largest font sizes represent headers
        header_threshold = unique_fonts[:2]  # Top 2 font sizes are likely headers
        return header_threshold

    # Function to split a paragraph into pieces based on token limit
    def split_paragraph_by_token_limit(self, paragraph):
        lines = paragraph.split('\n')
        pieces = []
        current_piece = []
        current_token_count = 0
        
        for line in lines:
            line_token_count = self.tokenizer.num_tokens_from_string(line)  # Count tokens in the current line
            if current_token_count + line_token_count > self.token_limit:
                # If adding this line exceeds the token limit, close the current piece
                pieces.append('\n'.join(current_piece))
                current_piece = [line]  # Start a new piece with the current line
                current_token_count = line_token_count  # Reset token count for the new piece
            else:
                # Add line to the current piece
                current_piece.append(line)
                current_token_count += line_token_count
        
        # Add the last piece if it contains content
        if current_piece:
            pieces.append('\n'.join(current_piece))
        
        return pieces

    # Chunk the document based on identified headers
    # Chunk the document based on identified headers
    def chunk_document(self, line_items, header_fonts):
        current_header = None
        current_chunk_content = []

        for item in line_items:
            # If the current line is in one of the header font sizes, treat it as a header
            if item.font_height in header_fonts:
                if current_header:  # If there's an existing chunk, check its size before resetting
                    combined_content = " ".join(current_chunk_content)
                    if self.tokenizer.num_tokens_from_string(combined_content) > self.token_limit:
                        # If the chunk exceeds the limit, split it
                        split_chunks = self.split_paragraph_by_token_limit(combined_content)
                        for idx, chunk in enumerate(split_chunks):
                            new_header = f"{current_header} (Part {idx + 1})"
                            self.chunks[new_header] = [chunk]  # Add the new header and chunk
                    else:
                        # Otherwise, add the complete chunk to the dictionary
                        self.chunks[current_header] = [" ".join(current_chunk_content)]  # Store as a single string

                # Start a new section with the new header
                current_header = item.content
                current_chunk_content = []  # Reset current chunk content
            elif current_header:  # Add content to the current section
                current_chunk_content.append(item.content)

        # Handle the last chunk after the loop ends
        if current_header and current_chunk_content:
            combined_content = " ".join(current_chunk_content)
            if self.tokenizer.num_tokens_from_string(combined_content) > self.token_limit:
                # If the last chunk exceeds the limit, split it
                split_chunks = self.split_paragraph_by_token_limit(combined_content)
                for idx, chunk in enumerate(split_chunks):
                    new_header = f"{current_header} (Part {idx + 1})"
                    self.chunks[new_header] = [chunk]  # Add the new header and chunk
            else:
                # Otherwise, add the complete last chunk to the dictionary
                self.chunks[current_header] = [" ".join(current_chunk_content)]  # Store as a single string

    def display_font_metadata(self, line_items):
        for item in line_items:
            print(item) 

    def process_pdf(self):
        line_items = self.parse_pdf()
        unique_fonts = self.get_list_of_fonts(line_items)
        header_fonts = self.classify_headers_by_font(line_items, unique_fonts)
        self.chunk_document(line_items, header_fonts)
        for header, content in self.chunks.items():
            print(f"\n\nHeader: {header}\nContent: {content[:3]}...")  # Show first few lines of each chunk
        return self.chunks

# Example usage
pdf_processor = PDFProcessor('phil_book.pdf', token_limit=512)
chunks = pdf_processor.process_pdf()




Header: INTRODUCTION
Content: ['']...


Header: What Is Philosophy?
Content: ['The very question sounds philosophical, doesn’t it? But what exactly does that mean? What is philosophy? The  word  philosophy  means  “love  of  wisdom.”  Indeed,  it  is  a  love  of wisdom  that  guides  philosophers  to  explore  the  fundamental  questions about  who  we  are  and  why  we’re  here.  On  the  surface,  philosophy  is  a social science. But as you read this book, you’ll discover that it is so much more  than  that.  Philosophy  touches  on  every  subject  you  could  possibly think of. It’s not just a bunch of old Greek guys asking each other questions over and over again (though it has its fair share of that as well). Philosophy has very real applications; from the ethical questions raised in government policy to the logic forms required in computer programming, everything has its roots in philosophy. Through philosophy, we are able to explore concepts like the meaning of life,  know

In [197]:
p = PDFProcessor('phil_book.pdf')
l = p.parse_pdf()
p.display_font_metadata(l)

Font Height: 27, Text: INTRODUCTION, Bold: True
Font Height: 21, Text: What Is Philosophy?, Bold: True
Font Height: 15, Text: The very question sounds philosophical, doesn’t it? But what exactly does, Bold: False
Font Height: 15, Text: that mean? What is philosophy?, Bold: False
Font Height: 15, Text: The  word  philosophy  means  “love  of  wisdom.”  Indeed,  it  is  a  love  of, Bold: False
Font Height: 15, Text: wisdom  that  guides  philosophers  to  explore  the  fundamental  questions, Bold: False
Font Height: 15, Text: about  who  we  are  and  why  we’re  here.  On  the  surface,  philosophy  is  a, Bold: False
Font Height: 15, Text: social science. But as you read this book, you’ll discover that it is so much, Bold: False
Font Height: 15, Text: more  than  that.  Philosophy  touches  on  every  subject  you  could  possibly, Bold: False
Font Height: 15, Text: think of. It’s not just a bunch of old Greek guys asking each other questions, Bold: False
Font Height: 15, Text: over 