# Chonking for Better DocQnA

## Queries

|Query|Expectation|Implementation Detail|
|---|---|---|
| What information present in an Manufacturer's Manual is actually irrelevant for writing an SOP? | <li> The irrelevant sections can be dropped off </li> <li>If SOP contains ALL of the information in the Manufacturer's Manual then what's the point of rewriting?</li>|Filter Table of Contents & drop irrelevant pages|
|How is the procedure in the Manufacturer's Manual different from what's in an SOP? | <li>SOPs have pre-defined tasks that need to be carried out - user specific experiments. </li><li>These tasks require combining the various sub-procedures specified by the manufacturer.</li>|<ol><li>Identify sub-processes</li><li>Store subprocess summaries in metadata</li><li>Prompt LLM with experiment & Subprocess Options</li></ol>

In [318]:
import fitz
import re
import pandas as pd
import json
import os
import shutil
from IPython.display import display, HTML
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.utilities import PythonREPL

from dotenv import load_dotenv

In [2]:
if load_dotenv('openai.env'):
    bg, tx, msg = "lightgreen", "darkgreen", "OpenAI environment loaded successfully"
else:
    bg, tx, msg = "#FFC0CB", "#8B0000", "Could not find <code>openai.env</code> file."
display(HTML(f"""<div style="background-color: {bg}; color: {tx}; border-radius: 10px; padding: 10px;">
{msg}
</div>
"""))

In [3]:
model = ChatOpenAI()

In [4]:
doc = fitz.open("data/Akta_Pure_User_Manual.PDF")

In [5]:
pages = []
for page in doc.pages(start = 1, stop = 10):
    pages.append(page)

## Extract Table of Contents

In [6]:
contents_text = pages[0].get_text()
pattern = r'(\d+(\.\d+)*)\s+(.*?)\s+(\d+)'
matches = re.findall(pattern, contents_text)
table_of_contents = []
for match in matches:
    number = match[0]
    heading = re.sub('\.{2,}', '', match[2])
    page_number = match[3]
    table_of_contents.append({"Index": number, "Heading": heading})

table_of_contents = pd.DataFrame(table_of_contents)
table_of_contents.head()

Unnamed: 0,Index,Heading
0,1.0,Introduction
1,1.1,Important user information
2,1.2,ÄKTA pure overview
3,1.3,ÄKTA pure user documentation
4,2.0,The ÄKTA pure instrument


Easy Alternative!

In [7]:
table_of_contents = doc.get_toc()
table_of_contents = pd.DataFrame(table_of_contents, columns = ['Level', 'Title', 'Page Number'])
table_of_contents.head()

Unnamed: 0,Level,Title,Page Number
0,1,Coverpage,1
1,1,Table of Contents,2
2,1,1 Introduction,6
3,2,1.1 Important user information,7
4,2,1.2 ÄKTA pure overview,9


In [8]:
table_of_contents['End Page'] = table_of_contents['Page Number'].shift(-1) - 1

In [9]:
level_wise_ends = []
max_level = table_of_contents.Level.max()
for level in range(2, max_level+2):
    level_wise_ends.append(table_of_contents[table_of_contents.Level.apply(lambda l: l < level)]['Page Number'].shift(-1) - 1)

In [10]:
section_end_pages = pd.Series(dtype='float')
for series in level_wise_ends:
    section_end_pages = section_end_pages.combine_first(series)

In [11]:
table_of_contents['End Page'] = section_end_pages

In [12]:
table_of_contents.head(30)

Unnamed: 0,Level,Title,Page Number,End Page
0,1,Coverpage,1,1.0
1,1,Table of Contents,2,5.0
2,1,1 Introduction,6,12.0
3,2,1.1 Important user information,7,8.0
4,2,1.2 ÄKTA pure overview,9,10.0
5,2,1.3 ÄKTA pure user documentation,11,12.0
6,1,2 The ÄKTA pure instrument,13,85.0
7,2,2.1 Overview illustrations,14,24.0
8,2,2.2 Liquid flow path,25,26.0
9,2,2.3 Instrument control panel,27,31.0


## Extract Images

In [60]:
IMG_DIR = 'images'
os.makedirs(IMG_DIR, exist_ok=True)

In [61]:
for page_index in range(len(doc)):
	page = doc[page_index]
	image_list = page.get_images()

	for image_index, img in enumerate(image_list, start=1):
		xref = img[0]
		pix = fitz.Pixmap(doc, xref)

		if pix.n - pix.alpha > 3:
			pix = fitz.Pixmap(fitz.csRGB, pix)

		pix.save(f"{IMG_DIR}/page_{page_index}-image_{image_index}.png")
		pix = None
print(f"Found {len(os.listdir(IMG_DIR))} images. Saved all under {IMG_DIR}/")

Found 732 images. Saved all under images/


## Extract Tables

In [90]:
from pprint import pprint

page = doc[9]
print(page.get_text()[0:200])
print('\n','-'*50)

tabs = page.find_tables()
print(f"{len(tabs.tables)} tables found on {page}\n", '-'*50)
pd.DataFrame(tabs[0].extract()[1:], columns = tabs[0].extract()[0])

Main functions
Module
Create and edit methods using one or a combination of:
Method Editor
•
Predefined methods with built-in application support
•
Drag-and-drop function to build methods with relevan

 --------------------------------------------------
1 tables found on page 9 of data/Akta_Pure_User_Manual.PDF
 --------------------------------------------------


Unnamed: 0,Module,Main functions
0,Method Editor,Create and edit methods using one or a combina...
1,System Control,"Start, monitor and control runs. The current f..."
2,Evaluation,"Open results, evaluate runs and create reports..."
3,,


In [153]:
TABLE_DIR = 'tables'
os.makedirs(TABLE_DIR, exist_ok=True)
os.makedirs(os.path.join(TABLE_DIR, '.temp'), exist_ok=True)

In [154]:
def extract_and_save_tables(doc, output_directory = os.path.join(TABLE_DIR, '.temp')): 
    count = 0
    for page_num in range(doc.page_count):
        page = doc[page_num]
        tabs = page.find_tables()
        if len(tabs.tables) > 0:
            for idx, tab in enumerate(tabs):
                table_data = tab.extract()
                header, *rows = table_data
    
                # Convert table data to DataFrame
                df = pd.DataFrame(rows, columns=header)
    
                # Save DataFrame as CSV
                table_filename = f"table_page_{page_num + 1}_idx_{idx + 1}.csv"
                table_path = os.path.join(output_directory, table_filename)
                if df.shape[0]>0:
                    df.to_csv(table_path, index=False)
                    count+=1
    print(f"Saved {count} tables")

In [158]:
def merge_and_save_tables(output_directory):
    input_directory = os.path.join(output_directory, '.temp')
    table_files = sorted(os.listdir(input_directory))
    queue = []

    for table_file in table_files:
        table_path = os.path.join(input_directory, table_file)
        df = pd.read_csv(table_path)

        try: 
            assert (not queue or all(df.columns == queue[0].columns))
            queue.append(df)
        except:
            merged_table = pd.concat(queue, ignore_index=True)
            merged_filename = f"merged_{table_file}"
            merged_table_path = os.path.join(output_directory, merged_filename)
            merged_table.to_csv(merged_table_path, index=False)

            queue = [df]

    if queue:
        merged_table = pd.concat(queue, ignore_index=True)
        merged_filename = f"merged_last_{table_file}"
        merged_table_path = os.path.join(output_directory, merged_filename)
        merged_table.to_csv(merged_table_path, index=False)

    shutil.rmtree(input_directory)
    print(f"{len(os.listdir(output_directory))} tables in repository after merging")

In [156]:
extract_and_save_tables(doc)

Saved 402 tables


In [159]:
merge_and_save_tables(TABLE_DIR)

180 tables in repository after merging


## Extract Text

### Identify relevant pages

In [292]:
prompt = f'''
Below is the table of contents for my document:
```
{table_of_contents[table_of_contents.Level.apply(lambda l: l<3)]['Title'].to_markdown(index = False)}
```
I want to only keep the actual useful content for further processing.
Most documents have useless sections at the beginning and at the end like covers, index, contents, preface, acknowledgements, promotions, catalogs, appendix etc.
The useless contents are usually at the beginning and at the end of the document.
Can you return the list of such useless titles in the contents I provided that I can drop off? 
Only respond with a comma separated list of Titles. Don't write anything else.
'''

In [303]:
#response = model.predict(prompt) # commented out to avoid un-necessary API calls

response: 'Coverpage, Table of Contents, Index'

In [297]:
titles_to_drop = list(map(lambda s: s.strip(), response.split(',')))

In [304]:
filtered_table_of_contents = table_of_contents[~table_of_contents['Title'].isin(titles_to_drop)]

start_page = filtered_table_of_contents['Page Number'].iloc[0]
end_page = filtered_table_of_contents['End Page'].iloc[-1]

print(f"Range of useful pages: {start_page} - {end_page}")

Range of useful pages: 6 - 512.0


### Extract text bodies

Since all pages have headers and footers, we need to have an additional step in the front end where the user gets to select the area of the pages where relevant text is available for clean extraction.<br>
For dev purposes, the bounding box is hard coded by skipping 10% of page height from the top margin and 5% of page height from the bottom margin.

In [81]:
page_bbox = doc[0].bound()

In [82]:
page_height = page_bbox.height
page_width = page_bbox.width

In [375]:
x0, y0, x1, y1 = page_bbox

In [376]:
page_without_header_footer = fitz.Rect(x0, y0+0.1*page_height, x1, y1-0.05*page_height)

In [377]:
page = doc[236]

In [378]:
fulltext = page.get_text()
filteredtext = page.get_text(clip = page_without_header_footer)
print(fulltext.replace(filteredtext, ''))

ÄKTA pure User Manual 29119969 AC
237
6 Performance tests
6.6 Fraction collector F9-C test



Above is a quick test that the bounding box defined is indeed able to filter out the headers and footers.

In [379]:
def extract_text_bodies(start_page: int, end_page: int = None, page_bbox: fitz.Rect = page_without_header_footer, doc: fitz.Document = doc):
    all_text_outside_tables = []

    for page_num in range(start_page, (start_page if end_page is None else end_page)+1):
        page = doc[page_num]
        all_text = page.get_text("text", clip = page_bbox)
        table_texts = []
    
        for table in page.find_tables():
            table_bbox = table.bbox
            table_text = page.get_textbox(table_bbox)
            table_texts.append(table_text)
    
        for table_text in table_texts:
            all_text = all_text.replace(table_text, '')
    
        all_text_outside_tables.append(all_text)
    
    return '\n'.join(all_text_outside_tables)

In [422]:
table_of_contents.head(10)

Unnamed: 0,Level,Title,Page Number,End Page
0,1,Coverpage,1,1.0
1,1,Table of Contents,2,5.0
2,1,1 Introduction,6,12.0
3,2,1.1 Important user information,7,8.0
4,2,1.2 ÄKTA pure overview,9,10.0
5,2,1.3 ÄKTA pure user documentation,11,12.0
6,1,2 The ÄKTA pure instrument,13,85.0
7,2,2.1 Overview illustrations,14,24.0
8,2,2.2 Liquid flow path,25,26.0
9,2,2.3 Instrument control panel,27,31.0


In [None]:
print(extract_text_bodies(6,6))

## What Next?

In [153]:
from typing import Dict, List, Optional
import atexit

In [239]:
class _RichDocument:
    '''A Class to load a rich PDF document having metadata'''
    def __init__(self, path: str, header_footer_area: [float, float] = [0.1, 0.1]):
        '''
        source = <str> -> document path
        header_footer_area = [0.1, 0.1] -> by default 10% of page area from top and bottom margins are ignored
        '''
        self.doc = fitz.open(path)
        atexit.register(self.__del__)

        self.filename = path.split('/')[-1]
        self.toc = self._get_table_of_contents()
        _bbox = self.doc[0].bound()
        _bbox.y0+= _bbox.height * header_footer_area[0]
        _bbox.y1-= _bbox.height * header_footer_area[1]
        self.head_footer_clipping = fitz.Rect(_bbox)

    def __len__(self):
        return len(self.doc)
    
    def __del__(self):
        self.doc.close()
        return "file closed"
        
    def _get_table_of_contents(self):
        toc = pd.DataFrame(self.doc.get_toc(), columns = ['Level', 'Title', 'Start Page'])
        toc['End Page'] = toc['Start Page'].shift(-1) - 1

        level_wise_ends = []
        max_level = toc.Level.max()
        for level in range(2, max_level+2):
            level_wise_ends.append(toc[toc.Level.apply(lambda l: l < level)]['Start Page'].shift(-1) - 1)
        
        section_end_pages = pd.Series(dtype='float')
        for series in level_wise_ends:
            section_end_pages = section_end_pages.combine_first(series)

        section_end_pages.at[section_end_pages.index[-1]] = len(self.doc)
        
        toc['End Page'] = section_end_pages.astype('int32')
        toc['Level'] = toc['Level'].astype('int32')
        return toc

    def fetch_content(self,
                      start_page: int, 
                      end_page: int = None,
                      table_format: Optional[str] = 'csv',
                     ):
        all_content = []
    
        for page_num in range(start_page, (start_page if end_page is None else end_page)+1):
            page = self.doc[page_num]
            all_text = page.get_text("text", clip = self.head_footer_clipping)
            table_texts = []
            table_contents = []
        
            for table in page.find_tables():
                table_bbox = table.bbox
                table_text = page.get_textbox(table_bbox)
                table_texts.append(table_text)
                
                if table_format.lower() == 'csv':
                    table_contents.append(table.to_pandas().to_csv(index=False))
                if table_format.lower() == 'markdown':
                    table_contents.append(table.to_pandas().to_markdown(index=False))    
        
            for table_text, table_content in zip(table_texts,table_contents):
                all_text = all_text.replace(table_text, f"\n\n<<{table_format.upper()}_TABLE>>\n{table_content}\n<<{table_format.upper()}_TABLE>>\n\n")
        
            all_content.append(all_text)
        
        return '\n'.join(all_content)

In [293]:
class ReferenceDocs:
    def __init__(self, 
                 pathlist: List[str],
                 boundingBoxes: Optional[List[tuple]] = None,
                ):
        self.docs = [_RichDocument(path) for path in pathlist]
        self.filenames = [doc.filename for doc in self.docs]
        toc_list = []
        for i, doc in enumerate(self.docs):
            toc_list.append(pd.DataFrame(data=[[0, self.filenames[i], 1, len(doc)]], columns=doc.toc.columns))
            toc_list.append(doc.toc)
        self.__toc = pd.concat(toc_list, ignore_index=True)

    def get_toc(self, filters: Optional[Dict] = None):
        '''
        filters = {"sections": List[<filenames>],
                   "level
        '''
        if filters is None:
            return self.__toc
        
            
    def __str__(self):
        return 'Documents in instance:\n--> '+'\n--> '.join(self.filenames)

In [277]:
mans = ReferenceDocs(['./data/Akta_Pure_User_Manual.PDF', './data/UNICORN7_System_Control_Manual.pdf'])

In [278]:
print(mans)

Documents in instance:
--> Akta_Pure_User_Manual.PDF
--> UNICORN7_System_Control_Manual.pdf


## Add a ToC Scoping Function

### How would the GPT system flow?
1. Here is the job, here are the docs, first lets get some context, which page ranges would you like to look into? <Pass upto level 1>
2. 

`Find Title` -> `Save Starting Index` -> `Itterrows till the next instance of the same level` -> `Save Ending Index` -> `Filter out index range` -> `Filter out `

In [309]:
df = mans.get_toc()

In [319]:
df[df.Level <= 1]

Unnamed: 0,Level,Title,Start Page,End Page
0,0,Akta_Pure_User_Manual.PDF,1,519
1,1,Coverpage,1,1
2,1,Table of Contents,2,5
3,1,1 Introduction,6,12
7,1,2 The ÄKTA pure instrument,13,85
29,1,3 ÄKTA pure external modules,86,122
44,1,4 System configuration,123,147
60,1,5 Operation,148,223
82,1,6 Performance tests,224,250
93,1,7 Maintenance,251,354


In [311]:
df[df.Level == 0]

Unnamed: 0,Level,Title,Start Page,End Page
0,0,Akta_Pure_User_Manual.PDF,1,519
191,0,UNICORN7_System_Control_Manual.pdf,1,52


In [312]:
def split_dataframe_at_indices(dataframe, indices):
    """
    Split a DataFrame at specified indices.
    
    Parameters:
        dataframe (pd.DataFrame): Input DataFrame.
        indices (list): List of indices where the DataFrame should be split.
        
    Returns:
        list: List of DataFrames split at specified indices.
    """
    sorted_indices = sorted(indices)
    split_dataframes = []
    if sorted_indices[0] != 0:
        split_dataframes.append(dataframe.iloc[:sorted_indices[0]])
    for i in range(len(sorted_indices) - 1):
        split_dataframes.append(dataframe.iloc[sorted_indices[i]:sorted_indices[i + 1]])
    if sorted_indices[-1] != len(dataframe) - 1:
        split_dataframes.append(dataframe.iloc[sorted_indices[-1]:])    
    return split_dataframes

In [315]:
splits = split_dataframe_at_indices(df, [0,191])

In [317]:
splits[1]

Unnamed: 0,Level,Title,Start Page,End Page
191,0,UNICORN7_System_Control_Manual.pdf,1,52
192,1,Coverpage,1,4
193,1,1 Introducing UNICORN System Control,5,9
194,2,1.1 About this manual,6,6
195,2,1.2 About System Control of UNICORN,7,7
196,2,1.3 Important user information,8,8
197,2,1.4 Associated documentation,9,9
198,1,2 General UNICORN operation,10,14
199,2,2.1 Log on and log off routines,11,12
200,2,2.2 Help functions,13,14


In [294]:
def create_nested_structure(dataframe, level=0, parent=None):
    nested_dict = {}
    subset = dataframe[dataframe['Level'] == level]
    for index, row in subset.iterrows():
        section_info = {
            'Start Page': row['Start Page'],
            'End Page': row['End Page']
        }
        if level < max(dataframe['Level']):
            subsections = create_nested_structure(dataframe, level + 1, parent=row['Title'])
            section_info.update(subsections)
        nested_dict[row['Title']] = section_info
    return nested_dict

nested_structure = create_nested_structure(mans.get_toc())

In [263]:
def filter_subsections(nested_structure, titles):
    """
    Filter subsections based on provided titles.
    
    Args:
    nested_structure (dict): Nested dictionary representing the hierarchical structure.
    titles (str or list): Title or list of titles to filter the subsections.
    
    Returns:
    pd.DataFrame: DataFrame containing the filtered subsections.
    """
    if isinstance(titles, str):
        # Convert single title to list for uniform processing
        titles = [titles]
    
    filtered_subsections = nested_structure
    for title in titles:
        if title in filtered_subsections:
            filtered_subsections = filtered_subsections[title]
        else:
            # Title not found, return an empty DataFrame
            return pd.DataFrame()
    
    # Handle leaf nodes (page numbers)
    if not isinstance(filtered_subsections, dict):
        return pd.DataFrame([{'Title': titles[-1], 'Start Page': filtered_subsections, 'End Page': filtered_subsections}])
    
    # Convert the filtered nested dictionary to DataFrame
    data = []
    for section, subsections in filtered_subsections.items():
        if isinstance(subsections, dict):
            row = {'Title': section, 'Start Page': subsections.get('Start Page', None), 'End Page': subsections.get('End Page', None)}
            data.append(row)
    
    filtered_df = pd.DataFrame(data)
    return filtered_df

In [267]:
# Example usage
filtered_df = filter_subsections(nested_structure, ['Akta_Pure_User_Manual.PDF',])
filtered_df

Unnamed: 0,Title,Start Page,End Page
0,Coverpage,1,4
1,Table of Contents,2,5
2,1 Introduction,6,12
3,2 The ÄKTA pure instrument,13,85
4,3 ÄKTA pure external modules,86,122
5,4 System configuration,123,147
6,5 Operation,148,223
7,6 Performance tests,224,250
8,7 Maintenance,251,354
9,8 Troubleshooting,355,409
