In [1]:
import logging
import os
import re
from typing import Any, Container, Dict, Iterable, List, Optional, TextIO, Union, cast
import unicodedata

import pdfminer
from pdfminer.pdfdocument import PDFDocument, PDFNoOutlines, PDFXRefFallback
from pdfminer.pdfpage import PDFPage
from pdfminer.pdfparser import PDFParser
from pdfminer.pdftypes import PDFObjectNotFound, PDFValueError
from pdfminer.pdftypes import PDFStream, PDFObjRef, resolve1, stream_value
from pdfminer.psparser import PSKeyword, PSLiteral, LIT
from pdfminer.utils import isnumber

In [2]:
logging.basicConfig()
logger = logging.getLogger(__name__)

ESC_PAT = re.compile(r'[\000-\037&<>()"\042\047\134\177-\377]')
def escape(s: Union[str, bytes]) -> str:
    if isinstance(s, bytes):
        us = str(s, "latin-1")
    else:
        us = s
    return ESC_PAT.sub(lambda m: "&#%d;" % ord(m.group(0)), us)

def bintostr(b:bytes):
    if b is None:
        return ""
    return unicodedata.normalize('NFKD', b.decode("latin-1")).encode('ASCII', 'ignore').decode("ASCII")

In [3]:
def getColumnMappingDict() -> dict:
    map={}
    #PAGE 1
    map["D_NUM_EXPEDIENTE"]="P1_01.01_D_NUM_EXPEDIENTE"
    map["D_EXPEDIENTE_DM"]="P1_01.02_D_EXPEDIENTE_DM"
    map["D_CIF"]="P1_01.03_D_CIF"
    map["D_COD_AAFF"]="P1_01.04_D_COD_AAFF"
    map["D_COD_GRUPO"]="P1_01.05_D_COD_GRUPO"
    map["D_ACCION_FORMATIVA"]="P1_01.06_D_ACCION_FORMATIVA"
    map["ID_MODALIDAD"]="P1_01.07_ID_MODALIDAD"
    map["N_EDAD"]="P1_02.01_N_EDAD"
    map["ID_SEXO"]="P1_02.02_ID_SEXO"
    map["ID_TITULACION"]="P1_02.03_ID_TITULACION"
    map["D_OTRA_TITULACION"]="P1_03.10.01_D_OTRA_TITULACION"
    map["Nivel de idiomas"]="P1_03.10.02_Nivel"
    map["EspecificaotraTitulacion"]="P1_03.10.03_EspecificaotraTitulacion"
    map["D_LUGAR_DE_TRABAJO"]="P1_03.04_D_LUGAR_DE_TRABAJO"
    map["ID_CATEGORIA_PROFESIONAL"]="P1_03.05_ID_CATEGORIA_PROFESIONAL"
    map["D_OTRA_CAT_PROF"]="P1_05.06_D_OTRA_CAT_PROF"
    map["ID_HORARIO_CURSO"]="P1_06.00_ID_HORARIO_CURSO"
    map["ID_JORNADA"]="P1_06.01_ID_JORNADA"
    map["ID_TAMANIO_PYME"]="P1_07_ID_TAMANIO_PYME"
    #PAGE 2
    map["ID_BUENA_ORGANIZACION"]="P2_01.01_ID_BUENA_ORGANIZACION"
    map["ID_NUM_ALUMNOS_ADECUADO"]="P2_01.02_ID_NUM_ALUMNOS_ADECUADO"
    map["ID_CONTENIDOS_SEGUN_NECESIDADES"]="P2_02.01_ID_CONTENIDOS_SEGUN_NECESIDADES"
    map["ID_CONTENIDOS_SEGUN_TEORIA_PRACTICA"]="P2_02.02_ID_CONTENIDOS_SEGUN_TEORIA_PRACTICA"
    map["ID_DURACION_SUFICIENTE"]="P2_03.01_ID_DURACION_SUFICIENTE"
    map["ID_HORARIO_FAVORABLE"]="P2_03.02_ID_HORARIO_FAVORABLE"
    map["ID_FORMADOR_FACILITADOR"]="P2_04.01.01_ID_FORMADOR_FACILITADOR"
    map["ID_FORMADOR_CONOCE_TEMA"]="P2_04.01.02_ID_FORMADOR_CONOCE_TEMA"
    map["ID_TUTOR_FACILITADOR"]="P2_04.01.01_ID_TUTOR_FACILITADOR"
    map["ID_TUTOR_CONOCE_TEMA"]="P2_04.01.02_ID_TUTOR_CONOCE_TEMA"
    map["ID_MEDIOS_COMPRENSIBLES"]="P2_05.01_ID_MEDIOS_COMPRENSIBLES"
    map["ID_MEDIOS_ACTUALIZADOS"]="P2_05.02_ID_MEDIOS_ACTUALIZADOS"
    map["ID_INSTALACIONES_ADECUADAS"]="P2_06.01_ID_INSTALACIONES_ADECUADAS"
    map["ID_MEDIOS_TECNICOS_ADECUADOS"]="P2_06.02_ID_MEDIOS_TECNICOS_ADECUADOS"
    #PAGE 3
    map["ID_TELEFORMACION_GUIA"]="P3_07.01_ID_TELEFORMACION_GUIA"
    map["ID_TELEFORMACION_MEDIOS"]="P3_07.02_ID_TELEFORMACION_MEDIOS"
    map["ID_PRUEBAS_EVALUACION"]="P3_08.01_ID_PRUEBAS_EVALUACION"
    map["ID_ACREDITACION"]="P3_08.02_ID_ACREDITACION"
    map["ID_INCORPORACION_MERCADO"]="P3_09.01_ID_INCORPORACION_MERCADO"
    map["ID_CONOCIMIENTOS_PUESTO_TRABAJO"]="P3_09.02_ID_CONOCIMIENTOS_PUESTO_TRABAJO"
    map["Habilidades_trabajar_en_formado"]="P3_09.03_Habilidades_trabajar_en_formado"
    map["ID_PROGRESAR_CARRERA"]="P3_09.04_ID_PROGRESAR_CARRERA"
    map["ID_DESARROLLO_PERSONAL"]="P3_09.05_ID_DESARROLLO_PERSONAL"
    map["ID_GRADO_SATISFACCION"]="P3_10_ID_GRADO_SATISFACCION"
    map["D_OBSERVACION"]="P3_11_D_OBSERVACION"
    map["F_CUMPLIMENTACION"]="P3_12_F_CUMPLIMENTACION"
    #
    return map

In [4]:
def saveTxt(
    outfilename: str,
    retrievedData: dict
) -> None:
    map=getColumnMappingDict()
    #
    print("Saving to %s" % outfilename)
    #
    outfp=open(outfilename , "w")
    mapped={}
    for k,v in retrievedData.items():
        mapped[map[k]]=v
    for k,v in sorted(mapped.items()):
        outfp.write("%s=%s\n" % (k, v.strip().replace("\r\n", ".").replace("\n", ".").replace("\t", " ")))
    outfp.close()

def saveTxts(pdfnames: list, retrievedDataList: list) -> None:
    for i, pdfname in enumerate(pdfnames, start=0):
        ## Save map to txt file
        outfilename=pdfname + ".txt"
        saveTxt(outfilename, retrievedDataList[i])
    return

In [5]:
def processPdf(
    pdfname: str
) -> dict:
    print("Processing %s" % pdfname)
    #
    retrievedData = {}
    fp = open(pdfname, "rb")
    parser = PDFParser(fp)
    doc = PDFDocument(parser)
    searchInAllObjects(retrievedData, doc)
    fp.close()
    return retrievedData

def processPdfs(
    pdfnames: list
) -> list:
    retrievedDataList=[]
    for pdfname in pdfnames:
        ## Retrieve data from PDF
        retrievedData=processPdf(pdfname)
        ## Save to list
        retrievedDataList.append(retrievedData)
    return retrievedDataList

In [6]:
def searchInAllObjects(
    retrievedData: dict,
    doc: PDFDocument
) -> None:
    visited = set()
    for xref in doc.xrefs:
        for objid in xref.get_objids():
            if objid in visited:
                continue
            visited.add(objid)
            try:
                obj = doc.getobj(objid)
                if obj is None:
                    continue
                searchInObject(retrievedData, obj, objid=objid)
            except PDFObjectNotFound as e:
                print("not found: %r" % e)
    return

In [7]:
def searchInObject(
    retrievedData: dict,
    obj: object, 
    objid: str = None
) -> None:
    understood=False
    if isinstance(obj, dict) and 'Type' in obj.keys():
        type=obj["Type"]
        if isinstance(type, PSLiteral):
            type=type.name
        #
        if type=="Annot":
            if 'T' in obj.keys() and 'V' in obj.keys():
                valueT=bintostr(obj["T"])
                valueV=bintostr(obj["V"])
                #
                if 'Opt' in obj.keys(): ##Combo
                    value=""
                    if isinstance(obj["Opt"], list):
                        for nestedList in obj["Opt"]:
                            if isinstance(nestedList, list) and len(nestedList)==2:
                                try:
                                    nestedListKey=bintostr(nestedList[0])
                                except:
                                    out.write('!! Error decoding nestedlist key %s \n' % nestedList[0])
                                try:
                                    nestedListValue=bintostr(nestedList[1])
                                except:
                                    out.write('!! Error decoding nestedlist value %s \n' % nestedList[1])
                                if nestedListKey==valueV:
                                    value=nestedListValue
                    understood=True
                    retrievedData[valueT]=value
                else: ##Campo de texto
                    understood=True
                    retrievedData[valueT]=valueV
        elif type in ["Pages", "Page", "Font", "FontDescriptor", "Encoding", "Outlines", "Catalog"]:
            understood=True
        else: ## type desconocido
            understood=True
            print("<unknown type=\"%s\"/>\n" % type)

In [8]:
def saveCsv(
    pdfnames: list, 
    retrievedDataList: list
) -> None:
    map=getColumnMappingDict()
    listOfKeys=sorted(map.values())
    #
    outfilename="export.csv"
    print("Saving to %s" % outfilename)
    outfp=open(outfilename , "w")
    # Header
    outfp.write("FILE\t")
    for k in listOfKeys:
        outfp.write("%s\t" % k)
    outfp.write("\n")
    # Rows
    for i, pdfname in enumerate(pdfnames, start=0):        
        retrievedData=retrievedDataList[i]
        #
        mapped={}
        for k,v in retrievedData.items():
            mapped[map[k]]=v
        #
        outfp.write("%s\t" % pdfname)
        for k in listOfKeys:
            v=mapped[k]
            sanitizedValue=v.strip().replace("\r\n", ".").replace("\n", ".").replace("\t", " ")
            outfp.write("%s\t" % sanitizedValue)
        outfp.write("\n")
    #
    outfp.close()
    return
    
if __name__ == "__main__":
    folderPath="."
    #
    pdfnames=[]
    for file in os.listdir(folderPath):
        if file.endswith(".pdf"):
            pdfname=os.path.join(folderPath, file)
            pdfnames.append(pdfname)
    #
    retrievedDataList=processPdfs(pdfnames)
    #
    saveTxts(pdfnames, retrievedDataList)
    #
    saveCsv(pdfnames, retrievedDataList)
    #
    print("Done")

Processing .\Test.pdf
Saving to .\Test.pdf.txt
Saving to export.csv
Done
