# Extract the data from of the balances finacieros pdfs in Mongodb
### Connect to mongodb and make sure we have access to the pdf files

In [None]:
packages = =['pandas', 'pymongo', 'gridfs', 'concurrent.futures']
for package in packages:
    try:
        __import__(package)
    except ImportError:
        !pip install {package}

import os
from pymongo import MongoClient
from gridfs import GridFS
from concurrent.futures import ThreadPoolExecutor


#endpoint = '10.0.10.5:27017'
endpoint = '192.168.1.10:27017'
database = 'supercias'
collection = 'companies'

# Connect to MongoDB
db = MongoClient('mongodb://'+endpoint)[database]
collection = db[collection]

#find one
comp = collection.find_one()

docs = comp['Documentos online']['downloaded']

pdfs = []
#if company has documents
for value in docs.values():
    if isinstance(value, list) and len(value) > 0:
        [ pdfs.append(v) for v in value ]

print(pdfs)

['0991295437001_DocumentosGenerales_Oficio Transferencia Acciones_1994-09-13.pdf', '0991295437001_DocumentosGenerales_Oficio Transferencia Acciones_1996-09-06.pdf', '0991295437001_DocumentosGenerales_Oficio Transferencia Acciones_2008-05-08.pdf', '0991295437001_DocumentosGenerales_Oficio Transferencia Acciones_2009-01-14.pdf', '0991295437001_DocumentosGenerales_Oficio Transferencia Acciones_2013-05-13.pdf', '0991295437001_DocumentosGenerales_Oficio Transferencia Acciones_2013-07-17.pdf', '0991295437001_DocumentosGenerales_Oficio Transferencia Acciones_2013-11-12.pdf', '0991295437001_DocumentosGenerales_Oficio Transferencia Acciones_2020-10-23.pdf', '0991295437001_DocumentosGenerales_Oficio Transferencia Acciones_2022-03-22.pdf', '0991295437001_DocumentosGenerales_Oficio Transferencia Acciones_2023-11-21.pdf', '0991295437001_DocumentosGenerales_Oficio Nombramiento Administradores_1994-09-09_CORDOVEZ ORTEGA JUAN XAVIER_PRESIDENTE.pdf', '0991295437001_DocumentosGenerales_Oficio Nombramien

### Make a query to get the pdfs we are interested in

In [2]:
# Access the GridFS collection
fs = GridFS(db, collection='companies')

def query_files(ruc, type, title, year):
    # Retrieve all files from GridFS collection that have in the filen ethe substring 'Balance  Estado de Situación'
    pdf_queried = fs.find({'filename': {'$regex': '.*'+ruc+'.*'+type+'.*'+title+'.*'+year+'.*'}})
    return pdf_queried

ruc = ''
type = 'DocumentosEconomicos'
# 'Estado de Flujos de Efectivo'
# 'Estado de Resultado Integral'
# 'Balance  Estado de Situación Financiera'
title = 'Balance  Estado de Situación'
year = '2023'

pdf_queried = query_files(ruc, type, title, year)

Print the number of queries files(this will exaust the cursor so we must do another query)

In [3]:
# print how many files were found
# this will exahust the cursor
print('found:', len(list(pdf_queried)))

# reset the cursor
pdf_queried = query_files(ruc, type, title, year)


found: 18204


Let track the time taken to read a file from mongodb and read it write tot eh filesystem

In [4]:
import time
# Write the contents of the file to a new file
# track the time taken to read a file from GridFS and write it to the disk
path = '../storage/pdfs/'

# make dir path if it does not exist
if not os.path.exists(path):
    os.makedirs(path)

def write_pdf(pdf_query):
    print('reading:', pdf_query.filename)
    buffer = pdf_query.read()
    print('buffer length:', len(buffer))
    if buffer.startswith(b'%PDF'):
        # write the file to the disk
        print('writing:', pdf_query.filename)
        with open( path + pdf_query.filename, 'wb') as f:
            f.write(buffer)
        print('done writing:', pdf_query.filename)
    else:
        print('not a pdf:', pdf_query.filename)


# print timer
start = time.time()

# get the first file
pdf_query = next(pdf_queried)
write_pdf(pdf_query)
# remove the file from os
os.remove(path + pdf_query.filename)

end = time.time()
print('Time taken:', end - start)

# restart the cursor
pdf_queried = query_files(ruc, type, title, year)

reading: 0190000222001_DocumentosEconomicos_Balance  Estado de Situación Financiera_2023-12-31.pdf
buffer length: 75260
writing: 0190000222001_DocumentosEconomicos_Balance  Estado de Situación Financiera_2023-12-31.pdf
done writing: 0190000222001_DocumentosEconomicos_Balance  Estado de Situación Financiera_2023-12-31.pdf
Time taken: 1.2712726593017578


### Define extract function which will parse the data from the Balance Estado de Situacion  

In [5]:
%pip install pdfminer.six
import os
from pdfminer.high_level import extract_pages
from pdfminer.layout import LTTextContainer 

def extract_text_from_pdf(pdf_path):
    data = {}
    try:
        lines = []
        for page_layout in extract_pages(pdf_path):
            same_y_axis = {}
            # get all the elements that are on the same y0 position
            for element in page_layout:
                if isinstance(element, LTTextContainer):
                    if element.y0 not in same_y_axis:
                        same_y_axis[element.y0] = []
                    same_y_axis[element.y0].append(element)
            # concat the lines
            for y0, elements in same_y_axis.items():
                line = []
                for element in elements:
                    text = element.get_text()
                    # sanitize the text
                    text = text.replace('\n', ' ')
                    text = text.replace('\t', ' ')
                    text = text.strip()
                    line.append({ 'y': y0, 'text': text })
                lines.append(line)
        # parse the lines
        # filter only the ones that have 2 elements
        reachedEndOfHeader = False
        reachedStartOfFooter = False
        for line in lines:
            if not reachedEndOfHeader:
                if(len(line) == 1):
                    if('ESTADO DE SITUACIÓN FINANCIERA' in line[0]['text']):
                        reachedEndOfHeader = True
                else:
                    data[line[0]['text']] = line[1]['text']
            elif reachedEndOfHeader and not reachedStartOfFooter:
                if('ESTADO DE SITUACIÓN FINANCIERA' in line[0]['text']):
                    continue
                elif('CUENTA'in line[0]['text'] and 'CÓDIGO' in line[1]['text']):
                    continue
                else:
                    data[line[0]['text']] = {}
                    if(len(line) >= 2):
                        data[line[0]['text']]['codigo'] = line[1]['text']
                    if(len(line) == 3):
                        data[line[0]['text']]['descripcion'] = line[2]['text']
                if('REPRESENTANTE(S) LEGAL(ES)' in line[0]['text']):
                    reachedStartOfFooter = True
            elif reachedStartOfFooter:
                # need to implement later
                continue
    except Exception as e:
        print('Error:', e)
    return data


Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


* for each pdf, download from mongo
* for each pdf file we run the extract parse function
* we upload the data back to mongodb

In [6]:
# Write the contents of the file to a new file
path = '../storage/pdfs/'
from concurrent.futures import ThreadPoolExecutor

def process_pdf(pdf_query, path, db):
    buffer = pdf_query.read()
    if buffer.startswith(b'%PDF'):
        # write the file to the disk
        with open( path + pdf_query.filename, 'wb') as f:
            f.write(buffer)
        # extract the text from the pdf
        data = extract_text_from_pdf(path + pdf_query.filename)
        # add file name to the data
        data['filename'] = pdf_query.filename
        # push to mongodb
        db['balances_economico_2'].insert_one(data)
        # remove the file
        os.remove(path + pdf_query.filename)
        print('pdf processed: ', pdf_query.filename)
    else:
        # throw an error if the file is not a pdf
        # raise Exception('Not a PDF file:', pdf_query.filename)
        # write to log file
        with open('error.log', 'a') as f:
            f.write(pdf_query.filename + '\n')
    # catch any errors and write to log file
    

# Process PDFs concurrently
with ThreadPoolExecutor(max_workers=20) as executor:  # Adjust max_workers as needed
   futures = [executor.submit(process_pdf, pdf_query, path, db) for pdf_query in pdf_queried]
   for future in futures:
        future.result()  

# single threaded processing
#for pdf_query in pdf_queried:
#    process_pdf(pdf_query)

pdf processed:  0190005151001_DocumentosEconomicos_Balance  Estado de Situación Financiera_2023-12-31.pdf
pdf processed:  0190001628001_DocumentosEconomicos_Balance  Estado de Situación Financiera_2023-12-31.pdf
pdf processed:  0190002152001_DocumentosEconomicos_Balance  Estado de Situación Financiera_2023-12-31.pdf
pdf processed:  0190004937001_DocumentosEconomicos_Balance  Estado de Situación Financiera_2023-12-31.pdf
pdf processed:  0190001490001_DocumentosEconomicos_Balance  Estado de Situación Financiera_2023-12-31.pdf
pdf processed:  0190003701001_DocumentosEconomicos_Balance  Estado de Situación Financiera_2023-12-31.pdf
pdf processed:  0190000702001_DocumentosEconomicos_Balance  Estado de Situación Financiera_2023-12-31.pdf
pdf processed:  0190001237001_DocumentosEconomicos_Balance  Estado de Situación Financiera_2023-12-31.pdf
pdf processed:  0190004384001_DocumentosEconomicos_Balance  Estado de Situación Financiera_2023-12-31.pdf
pdf processed:  0190003809001_DocumentosEconom