# **Parseo de los txt**
---

In [29]:
import re
import json
import os
from typing import Union
import sys

In [30]:
# Definir las expresiones regulares para cada sección
regex_secciones = {
    "indicaciones": r'^4\.1\.?\s*Indicaciones\s+terapéuticas',
    "posologia": r'^4\.2\.?\s*Posolog[ií]a\s+y\s+forma\s+de\s+administraci[oó]n',
    "contraindicaciones": r'^4\.3\.?\s*Contraindicaciones',
    "advertencias": r'^4\.4\.?\s*Advertencias\s+y\s+precauciones\s+especiales\s+de\s+empleo',
    "interacciones": r'^4\.5\.?\s*Interacci[oó]n\s+con\s+otros\s+medicamentos',
    "fertilidad_embarazo": r'^4\.6\.?\s*Fertilidad,\s+embarazo\s+y\s+lactancia',
    "efectos_conducir": r'^4\.7\.?\s*Efectos\s+sobre\s+la\s+capacidad\s+para\s+conducir',
    "reacciones_adversas": r'^4\.8\.?\s*Reacciones\s+adversas',
    "sobredosis": r'^4\.9\.?\s*Sobredosis',
    "ATC": r'^5\.1\.?\s*Propiedades\s+farmacodin[aá]micas',
    "Propiedades_farmacocineticas": r'^5\.2\.?\s*Propiedades\s+farmacocin[eé]ticas',
    "excipientes": r'^6\.1\.?\s*Lista\s+de\s+excipientes',
    "incompatibilidades": r'^6\.2\.?\s*Incompatibilidades',
    "precauciones_conservacion": r'^6\.4\.?\s*Precauciones\s+especiales\s+de\s+conservaci[oó]n',
    "fecha_revision": r'^10\.\s*FECHA\s+DE\s+LA\s+REVISI[OÓ]N\s+DEL\s+TEXTO'
}

In [31]:
def limpiar_fecha_revision(data: dict) -> dict:
    """Elimina el texto específico y sustituye saltos de línea por espacios en la sección fecha_revision"""
    if 'fecha_revision' in data and data['fecha_revision']:
        # Texto a eliminar con posibles saltos de línea
        patron = re.compile(
            r'(\s*La información detallada y actualizada de este medicamento está disponible en la página Web de la\s*'
            r'Agencia Española de Medicamentos y Productos Sanitarios \(AEMPS\) http://www\.aemps\.gob\.es/\s*)',
            re.IGNORECASE | re.DOTALL
        )
        
        # Sustituir saltos de línea por espacios solo en el texto a eliminar
        def reemplazar_saltos(match):
            return ' '
        
        data['fecha_revision'] = patron.sub(reemplazar_saltos, data['fecha_revision']).strip()
        
        # Eliminar espacios múltiples resultantes
        data['fecha_revision'] = re.sub(r'\s+', ' ', data['fecha_revision'])
    
    return data

In [51]:
def limpiar_diccionario(data: dict) -> dict:
    """Aplica todas las técnicas de limpieza al diccionario de datos"""
    
    # 1. Limpieza específica de fecha_revision
    def limpiar_fecha_revision(data: dict) -> dict:
        if 'fecha_revision' in data and data['fecha_revision']:
            patron = re.compile(
                r'(\s*La información detallada y actualizada de este medicamento está disponible en la página Web de la\s*'
                r'Agencia Española de Medicamentos y Productos Sanitarios \(AEMPS\) http://www\.aemps\.gob\.es/\s*)',
                re.IGNORECASE | re.DOTALL
            )
            data['fecha_revision'] = patron.sub(' ', data['fecha_revision']).strip()
        return data
    
    # 2. Sustituir saltos de línea por espacios en todas las secciones
    def limpiar_saltos_linea(texto: str) -> str:
        if texto:
            # Reemplazar múltiples saltos de línea por un espacio único
            texto = re.sub(r'[\n\r]+', ' ', texto)
        return texto
    
    # 3. Eliminar caracteres especiales no estándar
    def limpiar_caracteres_extraños(texto: str) -> str:
        if texto:
            # Eliminar caracteres especiales específicos usando sus códigos Unicode
            texto = re.sub(r'[\uF0B7\uF06D]', ' ', texto)  #  (U+F0B7) y  (U+F06D)
            
            # También puedes incluir otros caracteres no deseados
            texto = re.sub(r'[•▪▫►◄●]', ' ', texto)  # Bullets comunes
            
            # Normalizar espacios
            texto = re.sub(r'\s+', ' ', texto).strip()
        return texto
    
    # 4. Limpiar numeraciones tipo "X de Y"
    def limpiar_numeraciones(texto: str) -> str:
        if texto:
            # Eliminar patrones de paginación
            texto = re.sub(r'\b\d+\s+de\s+\d+\b', '', texto)
            # Normalizar espacios múltiples
            texto = re.sub(r'\s+', ' ', texto).strip()
        return texto
    
    # Aplicar limpieza en cascada
    data = limpiar_fecha_revision(data)
    
    # Procesar todas las secciones
    for seccion, contenido in data.items():
        if isinstance(contenido, str):
            # Orden de transformaciones
            contenido = limpiar_saltos_linea(contenido)
            contenido = limpiar_caracteres_extraños(contenido)
            contenido = limpiar_numeraciones(contenido)
            data[seccion] = contenido
    
    return data

In [52]:
def extract_secciones(file_path: str) -> dict:
    """Extrae las secciones relevantes de un archivo TXT excluyendo los títulos"""
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            lines = file.readlines()
    except Exception as e:
        raise RuntimeError(f"Error leyendo archivo: {str(e)}")

    current_section = None
    data = {key: [] for key in regex_secciones.keys()}
    control_flags = {
        'stop_until_5_1': False,
        'ignore_until_6_1': False,
        'ignore_until_6_4': False,
        'ignore_until_10': False  # Bandera para 6.5 -> 10
    }

    for line in lines:
        line = line.strip()
        
        # Manejo de flags de control
        if control_flags['stop_until_5_1']:
            if re.match(r'^5\.1\.?\s*Propiedades\s+farmacodin[aá]micas', line, re.IGNORECASE):
                control_flags['stop_until_5_1'] = False
            continue
        
        if control_flags['ignore_until_6_1']:
            if re.match(r'^6\.1\.?\s*Lista\s+de\s+excipientes', line, re.IGNORECASE):
                control_flags['ignore_until_6_1'] = False
            else:
                continue
                
        if control_flags['ignore_until_6_4']:
            if re.match(r'^6\.4\.?\s*Precauciones\s+especiales\s+de\s+conservaci[oó]n', line, re.IGNORECASE):
                control_flags['ignore_until_6_4'] = False
            else:
                continue
        
        if control_flags['ignore_until_10']:
            if re.match(r'^10\.\s*FECHA\s+DE\s+LA\s+REVISI[OÓ]N\s+DEL\s+TEXTO', line, re.IGNORECASE):
                control_flags['ignore_until_10'] = False
            else:
                continue

        # Detectar sección 6.5 y activar bandera
        if re.match(r'^6\.5\.?\s*NATURALEZA\s+Y\s+CONTENIDO\s+DEL\s+ENVASE', line, re.IGNORECASE):
            control_flags['ignore_until_10'] = True
            current_section = None
            continue

        # Detección de secciones principales
        section_found = False
        for section, pattern in regex_secciones.items():
            if re.match(pattern, line, re.IGNORECASE):
                current_section = section
                section_found = True
                
                # Manejo de flags especiales
                if section == "sobredosis":
                    control_flags['stop_until_5_1'] = True
                elif section == "excipientes":
                    control_flags['ignore_until_6_1'] = False
                elif section == "fecha_revision":
                    control_flags['ignore_until_10'] = False
                break
        
        if section_found:
            continue

        # Captura de contenido solo si no hay banderas activas
        if current_section and not any(control_flags.values()):
            if current_section == "ATC":
                if "ATC:" in line:
                    if match := re.search(r'ATC:\s*([A-Z0-9]+)', line):
                        data["ATC"] = [match.group(1)]
                        current_section = None
            else:
                data[current_section].append(line)

    # Limpiar y formatear datos
    for key in data:
        if isinstance(data[key], list):
            data[key] = '\n'.join(data[key]).strip()
    
    # Aplicar limpieza específica para la sección fecha_revision
    data = limpiar_diccionario(data)
    
    return data

In [53]:
# Función para guardar los datos en un archivo JSON
def guardar_json(data: dict, output_path: str) -> None:
    """Guarda los datos en un archivo JSON en la ruta especificada"""
    try:
        # Crear directorios si no existen
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=4)
            
    except Exception as e:
        raise RuntimeError(f"Error guardando JSON: {str(e)}")

# Función principal para procesar archivos
def procesar_archivos(input_path: str, output_path: str) -> dict:
    """Procesa un archivo o carpeta y genera los JSON correspondientes"""
    resultados = {}
    total_procesados = 0
    errores = []

    try:
        if os.path.isfile(input_path):
            if input_path.lower().endswith('.txt'):
                data = extract_secciones(input_path)
                guardar_json(data, output_path)
                total_procesados += 1
                resultados[os.path.basename(input_path)] = "OK"
            else:
                raise ValueError("El archivo no es un TXT")
        
        elif os.path.isdir(input_path):
            json_final = {}
            for filename in os.listdir(input_path):
                if filename.lower().endswith('.txt'):
                    file_path = os.path.join(input_path, filename)
                    try:
                        data = extract_secciones(file_path)
                        json_final[filename] = data
                        total_procesados += 1
                    except Exception as e:
                        errores.append(filename)
                        resultados[filename] = f"Error: {str(e)}"
            
            guardar_json(json_final, output_path)
        
        else:
            raise ValueError("La ruta no es válida")
    
    except Exception as e:
        resultados["error"] = f"Error general: {str(e)}"
    
    # Mostrar resumen
    print(f"\nProceso completado. Archivos procesados: {total_procesados}")
    if errores:
        print(f"\nArchivos con errores ({len(errores)}):")
        for error in errores:
            print(f"- {error}")
    
    return resultados

In [54]:
# Ejecutar el script
if __name__ == "__main__":

    # Configuración manual
    input_path = os.path.join("..", "..", "data", "inputs", "1_data_acquisition", "wrangler", "A.A.S._100_mg_COMPRIMIDOS.txt")  
    nombre_archivo_txt = os.path.basename(input_path).replace(".txt", "")
    output_path = os.path.join("..", "..", "data", "outputs", "1_data_acquisition", "wrangler", f"{nombre_archivo_txt}.json")  
    
    # Ejecutar procesamiento
    try:
        print(f"\nIniciando procesamiento de: {input_path}")
        resultados = procesar_archivos(input_path, output_path)
        print("\nEjecución completada")
        
    except Exception as e:
        print(f"\nERROR: Fallo en el procesamiento - {str(e)}")
        raise


Iniciando procesamiento de: ../../data/inputs/1_data_acquisition/wrangler/A.A.S._100_mg_COMPRIMIDOS.txt

Proceso completado. Archivos procesados: 1

Ejecución completada
