In [1]:
import pdfplumber
import fitz
import warnings , math, collections , os, re
import pickle
import numpy as np
import pandas as pd

warnings.filterwarnings("ignore", category=UserWarning) 

In [60]:
path = r"C:\Users\Kaustubh.keny\OneDrive - Cogencis Information Services Ltd\Documents\mywork-repo"
#path = r"C:\Users\rando\OneDrive\Documents\mywork-repo"

samco_path = path + r"\files\58_29-Feb-24_FS.pdf"
dry_run_path = path + r"\output\DryRun.pdf"
indice_path = path + r"\output\pkl\indices_var.pkl"

In [81]:
def get_financial_indices(path:str):
    final_indices = set()
    with open(path , 'rb') as file:
        indices = pickle.load(file)  
        for k,v in indices.items():
            temp = [k] + v
            for t in temp:
                final_indices.add(t)
    
    return list(final_indices)

""" Highlights important financial indices in the pdf, does other pre
analysis of data.
Args: list of indices, string of pdf path
Returns: dict of pages highlighted, string of output pdf, dict of pages contaiting FUND NAMES
"""
def check_indice_highlight(path:str, indices_variations:list, fund_pattern:str, fund_size:int):
    doc = fitz.open(path)
    page_count = doc.page_count #No of pages
    
    pages = [i for i in range(page_count)]
    important_pages = dict.fromkeys(pages, 0)
    fund_titles = dict.fromkeys(pages, "")


    for page_num, page in enumerate(doc):
        
        text_instances = page.get_text('dict')["blocks"]
        
        #sort for all data in pdf document 
        sorted_text_instances = sorted(text_instances, key=lambda x: (x['bbox'][1], x['bbox'][0]))
        
        # rect = fitz.Rect((35,120,250,765))
        # page.add_highlight_annot(rect)

        for pgn,block in enumerate(sorted_text_instances):     
            if "lines" not in block:
                continue
            
            for line in block["lines"]: 
                for span in line["spans"]:
                    if span['flags'] in [20,25]:  # learn flag logic , rn set for all flags value
                        span_text = span['text'].strip().lower()
                        
                        
                        #FUND PAGE CHECK
                        conditions = [
                            pgn in range(0,15),
                            re.match(fund_pattern, span_text, re.IGNORECASE),
                            span['size'] >fund_size
                        ]
                        if all(conditions):
        
                            fund_titles[page_num] = span_text  
                        
                        #CHECK IMP FINANCE INDICES  
                        for term in indices_variations:  
                            pattern = r'\b' + re.escape(term.lower()) + r'\b'
                            if re.search(pattern, span_text):

                                #count highlights
                                important_pages[page_num] +=1
                                #mark content
                                rect = fitz.Rect(span['bbox']) 
                                page.add_highlight_annot(rect)
                                break  #optional , one highlight

    
    output_path = None
    if any(important_pages.values()):
        output_path = path.replace('.pdf', '_highlighted.pdf')
        doc.save(output_path)

    doc.close()
    return important_pages, output_path, fund_titles


""" Get the clipped data in the bbox provided and store in nested dict
Args: input path, dryrun path, important pages, bbox coords
Returns: dict { 'page' : int 'block': dict}"""
def get_clipped_data(input:str, output:str, pageSelect:list, bbox:list[set], fund_names:dict):
    
    document = fitz.open(input)
    finalData = []
    
    for pgn in pageSelect:
        #get the page
        page = document[pgn]
        fundName = fund_names[pgn]

        #get all block
        final_blocks = []
        for box in bbox:
            blocks = page.get_text('dict', clip = box)['blocks'] #get all blocks
            filtered_blocks = [block for block in blocks if block['type']==0 and 'lines' in block] #only text blocks
            sorted_blocks = sorted(filtered_blocks, key= lambda x: (x['bbox'][1], x['bbox'][0]))
            final_blocks.extend(sorted_blocks)
        
        
        finalData.append({
            "page": pgn,
            "fundname": fundName,
            "block": final_blocks,
        })
            
    return finalData


def get_pdf_data(input:str, pageSelect:list, fund_names:dict):
    
    document = fitz.open(input)
    finalData = []
    
    for pgn in pageSelect:
        #get the page
        page = document[pgn]
        fundName = fund_names[pgn]
    
        blocks = page.get_text('dict')['blocks'] #get all blocks
        
        filtered_blocks = [block for block in blocks if block['type']==0 and 'lines' in block]
        sorted_blocks = sorted(filtered_blocks, key= lambda x: (x['bbox'][1], x['bbox'][0]))
        
        finalData.append({
            "page": pgn,
            "fundname": fundName,
            "block": sorted_blocks,
        })
            
    return finalData

def extract_span_data(data:list, name:list): #all
    final_data = dict()
    for pgn,page in enumerate(data):
        pgn_content = []
        for blocks in page['block']:
            for line in blocks['lines']:
                spans = line.get('spans',[])
                for span in spans:
                    
                    text = span['text'].strip()
                    size = span['size']
                    color = span['color']
                    origin = span['origin']
                    bbox = span['bbox']
                
                    pgn_content.append([size,text,color,origin,bbox])
                    
        final_data[page['fundname']] = pgn_content
    
    return final_data


from collections import defaultdict

def clean_block_data(blocks):
    
    remove_text = ['Purchase','Amount','thereafter','.','. ',',',':','st',";","-",'st ',' ','th', 'th ', 'rd', 'rd ', 'nd', 'nd ','','`','(Date of Allotment)']
    
    sorted_blocks = sorted(blocks, key=lambda x: (x[3][1],x[3][0]))
    
    cleaned_blocks = []
    for block in sorted_blocks:
        size, text, color, origin, bbox = block
        if text not in remove_text:
            cleaned_blocks.append(block)
 
    processed_blocks = []
    # adjust size based on color and size
    for block in cleaned_blocks:
        size, text, color, origin, bbox = block
        text = text.strip()
        if size in [9.0,8.0] and color == -1:
            size = 20.0  # Update size to 20.0
        processed_blocks.append([size, text, color, origin, bbox])
                

    # group blocks by rounded y-coordinate
    grouped_blocks = defaultdict(list)
    for block in processed_blocks:
        y_coord = math.ceil(block[3][1])# Extract and round the y-coordinate
        size = block[0]
        grouped_blocks[(y_coord,size)].append(block)

    # Combine blocks with the same y-coordinate
    combined_blocks = []
    for key, group in grouped_blocks.items():
        
        if key[1] == 20:
            combined_text = " ".join(item[1] for item in group).strip()
            if combined_text:  # Ignore whitespace-only text
                size, color, origin, bbox = group[0][0], group[0][2], group[0][3],group[0][4]
                combined_blocks.append([size, combined_text, color, origin,bbox])
        
        else:
            for item in group:
                combined_blocks.append(item)

    return combined_blocks

def process_text_data(text_data):
    
    updated_text_data = {}

    for fund, data in text_data.items():
        blocks = data
        cleaned_blocks = clean_block_data(blocks)
        updated_text_data[fund] = cleaned_blocks

    return updated_text_data



In [82]:

financial_indices = get_financial_indices(indice_path)
pdfInd = pd.DataFrame({'indexes': financial_indices})

excel_path  = path+ r'\files\financial_indices.xlsx'
pdfInd.indexes.to_excel(excel_path) #remove first col

In [83]:
file_path  = samco_path
financial_indices = get_financial_indices(indice_path)
fund_pattern = r"^(samco|tata|canara)"
fund_size = 16

highlight_pages, saved_path, fund_pages =  check_indice_highlight(file_path, financial_indices, fund_pattern, fund_size)

In [84]:
pagedf = pd.DataFrame({'title': fund_pages.values(),'highlight_count': highlight_pages.values()})

"""_summary_ fund is located only on certain pages, based on no. of 
highlights we know which pages are imp. automate this content later
"""
pagedf.to_excel(path + r'\output\example.xlsx')

In [86]:
pages = [ 3,5,7,9,11]
bbox = [(35,120,250,765)]
fund_pages = fund_pages
file_path = samco_path

data = get_clipped_data(file_path, dry_run_path, pages, bbox, fund_pages)
text_data = extract_span_data(data,[])
cleaned_data = process_text_data(text_data) #personalized

In [133]:
header_size = 20
content_size = 10

def create_nested_dict(cleaned_data:dict,header_size:float, content_size:float):
    final_text_data = dict()
    final_matrix = dict()

    for fund, items in cleaned_data.items(): #ech fund
        
        #step 1 extract size, coord
        coordinates = list()
        sizes = set()
        
        for item in items: #size,text,color,origin
            origin = tuple(item[3])
            coordinates.append(origin)
            sizes.add(item[0])
        
        coordinates = sorted(set(coordinates), key=lambda c: (c[1], c[0]))  # Sort by y, then x
        sizes = sorted(sizes, reverse=True)  
        
        #step 2 create matrix
        coord_to_index = {coord: idx for idx, coord in enumerate(coordinates)}  # (x,y) at pos 0 etc. ROWS
        size_to_index = {font: idx for idx, font in enumerate(sizes)}  # COLUMNS
        matrix = np.zeros((len(coordinates), len(sizes)), dtype=object)
        
        
        #step 3
        nested_dict = {}
        current_header = None
        for item in items:
            origin = tuple(item[3])
            size = item[0]
            text = item[1].strip()
            
            #populate the matrix
            if origin in coord_to_index and size in size_to_index:
                row = coord_to_index[origin]
                col = size_to_index[size]
                
                if matrix[row,col] == 0:
                    matrix[row,col] ==r"nil"
                matrix[row,col] == text
            
            #build nested dict
            if size == header_size:
                current_header = " ".join(txt for txt in text.split(" ") if txt !="").lower()
                nested_dict[current_header] = []
            elif size<= content_size and current_header:
                nested_dict[current_header].append(item)
                
        final_text_data[fund] = nested_dict        
        matrix_df = pd.DataFrame(matrix, index=coordinates, columns=sizes)
        final_matrix[fund] = matrix_df
    
    return final_text_data, final_matrix
def generate_pdf_from_data(data:list, output_path:str):
    
    pdf_doc = fitz.open()
    
    for header, items in data.items():
        
        page = pdf_doc.new_page()
        text_position = 72  # for title initalize something

        #section title
        title_font_size = 24
        try:
            page.insert_text(
                (72, text_position), #initalizor
                header,
                fontsize=title_font_size,
                fontname="helv",
                color=(0, 0, 1),
            )        
        except Exception as e:
            print(f"Error while parsing fund {e}")
            
        for item in items:
            
            bbox = item[3] #origin coords
            text = item[1]
            size = item[0]
            color = item[2]
   
            #Errror in fitz font 
            try:
                page.insert_text(
                    (bbox[0], bbox[1]),
                    text,
                    fontsize=size,
                    fontname="helv",
                    color=tuple(int(color & 0xFFFFFF) for _ in range(3)))#unsigned int value so (0,0,0)
                
            except Exception:
                page.insert_text(
                    (bbox[0], bbox[1]),
                    text,
                    fontsize=size,
                    fontname="helv",
                    color=(1, 0, 0),
                )

    # Save the created PDF
    pdf_doc.save(output_path)
    pdf_doc.close()
    print(f" PDF generated to: {output_path}")

def extract_data_from_pdf(path:str):
    
    def replace_main_key(string: str):
        replace_key = string
        if re.match(r'^nav.*', string, re.IGNORECASE):
            replace_key = "nav"
        elif re.match(r"^market", string, re.IGNORECASE):
            replace_key = "market_capital"  
        elif re.match(r"^assets", string, re.IGNORECASE):
            replace_key = "assets_under_management"
        elif re.match(r"^fund", string, re.IGNORECASE):
            replace_key = "fund_manager" 
        elif re.match(r"^scheme", string, re.IGNORECASE):
            replace_key = "scheme_details" 
        elif re.match(r"^investment", string, re.IGNORECASE):
            replace_key = "investment_objective"
        elif re.match(r"^quanti", string, re.IGNORECASE):
            replace_key = "quantitative_data"
        elif re.match(r"^portfolio", string, re.IGNORECASE):
            replace_key = "portfilio" 
        elif re.match(r"^industry", string, re.IGNORECASE):
            replace_key = "industry_allocation_of_equity"       
        return replace_key
    
    with pdfplumber.open(path) as pdf:
        final_data = []
        final_data_generated = {}
        
        for page in pdf.pages:
            # extract text from the page
            text = page.extract_text()
            final_data.append(text)
        #print(final_data)
        
        #store them in a dict for each page
        for data in final_data:
            content = data.split('\n')
            main_key = replace_main_key(content[0])
            print(main_key)
            values = content[1:]
        
            final_data_generated[main_key] = values

        #sort the headers in lex order
        sorted_final_generated = {key: final_data_generated[key] for key in sorted(final_data_generated)}

    return sorted_final_generated

In [134]:
final_text_data, final_matrix = create_nested_dict(cleaned_data, 20.0 , 10.0)

In [None]:
final_extracted_text = dict()
for fund, items in final_text_data.items():
    print(fund)
    generate_pdf_from_data(items, dry_run_path)
    extract_data = extract_data_from_pdf(dry_run_path)
    final_extracted_text[fund] = extract_data

In [136]:
import json
with open(path +r"\output\dump58_29-Feb-24_FS.json", "w", encoding="utf-8") as file:
    json.dump(final_extracted_text, file, ensure_ascii=False, indent=4)
    print("JSON CREATED\n")

JSON CREATED



In [160]:
imp_indices = sorted({indices for value in final_extracted_text.values() for indices in value.keys()})
imp_indices

['assets_under_management',
 'fund_manager',
 'investment_objective',
 'nav',
 'quantitative_data',
 'scheme_details']

In [140]:
#REGEX FUNCTIONS

def return_invest_data(key:str,data:list):
    investment_objective = data
    values = " ".join(txt for txt in investment_objective)

    data = {
        key:values
    }

    return data

def return_scheme_data(key:str,data:list):
    scheme_data = data
    main_key = key
    structured_data = {main_key: {}}

    # Patterns
    date_pattern = r"^(.*?date)\s(\d{2}-[A-Za-z]{3}-\d{4})$"
    benchmark_pattern = r"^(Benchmark)\s+(.*)$"
    application_pattern = r"(?:·)?\d+(?:,\d{3})*(?:\.\d+)?/-"

    for data in scheme_data:
        if re.search(date_pattern, data, re.IGNORECASE):
            match = re.match(date_pattern, data, re.IGNORECASE)
            if match:
                key = match.group(1)
                value = match.group(2)
                structured_data[main_key][key] = value
        elif re.search(benchmark_pattern, data, re.IGNORECASE):
            match = re.match(benchmark_pattern, data, re.IGNORECASE)
            if match:
                key = match.group(1)
                value = match.group(2)
                structured_data[main_key][key] = value
        elif re.search(r"\b(min|application)\b", data, re.IGNORECASE):
            matches = re.findall(application_pattern, data, re.IGNORECASE)
            if matches:
                cleaned_matches = [match.replace('·', '') for match in matches]
                structured_data[main_key]["min_appl_amt"] = cleaned_matches
        elif re.search(r"\b(additional.* and in multiples of)\b", data, re.IGNORECASE):
            matches = re.findall(application_pattern, data, re.IGNORECASE)
            if matches:
                cleaned_matches = [match.replace('·', '') for match in matches]
                structured_data[main_key]["additional_amt"] = cleaned_matches

    return structured_data

def return_fund_data(key:str,data:list):
    fund_manager = data
    main_key = key
    strucuted_data = {main_key:[]}
    current_entry = None
    name_pattern = r'^(Ms\.|Mr\.)'
    manage_pattern = r'^\(|\)$'
    date_pattern = r'\b\w+ \d{1,2}, \d{4}\b'
    experience_pattern = r'^Total Experience: (.+)$'

    for data in fund_manager:
        if re.match(name_pattern,data):
            if current_entry:
                strucuted_data[main_key].append(current_entry)
            current_entry = {
                'name': data.split(",")[0].strip().lower(),
                'designation': "".join(data.split(",")[1:]).strip().lower()
            }
            #print(data.split(",")[0],"".join(data.split(",")[1:]))
        elif re.match(manage_pattern,data):
            if "inception" in data.lower():
                current_entry['managing_since'] = 'inception'
            else:
                date = re.search(date_pattern, data)
                current_entry['managing_since'] = date.group() if date != None else None
        elif re.match(experience_pattern,data):
            current_entry['total_experience'] = data.split(":")[1].strip().lower()
            #print(data.split(":")[1])

        
    if current_entry:  # Append the last entry
        strucuted_data[main_key].append(current_entry)
            
    return strucuted_data

def return_nav_data(key:str,data:list):
    main_key = key
    structured_data = {main_key: {}}
    
    growth_pattern = r"([\w\s]+):\s*·([\d.]+)"
    
    for line in data:
        matches = re.findall(growth_pattern, line)
        for key, value in matches:
            structured_data[main_key][key.strip().lower()] = value
        
    return structured_data

def return_quant_data(key:str,data:list):
    qunatitative_data = data
    main_key = key

    strucuted_data = {main_key:{}}
    current_entry = None
    comment = ""

    ratio_pattern = r"\b(ratio|turnover)\b"
    annual_pattern = r'\b(annualised|YTM)\b'
    macaulay_pattern = r"\b(macaulay.*duration)\b"
    residual_pattern = r"\b(residual.*maturity)\b"
    modified_pattern = r"\b(modified.*duration)\b"

    for data in qunatitative_data:
        if re.search(ratio_pattern,data, re.IGNORECASE):
            key = data.split(":")[0].lower().strip()
            value = data.split(":")[1].lower().strip()
        elif re.search(annual_pattern,data, re.IGNORECASE):
            key = data.split(":")[0].lower().strip()
            value = data.split(":")[1].lower().strip()
        elif re.search(macaulay_pattern,data, re.IGNORECASE):
            key = data.split(":")[0].lower().strip()
            value = data.split(":")[1].lower().strip()
        elif re.search(residual_pattern,data, re.IGNORECASE):
            key = data.split(":")[0].lower().strip()
            value = data.split(":")[1].lower().strip()
        elif re.search(modified_pattern,data, re.IGNORECASE):
            key = data.split(":")[0].lower().strip()
            value = data.split(":")[1].lower().strip()
        else:
            comment+= data
        strucuted_data[main_key][key] = value
    
    strucuted_data[main_key]['comment'] = comment

    return strucuted_data

def return_aum_data(key:str,data:list):
    
    aum = data
    main_key = key
    strucuted_data = {main_key:{}}

    pattern = r"\b\d{1,3}(?:,\d{3})*(?:\.\d+)? Crs\b"

    for data in aum:
        if re.search(r'average', data, re.IGNORECASE):
            match = re.search(pattern, data)
            key = 'avg_aum'
        elif re.search(pattern, data):
            match = re.search(pattern, data)
            key = "aum"
        else:
            continue
        
        if match:
            strucuted_data[main_key][key] = match.group()

    return strucuted_data

def return_dummy_data(key:str,data:list):
    return {key:{}}


def return_required_regex(string: str):
        replace_key = string
        if re.match(r'^nav.*', string, re.IGNORECASE):
            replace_key = "nav"
        elif re.match(r"^market", string, re.IGNORECASE):
            replace_key = "market_capital"  
        elif re.match(r"^assets", string, re.IGNORECASE):
            replace_key = "assets_under_management"
        elif re.match(r"^fund", string, re.IGNORECASE):
            replace_key = "fund_manager" 
        elif re.match(r"^scheme", string, re.IGNORECASE):
            replace_key = "scheme_details" 
        elif re.match(r"^investment", string, re.IGNORECASE):
            replace_key = "investment_objective"
        elif re.match(r"^quanti", string, re.IGNORECASE):
            replace_key = "quantitative_data"
        elif re.match(r"^portfolio", string, re.IGNORECASE):
            replace_key = "portfilio" 
        elif re.match(r"^industry", string, re.IGNORECASE):
            replace_key = "industry_allocation_of_equity"       
        return replace_key

In [161]:
# Map indices to funct , it has to be sorted
function_indices = [
    return_aum_data,
    return_dummy_data,
    return_fund_data,
    return_invest_data,
    return_nav_data,
    return_quant_data,
    return_scheme_data,
]

function_indices = sorted(function_indices, key=lambda x: str(x))


for key, funct in zip(imp_indices,function_indices):
    print(key, funct)

assets_under_management <function return_aum_data at 0x000001EF93CC2340>
fund_manager <function return_dummy_data at 0x000001EF93CC1120>
investment_objective <function return_fund_data at 0x000001EF93CC3060>
nav <function return_invest_data at 0x000001EF93CC1BC0>
quantitative_data <function return_nav_data at 0x000001EF93CC16C0>
scheme_details <function return_quant_data at 0x000001EF93CC23E0>


In [None]:
# Perform operation based on the function map
def perform_operation(operation, data):
    func = function_map.get(operation)
    if func:
        try:
            return func(data)
        except Exception as e:
            return f"Error executing function for {operation}: {e}"
    return "Invalid operation"



grand_dictionary = {}
for master_k, page_data in final_text_data.items():
    print(master_k)
    page_content = []

    for main_k, main_content in page_data.items():
        func = function_map.get(main_k)
        if func:
            try:
                result = func(main_k, main_content)
                page_content.append(result)
            except Exception as e:
                print(f"Error processing -> {main_k}: {e}")
        else:
            print(f"Warning: No function mapped for {main_k}")

    grand_dictionary[master_k] = page_content
    print("Done for _________________")