In [1]:
# Intalar pymupdf
# %pip install pymupdf

#importar fitz para usar pymupdf (fitz es pymupdf)
import fitz  

In [2]:
# Instalar e importar la biblioteca pendulum y datetime para trabajar con fechas
# %pip install pendulum
from datetime import datetime
import pendulum

# Definir timezone e idioma en la biblioteca pendulum

tz = pendulum.timezone('Europe/Paris')
pendulum.set_locale('es')

In [3]:
# Importar regex
import re

In [4]:
# Intalar e importar la biblioteca para trabajar con codigos postales
# %pip install pgeocode

import pgeocode

In [5]:
def formato_fecha(fecha):
    """ Convierte fechas en formato DD/MM/YYYY o "DD de MMMM YYYY al formato DD.MM.YYYY.
    Args:
        fecha (Date or String): fecha a cambiar de formato.
    Returns:
        dt_final (Date): devuelve la fecha con el formato cambiado.
    """  
    dt = ''
    try:
        datetime.strptime(fecha, "%d.%m.%Y")
        return fecha
    except ValueError:
       pass
        
    if '/' in fecha:
        dt = pendulum.from_format(fecha, 'DD/MM/YYYY')
    elif 'de' in fecha:
        dt = pendulum.from_format(fecha.replace(' de', ''), 'DD MMMM YYYY')
    else:
        dt_final = fecha
    try:
        dt_final = dt.format('DD.MM.YYYY')
    except Exception:
        dt_final = fecha

    return dt_final    


In [6]:
def separa_CP(direccion):
    """ Divide una string en una lista por un código de 5 dígitos (incluido en la lista devuelta).
    Args:
        direccion (String): línea de texto que incluya un código postal.
    Returns:
        lista (list): devuelve una lista separando por CP. 
    """
    return re.split(r"(\d{5})", direccion)

In [7]:
def extaer_pag_pdf(ruta, num):
    """ Extrae el texto de un pdf convirtiéndolo en una lista dónde cada línea del texto es un elenmento de la lista.
    Args:
        ruta (String): ruta al archivo pdf que se va a leer.
        num (int): número de la página del pdf que se quiere leer (empezando en 0).
    Returns:
        lista (list): devuelve una lista separando cada línea del texto como un elemento de la lista.
    """
    file = fitz.open(ruta)
    pymupdf_text = []

    for page in file:
        pymupdf_text.append(page.get_text())

    return pymupdf_text[num].split('\n')  



In [8]:
# Crear el diccionario que se convertirá en el archivo JSON final con los datos a conseguir
diccionario_final = {
    "nombre_cliente": "",
    "dni_cliente": "",
    "calle_cliente": "",
    "cp_cliente": "",
    "población_cliente": "",
    "provincia_cliente": "",
    "nombre_comercializadora": "",
    "cif_comercializadora": "",
    "dirección_comercializadora": "",
    "cp_comercializadora": "",
    "población_comercializadora": "",
    "provincia_comercializadora": "",
    "número_factura": "",
    "inicio_periodo": "",
    "fin_periodo": "",
    "importe_factura": "",
    "fecha_cargo": "",
    "consumo_periodo": "",
    "potencia_contratada": ""
}

In [9]:
def pdf_a_dicc_y_lista(lineas_pdf):
    """ De una lista de strings, se eliminan las líneas vacias y se crea un diccionario con los elementos de los que se pueden extraer pares key/valor mediante la separación por ':' y, por otro lado, se crea una lista con todos los datos para tenerlos ordenados.
    Args:
        lineas_pdf (list): lista de strigns.
    Returns:
        dict_lineas(dict): devuelve un diccionario con los pares key/valor de las líneas que incuian ':' y el resto de líneas con una key numerada.
        lista_lineas (list): devuelve una lista donde cada elemnto es una string (misma lista de args sin elementos vacíos).
    """
    dict_lineas = {}
    lista_lineas = []
    contador = 0
    for linea in lineas_pdf:
        linea = linea.strip()
        #Se eliminan lineas vacias (o de solo un espacio)
        if linea != '':
            lista_lineas.append(linea)
            #Se dividen las líneas por el primer ':' para separarlas en Key/valor en el diccionario
            if ':' in linea:
                lista_linea = linea.split(':', 1)
                dict_lineas[lista_linea[0]] = lista_linea[1].strip() 
            else:
                # En caso de no contar con ':' en la línea, se añade al diccionario con la key:'otro' más un número incremental
                contador += 1
                dict_lineas[f'otro{contador}'] = linea
    return dict_lineas, lista_lineas

In [10]:
def extraer_info(dict_lineas, lista_lineas):
    # Se obtienen todos los datos que se pueden extraer mediante las llaves del diccionario 
    for key in dict_lineas.keys():
        try:
            if 'nº' in key.lower() and 'factura' in key.lower():
                diccionario_final["número_factura"] = dict_lineas[key]
            if 'periodo' in key.lower() or 'período' in key.lower():
                lista_fechas = dict_lineas[key].replace('del', '').replace('de', '').split('(')[0]
                if 'al' in dict_lineas[key].lower():
                    fechas_consumo = lista_fechas.split(' al ')
                elif 'a' in dict_lineas[key].lower():    
                    fechas_consumo = lista_fechas.split(' a ')
                diccionario_final["inicio_periodo"] = formato_fecha(fechas_consumo[0].strip())
                diccionario_final["fin_periodo"] = formato_fecha(fechas_consumo[-1].strip())
            if 'fecha' in key.lower() and ('cargo' in key.lower() or 'emisión' in key.lower()):
                diccionario_final["fecha_cargo"] = formato_fecha(dict_lineas[key])
            if 'potencia' in key.lower() and 'importe' not in key.lower():
                diccionario_final["potencia_contratada"] = dict_lineas[key].split()[0]      
            if 'titular' in key.lower() or 'nombre' in key.lower() :
                diccionario_final["nombre_cliente"] = dict_lineas[key]     
            if 'dirección' in key.lower():
                print(f"direccion: {key}")
                try:
                    lista_direccion = separa_CP(dict_lineas[key])
                    diccionario_final["calle_cliente"] = lista_direccion[0]
                    diccionario_final["cp_cliente"] = lista_direccion[1]
                    diccionario_final["población_cliente"] = lista_direccion[2]
                    if diccionario_final["calle_cliente"] != '' and diccionario_final["cp_cliente"] != '' and diccionario_final["población_cliente"] != '':
                        break
                    else:
                        continue 
                except Exception:
                    continue
        except Exception:
            continue
    # Se duplica el for en el caso del cif y el Nif para poder romper el loop después de la primera localización y asegurarnos que es el de la comercializadora y el cliente respectivamente
    for key in dict_lineas.keys():
        try:
            if 'importe' in key.lower() and 'factura' in key.lower():
                print(dict_lineas[key])
                diccionario_final["importe_factura"] = dict_lineas[key].split()[0]
                break
        except Exception:
            continue    

    for key in dict_lineas.keys():
        if 'nif' in key.lower():
            diccionario_final["dni_cliente"] = dict_lineas[key]  
            break

    # Mediante un enumerate extraemos todos los datos que sabemos que estan cerca unos de otros en la lista.
    lista_consumo = []
    lista_direccion = []
    for indice, linea in enumerate(lista_lineas):

        if 'potencia' in linea.lower() and diccionario_final["potencia_contratada"] == '':
            if 'kw' in lista_lineas[indice+1].lower():
                diccionario_final["potencia_contratada"] = lista_lineas[indice+1].split()[0]  

        if diccionario_final["dni_cliente"] == '' or diccionario_final["dni_cliente"] == None or len(diccionario_final["dni_cliente"]) != 9:
            try:
                dni = re.search(r"[0-9]{8}[A-Za-z]", linea)
                diccionario_final["dni_cliente"] = dni.group()
            except Exception:
                continue
        if (diccionario_final["inicio_periodo"] == '' or diccionario_final["fin_periodo"] == '') and ('periodo' in linea.lower() or 'período' in linea.lower()):
            fechas = linea.replace('periodo', '').replace('período', '').replace('Periodo', '').replace('Período', '').replace('del', '').replace('de', '').strip()
            
            if 'a' in linea.lower():    
                fechas_consumo = fechas.split(' a ')
            elif 'al' in linea.lower():
                fechas_consumo = fechas.split(' al ')
            print(f"fechas consumo2: {fechas_consumo}")
    
            diccionario_final["inicio_periodo"] = formato_fecha(fechas_consumo[0].strip())
            diccionario_final["fin_periodo"] = formato_fecha(fechas_consumo[-1].strip())
            break
        
        if 'dirección' in linea.lower() and 'suministro' in linea.lower() and diccionario_final["calle_cliente"] == '':
            try:
                lista_direccion.append(linea) 
                lista_direccion.append(lista_lineas[indice+1]) 
                direccion_suministro = ' '.join(lista_direccion)
                direccion_dividir = direccion_suministro.split(':', 1)
                direccion = direccion_dividir[1].strip().split(',')
                diccionario_final["calle_cliente"] = direccion[0].strip() 
                diccionario_final["población_cliente"] = direccion[1].strip()  
            except Exception:
                direccion_entera = linea.split(':', 1)[1]
                direccion = separa_CP(direccion_entera)
                diccionario_final["calle_cliente"] = direccion[0].strip() 
                diccionario_final["población_cliente"] = direccion[2].strip()  
        
        
    # Se duplican los bucles for para poder pararlos después de la primera aparición en aquella información repetida a lo largo del pdf y que puede dar problemas.
    for indice, linea in enumerate(lista_lineas):
        if 'importe' in linea.lower() and 'factura' in linea.lower() and diccionario_final["importe_factura"] == '':
            try:
                print('try')
                print(linea)
                importe = float(lista_lineas[indice-1].split()[0].replace(',','.'))
                diccionario_final["importe_factura"] = importe
                
            except Exception:
                lista_importe = lista_lineas[indice].replace(',','.').split()
                print(f"lista_importe: {lista_importe}")
                for elemento in lista_importe:
                    try:
                        importe = float(elemento)
                        diccionario_final["importe_factura"] = importe
                        break
                    except Exception:
                        continue
                
    for indice, linea in enumerate(lista_lineas):
        if 'cif' in linea.lower():
            diccionario_final["cif_comercializadora"] = linea.split()[-1].replace('.', '').strip()
            try:
                if len(diccionario_final["cif_comercializadora"]) != 9:
                    diccionario_final["cif_comercializadora"] = ''
                    diccionario_final["cif_comercializadora"] = lista_lineas[indice+1].replace('.', '').strip()
                break
            except Exception:
                continue
    for indice, linea in enumerate(lista_lineas):
        if 'cif' in linea.lower():
            try:                
                diccionario_final["nombre_comercializadora"] = lista_lineas[indice-1]
                nombre = re.search(r"^[\sáéíóúÁÉÍÓÚàÀÈÈÒòa-z,A-Z]+S\.[AL]\.", diccionario_final["nombre_comercializadora"])
                diccionario_final["nombre_comercializadora"] = nombre.group()

                direccion_comercializadora = separa_CP(lista_lineas[indice+1])
                diccionario_final["dirección_comercializadora"] = direccion_comercializadora[0].strip()
                diccionario_final["cp_comercializadora"] = direccion_comercializadora[1].strip()
                break
            except Exception:
                continue
    for indice, linea in enumerate(lista_lineas):
        if 'cif' in linea.lower():        
            try:
                if len(diccionario_final["dirección_comercializadora"]) < 5 and 'social' in linea.lower():
                    lista_comercializadora = linea.split(':')
                    nombre = re.search(r"^[\sa-z,A-Z]+S\.[AL]\.", lista_comercializadora[0])
                    diccionario_final["nombre_comercializadora"] = nombre.group()

                    lista_direccion_comercial = separa_CP(lista_comercializadora[-1])
                    diccionario_final["dirección_comercializadora"] = lista_direccion_comercial[0].strip()
                    diccionario_final["cp_comercializadora"] = lista_direccion_comercial[1]
                    break
            except:
                continue

    for indice, linea in enumerate(lista_lineas):
        if 'cif' in linea.lower():        
            try:
                if diccionario_final["nombre_cliente"] == '':
                    diccionario_final["nombre_cliente"] = lista_lineas[indice+2]
                    if diccionario_final["calle_cliente"] == '':
                        diccionario_final["calle_cliente"] = lista_lineas[indice+3]
                    direccion_cliente = separa_CP(lista_lineas[indice+4])
                    break
            except Exception:
                continue        
                     
    for indice, linea in enumerate(lista_lineas):
        if 'consumo' in linea.lower() and 'p1' in linea.lower() and diccionario_final["consumo_periodo"] == '':
            try: 
                consumo = float(lista_lineas[indice+1].split('kWh')[0].strip())
                diccionario_final["consumo_periodo"] = consumo
                break 
            except Exception:
                continue

    for indice, linea in enumerate(lista_lineas):
        if 'consumo' in linea and (diccionario_final['consumo_periodo'] == '' or 'xx' in str(diccionario_final['consumo_periodo']).lower()):
            for x in range(1,5):
                if 'kwh' in lista_lineas[indice+x].lower():
                    try:
                        diccionario_final['consumo_periodo'] = int(linea.split()[0])
                        break
                    except Exception:
                        continue

    for indice, linea in enumerate(lista_lineas):
        if diccionario_final['cp_cliente'] == '':
            if diccionario_final['nombre_cliente'].lower() in linea.lower():
                cp = re.search(r"[\d]{5}", lista_lineas[indice+2])
                diccionario_final["cp_cliente"] = cp.group()
                diccionario_final["calle_cliente"] = lista_lineas[indice+1]
                break

                           
    nomi = pgeocode.Nominatim('es')
    diccionario_final["provincia_cliente"] = nomi.query_postal_code(diccionario_final["cp_cliente"])['county_name']              
    diccionario_final["población_cliente"] = nomi.query_postal_code(diccionario_final["cp_cliente"])['place_name']              
    diccionario_final["provincia_comercializadora"] = nomi.query_postal_code(diccionario_final["cp_comercializadora"])['county_name']       
    diccionario_final["población_comercializadora"] = nomi.query_postal_code(diccionario_final["cp_comercializadora"])['place_name']       

    for linea in lista_lineas:
        lista_pueblos = diccionario_final["población_cliente"].split(',')
        for pueblo in lista_pueblos:
            pueblo = pueblo.strip()
            if pueblo.lower() in linea.lower():
                diccionario_final["población_cliente"] = pueblo
                break
    for linea in lista_lineas:
        lista_pueblos = diccionario_final["población_comercializadora"].split(',')
        for pueblo in lista_pueblos:
            pueblo = pueblo.strip()
            if pueblo.lower() in linea.lower():
                diccionario_final["población_comercializadora"] = pueblo
                break

    return diccionario_final

In [11]:
lineas_pdf = extaer_pag_pdf("pdfs/factura_3.pdf", 0)
dict_lineas, lista_lineas = pdf_a_dicc_y_lista(lineas_pdf)
diccionario_final = extraer_info(dict_lineas, lista_lineas)

try
TOTAL IMPORTE FACTURA


In [12]:
diccionario_final 

{'nombre_cliente': 'Belinda Zetina Mijares',
 'dni_cliente': '',
 'calle_cliente': 'Calle del Zarzalejo',
 'cp_cliente': '25737',
 'población_cliente': "Cubells, Vernet, Marcovau, Foradada, Baldomar, Alos De Balaguer, Rubio De Baix, Rubio Del Mig, La Vall D'Ariet, La Torre De Fluvia, Clua, La (Artesa De Segre), Rubio De Dalt, Montsonis",
 'provincia_cliente': 'Lleida',
 'nombre_comercializadora': 'TRADE UNIVERSAL ENERGY, S.A.',
 'cif_comercializadora': 'A66531013',
 'dirección_comercializadora': 'RAMBLA CATALUNYA, 91-93',
 'cp_comercializadora': '08008',
 'población_comercializadora': 'Barcelona',
 'provincia_comercializadora': 'Barcelona',
 'número_factura': 'SF3956122542',
 'inicio_periodo': '28.10.2021',
 'fin_periodo': '27.12.2021',
 'importe_factura': 219.71,
 'fecha_cargo': '01.01.2022',
 'consumo_periodo': '',
 'potencia_contratada': ''}

In [13]:
lista_lineas

['DATOS DE LA FACTURA',
 'Nº factura: SF3956122542',
 'Referencia: 210471327474/5121',
 'Fecha emisión factura: 29/12/2021',
 'Periodo de Facturación: del 28/10/2021 a 27/12/2021 (60 días)',
 'Fecha de cargo: 01 de enero de 2022',
 '....................................................................................................................................................................',
 'TRADE UNIVERSAL ENERGY, S.A..',
 'CIF A66531013.',
 'RAMBLA CATALUNYA, 91-93 08008 - BARCELONA',
 'Belinda Zetina Mijares',
 'Calle del Zarzalejo',
 '25737 Bassella',
 'Lleida',
 'Forma de pago: Domiciliada',
 'Potencia',
 'Energía',
 'Descuentos',
 'Otros',
 'Impuestos',
 'IGIC reducido',
 '23,11 €',
 '180,61 €',
 '-X,XX €',
 '1,07 €',
 '10,42 €',
 '4,28 €',
 'Fecha de cargo: 01 de enero de 2022',
 'IBAN: ES02182401783718573*****',
 'Cod.Mandato: E43931580436446446679978445',
 '( 2%)',
 'Versión: 4496',
 'IGIC normal ( 21%)',
 '0,22 €',
 '....................................................

In [14]:
for key in diccionario_final.keys():
    if diccionario_final[key] == '':
        lineas_pdf2 = extaer_pag_pdf("pdfs/factura_3.pdf", 1)
        dict_lineas2, lista_lineas2 = pdf_a_dicc_y_lista(lineas_pdf2)
        diccionario_final2 = extraer_info(dict_lineas2, lista_lineas2)
        print(lista_lineas2)
        diccionario_final[key] = diccionario_final2[key]
    else:
        continue    


direccion: Dirección de suministro


['180,61 €', 'Importe por energía consumida:', '740 kWh x 0,244067 Eur/kWh', 'DATOS DEL CONTRATO', 'Titular del contrato: Belinda Zetina Mijares', 'NIF: 89901698B', 'Dirección de suministro: Calle del Zarzalejo, Bassella, Lleida', 'Número de contador: 245151267', 'Su comercializadora: TRADE UNIVERSAL ENERGY, S.A.', 'Su distribuidora: HIDROELECTRICA SANTA TERESA SL', 'Referencia del contrato de acceso: 781087855549', 'Peaje de acceso: 2.0A', 'Fin de contrato de suministro: 27/12/2021', '(renovación anual', 'automática)', 'CUPS: ES8251682085417910DJIY', 'El destino del importe de su', 'factura, 219,71 euros, es el', 'siguiente:', 'LUZ', 'Importe por potencia contratada:', '3,32 kW x 0,116028 Eur/kW x 60 días', '23,11 €', 'X,XX €', 'En dicho importe, facturación por peaje de acceso:', '3,32 kW x 38,524425 Eur/kW y año x (60/365) días', '21,02 €', 'XX,XX €', '23,11 €', 'XX,XX €', 'En dicho importe, su facturación por peaje de acceso ha sido:', 'Consumo 

In [15]:
diccionario_final2

{'nombre_cliente': 'Belinda Zetina Mijares',
 'dni_cliente': '89901698B',
 'calle_cliente': 'Calle del Zarzalejo, Bassella, Lleida',
 'cp_cliente': '25737',
 'población_cliente': "Cubells, Vernet, Marcovau, Foradada, Baldomar, Alos De Balaguer, Rubio De Baix, Rubio Del Mig, La Vall D'Ariet, La Torre De Fluvia, Clua, La (Artesa De Segre), Rubio De Dalt, Montsonis",
 'provincia_cliente': 'Lleida',
 'nombre_comercializadora': 'TRADE UNIVERSAL ENERGY, S.A.',
 'cif_comercializadora': 'A66531013',
 'dirección_comercializadora': 'RAMBLA CATALUNYA, 91-93',
 'cp_comercializadora': '08008',
 'población_comercializadora': 'Barcelona',
 'provincia_comercializadora': 'Barcelona',
 'número_factura': 'SF3956122542',
 'inicio_periodo': '28.10.2021',
 'fin_periodo': '27.12.2021',
 'importe_factura': 219.71,
 'fecha_cargo': '01.01.2022',
 'consumo_periodo': 740.0,
 'potencia_contratada': '3,317'}

In [16]:
diccionario_final

{'nombre_cliente': 'Belinda Zetina Mijares',
 'dni_cliente': '89901698B',
 'calle_cliente': 'Calle del Zarzalejo, Bassella, Lleida',
 'cp_cliente': '25737',
 'población_cliente': "Cubells, Vernet, Marcovau, Foradada, Baldomar, Alos De Balaguer, Rubio De Baix, Rubio Del Mig, La Vall D'Ariet, La Torre De Fluvia, Clua, La (Artesa De Segre), Rubio De Dalt, Montsonis",
 'provincia_cliente': 'Lleida',
 'nombre_comercializadora': 'TRADE UNIVERSAL ENERGY, S.A.',
 'cif_comercializadora': 'A66531013',
 'dirección_comercializadora': 'RAMBLA CATALUNYA, 91-93',
 'cp_comercializadora': '08008',
 'población_comercializadora': 'Barcelona',
 'provincia_comercializadora': 'Barcelona',
 'número_factura': 'SF3956122542',
 'inicio_periodo': '28.10.2021',
 'fin_periodo': '27.12.2021',
 'importe_factura': 219.71,
 'fecha_cargo': '01.01.2022',
 'consumo_periodo': 740.0,
 'potencia_contratada': '3,317'}

In [17]:
""" import json
with open("json_meu/factura_3_meu.json", "w") as fp:
    json.dump(diccionario_final, fp, ensure_ascii=False) """

' import json\nwith open("json_meu/factura_3_meu.json", "w") as fp:\n    json.dump(diccionario_final, fp, ensure_ascii=False) '