PREPROCESADO

En este fichero realizo las operaciones iniciales para 
- Conciliar los archivos de 'fichas' (xml) y 'escrituras' (pdf) que tienen la misma raíz en su nombre de archivo y copiarlos juntos en un directorio común, cambiándoles el nombre a un contador numérico de cinco caracteres.
- Para cada par de ficheros: Buscar dentro en el texto extraído de la escritura los datos de la ficha. Si se encuentran, crear un archivo de texto (con el mismo contador por nombre) con el texto de la página que contiene estos datos. En este caso, la página será la portada de la escritura, pues es donde vienen los datos de registro.
- Se genera también un archivo de texto con sufijo _norm.txt con el texto nomralizado. Sin espacios ni guiones repetidos, sin saltos de página y algunas correcciones detectadas en los textos extraídos.
- A continuación se buscan en el texto las ocurrencias de los datos que vamos a extraer de las escrituras y que para el entrenamiento de este modelo podemos extraer de las fichas, se genera una estructura (dataset) con esta información.

ANONIMIZACIÓN
- Se extraen de las fichas los nombres, apellidos, razón social de empresas y nombres de vías para anonimizarlos en los textos de las escrituras, se generan con ellos una serie de diccionarios.
- Se definen las funciones de anonimización, para los datos contenidos en las fichas y para el resto del texto (contexto). De manera que los datos de las fichas pueden ser referenciados en el texto aunque haya varias ocurrencias. En los datos de contexto se anonimizan todos los números de forma aleatoria y las entidades de diccionario se sustituyen por otras del mismo tipo.

GENERACIÓN DEL DATASET
- Se genera el dataset definitivo con los datos anonimizados.
- Se prueba y se generan tres ficheros, que serán los utilizados durante el entrenemiento, la validación y el testeo de los modelos de QA

In [None]:
import os
import PyPDF2
import xml.etree.ElementTree as ET
import json
import random
import string
import re
import shutil
from num2words import num2words
import unicodedata
from globals import RANDOM_APELLIDOS, RANDOM_NOMBRES, RANDOM_EMPRESAS, RANDOM_VIAS, DATA_DIR,PDF_FICHAS_DIR, ESC_DIR, FICH_DIR
import pandas as pd
import concurrent.futures
from enum import Enum, StrEnum, auto
import datetime, time
import ahocorasick # Librería para búsqueda de diccionarios
import traceback
from sklearn.model_selection import train_test_split

In [None]:
MAX_WORKERS = 24 # Número de procesos utilizados en paralelización. Un número excesivo provoca saturación del sistema y ralentización

## Definición de funciones

### Conciliar archivos de escrituras y fichas

In [None]:
# Dejamos en una misma carpeta pdfs y fichas que empiecen por el mismo prefijo
def copiar_archivos_coincidentes(ruta_pdf, ruta_fichas, ruta_destino):
    """Copia los archivos pdf y xml que tengan el mismo prefijo en su nombre de archivo a una sola carpeta.\n
    Los archivos de escritura acaban en '-copia.pdf' y los archivos de ficha acaban en '-ficha.xml'.
    """
    # Asegurarse de que el directorio de destino existe
    if not os.path.exists(ruta_destino):
        os.makedirs(ruta_destino)

    # Obtener la lista de archivos PDF y fichas XML
    archivos_pdf = [f for f in os.listdir(ruta_pdf) if f.endswith('-copia.pdf')]
    archivos_fichas = [f for f in os.listdir(ruta_fichas) if f.endswith('-ficha.xml')]

    # Inicializar el contador para los nombres secuenciales
    contador = 0

    for pdf in archivos_pdf:
        # Obtener el nombre base del archivo PDF (sin '-copia.pdf')
        nombre_base = pdf[:-10]

        # Buscar la ficha correspondiente
        ficha_correspondiente = nombre_base + '-ficha.xml'
        if ficha_correspondiente in archivos_fichas:
            # Construir las rutas completas de los archivos originales
            ruta_origen_pdf = os.path.join(ruta_pdf, pdf)
            ruta_origen_ficha = os.path.join(ruta_fichas, ficha_correspondiente)

            # Construir las rutas de destino con nombres secuenciales
            contador += 1
            nombre_secuencial = f"{contador:05d}"
            destino_pdf = os.path.join(ruta_destino, nombre_secuencial + '.pdf')
            destino_ficha = os.path.join(ruta_destino, nombre_secuencial + '.xml')

            # Copiar los archivos
            shutil.copy2(ruta_origen_pdf, destino_pdf)
            shutil.copy2(ruta_origen_ficha, destino_ficha)
            
    print(f"{2*contador} Archivos copiados.({contador} escrituras)")

### Funciones auxiliares de manejo de textos

In [None]:
def quita_las_tildes(texto):
    """Normaliza el texto a la forma 'NFD' y elimina los caracteres de acento (dejando las eñes)"""
    texto_sin_tildes = ''.join(
            c if c in ['ñ', 'Ñ'] else ''.join(char for char in unicodedata.normalize('NFD', c) if unicodedata.category(char) != 'Mn')
            for c in texto
    )        
    return texto_sin_tildes

#se ha detectado que puede haber un espacio en el mes "j ulio", "mar zo" ... o en el año "veinti dos"
def construir_regex_con_espacios(palabras, ignora_tildes=True):
    # Construir una expresión regular que permite espacios opcionales entre cada letra
    # Si ignora_tildes, además se incluye para cada vocal la posibilidad de tener o no tener tildes
    if ignora_tildes:
        vocales_tildes = {'a':'[aá]', 'e':'[eé]', 'i': '[ií]', 'o': '[oó]', 'u': '[uú]', 
                        'A':'[AÁ]', 'E':'[EÉ]', 'I':'[IÍ]', 'O':'[OÓ]', 'U':'[UÚ]'} 
        #buscamos cualquier vocal en la regex_con_espacios y la cambiamos por su representación con o sin tilde
        patron_vocales = "[aeiouAEIOU]"
        palabras = quita_las_tildes(palabras)
        
    palabras_separadas = palabras.split(' ')
    regex_con_espacios = r'[\W\n]*'.join([r'\b'+ r'[\W\n]*'.join(palabra) +r'\b' for palabra in palabras_separadas])
    if ignora_tildes:
        regex_con_espacios=  re.sub(patron_vocales,lambda x: vocales_tildes.get(x.group(0),x.group(0)), regex_con_espacios)
        
    return regex_con_espacios

# En las fichas, los datos de fecha están en formato YYYY-MM-DD, mientras que en las escrituras se utiliza un formato textualizado como 'siete de marzo de dos mil veinte'. Separaremos el mes y el año para hacer la búsqueda en el documento
def separar_datos_fecha_a_texto(fecha):
    """Recibe una fecha en formato 'YYYY-MM-DD' y devuelve en formato texto (dia, mes,año)"""
    partes = fecha.split('-')
    meses = ['enero', 'febrero', 'marzo', 'abril', 'mayo', 'junio',
             'julio', 'agosto', 'septiembre', 'octubre', 'noviembre', 'diciembre']
    dia = num2words(partes[2])
    mes = meses[int(partes[1]) - 1]
    año = num2words(int(partes[0]), lang='es', to='year')
    return dia, mes, año

def convertir_fecha_a_texto_completo(fecha):
    """Textualiza una fecha en formato YYYY-MM-DD"""
    partes = fecha.split('-')
    meses = ['enero', 'febrero', 'marzo', 'abril', 'mayo', 'junio',
             'julio', 'agosto', 'septiembre', 'octubre', 'noviembre', 'diciembre']
    dia = num2words(int(partes[2]), lang='es')
    mes = meses[int(partes[1]) - 1]
    año = num2words(int(partes[0]), lang='es', to='year')
    return f"{dia} de {mes} de {año}"

def construir_regex_fecha(fecha):
    """A partir de una fecha en formato YYYY-MM-DD, devuelve una expresión regular para encontrarla en formato textual"""
    partes = fecha.split('-')
    meses = ['enero', 'febrero', 'marzo', 'abril', 'mayo', 'junio',
             'julio', 'agosto', 'septiembre', 'octubre', 'noviembre', 'diciembre']
    dia = num2words(int(partes[2]), lang='es')
    mes = meses[int(partes[1]) - 1]
    año = num2words(int(partes[0]), lang='es', to='year')
    return construir_regex_con_espacios(dia) + r'[\n\s]+de[\n\s]+' + construir_regex_con_espacios(mes) + r'[\n\s]+de((l|\s+el)\s+a(n|ñ)o)?' + construir_regex_con_espacios(año)

def construir_regex_apellidos_notario(apellidos):
    return r'\s*(de\s+|y\s+)?'.join(list(map(construir_regex_con_espacios,apellidos.split(' '))))

def construir_regex_notario(nombre, apellidos):
    """Dado que se ha encontrado que a veces los notarios en la escritura no indican su nombre completo (nombres compuestos) en la expresión regular se tratará de encontrar éste nombre"""
    return construir_regex_con_espacios(nombre) + r'(\s*[\w\-]*\s*){0,3}' + construir_regex_apellidos_notario(apellidos)

def reemplazo_segun_caso(match, txt_alternativo):
    """Función de reemplazo de cadenas, tratando de respetar el formato de mayúsculas y minúsculas.\n
    Si match y txt_alternativo no tienen el mismo número de palabras se tendrá en cuenta el caso en el número de palabras de la oración más corta."""
    txt_original = match.group()
    if txt_original=='':
        return ''
    palabras_original = txt_original.split(' ')
    palabras_alternativo = txt_alternativo.split(' ')
    palabras_final = []
    for original, alternativo in zip(palabras_original,palabras_alternativo):
        if original.islower():
            palabras_final.append(alternativo.lower())
        elif original.isupper():
            palabras_final.append(alternativo.upper())
        elif original.istitle():
           palabras_final.append(alternativo.title())
        else:
            palabras_final.append(alternativo)
    # Unimos el resto de palabras del texto alternativo
    palabras_final = palabras_final + palabras_alternativo[len(palabras_original):]
    return ' '.join(palabras_final)

def re_sub_case(patron_a_buscar, txt_alternativo, txt_original, flags = re.IGNORECASE):
    """Función de reemplazo de cadenas, tratando de respetar el formato de mayúsculas y minúsculas"""
    patron_re = re.compile(patron_a_buscar, flags=flags)
    return patron_re.sub(lambda x: reemplazo_segun_caso(x,txt_alternativo), txt_original)

def normalizar_texto(texto,conservar_tildes=False):
    """Procesa el texto para devolver una versión\nSin tildes (opcional)\nSin saltos de página\nSin espacios en blanco repetidos\n
        Sin guiones partiendo ni separando palabras
    """    
    texto_inicial = texto if conservar_tildes else quita_las_tildes(texto)
    # elimina saltos de página y mayúsculas
    texto_sin_saltos = re.sub(r'\n+', ' ', texto_inicial)

    # Buscamos signos de igual repetidos (los usan para separar partes del texto, pero no dan información)
    texto_sin_iguales = re.sub(r'\s*=+\s*', ' ', texto_sin_saltos)

    # Esto evita que se quite la separación entre palabras completas con un guión en medio
    texto_sin_guiones_sueltos = re.sub(r'\s*-+\s*', ' ', texto_sin_iguales)

    # Elimina espacios repetidos
    texto_sin_espacios_repetidos = re.sub(r'\s+', ' ', texto_sin_guiones_sueltos)

    # Elimina espacios antes y después de guiones
    texto_sin_espacios_delante_guion = re.sub(r'(?<=\w) -', '-', texto_sin_espacios_repetidos)
    texto_sin_espacios_detras_guion = re.sub(r'- (?=\w)', '-', texto_sin_espacios_delante_guion)

    # Elimina guiones entre palabras
    texto_sin_guiones = re.sub(r'(?<=\w)-(?=\w)', '', texto_sin_espacios_detras_guion)

    return texto_sin_guiones

def subsanar_errores_de_lectura(texto)->str:
    """Se han detectado errores comunes al leer el texto desde un fichero pdf. Esta función trata de arreglar esos errores"""
    def corregir_meses_años(texto):
        # Corregir meses
        meses = ["enero", "febrero", "marzo", "abril", "mayo", "junio", "julio", "agosto", "septiembre", "octubre", "noviembre", "diciembre"]
        for mes in meses:
            regex = construir_regex_con_espacios(mes)
            texto = re_sub_case(regex, mes, texto)

        # Corregir años entre veintiuno y veintinueve
        year = ["uno", "dos", "trés", "cuatro", "cinco", "séis", "siete", "ocho", "nueve"]
        for num in year:
            regex = construir_regex_con_espacios(f"veinti{num}")
            texto = re_sub_case(regex, f"veinti{num}", texto)

        return texto

    #en este caso se ha detectado que a veces, en la fecha se escribe "mi l" en lugar de "mil"
    texto_corregido = re_sub_case(construir_regex_con_espacios('mil'), 'mil', texto)
    texto_corregido = corregir_meses_años(texto_corregido)    

    return texto_corregido

def extraer_textos_del_pdf(ruta_pdf) -> [str]:
    """Extrae el texto de todas las páginas.\nDevuelve una lista con el texto de cada página."""
    textos = []
    with open(ruta_pdf, 'rb') as archivo:
        lector = PyPDF2.PdfReader(archivo)
        textos = [pagina.extract_text() for pagina in lector.pages]
        
    return textos            

def extraer_de_pdf_pagina_con_regex(ruta_pdf, lista_regex):
    """Devuelve el texto de aquella página del documento pdf donde se encuentren todas las coincidencias con la lista de expresiones regulares 'lista_regex'"""
    with open(ruta_pdf, 'rb') as archivo:
        lector = PyPDF2.PdfReader(archivo)
        for i in range(len(lector.pages)):
            pagina = lector.pages[i]
            texto_pagina = pagina.extract_text()
            texto_pagina_normalizado_y_corregido = subsanar_errores_de_lectura(normalizar_texto(texto_pagina))
            if all(re.search(regex, texto_pagina_normalizado_y_corregido,re.IGNORECASE) is not None 
                   for regex in lista_regex):
                return texto_pagina
    
    return None

def extraer_de_pdf_pagina_con_textos(ruta_pdf, lista_textos):
    """Abre el fichero pdf encontrado en ruta_pdf y busca los textos de la lista_textos.\n
    Devuelve el texto original de la página que contiene todos los textos buscados.\n
    Si no encuentra todos los textos de lista_textos entonces devuelve None
    """
    texto_buscado = [normalizar_texto(texto) for texto in lista_textos]

    with open(ruta_pdf, 'rb') as archivo:
        lector = PyPDF2.PdfReader(archivo)
        for i in range(len(lector.pages)):
            pagina = lector.pages[i]
            texto_pagina = pagina.extract_text()
            texto_pagina_normalizado_y_corregido = subsanar_errores_de_lectura(normalizar_texto(texto_pagina))
            if all(re.search(construir_regex_con_espacios(texto), texto_pagina_normalizado_y_corregido, re.IGNORECASE) is not None for texto in texto_buscado):
                return texto_pagina
    
    return None

### Extraer el texto de la portada que contiene los datos de registro

In [None]:
def get_xml_namespace(root):
    return {'ns': re.match(r'\{(.*)\}',root.tag).group(1)}
    
def extraer_txt_portada_escritura(ruta_documento_xml):
    """Lee las páginas del PDF asociado hasta que aparezca una página donde esté el nombre del notario y la fecha (esta es la carátula).\n
        Crea un .txt con el texto de sólo esa página, con el mismo nombre base del documento xml."""
    if os.path.splitext(ruta_documento_xml)[1]=='.xml':
        ruta_carpeta = os.path.dirname(ruta_documento_xml)
        archivo = os.path.basename(ruta_documento_xml)

        nombre_txt = archivo.replace('.xml', '.txt')
        ruta_txt = os.path.join(ruta_carpeta, nombre_txt)
        nombre_txt_normalizado = archivo.replace('.xml', '_norm.txt')
        ruta_txt_normalizado = os.path.join(ruta_carpeta, nombre_txt_normalizado)
        # Comprobar si el archivo .txt ya existe
        if os.path.exists(ruta_txt):
            return
        
        tree = ET.parse(ruta_documento_xml)
        root = tree.getroot()
        namespace = get_xml_namespace(root)

        apellidos_notario = root.find('.//ns:CABECERA/ns:NOT/ns:NOTARIO/ns:APELLIDOS', namespace).text
        fecha_autorizacion = root.find('.//ns:DATOS/ns:DOCUMENTO/ns:IDE_DOC/ns:FECHA_AUTORIZACION', namespace).text

        dia_texto, mes_texto, año_texto = separar_datos_fecha_a_texto(fecha_autorizacion)

        # Nombre del archivo PDF asociado
        nombre_pdf = archivo.replace('.xml', '.pdf')                    
        ruta_pdf = os.path.join(ruta_carpeta, nombre_pdf)

        # Buscar texto en PDF
        #texto_buscado = [nombre_notario, apellidos_notario, mes_texto, año_texto]        
        #Voy a quitar de la búsqueda el nombre del notario, porque a veces los notarios no ponen su nombre completo
        #Tampoco uso el formato de fecha completa, porque a veces ponen 5 de enero de 2020 y otras veces 5 de enero del año 2020, con mes y año tenemos bastante
        texto_buscado = [apellidos_notario, mes_texto, año_texto]
        # regex_buscados = [regex_apellidos_notario, regex_fecha]
        texto_encontrado = extraer_de_pdf_pagina_con_textos(ruta_pdf, texto_buscado)
        # texto_encontrado = extraer_de_pdf_pagina_con_regex(ruta_pdf, regex_buscados)

        if texto_encontrado:
            with open(ruta_txt, 'w', encoding='utf-8') as archivo_txt:
                archivo_txt.write(texto_encontrado)
            #Guardo también una copia "normalizada y subsanada" para ver si es más efectiva la extracción con los datos depurados o aprende más con los datos crudos
            with open(ruta_txt_normalizado, 'w', encoding='utf-8') as archivo_norm_txt:
                archivo_norm_txt.write(normalizar_texto(subsanar_errores_de_lectura(texto_encontrado),conservar_tildes=True))
        else:
            # La copia _no_encontrado está pensada para revisar el texto y ver por qué no se han encontrado los datos
            ruta_no_encontrado = os.path.join(ruta_carpeta, "_no_encontrado_"+nombre_txt)
            with open(ruta_no_encontrado,'w', encoding='utf-8') as archivo_no_encontrado:
                archivo_no_encontrado.write('\n'.join(extraer_textos_del_pdf(ruta_pdf)))

# Automatización para hacerlo a toda la carpeta
def extraer_txt_portada_escrituras(ruta_carpeta):
    """Busca los datos del xml en el pdf y si los encuentra crea un txt con el texto de esa página"""    
    # Ejecución en paralelo para ahorrar tiempo
    archivos_xml = [os.path.join(ruta_carpeta,archivo) for archivo in os.listdir(ruta_carpeta) if archivo.endswith('.xml')]
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as ejecutor:
        ejecutor.map(extraer_txt_portada_escritura,archivos_xml)


In [None]:
#Función auxliar para buscar los .txt que no se han procesado
def ficheros_faltantes(directorio):
    # Obtener la lista de archivos en el directorio
    archivos = os.listdir(directorio)

    # Filtrar archivos .txt que siguen el patrón de nombres
    # archivos_txt = [archivo for archivo in archivos if archivo.endswith('.txt') and archivo[:-4].isdigit()]
    archivos_txt = [archivo for archivo in os.listdir(directorio) 
                    for nombre, extension in (os.path.splitext(archivo),) if nombre.isdigit() and extension=='.txt']
    # Extraer los números de los archivos y convertirlos a enteros
    numeros_archivos = [int(archivo.split('.')[0]) for archivo in archivos_txt]

    # Encontrar los números faltantes
    if numeros_archivos:
        max_numero = max(numeros_archivos)
        numeros_faltantes = set(range(1, max_numero + 1)) - set(numeros_archivos)
        return sorted(list(numeros_faltantes))
    else:
        return "No hay archivos .txt que sigan el patrón en el directorio"

 ## PREPROCESADO DE LOS FICHEROS

In [None]:
# Conciliar los archivos de 'fichas' (xml) y 'escrituras' (pdf) que tienen la misma raíz en su nombre de archivo y copiarlos juntos a un directorio común.
copiar_archivos_coincidentes(ESC_DIR,FICH_DIR,PDF_FICHAS_DIR)

In [None]:
# Buscar dentro en el texto extraído de la escritura los datos de la ficha, si se encuentran, crear un archivo de texto (autonumerado) con el texto de la página que contiene estos datos. En este caso, la página será la portada de la escritura, pues es donde vienen los datos de registro. Se crea un segundo archivo normalizado.
extraer_txt_portada_escrituras(PDF_FICHAS_DIR)

In [None]:
#PRUEBA
# # Compruebo cuántas escrituras se han quedado sin archivo de texto, para comprobar cuál puede ser el motivo y mejorar la función de extracción.
# faltantes = ficheros_faltantes(os.path.join(PDF_FICHAS_DIR))
# print(f"{len(faltantes)} faltantes\n",faltantes)

## Preparación del dataset

En primer lugar, busco el tipo de documento de cada escritura. En las fichas no aparece especificado, así que hay que buscarlo de forma textual

In [None]:
# Defino estos tipos fuera por ser constantes y no tener que volver a definirlos en cada llamada
TIPOS_ESCRITURA = ['compraventa', 'herencia', 'herencias', 'dación', 'daciones', 'donación', 'donaciones', 'cesión',
                       'disolución', 'extinción', 'legado', 'legados', 'liquidación', 'cesión', 'permuta']
RE_TIPOS_ESCRITURA = '('+ '|'.join(list(map(construir_regex_con_espacios,TIPOS_ESCRITURA))) + ')'
# Como algunos tipos de documento aparecen en singular y plurar, los juntaré en uno solo
TIPOS_BASE = {
    'HERENCIAS': 'HERENCIA', 
    'DONACIONES': 'DONACION', 
    'LEGADOS': 'LEGADO'
}

#Función para buscar el tipo de documento de un archivo con el texto de la escritura
def extraer_datos_tipo_documento(ruta_archivo_txt=None, contenido_archivo=None, incluye_texto=False):
    """Lee los datos de ruta_archivo_txt y busca el tipo de documento. Si el archivo ya ha sido abierto y se dispone del texto, puede pasarse en contenido_archivo para mejorar la eficiencia y evitar lecturas de disco.
    """
    # Procesar archivo TXT para descripción
    if contenido_archivo:
        contenido = contenido_archivo
        numero_archivo = -1
    elif ruta_archivo_txt:
        numero_archivo, extension = os.path.splitext(os.path.basename(ruta_archivo_txt))     
        assert extension =='.txt', "La extracción de los datos de tipo de documento ha de hacerse en archivos de texto"
        with open(ruta_archivo_txt, 'r', encoding='utf-8') as file:
            contenido = file.read()        
    else:
        raise ValueError("Debe aportarse una ruta de archivo o el contenido del mismo.")
    
    # Buscar con expresión regular los tipos de escritura
    descripcion = re.search(RE_TIPOS_ESCRITURA + r'.*?(?=[\-.,»>_])', contenido, re.IGNORECASE | re.DOTALL)
    if descripcion:
        texto_encontrado = descripcion.group(1)
        posicion_texto_encontrado = descripcion.start(1)
        #Si hemos conseguido encontrar el tipo de escritura, generamos el tipo de documento normalizado
        match = re.search(RE_TIPOS_ESCRITURA , texto_encontrado, re.IGNORECASE)        
        tipo_documento = normalizar_texto(match.group().strip().upper()).replace(" ", "")        
    else:
        texto_encontrado = ""
        tipo_documento = ""                
        posicion_texto_encontrado = -1

    tipo_documento = TIPOS_BASE.get(tipo_documento,tipo_documento)
    
    resultado = {
        'numero': numero_archivo,
        'tipo_documento': tipo_documento,
        'texto_encontrado' : texto_encontrado,
        'posicion_texto_encontrado' : posicion_texto_encontrado    
    }           
    if incluye_texto:
        resultado['texto'] = contenido

    return resultado

#Necesito comprobar todos los tipos de documento que hay, con su número asociado en la ficha xml
def extraer_datos_tipo_documentos(directorio):
    """Genera una lista con información sobre los tipos de documento encontrados en el directorio a partir de los ficheros .xml y sus correspondientes .txt.\n
    'numero' es el número del archivo (en cadena de cinco caracteres)\n
    'tipo_documento' es el tipo de documento normalizado\n
    'texto_encontrado' es cómo se lee en el archivo, puede tener defectos de formato\n
    'posicion_texto_encontrado' es la posición dentro del texto donde se ha encontrado el tipo de documento.    
    'texto' se incluye el texto completo para funciones de depuración
    """
    datos = []
    archivos_xml = [archivo for archivo in os.listdir(directorio) if archivo.endswith('.xml')]    
    
    for archivo in sorted(archivos_xml):        
        ruta_archivo_txt = os.path.join(directorio, archivo.replace('.xml','.txt'))
        # #Si no tenemos txt es que no hemos podido encontrar todos los campos en este documento       
        if not os.path.exists(ruta_archivo_txt):
            continue            
        datos.append(extraer_datos_tipo_documento(ruta_archivo_txt,incluye_texto=True))
    return datos

In [None]:
#PRUEBA
# # Dejamos los datos del tipo de documento guardados en esta variable
# datos_tipo_documento = extraer_datos_tipo_documentos(PDF_FICHAS_DIR)

# # Muestro una pequeña estadística para ver la distribución de los tipos de documento.
# tipos = {}
# for d in datos_tipo_documento:
#     tipo = d['tipo_documento']
#     tipos[tipo] = tipos.get(tipo,0) + 1 #guardamos el número de referencias encontradas    

# for clave,valor in sorted(tipos.items(), key=lambda item: item[1], reverse=True):
#     print(valor, clave)

In [None]:
#PRUEBA
# # Dejamos los datos del tipo de documento guardados en esta variable
# datos_tipo_documento = extraer_datos_tipo_documentos(PDF_FICHAS_DIR)

# # Muestro una pequeña estadística para ver la distribución de los tipos de documento.
# tipos = {}
# for d in datos_tipo_documento:
#     # tipo = d['tipo_documento']
#     tipo = d['texto_encontrado']
#     tipos[tipo] = tipos.get(tipo,0) + 1 #guardamos el número de referencias encontradas    

# for clave,valor in sorted(tipos.items(), key=lambda item: item[1], reverse=True):
#     print(valor, clave)

In [None]:
#PRUEBA
#Comprobación para ver los tipos no identificados
# with open(os.path.join(PDF_FICHAS_DIR,"_sin_tipo_identificado.txt"),'w',encoding='utf-8') as salida:    
#     for d in [d for d in datos_tipo_documento if d['tipo_documento']==""]:
#         # print(d['texto'])
#         salida.write(d['texto'])        

#### Funciones auxiliares para la preparación del dataset

In [None]:
def buscar_en_texto(re_busqueda, texto, grupo = 0):
    """Función que busca la espresión regular 're_busqueda' en 'texto' y devuelve la posición de inicio y el texto encontrado en una lista de diccionarios [{'answer_start':int, 'text':str}] para el grupo indicado y cada una de las ocurrencias encontradas. De este modo, puede elegirse si de toda la expresión buscada sólo quiere localizarse una parte del texto como respuesta.\n
    Si no encuentra el texto devuelve [{'answer_start':-1, 'text':""}]"""    
    coincidencias = []    
    for match in re.finditer(re_busqueda, texto,flags=re.IGNORECASE):
        coincidencias.append({"answer_start": match.start(grupo), "text": match.group(grupo)})
    
    if len(coincidencias)==0:
        coincidencias.append({"answer_start": -1, "text": ""})

    return coincidencias

class Tipo_Contenido(StrEnum):
    """Enumerado para trabajar con los tipos texto para diferenciar las preguntas del contexto"""
    # Tipos de preguntas #
    PROTOCOLO = auto()
    FECHA = auto()
    NOTARIO = auto()
    TIPO_DOCUMENTO = 'tipo'
    # Tipos que no están codificados como preguntas #
    CONTEXT = auto()    
    
    @staticmethod
    def tipo(question:str)->StrEnum|None:
        """Indica de qué tipo es esta pregunta"""
        for tipo_c in Tipo_Contenido:
            if tipo_c in [Tipo_Contenido.PROTOCOLO, Tipo_Contenido.FECHA, Tipo_Contenido.NOTARIO, Tipo_Contenido.TIPO_DOCUMENTO] \
                and tipo_c in question.lower():
                return tipo_c            
        # Si no es un tipo dentro de los esperados para preguntas, es None
        return None
    @staticmethod
    def pregunta(tipo:StrEnum):
        """Devuelve la pregunta que debe hacerse para ser de este tipo, si el tipo no está pensado para ser preguntado, devuelve None"""
        if tipo == Tipo_Contenido.PROTOCOLO:
            return "¿cuál es el número de protocolo?"
        elif tipo == Tipo_Contenido.NOTARIO:
            return "¿qué notario ha firmado el documento?"
        elif tipo == Tipo_Contenido.FECHA:
            return "¿en qué fecha se ha firmado el documento?"
        elif tipo == Tipo_Contenido.TIPO_DOCUMENTO:
            return "¿cuál es el tipo de documento?"
        else:
            return None
        
def generar_datos_QA(ruta_archivo_xml, usar_txt_normalizado=False,xml_precargado=None, texto_precargado=None):    
    """Dado un archivo con los datos de la ficha (en xml) busca las coincidencias en el archivo de texto extraido de la escritura, y devuelve un objeto diccionario con los datos para el entrenamiento de QA\n
    El objeto devuelto tiene la siguiente estructura {'id_documento':str, 'context':str, 'qas':list({'question':str, 'answers':list({'answer_start':int, 'text':str})})}\n
    Si no existe fichero txt asociado al xml con el texto de la escritura, se devuelve None"""
    ruta, nombre_archivo  = os.path.split(os.path.splitext(ruta_archivo_xml)[0]) # omito la extensión
    try:
        if texto_precargado is None:
            extension = '_norm.txt' if usar_txt_normalizado else '.txt'
            with open(os.path.join(ruta, nombre_archivo+extension), 'r', encoding='utf-8') as txt_file:
                texto_pagina = txt_file.read()
        else:    
            texto_pagina = texto_precargado
    except:
        #Si no hay archivo de texto no podemos generar datos
        return None

    try:
        if xml_precargado is None:
            arbol = ET.parse(ruta_archivo_xml)
        else:
            arbol = xml_precargado

        raiz = arbol.getroot()        
        namespace = get_xml_namespace(raiz)

        numero_protocolo = num2words(int(raiz.find(".//ns:DATOS/ns:DOCUMENTO/ns:IDE_DOC/ns:NUMERO_PROTOCOLO", namespace).text),lang='es')
        re_protocolo = construir_regex_con_espacios(numero_protocolo)
        # Tomo sólo el primer nombre del notario, para tratar de pillar los que tienen nombre compuesto y sólo usan el primero
        # en la expresión regular se permite la localización de más nombres entre el primer nombre y los apellidos
        nombre_notario = raiz.find(".//ns:CABECERA/ns:NOT/ns:NOTARIO/ns:NOMBRE", namespace).text.split(' ')[0]
        apellidos_notario =  raiz.find(".//ns:CABECERA/ns:NOT/ns:NOTARIO/ns:APELLIDOS", namespace).text
        re_notario = construir_regex_notario(nombre_notario, apellidos_notario)
        re_fecha_documento = construir_regex_fecha(raiz.find(".//ns:DATOS/ns:DOCUMENTO/ns:IDE_DOC/ns:FECHA_AUTORIZACION", namespace).text)          

        preguntas = [
                (Tipo_Contenido.pregunta(Tipo_Contenido.PROTOCOLO), re_protocolo),
                (Tipo_Contenido.pregunta(Tipo_Contenido.NOTARIO), re_notario),
                (Tipo_Contenido.pregunta(Tipo_Contenido.FECHA), re_fecha_documento)
            ]
        
        escritura = {
            'id_documento': nombre_archivo,
            'context': texto_pagina,
            'qas':[]
        }
        for pregunta, re_busqueda in preguntas:
            escritura['qas'].append(
                {
                    'question': pregunta,
                    'answers': buscar_en_texto(re_busqueda, texto_pagina)
                }
            )
        #La pregunta del tipo de documento es algo más complicada
        tipo_documento = extraer_datos_tipo_documento(contenido_archivo=texto_pagina)    
        escritura['qas'].append(
            {
                'question': Tipo_Contenido.pregunta(Tipo_Contenido.TIPO_DOCUMENTO),
                'answers': [{'answer_start': tipo_documento['posicion_texto_encontrado'], 
                            'text': tipo_documento['texto_encontrado']}]            
            }
        )
    except Exception as e:
        print(f"EXCEPCIÓN en generar_datos_QA({ruta_archivo_xml})\n",e)
        return None
    
    return escritura

In [None]:
# #PRUEBA
# # Prueba de extracción de datos
# datos = generar_datos_QA(os.path.join(PDF_FICHAS_DIR, "03601.xml"),True)
# print(datos['context'],'\n')
# for qa in datos['qas']:
#     print(qa['question'], qa['answers'])

In [None]:
def generar_dataset_QA(directorio_origen, ruta_destino):
    """Genera un archivo .json con el dataset de los archivos de escritura.\n
    El fichero contendrá un objeto data con una lista de escrituras, cada escritura tiene los datos que produce la función 'gererar_datos_QA'"""
    resultado = {'data':[]}
    # Para iterar utilizamos las fichas (archivos xml)
    lista_archivos = [os.path.join(directorio_origen, a) for a in os.listdir(directorio_origen) if a.endswith('.xml')]
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as ejecutor:    
        # Ejecuciones guarda un objeto Future por cada archivo xml, con los resultados de su ejecución
        ejecuciones = {ejecutor.submit(generar_datos_QA, archivo_xml):archivo_xml for archivo_xml in lista_archivos}
        for ejecucion in concurrent.futures.as_completed(ejecuciones): # conforme se van completando van apareciendo            
            escritura = ejecucion.result()
            if escritura is not None:
                resultado['data'].append({'escritura':escritura})
    
    with open(ruta_destino, 'w', encoding='utf-8') as outfile:    
        json.dump(resultado, outfile, ensure_ascii=False)

In [None]:
# #PRUEBA
# generar_dataset_QA(PDF_FICHAS_DIR,os.path.join(DATA_DIR,'Inicial','dataset_QA.json'))

## ANONIMIZACIÓN

Genero una serie de archivos, con los datos de nombres, apellidos, nombres de empresa y vías a partir de los datos de las fichas. Éstos deben ser los datos sensibles que contienen todas las escrituras utlizadas

In [None]:
def extraer_datos_ficha(ruta_ficha_xml):
    """Función para sacar todos los datos críticos de una ficha. Devuelve un diccionario con {'tipo','entrada'}"""    
    try:
        arbol = ET.parse(ruta_ficha_xml)
        raiz = arbol.getroot()
        namespaces = {'ns': re.match(r'\{(.*)\}',raiz.tag).group(1)}
        
        datos = []
        # Comprobamos primero si es una persona física (tiene campo NOM)
        for per in  raiz.findall('.//ns:PER', namespaces):
            nom = per.find('.//ns:NOM', namespaces)
            # Buscar y guardar los apellidos
            ape1 = per.find('.//ns:APE1_RAZ_SOC', namespaces)
            ape2 = per.find('.//ns:APE2', namespaces)
            # Si tiene nombre es persona, guardamos sus apellidos
            if nom is not None and nom.text:
                datos.append({
                    'tipo': 'nombre',
                    'entrada': nom.text.strip()
                })             
                if ape1 is not None and ape1.text :
                    datos.append({
                        'tipo': 'apellido',
                        'entrada':ape1.text.strip()
                    })
                if ape2 is not None and ape2.text:
                    datos.append({
                        'tipo': 'apellido',
                        'entrada':ape2.text.strip()
                    })
            # Si no, será empresa, sólo guardo su razón social
            elif ape1 is not None and ape1.text: 
                datos.append({
                    'tipo': 'empresa',
                    'entrada':ape1.text.strip()
                })
        #Lo mismo, pero ahora para las direcciones
        for via in raiz.findall('.//ns:VIA', namespaces):
            datos.append({
                'tipo': 'via',
                'entrada': via.text.strip()
            })  
    except ET.ParseError:
        print(f"Error al parsear el archivo: {ruta_ficha_xml}")
    
    return datos

# Voy a probar a paralelizar la lectura de los ficheros para luego guardar todos los datos sin repetición en los ficheros de datos.
def generar_ficheros_de_datos(ruta_ficheros_xml, fichero_nombres, fichero_apellidos, fichero_empresas, fichero_vias):
    lista_archivos_xml = [os.path.join(ruta_ficheros_xml,archivo) for archivo in os.listdir(ruta_ficheros_xml) if archivo.endswith('.xml')]
    
    set_nombres = set()
    set_apellidos = set()
    set_empresas = set()
    set_vias = set()

    with concurrent.futures.ThreadPoolExecutor() as ejecutor:
        # Ejecuciones guarda un objeto Future por cada archivo xml, con los resultados de su ejecución
        ejecuciones = {ejecutor.submit(extraer_datos_ficha, archivo_xml):archivo_xml for archivo_xml in lista_archivos_xml}
        for ejecucion in concurrent.futures.as_completed(ejecuciones): # conforme se van completando van apareciendo
            datos = ejecucion.result()
            for d in datos:
                # Si es una empresa o una calle guardamos la cadena entera, si no, por partes
                if d['tipo']== 'nombre':                    
                    set_nombres.add(d['entrada'])
                elif d['tipo']== 'apellido':
                    set_apellidos.add(d['entrada'])
                elif d['tipo']== 'empresa':
                    set_empresas.add(d['entrada'])                
                elif d['tipo']== 'via':
                    set_vias.add(d['entrada'])
                else:
                    print(f"DEBUG: Se ha encontrado un tipo no manejado {d['tipo']}")

    with open(fichero_nombres,'w',encoding='utf-8') as f_nombres, \
        open(fichero_apellidos,'w',encoding='utf-8') as f_apellidos, \
        open(fichero_vias,'w',encoding='utf-8') as f_vias, \
        open(fichero_empresas,'w',encoding='utf-8') as f_empresas:        
        def add_new_line(x): 
            return x+'\n' 
        def guarda_fichero(conjunto_datos,fichero):            
            lista_con_saltos_de_linea = list(map(add_new_line,list(conjunto_datos)))
            lista_con_saltos_de_linea[-1] = lista_con_saltos_de_linea[-1].strip()
            fichero.writelines(lista_con_saltos_de_linea)        

        guarda_fichero(set_nombres,f_nombres)
        guarda_fichero(set_apellidos,f_apellidos)
        guarda_fichero(set_empresas,f_empresas)
        guarda_fichero(set_vias,f_vias)        

### Generación de los ficheros de datos (Diccionarios)

In [None]:
generar_ficheros_de_datos(PDF_FICHAS_DIR,RANDOM_NOMBRES, RANDOM_APELLIDOS,RANDOM_EMPRESAS,RANDOM_VIAS)

In [None]:
# Defino funciones para anonimizar los nombres y apellidos
DATA_NOMBRES = []
DATA_APELLIDOS = []
DATA_EMPRESAS = []
DATA_VIAS = []

def cargar_aleatorios():
    global DATA_NOMBRES
    global DATA_APELLIDOS
    global DATA_EMPRESAS
    global DATA_VIAS

    with open(RANDOM_NOMBRES, 'r', encoding='utf-8') as archivo:
        DATA_NOMBRES = archivo.readlines()
    with open(RANDOM_APELLIDOS, 'r', encoding='utf-8') as archivo:
        DATA_APELLIDOS = archivo.readlines()
    with open(RANDOM_EMPRESAS, 'r', encoding='utf-8') as archivo:
        DATA_EMPRESAS = archivo.readlines()
    with open(RANDOM_VIAS, 'r', encoding='utf-8') as archivo:
        DATA_VIAS = archivo.readlines()

cargar_aleatorios()

def obtener_nombre_aleatorio():
    return random.choice(DATA_NOMBRES).strip()

def obtener_apellido_aleatorio():
    return random.choice(DATA_APELLIDOS).strip()

def obtener_empresa_aleatoria(empresa):    
    aleatoria = empresa.split(' ')
    for orden in range(len(aleatoria)):
        busca = orden
        while busca>=0:            
            try:
                aleatoria[busca] = random.choice(DATA_EMPRESAS).strip().split(' ')[busca]
                break
            except:
                busca-=1
    return ' '.join(aleatoria)

def obtener_via_aleatoria():
    return random.choice(DATA_VIAS).strip()

def obtener_nif_aleatorio(texto = "00000000A"):
    resultado = ""
    for char in texto:
        if char.isalpha():
            resultado += random.choice(string.ascii_letters.upper())
        elif char.isdigit():
            resultado += random.choice(string.digits)
        else:
            resultado += char
    return resultado

In [None]:
def obtener_persona_aleatoria(nombre_completo):
    palabras = nombre_completo.split()
    num_palabras = len(palabras)
    
    persona = nombre_completo 
    if num_palabras == 1: #Raro, pero bueno        
        persona = obtener_nombre_aleatorio()
    elif num_palabras == 2:
        # Un nombre y un apellido
        persona =  obtener_nombre_aleatorio() + ' ' + obtener_apellido_aleatorio()
    elif num_palabras>2:
        # Más de dos palabras: nombres y dos apellidos combinados
        nombres_a_usar = num_palabras - 2
        persona = ' '.join([obtener_nombre_aleatorio() for _ in range(nombres_a_usar)])
        persona += ' ' + ' '.join([obtener_apellido_aleatorio() for _ in range(2)])            
    return re_sub_case('.*',persona,nombre_completo,re.IGNORECASE+re.DOTALL)

def obtener_fecha_aleatoria():
    start_date = datetime.date(2020, 1, 1)
    end_date = datetime.date(2023, 12, 31)
    # Generar una fecha aleatoria entre start_date y end_date
    tiempo_entre_fechas = end_date - start_date
    dias_aleatorios = random.randrange(tiempo_entre_fechas.days)
    fecha_aleatoria = start_date + datetime.timedelta(days=dias_aleatorios)
    return convertir_fecha_a_texto_completo(fecha_aleatoria.strftime('%Y-%m-%d'))

def obtener_protocolo_aleatorio():
    return num2words(random.randrange(1,10000),lang='es')

def obtener_texto_aleatorio(texto):   
    """Esta función cambia cada letra o número del texto por una letra o número aleatorios.\n
    No conserva ningún sentido, pero mantiene los saltos de página, los guiones y signos de puntuación.""" 
    resultado = ""
    for char in texto:
        if char.isalpha():
            resultado += random.choice(string.ascii_letters.upper())
        elif char.isdigit():
            resultado += random.choice(string.digits)
        else:
            resultado += char
    return re_sub_case('.*',resultado,texto,re.IGNORECASE+re.DOTALL)

In [None]:
class Tipo_Diccionario(Enum):
    NOMBRE  = auto()
    APELLIDO = auto()
    VIA = auto()
    EMPRESA = auto()

"""Definición para poder tener a mano los datos de cada fichero, buscando por tipo"""
DICCIONARIO = {
        Tipo_Diccionario.NOMBRE: DATA_NOMBRES, 
        Tipo_Diccionario.APELLIDO: DATA_APELLIDOS,
        Tipo_Diccionario.EMPRESA: DATA_EMPRESAS,
        Tipo_Diccionario.VIA: DATA_VIAS
    }

def buscar_en_diccionario(diccionario, texto):
    """Busca TODAS las ocurrencias de las palabras del diccionario en el texto"""
    automaton = ahocorasick.Automaton()
    for idx, key in enumerate([d.strip() for d in diccionario if len(d.strip())>3]):
        # automaton.add_word(key.lower(), (idx, key))
        automaton.add_word(key.lower(), key)

    automaton.make_automaton()
    resultado = []
    for end_index, key in automaton.iter(texto.lower()):
        # El algoritmo busca subcadenas, así que voy a usar regex sobre este subconjunto de coincidencias para buscar palabras enteras
        matches = re.finditer(r'\b'+key+r'\b',texto,re.IGNORECASE)
        for match in matches:
            ocurrencia = {
                'start_index': match.start(),
                'text': match.group()
            }            
            resultado.append(ocurrencia)
    resultado.sort(key=lambda x: (x['start_index'], -len(x['text'])))
    
    # Quito los resultados que están contenidos en otros más grades
    def esta_contenido(resultado, otros):
        inicio = resultado['start_index']
        fin = resultado['start_index'] + len(resultado['text'])                
        for otro in otros:
            otro_inicio = otro['start_index']
            otro_fin = otro_inicio + len(otro['text'])
            if inicio >= otro_inicio and fin <= otro_fin and resultado['text'] != otro['text']:            
                return True                
        return False
    
    resultados_filtrados = [r for r in resultado if not esta_contenido(r,resultado)]

    return resultados_filtrados

In [None]:
def obtener_aleatorio(tipo_diccionario, texto=None):
    if tipo_diccionario == Tipo_Diccionario.NOMBRE:
        return obtener_nombre_aleatorio()
    elif tipo_diccionario == Tipo_Diccionario.APELLIDO:
        return obtener_apellido_aleatorio()
    elif tipo_diccionario == Tipo_Diccionario.EMPRESA:
        return obtener_empresa_aleatoria(texto)
    elif tipo_diccionario == Tipo_Diccionario.VIA:
        return obtener_via_aleatoria()

def anonimiza_con_diccionario(tipo_diccionario, texto):
    partes_nuevo_texto = []
    
    coincidencias = buscar_en_diccionario(DICCIONARIO.get(tipo_diccionario),texto)
    if len(coincidencias)==0:
        return texto
    # Si no hay coincidencias se devuelve el mismo texto, en otro caso, vamos sustituyendo
    ultimo_index = 0
    for coincidencia in coincidencias:
        partes_nuevo_texto.append(texto[ultimo_index:coincidencia['start_index']])
        partes_nuevo_texto.append(obtener_aleatorio(tipo_diccionario, coincidencia['text']))
        ultimo_index = coincidencia['start_index'] + len(coincidencia['text']) 
    # Guardar el último
    partes_nuevo_texto.append(texto[ultimo_index:])

    return ''.join(partes_nuevo_texto)
        

def anonimiza(tipo_contenido:Tipo_Contenido, texto:str):
    if texto == "":
        return ""
    
    if tipo_contenido == Tipo_Contenido.PROTOCOLO:
        return obtener_protocolo_aleatorio()
    elif tipo_contenido == Tipo_Contenido.FECHA:
        return obtener_fecha_aleatoria()
    elif tipo_contenido == Tipo_Contenido.NOTARIO:
        return obtener_persona_aleatoria(texto)
    elif tipo_contenido == Tipo_Contenido.TIPO_DOCUMENTO:
        return texto #No anonimizo el tipo
    else:
        # Anonimizar un contexto
        #Buscar números
        texto = re.sub(r'\d+',lambda match:obtener_texto_aleatorio(match.group()),texto)
        #Buscar correos electrónicos
        texto = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}', lambda match:obtener_texto_aleatorio(match.group()), texto)
        #Buscar web
        texto = re.sub(r'[A-Za-z0-9._%+-:\/\/]+\.(com|es|eu|org|net|info|biz|cat|edu|gov)\b', lambda match:obtener_texto_aleatorio(match.group()), texto)        
        #Buscar Empresas
        texto = anonimiza_con_diccionario(Tipo_Diccionario.EMPRESA, texto)
        #Buscar Personas (nombres / apellidos)
        texto = anonimiza_con_diccionario(Tipo_Diccionario.APELLIDO, texto)
        texto = anonimiza_con_diccionario(Tipo_Diccionario.NOMBRE, texto)    
        #Buscar calles
        texto = anonimiza_con_diccionario(Tipo_Diccionario.VIA, texto)
        return texto
    

In [None]:
#Código para generar el dataset anonimizando el contexto y respetando los espacios donde haya datos, que también serán anonimizados    
def generar_datos_QA_anonimizado(archivo_xml, xml_precargado=None, texto_precargado=None):
    escritura = generar_datos_QA(archivo_xml, usar_txt_normalizado=True,xml_precargado=xml_precargado,texto_precargado=texto_precargado)
    # Localizar: crear una lista con datos=[{'answer_start', 'text', 'type'}] y ordenarla por 'answer_start'
    datos = []
    for qa in escritura['qas']:
        for answer in qa['answers']:
            tipo = Tipo_Contenido.tipo(qa['question'])
            datos.append({
                'answer_start': answer['answer_start'],
                'text': answer['text'],
                'type': tipo
            })
    # Ordenar 'datos' por 'answer_start'
    datos.sort(key=lambda x: x['answer_start'])

    # Separar: crear una lista partes=[{'type','text'}] que guarde de forma ordenada el 'context' fracturado según los datos y el espacio entre datos
    partes = []
    ultimo_fin = 0

    for dato in datos:
        # Añadir el texto del contexto anterior si es necesario
        if dato['answer_start'] > ultimo_fin:
            partes.append({
                'type': Tipo_Contenido.CONTEXT,
                'text': escritura['context'][ultimo_fin:dato['answer_start']]

            })
        # Añadir la parte de la respuesta
        inicio_respuesta = dato['answer_start']
        fin_respuesta = inicio_respuesta + len(dato['text'])
        partes.append({
            'type': dato['type'],
            'text': escritura['context'][inicio_respuesta:fin_respuesta]
        })
        # Actualizar el último índice del final de la respuesta
        ultimo_fin = fin_respuesta

    # Añadir cualquier texto restante del contexto después de la última respuesta
    if ultimo_fin < len(escritura['context']):
        partes.append({
            'type': Tipo_Contenido.CONTEXT,
            'text': escritura['context'][ultimo_fin:]
        })

    # Anonimizar: para cada parte, según su 'tipo' realizar una función de anonimización
    protocolo_anonimo = ""
    fecha_anonimo = ""
    notario_anonimo = ""
    tipo_anonimo = ""
    partes_anonimizadas = []
    for p in partes:
        tipo_p = p['type']
        texto_p = p['text']
        parte_anonimizada = {'start': sum(len(parte['text']) for parte in partes_anonimizadas),
                             'type': tipo_p}
        if tipo_p == Tipo_Contenido.PROTOCOLO:
            if protocolo_anonimo == "":
                protocolo_anonimo = anonimiza(tipo_p,texto_p)         
            parte_anonimizada['text']=protocolo_anonimo            
        elif tipo_p == Tipo_Contenido.FECHA:
            if fecha_anonimo == "":
                fecha_anonimo = anonimiza(tipo_p,texto_p)            
            parte_anonimizada['text']=fecha_anonimo
        elif tipo_p == Tipo_Contenido.NOTARIO:
            if notario_anonimo == "":
                notario_anonimo = anonimiza(tipo_p,texto_p)                        
            parte_anonimizada['text']=notario_anonimo
        elif tipo_p == Tipo_Contenido.TIPO_DOCUMENTO:
            if tipo_anonimo == "":
                tipo_anonimo = anonimiza(tipo_p,texto_p)                        
            parte_anonimizada['text']=tipo_anonimo
        else:
            parte_anonimizada['text']=anonimiza(tipo_p,texto_p)
        partes_anonimizadas.append(parte_anonimizada)
        

    # Juntar, de nuevo las partes, teniendo en cuenta que ahora los 'answer_start' y los 'text' han cambiado para los datos
    texto_final = ''.join(p['text'] for p in partes_anonimizadas)

    # Generamos un nuevo objeto escritura_anonimizada y componemos los datos.
    escritura_anonimizada = {
        'id_documento': escritura['id_documento'],
        'context': texto_final,
        'qas': []
    }
    # Hay que tener cuidado con los nuevos 'answer_start'
    respuestas_por_pregunta = {
        Tipo_Contenido.pregunta(Tipo_Contenido.PROTOCOLO): [],
        Tipo_Contenido.pregunta(Tipo_Contenido.FECHA): [],
        Tipo_Contenido.pregunta(Tipo_Contenido.NOTARIO): [],
        Tipo_Contenido.pregunta(Tipo_Contenido.TIPO_DOCUMENTO): []
    }
    for p in partes_anonimizadas:
        # Compruebo si esta parte corresponde con una pregunta
        question = Tipo_Contenido.pregunta(p['type'])
        if question is not None:
            respuesta = {
                'answer_start':p['start'],
                'text': p['text']
            }
            respuestas_por_pregunta[question].append(respuesta)
    
    for pregunta, respuestas in respuestas_por_pregunta.items():
        # Verificar si la lista de respuestas para esta pregunta está vacía
        if not respuestas:
            # Añadir la respuesta predeterminada
            respuestas.append({'answer_start': -1, 'text': ''})
        
        # Añadir la pregunta y sus respuestas 
        escritura_anonimizada['qas'].append({
            'question': pregunta,
            'answers': respuestas
        })

    return escritura_anonimizada

def generar_dataset_QA_anonimizado(directorio_origen, ruta_archivo_destino):
    """Genera un archivo .json con el dataset de los archivos de escritura ANONIMIZADOS.\n
    El fichero contendrá un objeto data con una lista de escrituras, cada escritura tiene los datos que produce la función 'gererar_datos_QA'.\n
    Después de pasar por todos los procesos de anonimización todos los nombres estarán cambiados, los números estarán cambiados y las direcciones y nombres de empresa estarán cambiados."""
    contador_ejecuciones = 0
    try:
        resultado = {'data':[]}
        # Para iterar utilizamos las fichas (archivos xml), comprobando que exista su fichero normalizado.
        lista_archivos = [os.path.join(directorio_origen, a) for a in os.listdir(directorio_origen) if a.endswith('.xml') and os.path.exists(os.path.join(directorio_origen, a.replace('.xml','_norm.txt')))]
        xml_precargados = [ET.parse(ruta_archivo_xml) for ruta_archivo_xml in lista_archivos]        
        textos_precargados = []
        for ruta_archivo_xml in lista_archivos:
            ruta_archivo_norm = ruta_archivo_xml.replace('.xml','_norm.txt')
            with open(ruta_archivo_norm, 'r', encoding='utf-8') as txt_file:
                textos_precargados.append(txt_file.read())
            
        print("DEBUG", "Iniciamos generación de datos anonimizados", len(lista_archivos), "Archivos por procesar.")
        with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as ejecutor:    
            t_inicio = time.time()
            # Ejecuciones guarda un objeto Future por cada archivo xml, con los resultados de su ejecución
            ejecuciones = {ejecutor.submit(generar_datos_QA_anonimizado, archivo_xml,pre_xml,pre_txt):archivo_xml 
                           for archivo_xml,pre_xml,pre_txt in zip(lista_archivos,xml_precargados,textos_precargados)}
            for ejecucion in concurrent.futures.as_completed(ejecuciones): # conforme se van completando van apareciendo                          
                escritura = ejecucion.result()
                if escritura is not None:
                    resultado['data'].append({'escritura':escritura})
                contador_ejecuciones += 1
                if contador_ejecuciones % 500 == 0:
                    print("DEBUG", contador_ejecuciones, f"Ejecuciones en {time.time()-t_inicio} segundos")
                    t_inicio = time.time()
    except Exception as ex:
        print("ERROR generando dataset QA anonimizado\n",ex)
        traceback.print_exc()
    finally:    
        with open(ruta_archivo_destino, 'w', encoding='utf-8') as outfile:    
            json.dump(resultado, outfile, ensure_ascii=False)

In [None]:
generar_dataset_QA_anonimizado(PDF_FICHAS_DIR,os.path.join(DATA_DIR,'dataset_QA_ANONIMIZADO.json'))

## Inspeccionar el dataset

Cargo el dataset para trabajar en pandas. Lo inspeccionaré para ver que está todo correcto y generaré tres ficheros. Para entrenamiento, validación y test.

In [None]:
# Carga del archivo
archivo_dataset_QA_anonimizado = os.path.join(DATA_DIR,'dataset_QA_ANONIMIZADO.json')
with open(archivo_dataset_QA_anonimizado, 'r',encoding='utf-8') as archivo:
    datos_json_QA_anonimizado = pd.read_json(archivo)

# Hay que aplanar la estructura, para que cada fila sea una pregunta, con una respuesta, y mantenga el contexto y el id_documento del nivel superior.
qa_dataset = pd.json_normalize(
    data=datos_json_QA_anonimizado['data'],  
    record_path=['escritura', 'qas'],  
    meta=[['escritura', 'id_documento'], ['escritura', 'context']]  # Meta datos para preservar    
)

In [None]:
# #PRUEBAS
# print(qa_dataset.head(3))

In [None]:
# Vamos a ver cuántos tienen las tres respuestas encontradas
def tiene_respuesta(respuestas):
    # al menos, una respuesta válida
    return any(respuesta['text'] != "" for respuesta in respuestas)

def todo_el_grupo_tiene_respuestas(grupo):
    # para cada elemento del grupo compruebo que tiene al menos una respuesta válida
    # si todos son válidos, el grupo es válido
    return all(tiene_respuesta(fila['answers']) for index, fila in grupo.iterrows())
    

qa_completo = qa_dataset.groupby('escritura.id_documento').filter(todo_el_grupo_tiene_respuestas)
print("Número de escrituras con todos sus campos localizados:", qa_completo['escritura.id_documento'].nunique())
print("Número de documentos total:", len(qa_dataset.groupby('escritura.id_documento')))

Voy a dejar guardados tres archivos, uno para entrenamiento, otro para validación y otro para test. 

En cada uno, tenemos en una única línea un objeto 'data' coneniendo una lista de 'escritura'. Cada escritura con sus campos 'id_documento', 'context' (el texto de la escritura) y una lista de preguntas-respuestas 'qas', que contiene objetos con 'question' (la pregunta para el modelo) y 'answers' que son listas de 'answer_start' y 'text' (las respuestas y su posición en el contexto).

In [None]:
train_data, test_data = train_test_split(datos_json_QA_anonimizado, test_size=0.2, random_state=42)
train_data, vali_data = train_test_split(train_data, test_size=0.2, random_state=42)

elementos = [('train', train_data), ('validation', vali_data), ('test', test_data)]
for archivo, datos in elementos:
    ruta_archivo = os.path.join(DATA_DIR, 'ESCRITURAS', f'QA_{archivo}.json')
    with open(ruta_archivo,'w', encoding='utf-8') as f:
        json.dump({'data':list(datos['data'])}, f, ensure_ascii=False)

Para trabajar con estos ficheros de datos, en el directorio 'Dataset/ESCRITURAS' se creará una clase ESCRITURAS, que permitirá cargar los ficheros con las librerías dataset de Hugging Face.