In [1]:
# this package changes part of pydantic and need to be installed first
%pip install llama-index-llms-anthropic==0.4.0
%pip install llama-index-extractors-entity==0.3.1
!pip install llama-index==0.12.5
!pip install python-docx==1.1.2
!pip install python-pptx==0.6.23
!pip install openpyxl==3.1.4
!pip install PyPDF2==3.0.1
!pip install docx2txt
!sudo apt-get update
!sudo apt-get install libgl1-mesa-glx -y
!sudo apt-get update
!sudo apt install ghostscript python3-tk -y
!pip install ghostscript
!pip install llama-index-readers-pdf-table
!pip install pymupdf

Collecting llama-index-llms-anthropic==0.4.0
  Downloading llama_index_llms_anthropic-0.4.0-py3-none-any.whl.metadata (5.8 kB)
Collecting anthropic>=0.39.0 (from anthropic[bedrock,vertex]>=0.39.0->llama-index-llms-anthropic==0.4.0)
  Downloading anthropic-0.46.0-py3-none-any.whl.metadata (23 kB)
Collecting llama-index-core<0.12.0,>=0.11.0 (from llama-index-llms-anthropic==0.4.0)
  Downloading llama_index_core-0.11.23-py3-none-any.whl.metadata (2.5 kB)
Collecting boto3>=1.28.57 (from anthropic[bedrock,vertex]>=0.39.0->llama-index-llms-anthropic==0.4.0)
  Downloading boto3-1.36.24-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore>=1.31.57 (from anthropic[bedrock,vertex]>=0.39.0->llama-index-llms-anthropic==0.4.0)
  Downloading botocore-1.36.24-py3-none-any.whl.metadata (5.7 kB)
Collecting dataclasses-json (from llama-index-core<0.12.0,>=0.11.0->llama-index-llms-anthropic==0.4.0)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting dirtyjson<2.0.0,>=1.0.8

In [3]:
file_folder = './example'
image_folder = './images'

## Ingestion of text and table, extraction of images

In [4]:
from llama_index.core import SimpleDirectoryReader
from llama_index.readers.pdf_table import PDFTableReader
import docx2txt
from docx import Document as DocxDocument
import openpyxl
from pptx import Presentation
import PyPDF2
import textwrap
from PIL import Image as PILImage
from io import BytesIO
from pathlib import Path

from llama_index.core.schema import Document

import os
import fitz
import io

class ChunkedDocument(Document):
    @classmethod
    def from_document(cls, doc: Document, chunk_size: int = 1000, overlap: int = 100):
        chunks = textwrap.wrap(doc.text, chunk_size, break_long_words=False, replace_whitespace=False)
        chunked_docs = []

        for i, chunk in enumerate(chunks):
            if i > 0:
                # Add overlap from previous chunk
                chunk = chunks[i-1][-overlap:] + chunk
            if i < len(chunks) - 1:
                # Add overlap to next chunk
                chunk = chunk + chunks[i+1][:overlap]

            chunked_doc = cls(text=chunk, extra_info=doc.extra_info.copy())
            chunked_doc.extra_info['chunk_id'] = i
            chunked_docs.append(chunked_doc)

        return chunked_docs

class DocxReader:
    def load_data(self, file, extra_info=None):
        try:
            with open(file, 'rb') as f:
                doc = DocxDocument(f)
                full_text = '\n'.join(paragraph.text for paragraph in doc.paragraphs if paragraph.text)
                metadata = {
                    'title': doc.core_properties.title,
                    'filename': os.path.basename(file),
                    'author': doc.core_properties.author,
                    'keywords': doc.core_properties.keywords,
                    'created': doc.core_properties.created.isoformat() if doc.core_properties.created else None,
                    'modified': doc.core_properties.modified.isoformat() if doc.core_properties.modified else None,
                    'type': 'text'
                }
                all_documents = [Document(text=full_text, extra_info=metadata)]

                # extract tables from docs
                metadata['type'] = 'table'
                for table in doc.tables:
                    table_content = []
                    for row in table.rows:
                        row_data = [cell.text for cell in row.cells]
                        table_content.append(','.join(row_data))
                    all_documents.append(Document(text='\n'.join(table_content), extra_info=metadata))

                # extract images from docx
                rels = doc.part.rels
                for rel in rels:
                    if "image" in rels[rel].target_ref:
                        img = rels[rel].target_part
                        img_data = img.blob
                        file_name = os.path.basename(file)
                        img_filename = file_name + '_' + os.path.basename(img.partname)
                        image = Image.open(BytesIO(img_data))
                        if image.width >= 100 and image.height >= 100:
                            with open(os.path.join(image_folder, img_filename), "wb") as f:
                                f.write(img_data)

                return all_documents
        except Exception as e:
            print(f"Erreur lors de la lecture du fichier DOCX {file}: {e}")
            pass

class XlsxReader:
    def load_data(self, file, extra_info=None):
        try:
            with open(file, 'rb') as f:
                wb = openpyxl.load_workbook(f)
                text = []
                for sheet in wb:
                    for row in sheet.iter_rows(values_only=True):
                        text.append(' '.join(str(cell) for cell in row if cell is not None))
                metadata = {
                    'title': wb.properties.title,
                    'filename': os.path.basename(file),
                    'creator': wb.properties.creator,
                    'created': wb.properties.created.isoformat() if wb.properties.created else None,
                    'modified': wb.properties.modified.isoformat() if wb.properties.modified else None,
                    'sheet_names': wb.sheetnames,
                    'type': 'text'
                }
                # extract images
                # TODO: wmf image can not be extracted
                image_count = 0
                for sheet_name in wb.sheetnames:
                    sheet = wb[sheet_name]
                    for image in sheet._images:
                        img_bytes = image._data()
                        img_pil = PILImage.open(io.BytesIO(img_bytes))
                        image_path = os.path.join(image_folder, f'{os.path.basename(file)}_image{image_count}.png')
                        img_pil.save(image_path)
                        image_count += 1

                return [Document(text='\n'.join(text), extra_info=metadata)]
        except Exception as e:
            print(f"Erreur lors de la lecture du fichier XLSX {file}: {e}")
            pass

class PptxReader:
    def load_data(self, file, extra_info=None):
        try:
            with open(file, 'rb') as f:
                prs = Presentation(f)
                full_text = []
                for slide in prs.slides:
                    for shape in slide.shapes:
                        if hasattr(shape, 'text') and shape.text:
                            full_text.append(shape.text)
                metadata = {
                    'title': prs.core_properties.title,
                    'filename': os.path.basename(file),
                    'author': prs.core_properties.author,
                    'created': prs.core_properties.created.isoformat() if prs.core_properties.created else None,
                    'modified': prs.core_properties.modified.isoformat() if prs.core_properties.modified else None,
                    'slides': len(prs.slides),
                    'type': 'text'
                }
                all_documents = [Document(text='\n'.join(full_text), extra_info=metadata)]

                image_count = 0
                for slide in prs.slides:
                    text_forms = ''
                    for shape in slide.shapes:
                        # extract smartArt
                        if shape.shape_type == 1:
                            text = shape.text_frame.text
                            metadata['type'] = 'smartart'
                            all_documents.append(Document(text=text, extra_info=metadata))
                        # extract tables
                        elif shape.has_table:
                            table = shape.table
                            table_data = []
                            for row in table.rows:
                                row_data = [cell.text for cell in row.cells]
                                table_data.append(','.join(row_data))
                            metadata['type'] = 'table'
                            all_documents.append(Document(text='\n'.join(table_data), extra_info=metadata))
                        # extract text from forms
                        elif shape.has_text_frame:
                            text_forms += shape.text_frame.text + '\n'
                        # extract images
                        elif shape.shape_type == 13:
                            image = shape.image
                            image_bytes = image.blob
                            image_filename = f'{os.path.basename(file)}_image{image_count}.{image.ext}'
                            image_path = os.path.join(image_folder, image_filename)
                            with open(image_path, 'wb') as f:
                                f.write(image_bytes)
                            image_count += 1
                    if len(text_forms) > 30:
                        metadata['type'] = 'forms'
                        all_documents.append(Document(text=text_forms, extra_info=metadata))

                return all_documents
        except Exception as e:
            print(f"Erreur lors de la lecture du fichier PPTX {file}: {e}")
            pass


class PdfReader:
    def load_data(self, file, extra_info=None):
        try:
            with open(file, 'rb') as f:
                pdf_reader = PyPDF2.PdfReader(f)
                text = ""
                for page in pdf_reader.pages:
                    text += page.extract_text()
                metadata = {
                    'filename': os.path.basename(file),
                    'num_pages': len(pdf_reader.pages),
                    'type': 'text'
                }

                # camelot to extract tables
                metadata['type'] = 'table'
                reader = PDFTableReader()
                pdf_path = Path(file)
                documents = reader.load_data(file=pdf_path, extra_info=metadata)
                documents.append(Document(text=text, extra_info=metadata))

                # extract images
                self._extract_images(file)

                return documents
        except Exception as e:
            print(f"Erreur lors de la lecture du fichier PDF {file}: {e}")
            pass

    def _extract_images(self, file):
        pdf = fitz.open(file)
        file_name = os.path.basename(file)
        page_number = 0
        for page in pdf:
            page_number += 1
            image_number = 0
            for block in page.get_text("dict")["blocks"]:
                # Skip if not an image block
                if block["type"] != 1:
                    continue
                # skip if image mostly black
                if self.is_mostly_black(block["image"]) == False:
                    continue
                # skip if a span window or a small icon
                if block['width'] >= 100 and block['height'] >= 100:
                    image_number += 1
                    pix = page.get_pixmap(dpi=300, clip=block['bbox'])
                    output_file_name = f"{image_folder}/{file_name}_image{image_number}.png"
                    pix.pil_save(output_file_name)

    def is_mostly_black(self, image_data, threshold=0.8):
        # Load the image
        img = Image.open(io.BytesIO(image_data))

        # Convert to grayscale
        img_gray = img.convert("L")

        # Calculate the number of non-black pixels
        num_non_black = sum(1 for pixel in img_gray.getdata() if pixel > 0)

        # Calculate the total number of pixels
        total_pixels = img_gray.width * img_gray.height

        # Calculate the percentage of non-black pixels
        percentage_non_black = num_non_black / total_pixels

        # Check if the percentage exceeds the threshold
        return percentage_non_black >= threshold

file_extractor = {
    ".docx": DocxReader(),
    ".xlsx": XlsxReader(),
    ".pptx": PptxReader(),
    ".pdf": PdfReader()
}
try:
    documents = SimpleDirectoryReader(file_folder, file_extractor=file_extractor).load_data()
    print("Documents chargés avec succès.")
except Exception as e:
    print(f"Erreur lors du chargement des documents: {e}")
    pass



Documents chargés avec succès.


## results from rule-based method

In [19]:
import re

def extract_entities_rule_based(text):
    entities = {
        "Company": re.findall(r"Company:\s*([\w\s]+)", text),
        "Sponsor": re.findall(r"Sponsor:\s*([\w\s]+)", text),
        "Investor": re.findall(r"Investor:\s*([\w\s]+)", text),
        "Pre-money Valuation": re.findall(r"Pre-money Valuation:\s*([\w\s\d]+)", text),
        "Amount of Financing": re.findall(r"Amount of Financing:\s*([\w\s\d,]+)", text),
        "Security Type": re.findall(r"Type of Security:\s*([\w\s]+)", text),
        "Dividends": re.findall(r"Dividends:\s*([\w\s\d%,]+)", text),
        "Liquidation Preference": re.findall(r"Liquidation Preference:\s*([\w\s\d%,]+)", text),
        "Exit Period": re.findall(r"Exit Period:\s*([\d]+ months)", text),
        "Voting Rights": re.findall(r"Voting Rights:\s*([\w\s\d]+)", text),
    }

    return {key: value[0] if value else None for key, value in entities.items()}

for document in documents:
    print(extract_entities_rule_based(document.to_embedchain_format()['data']['content']))

{'Company': 'XX Technologies Private Limited  Sponsor', 'Sponsor': 'XX ', 'Investor': 'Infuse Capital ', 'Pre-money Valuation': 'XX ', 'Amount of Financing': 'XX, to be invested by Investor in 1 tranche', 'Security Type': 'Convertible Preference Shares ', 'Dividends': 'The holders of the Preference shares, shall be entitled to receive non', 'Liquidation Preference': 'In the event of any liquidation or winding up of the Company, the Company will first pay the Investor, the higher of the fair market value as determined by an independent banker appointed by the Board or 2X of the original purchase price or 25% IRR on the investment amount, including any paid dividends, before making any distribution to other existing shareholders', 'Exit Period': None, 'Voting Rights': 'Subject to applicable law'}
{'Company': None, 'Sponsor': None, 'Investor': None, 'Pre-money Valuation': None, 'Amount of Financing': None, 'Security Type': None, 'Dividends': None, 'Liquidation Preference': None, 'Exit Per

## results from NER model

In [20]:
import spacy

nlp = spacy.load("en_core_web_sm")

def extract_entities_spacy(text):
    doc = nlp(text)
    extracted_entities = {ent.label_: ent.text for ent in doc.ents}
    return extracted_entities

for document in documents:
    print(extract_entities_spacy(document.to_embedchain_format()['data']['content']))

{'CARDINAL': '3', 'GPE': 'XX', 'DATE': '3) months', 'ORG': 'TIMES', 'PERSON': 'this Term Sheet', 'WORK_OF_ART': 'Expiration: No-Shop', 'PRODUCT': 'Rs 10', 'PERCENT': '10%', 'ORDINAL': 'third', 'FAC': 'Annex A', 'NORP': 'Company', 'LOC': 'the Closing Date', 'TIME': '5:00 p.m.'}
{'ORG': 'EVG', 'CARDINAL': '2Y', 'DATE': 'Quarterly', 'PERSON': 'Estr'}
{'PRODUCT': 'ZF4894', 'ORG': 'ALV', 'PERCENT': '75%', 'DATE': '07 August 2026'}
{'ORG': 'XETRA\nCoupon (C),0%\nBarrier', 'DATE': 'August 2026', 'WORK_OF_ART': 'Initial Valuation', 'CARDINAL': '1 million', 'LOC': 'the Effective Date              \nValuation', 'GPE': 'Shareini'}
{}
{'ORG': 'Party B and'}


## results from entity extractor and llm query

In [24]:
import nltk

nltk.download('punkt_tab')

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


True

In [None]:
from llama_index.llms.openai import OpenAI
from llama_index.core import Settings
from llama_index.core.ingestion import IngestionPipeline
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.extractors.entity import EntityExtractor

os.environ["OPENAI_API_KEY"] = ""

Settings.embed_model = OpenAIEmbedding(embed_batch_size=10)

def build_pipeline():

    openai_llm = OpenAI(model="gpt-3.5-turbo-1106", temperature=0.1)
    transformations = [
        EntityExtractor(prediction_threshold=0.5),
        Settings.embed_model,
    ]

    return IngestionPipeline(transformations=transformations)

In [25]:
pipline = build_pipeline()
nodes = await pipline.arun(documents=documents,show_progress=True)

Extracting entities:   0%|          | 0/6 [00:00<?, ?it/s]

Generating embeddings: 100%|██████████| 1/1 [00:00<00:00,  1.22it/s]


In [26]:
for node in nodes:
    print(node.metadata)

{'filename': 'BankABC_TermSheet_Template.pdf', 'num_pages': 9, 'type': 'table', 'entities': ['Infuse Capital', 'Surya Power Magic Private Limited', 'CIIE Initiatives']}
{'file_path': '/content/example/FR001400QV82_AVMAFC_30Jun2028.txt', 'file_name': 'FR001400QV82_AVMAFC_30Jun2028.txt', 'file_type': 'text/plain', 'file_size': 195, 'creation_date': '2025-02-19', 'last_modified_date': '2025-02-19', 'entities': ['BANK ABC']}
{'title': 'Hedge zu DZ6DUH', 'filename': 'ZF4894_ALV_07Aug2026_physical.docx', 'author': 'DZ-Bank', 'keywords': '', 'created': '2025-01-13T13:48:00+00:00', 'modified': '2025-02-19T06:34:00+00:00', 'type': 'text'}
{'title': 'Hedge zu DZ6DUH', 'filename': 'ZF4894_ALV_07Aug2026_physical.docx', 'author': 'DZ-Bank', 'keywords': '', 'created': '2025-01-13T13:48:00+00:00', 'modified': '2025-02-19T06:34:00+00:00', 'type': 'table', 'entities': ['Reuters', 'BANK ABC', 'Allianz SE']}
{'title': 'Hedge zu DZ6DUH', 'filename': 'ZF4894_ALV_07Aug2026_physical.docx', 'author': 'DZ-Bank

In [29]:
from llama_index.core import VectorStoreIndex

index = VectorStoreIndex(nodes=nodes)
query_engine = index.as_query_engine()
response = query_engine.query("What is Infuse Capital?")
print(response)

Infuse Capital is a party involved in a proposed financing of Surya Power Magic Private Limited as outlined in the Term Sheet.


## chunk document

In [5]:
# save document in the json file
import json

json_str = []
for document in documents:
    json_str.append(document.to_embedchain_format())
with open('document_data.json', 'w') as json_file:
    json.dump(json_str, json_file)

In [None]:
# restore document from the json file
with open('document_data.json', 'r') as json_file:
    json_str = json.load(json_file)
documents = []
for doc in json_str:
    documents.append(Document().from_embedchain_format(doc))

In [None]:
try:
    chunked_documents = []
    for doc in documents:
        chunked_documents.extend(ChunkedDocument.from_document(doc))
    print("Documents découpés avec succès.")
except Exception as e:
    print(f"Erreur lors du chargement des documents: {e}")
    pass