In [2]:
import numpy as np
import pandas as pd
import PyPDF2
import re
import json
import os
from datetime import datetime
from pdfminer.high_level import extract_text
from collections import Counter

In [4]:
files_path = 'test/'

In [6]:
json_paths = [files_path + file for file in os.listdir(files_path) if file.endswith('.json')]
pdf_paths = [files_path + file for file in os.listdir(files_path) if file.endswith('.pdf')]

In [8]:
spain_pc = {
    '01': 'Álava',
    '02': 'Albacete',
    '03': 'Alicante',
    '04': 'Almería',
    '05': 'Ávila',
    '06': 'Badajoz',
    '07': 'Islas Baleares',
    '08': 'Barcelona',
    '09': 'Burgos',
    '10': 'Cáceres',
    '11': 'Cádiz',
    '12': 'Castellón',
    '13': 'Ciudad Real',
    '14': 'Córdoba',
    '15': 'La Coruña',
    '16': 'Cuenca',
    '17': 'Gerona',
    '18': 'Granada',
    '19': 'Guadalajara',
    '20': 'Guipúzcoa',
    '21': 'Huelva',
    '22': 'Huesca',
    '23': 'Jaén',
    '24': 'León',
    '25': 'Lérida',
    '26': 'La Rioja',
    '27': 'Lugo',
    '28': 'Madrid',
    '29': 'Málaga',
    '30': 'Murcia',
    '31': 'Navarra',
    '32': 'Orense',
    '33': 'Asturias',
    '34': 'Palencia',
    '35': 'Las Palmas',
    '36': 'Pontevedra',
    '37': 'Salamanca',
    '38': 'Santa Cruz de Tenerife',
    '39': 'Cantabria',
    '40': 'Segovia',
    '41': 'Sevilla',
    '42': 'Soria',
    '43': 'Tarragona',
    '44': 'Teruel',
    '45': 'Toledo',
    '46': 'Valencia',
    '47': 'Valladolid',
    '48': 'Vizcaya',
    '49': 'Zamora',
    '50': 'Zaragoza',
    '51': 'Ceuta',
    '52': 'Melilla'
}

In [10]:
months = {
    "enero": "01",
    "febrero": "02",
    "marzo": "03",
    "abril": "04",
    "mayo": "05",
    "junio": "06",
    "julio": "07",
    "agosto": "08",
    "septiembre": "09",
    "octubre": "10",
    "noviembre": "11",
    "diciembre": "12"
}

In [12]:
def extract_result(re, text):
    matches = re.findall(text)
    matches_set = set(matches)
    if len(matches_set) == 1:
        return matches[0]
    return matches

In [14]:
def extract_text_pypdf2(pdf_path):
    
    reader = PyPDF2.PdfReader(pdf_path)

    # Como el cif tiene que estar en la primera página solo extraeremos esa
    page = reader.pages[0]
    text = page.extract_text()
    return text

In [16]:
#importe total

def return_total_amount(text):
    re_total_amount = re.compile(r"(\d+,\d+)[^\n]*€")

    matches = re_total_amount.findall(text)
    total_amount = max([float(amount.replace(',','.')) for amount in matches])
    total_amount = "{:.2f}".format(total_amount)
    total_amount = str(total_amount).replace(".",",")

    return total_amount

In [18]:
#Numero de factura

def return_bill_num(text):
    re_bill_num = re.compile(r"(?:nº|num\.|número|num)\s*factura(?::)?\s*(\w+)", re.IGNORECASE)
    
    bill_num = extract_result(re_bill_num, text)
    return bill_num

In [20]:
#cif

def return_cif(text, pdf_path):
    re_cif = re.compile(r"\bcif\b(?::)?(?:\s*)?(\w+)", re.IGNORECASE)

    cif = extract_result(re_cif, text)

    #Si no encontramos ningún match probamos con otro extractor de texto 
    if not cif:
        text = extract_text_pypdf2(pdf_path)
        cif = extract_result(re_cif, text)
    return cif

In [22]:
#dni

def return_dni(text):
    re_dni = re.compile(r"\b\d{8}[A-Z]\b")

    dni = extract_result(re_dni, text)
    
    return dni

In [24]:
#cp

def return_pc(text):
    re_pc = re.compile(r".*\b\d{5}\b(?!\s*kW|\s*kWh).*")

    pc = extract_result(re_pc, text)

    return pc

In [26]:
#potencia contratada
def return_contracted_power(text):
    re_contracted_power = re.compile(r"contratada:?[^\n]*?(\d+,\d+)\s*(?<!€/)kW")

    contracted_power = extract_result(re_contracted_power, text)
    
    return contracted_power

In [28]:
def return_consumption(text):
    #Buscamos todas las cadenas de numeros que van sucedidas por kWh ya que esa es la unidad para el consumo
    re_consumo = re.compile(r"(\d+)\s*kWh")

    matches = re_consumo.findall(text)
    matches = [int(amount.replace(',','.')) for amount in matches]
    matches = sorted(matches)

    #Si solo hay un match ese tiene que ser el consumo
    if len(matches) == 1:
        consumo = matches[0]

    #Si hay más de 3 hay varias posibilidades
    elif len(matches) >= 3:
        #Si hay un número que se repite más que el resto ese debe ser el consumo
        counts = Counter(matches)
        if len(counts.values()) != sum(counts.values()):
            consumo = counts.most_common()[0][0]

        #En caso contrario tomaremos como el consumo el valor más grande (última lectura) menos el valor más pequeño(lectura anterior)
        else:
            consumo = matches[-1] - matches [-2]
            if consumo not in matches:
                consumo = matches
    #En caso de que haya dos elementos no podemos saber cuál es el consumo
    else:
        consumo = matches

    return consumo

In [47]:
# Fechas periodo

def return_initial_and_final_date(text):
    #Hacemos match con todos los elementos que van precedidos por periodo
    matches = re.findall(r"(?<=periodo|período)(.*?)(?=\n|$)", text, re.IGNORECASE)

    #filtramos los elementos que pueden tener formato de fecha
    re_numeric_date = re.compile(r"\b(\d{2})[-.\s\/]\s?(\d{2})[-.\s\/]\s?(\d{4})\b")
    re_string_date = re.compile(r"\b(\d{1,2})\s*de?\s*?(enero|febrero|marzo|abril|mayo|junio|julio|agosto|septiembre|octubre|noviembre|diciembre)\s*de?\s*?(\d{4})\b")
    period_dates = [re_numeric_date.findall(match) for match in matches if len(re_numeric_date.findall(match)) == 2]
    if not period_dates:
        period_dates = [re_string_date.findall(match) for match in matches if len(re_string_date.findall(match)) == 2]
        period_dates = [[f"{date[0]}.{months[date[1]]}.{date[2]}" for dates in period_dates for date in dates]]
    else:
        period_dates = [[f"{date[0]}.{date[1]}.{date[2]}" for dates in period_dates for date in dates]]

    #Compruebo que los pares de fechas son distintos entre si para poder obtener periodos
    
    #################################### Se añade el try except por si no encuentra ningún resultado ####################################
    try:
        period_dates = [dates for dates in period_dates if dates[0] != dates[1]]
    except:
        return None, None
    
    #Me quedo con los valores unicos de fechas para comprobar que solo tenga un periodo valido
    period_dates = [date for dates in period_dates for date in dates]
    period_dates = list(set(period_dates))

    #Ahora le doy formato a las fechas para comparar y parsear la mayor a fecha final y la menor a fecha inicial
    date_1 = datetime.strptime(period_dates[0], "%d.%m.%Y")
    date_2 = datetime.strptime(period_dates[1], "%d.%m.%Y")
    if date_1 == date_2:
        return None, None
    else:
        sorted_dates = sorted([date_1, date_2]) 
        initial_date, final_date = [date.strftime("%d.%m.%Y") for date in sorted_dates]
    
    return initial_date, final_date

In [57]:
#Fecha de cargo

def return_charge_date(text):
    re_numeric_date = re.compile(r"\b(\d{2})[-.\s\/]\s?(\d{2})[-.\s\/]\s?(\d{4})\b")
    re_string_date = re.compile(r"\b(\d{1,2})\s*de?\s*?(enero|febrero|marzo|abril|mayo|junio|julio|agosto|septiembre|octubre|noviembre|diciembre)\s*de?\s*?(\d{4})\b")
    
    matches = re.findall(r"(?<=cargo)(.*?)(?=\n|$)", text, re.IGNORECASE)
    charge_date = [re_numeric_date.findall(match) for match in matches if re_numeric_date.search(match)]
    if not charge_date:
        charge_date = [re_string_date.findall(match) for match in matches if re_string_date.search(match)]

        # Si no hay fecha devolver none
        if not charge_date:
            return None

        charge_date = [f"{date[0]}.{months[date[1]]}.{date[2]}" for dates in charge_date for date in dates]

    else:
        charge_date = [f"{date[0]}.{date[1]}.{date[2]}" for dates in charge_date for date in dates]

    # Si se encuentra un número distinto a 1 de fechas no podemos saber cuál es la de cargo
    if len(set(charge_date)) != 1:
        charge_date = None

    #Si se han cumplido todas las condiciones solo tendremos una fecha repetida en la lista 
    charge_date = charge_date[0]
    
    return charge_date

In [51]:
#Nombre cliente

def return_client_name(text):

    re_client_name = re.compile(r"Nombre.*|titular.*", re.IGNORECASE)

    matches = re_client_name.findall(text)

    if matches:
        client_name = [match.split(":")[1].strip() for match in matches if ":" in match]
        if len(set(client_name)) != 1:
            client_name = None
        else:
            client_name = client_name[0]
        
        #################################### Se añade este condicional para evitar errores ####################################
        if client_name:
            return client_name
        else:
            return None

In [36]:
def return_client_addres_pc_town_province(text, client_name, spain_pc):
    if not client_name:
        return None, None, None, None
    #Intentamos extraer los datos de la dirección del cliente del recuadro que se ubica siempre en la primera página de las facturas
    #Para ello nos quedamos con los párrafos que contengan el nombre del cliente pero que no tengan :
    client_parrs = [parr.split("\n") for parr in text.split("\n\n") if re.search(rf"^(?!.*:).*{client_name}*", parr, re.IGNORECASE) and len(parr.split("\n")) > 1]

    if client_parrs:
        client_parrs = client_parrs[0]
        client_addres, client_cp_town = client_parrs[1:3]
        client_pc = re.findall(r"\b\d{5}+\b", client_cp_town)[0]
        client_town = re.findall(r"[a-zA-ZáéíóúÁÉÍÓÚüÜñÑ]+", client_cp_town)[0]

        client_addres = client_addres.strip()
        client_pc = client_pc.strip()
        client_town = client_town.strip()
        client_province = spain_pc[client_pc[:2]]

    else:
        client_addres = None
        client_pc = None
        client_town = None
        client_province = None
    return client_addres, client_pc, client_town, client_province

In [100]:
#Datos empresa

def return_retailer_name_addres_pc_town_province(text, spain_pc, pdf_path):
    # Como los datos de las distribuidoras están agrupados, dividimos por párrafos e intentamos encontrarlas con el cif
    retailer_parrs = [parr.split("\n") for parr in text.split("\n\n") if re.search(r"\bcif\b(?::)?(?:\s*)?", parr, re.IGNORECASE)]

    #Nos quedamos con las líneas que no tienen el cif
    retailer_parrs = [line for parr in retailer_parrs for line in parr if not re.search(r"\bcif\b(?::)?(?:\s*)?", line, re.IGNORECASE)]

    if len(retailer_parrs) == 2:
        retailer_name, retailer_addres = retailer_parrs
        retailer_name = retailer_name.strip()
        retailer_addres = retailer_addres.strip()

        retailer_addres = re.findall(r"(?:social:)?(?:.*?:)?(.*)", retailer_addres, re.IGNORECASE)
        retailer_addres = [match.strip() for match in retailer_addres if match != '']

    else:
        text = extract_text_pypdf2(pdf_path)
        retailer_name = None
        retailer_addres = re.findall(r"(?:social)(?::)?(.[^.;]*)", text, re.IGNORECASE)
        retailer_addres = [match.strip() for match in retailer_addres if match != '']

    #cp comercializadora
    retailer_pc = [re.findall(r"\d{5}+", retailer_addres_element)[0] for retailer_addres_element in retailer_addres if re.search(r"\d{5}+", retailer_addres_element)]
    if retailer_pc:
        retailer_pc = retailer_pc[0]

        #Comprobamos si es una dirección marcada como extranjera
        foreign_retailer = [re.findall(r"extranjero", element, re.IGNORECASE) for element in retailer_addres if re.search(r"extranjero", element, re.IGNORECASE)]

        #En caso de que el cp sea único podremos parsear la dirección, si no la dejaremos [etiqueta](etiqueta)
        retailer_addres, retailer_town = retailer_addres[0].split(retailer_pc)[0:2] #################################### Se añade el slicing [0:2] para evitar errores en listas con más de 2 elementos ####################################
        retailer_town = re.findall(r"[a-zA-ZáéíóúÁÉÍÓÚüÜñÑ]+", retailer_town)[0]
        retailer_addres, retailer_town = retailer_addres.strip(), retailer_town.strip()

        #Si el cp es español encontramos la provincia con el diccionario de cp y si es extranjero le ponemos la etiqueta "Extranjero"
        if not foreign_retailer:
            retailer_province = spain_pc[retailer_pc[:2]]
        else:
            retailer_province = "Extranjero"

    else:
        retailer_pc = None
        retailer_addres = None
        retailer_town = None
        retailer_province = None

    return retailer_name, retailer_addres, retailer_pc, retailer_town, retailer_province

## Extracción de datos

In [104]:
for pdf_path in pdf_paths:

    text = extract_text(pdf_path)
    
    #Extraemos todos los datos
    client_name = return_client_name(text)
    dni = return_dni(text)
    client_addres, client_pc, client_town, client_province = return_client_addres_pc_town_province(text, client_name, spain_pc)
    cif = return_cif(text, pdf_path)
    retailer_name, retailer_addres, retailer_pc, retailer_town, retailer_province = return_retailer_name_addres_pc_town_province(text, spain_pc, pdf_path)
    bill_num = return_bill_num(text)
    initial_date, final_date = return_initial_and_final_date(text)
    total_amount = return_total_amount(text)
    charge_date = return_charge_date(text)
    consumption = return_consumption(text)
    contracted_power = return_contracted_power(text)
    
    #Guardamos todo en el diccionario
    json_data = {}
        
    json_data["nombre_cliente"] = client_name
    json_data["dni_cliente"] = dni
    json_data["calle_cliente"] = client_addres
    json_data["cp_cliente"] = client_pc
    json_data["población_cliente"] = client_town
    json_data["provincia_cliente"] = client_province
    json_data["nombre_comercializadora"] = retailer_name
    json_data["cif_comercializadora"] = cif
    json_data["dirección_comercializadora"] = retailer_addres
    json_data["cp_comercializadora"] = retailer_pc
    json_data["población_comercializadora"] = retailer_town
    json_data["provincia_comercializadora"] = retailer_province
    json_data["número_factura"] = bill_num
    json_data["inicio_periodo"] = initial_date
    json_data["fin_periodo"] = final_date
    json_data["importe_factura"] = total_amount
    json_data["fecha_cargo"] = charge_date
    json_data["consumo_periodo"] = consumption
    json_data["potencia_contratada"] = contracted_power
    
    #Exportamos el json
    json_path = "json_resultados/" + pdf_path.split("/")[-1].replace("pdf", "json")
    
    with open(json_path, 'w') as file:
        json.dump(json_data, file, ensure_ascii=False)

#### Funciones retocadas después de la entrega

- return_initial_and_final_date
- return_client_name
- return_retailer_name_addres_pc_town_province