In [13]:
import os
import fitz  # PyMuPDF
from datetime import datetime
from io import BytesIO
import pandas as pd
import re
import psycopg2
import dateparser

In [14]:
LOCAL_FOLDER = "D:/Carlos/Udenar-Investigación/Tesis/Extraccion Egresos CANREC/Extaer lab/LABORATORIOS/LABORATORIOS/LAB FERNANDO SANZON"  # Carpeta donde están los PDF
OUTPUT_FOLDER = "D:/Carlos/Udenar-Investigación/Tesis/Extraccion Egresos CANREC/Extraccion_Laboratorios/lab_Fernando_Sanzon"  # Carpeta para guardar resultados

In [15]:
def extract_word_content(word_document):
    """
    Extrae contenido específico de un archivo PDF ya abierto.

    Args:
        word_document (fitz.Document): Documento PDF abierto con PyMuPDF.

    Returns:
        dict: Diccionario con la información extraída.
    """
    extracted_data = {
        "Numero de Informe": None,
        "Nombre del Paciente": None,
        "Identificacion": None,
        "Edad": None,
        "Telefono": None,
        "Fecha de Toma de Muestra": None,
        "Fecha de Ingreso": None,
        "Fecha de Informe": None,
        "Entidad": None,
        "EPS": None,
        "Servicio": None,
        "Muestra Remitida": None,
        "Descripcion Macroscopica": None,
        "Descripcion Microscopica": None,
        "Diagnostico": [],
        "Comentario": None
    }

    try:
        # Leer cada página del PDF
        for page_number in range(word_document.page_count):
            page = word_document[page_number]
            text = page.get_text("text")
            cleaned_text = " ".join(text.split())
            
            # Extraer información utilizando búsquedas
            if "INFORME PATOLOGIA No." in cleaned_text:
                extracted_data["Numero de Informe"] = cleaned_text.split("INFORME PATOLOGIA No.")[1].split()[0].strip()
            if "Nombre:" in cleaned_text:
                extracted_data["Nombre del Paciente"] = cleaned_text.split("Nombre:")[1].split("Historia:")[0].strip()
            if "Historia:" in cleaned_text:
                extracted_data["Identificacion"] = cleaned_text.split("Historia:")[1].split("Edad:")[0].strip()
            if "Edad:" in cleaned_text:
                extracted_data["Edad"] = cleaned_text.split("Edad:")[1].split()[0].strip()
            if "Teléfono:" in cleaned_text:
                extracted_data["Telefono"] = cleaned_text.split("Teléfono:")[1].split()[0].strip()
            if "Fecha recibo:" in cleaned_text:
                extracted_data["Fecha de Toma de Muestra"] = cleaned_text.split("Fecha recibo:")[1].split("DESCRIPCION MACROSCOPICA")[0].strip()
            if "FECHA DE INGRESO:" in cleaned_text:
                extracted_data["Fecha de Ingreso"] = cleaned_text.split("FECHA DE INGRESO:")[1].split()[0].strip()
            if "FECHA:" in cleaned_text:
                extracted_data["Fecha de Informe"] = cleaned_text.split("FECHA:")[1].split("Nombre")[0].strip()
            if "Entidad:" in cleaned_text:
                extracted_data["Entidad"] = cleaned_text.split("Entidad:")[1].split("Medico Sol:")[0].strip()
            if "EPS:" in cleaned_text:
                extracted_data["EPS"] = cleaned_text.split("EPS:")[1].split("SERVICIO")[0].strip()
            if "SERVICIO:" in cleaned_text:
                extracted_data["Servicio"] = cleaned_text.split("SERVICIO:")[1].split()[0].strip()
            if "MUESTRA REMITIDA:" in cleaned_text:
                extracted_data["Muestra Remitida"] = cleaned_text.split("MUESTRA REMITIDA:")[1].split()[0].strip()
            if "DESCRIPCION MACROSCOPICA" in cleaned_text:
                extracted_data["Descripcion Macroscopica"] = cleaned_text.split("DESCRIPCION MACROSCOPICA")[1].split("DESCRIPCION MICROSCOPICA")[0].strip()
            if "DESCRIPCION MICROSCOPICA" in cleaned_text:
                extracted_data["Descripcion Microscopica"] = cleaned_text.split("DESCRIPCION MICROSCOPICA")[1].split("DIAGNÓSTICO:")[0].strip()
            if "DIAGNÓSTICO:" in cleaned_text:
                diagnostic_section = cleaned_text.split("DIAGNÓSTICO:")[1].strip().split("\n")
                extracted_data["Diagnostico"] = [line.strip() for line in diagnostic_section if line.strip()]
            if "COMENTARIO:" in cleaned_text:
                extracted_data["Comentario"] = cleaned_text.split("COMENTARIO:")[1].strip()

    except Exception as e:
        print(f"Error al extraer datos del PDF: {e}")

    return extracted_data

In [16]:
def process_all_files_in_folder(local_folder):
    """Procesa todos los archivos en una carpeta local específica."""
    results = []

    try:
        for root, _, files in os.walk(local_folder):
            for file_name in files:
                if file_name.lower().endswith(".docx"):
                    file_path = os.path.join(root, file_name)
                    print(f"Procesando archivo: {file_path}")
                    word_document = None

                    try:
                        # Abrir el PDF
                        word_document = fitz.open(file_path)

                        # Extraer contenido
                        content = extract_word_content(word_document)

                        # Agregar los datos al resultado
                        results.append({
                            "Archivo": file_name,
                            "Numero de Informe": content.get("Numero de Informe"),
                            "Nombre del Paciente": content.get("Nombre del Paciente"),
                            "Identificacion": content.get("Identificacion"),
                            "Edad": content.get("Edad"),
                            "Telefono": content.get("Telefono"),
                            "Fecha de Toma de Muestra": content.get("Fecha de Toma de Muestra"),
                            "Fecha de Ingreso": content.get("Fecha de Ingreso"),
                            "Fecha de Informe": content.get("Fecha de Informe"),
                            "Entidad": content.get("Entidad"),
                            "EPS": content.get("EPS"),
                            "Servicio": content.get("Servicio"),
                            "Muestra Remitida": content.get("Muestra Remitida"),
                            "Descripcion Macroscopica": content.get("Descripcion Macroscopica"),
                            "Descripcion Microscopica": content.get("Descripcion Microscopica"),
                            "Diagnostico": content.get("Diagnostico"),
                            "Comentario": content.get("Comentario"),
                        })
                    except Exception as e:
                        print(f"Error procesando {file_name}: {e}")
                    finally:
                        if word_document is not None:
                            word_document.close()

    except Exception as e:
        print(f"Error al listar los archivos en {local_folder}: {e}")

    # Convertir resultados en un DataFrame de pandas
    df = pd.DataFrame(results)
    return df

In [17]:
def clean_special_characters(text):
    """
    Limpia caracteres especiales y no deseados en un texto o lista de textos.

    Args:
        text (str | list): Texto o lista de textos a limpiar.

    Returns:
        str: Texto limpio.
    """
    if isinstance(text, list):
        # Si es una lista, convertirla en una sola cadena separada por espacios
        text = ' '.join(text)
    
    if isinstance(text, str):
        # Reemplazar \uf0d8 con viñeta estándar
        text = text.replace('\uf0d8', '•')
        # Eliminar corchetes y otros caracteres especiales
        text = re.sub(r'[\[\]\"]', '', text)
        # Eliminar caracteres no ASCII imprimibles
        text = re.sub(r'[^\x20-\x7E]', '', text)
        return text.strip()  # Eliminar espacios en blanco adicionales

    return text  # Si no es cadena ni lista, devolver sin cambios

In [18]:
def remove_points(text):
    """
    Remueve los puntos de un texto.

    Parámetros:
    - text (str o list): Texto a limpiar.

    Retorno:
    - str: Texto limpio.
    """
    if not isinstance(text, (str, list)):
        raise ValueError("El texto debe ser una cadena o una lista")

    if isinstance(text, list):
        text = ' '.join(text)

    return text.replace('.', '')

In [19]:
def transformate_date(date):
    """
    Transforma una fecha en el formato 'Enero 10 de 2024' a '30/12/2022'.

    Parámetros:
    - fecha (str): Fecha a transformar.

    Retorno:
    - str: Fecha transformada.
    """
    fecha_parseada = dateparser.parse(date)
    if fecha_parseada:
        return fecha_parseada.strftime('%d/%m/%Y')
    else:
        return None

In [20]:
def transformar_tipo_identificacion(df):
    """
    Transforma la columna 'TipoIdentificacion' según la edad y crea una nueva columna 'id_paciente'.

    Parámetros:
    - df (DataFrame): DataFrame que contiene las columnas 'edad', 'Identificacion' y 'TipoIdentificacion'.

    Retorno:
    - df (DataFrame): DataFrame modificado con la nueva columna 'id_paciente'.
    """
    df['Edad'] = df['Edad'].astype(int)
    # Asignar 'CC' o 'TI' según la edad
    df.loc[df['Edad'] >= 18, 'TipoIdentificacion'] = 'CC'
    df.loc[df['Edad'] < 18, 'TipoIdentificacion'] = 'TI'
    # Renombrar la columna 'Identificacion'
    df.rename(columns={'Identificacion': 'NumeroIdentificacion'}, inplace=True)

    # Crear la nueva columna 'id_paciente' concatenando 'TipoIdentificacion' y 'NumeroIdentificacion'
    df['id_paciente'] = df['TipoIdentificacion'] + df['NumeroIdentificacion']

    return df

In [21]:
def save_dataframe_to_local(output_folder, file_name, df):
    """Guarda un DataFrame como archivo CSV en una carpeta local."""
    try:
        # Asegurarse de que la carpeta exista
        os.makedirs(output_folder, exist_ok=True)

        # Ruta completa del archivo
        file_path = os.path.join(output_folder, file_name)
        # Definir las funciones y columnas a aplicar
        funciones = {
            'clean_special_characters': ['Diagnostico'],
            'remove_points': ['Identificacion', 'Numero de Informe', 'Fecha de Informe', 'Fecha de Toma de Muestra', 'Nombre del Paciente'],
            'transformate_date': ['Fecha de Informe', 'Fecha de Toma de Muestra']
        }
        
        # Aplicar las funciones a las columnas
        for funcion, columnas in funciones.items():
            for columna in columnas:
                df[columna] = df[columna].apply(eval(funcion))
        
        # Transformar el tipo de identificación
        df = transformar_tipo_identificacion(df)
        # Renombrar columnas del DataFrame para que coincidan con la tabla en la base de datos
        df.rename(columns={
            'Fecha de Toma de Muestra': 'fecha_toma_muestra',
            'Fecha de Ingreso': 'fecha_ingreso',
            'Fecha de Informe': 'fecha_informe',
            'Entidad': 'entidad',
            'EPS': 'eps',
            'Servicio': 'servicio',
            'Muestra Remitida': 'muestra_remitida',
            'Descripcion Macroscopica': 'descripcion_macroscopica',
            'Descripcion Microscopica': 'descripcion_microscopica',
            'Diagnostico': 'diagnostico',
            'Comentario': 'comentario',
            'llavePaciente': 'id_paciente',
            'Archivo': 'archivo',
            'Historia':'Identificacion'
        }, inplace=True)
        
        # Agregar la columna 'fuente' con un valor por defecto (si es necesario)
        df['fuente'] = 'nombre_fuente'  # Cambia 'nombre_fuente' por el valor que corresponda

        # Guardar como CSV
        df.to_csv(file_path, index=False)
        print(f"Archivo {file_name} guardado exitosamente en {output_folder}.")
    except Exception as e:
        print(f"Error al guardar el archivo {file_name}: {e}")

In [22]:
def get_table_columns(table_name, connection_params):
    """
    Obtiene las columnas de una tabla en PostgreSQL.

    Args:
        table_name (str): Nombre de la tabla en PostgreSQL.
        connection_params (dict): Parámetros de conexión a PostgreSQL.

    Returns:
        list: Lista con los nombres de las columnas de la tabla.
    """
    try:
        conn = psycopg2.connect(**connection_params)
        cursor = conn.cursor()

        # Consultar los nombres de las columnas de la tabla
        query = f"""
            SELECT column_name
            FROM information_schema.columns
            WHERE table_name = '{table_name}';
        """
        cursor.execute(query)
        columns = [row[0] for row in cursor.fetchall()]

        return columns

    except Exception as e:
        print(f"Error al obtener las columnas de la tabla {table_name}: {e}")
        return []
    finally:
        if conn:
            cursor.close()
            conn.close()

In [23]:
def insert_into_info_lab(df, connection_params):
    """
    Inserta los datos de un DataFrame en la tabla 'infolab' de PostgreSQL,
    considerando solo las columnas que coincidan entre los nombres del DataFrame y los de la tabla.

    Args:
        df (pd.DataFrame): DataFrame con los datos a insertar.
        connection_params (dict): Diccionario con los parámetros de conexión a PostgreSQL.
    """
    # Obtener las columnas de la tabla 'infolab'
    table_columns = get_table_columns('infolab', connection_params)
    
    if not table_columns:
        raise ValueError("No se pudieron obtener las columnas de la tabla 'infolab'.")

    # Seleccionar las columnas que coincidan entre el DataFrame y la tabla
    matching_columns = [col for col in df.columns if col in table_columns]
    
    if not matching_columns:
        raise ValueError("No hay columnas coincidentes entre el DataFrame y la tabla 'infolab'.")

    # Conexión a PostgreSQL
    try:
        conn = psycopg2.connect(**connection_params)
        cursor = conn.cursor()
        
        # Crear la consulta INSERT dinámica
        columns_str = ", ".join(matching_columns)
        placeholders_str = ", ".join(["%s"] * len(matching_columns))
        insert_query = f"INSERT INTO infolab ({columns_str}) VALUES ({placeholders_str})"

        # Preparar los datos para la inserción
        values = df[matching_columns].values.tolist()

        # Ejecutar la consulta
        cursor.executemany(insert_query, values)
        conn.commit()

        print(f"Se insertaron {cursor.rowcount} filas en la tabla 'infolab'.")
    
    except Exception as e:
        print(f"Error al insertar los datos: {e}")
        conn.rollback()
    finally:
        if conn:
            cursor.close()
            conn.close()

In [24]:
connection_params = {
    'dbname': 'udenar',
    'user': 'postgres',
    'password': '12345678',
    'host': 'localhost',
    'port': 5432
}

df = process_all_files_in_folder(LOCAL_FOLDER)
csv_file_name = "lab_Fernando_Sanzon.csv"
save_dataframe_to_local(OUTPUT_FOLDER, csv_file_name, df)
insert_into_info_lab(df, connection_params)

Procesando archivo: D:/Carlos/Udenar-Investigación/Tesis/Extraccion Egresos CANREC/Extaer lab/LABORATORIOS/LABORATORIOS/LAB FERNANDO SANZON\R0036-1089242582 CMVA.docx
Procesando archivo: D:/Carlos/Udenar-Investigación/Tesis/Extraccion Egresos CANREC/Extaer lab/LABORATORIOS/LABORATORIOS/LAB FERNANDO SANZON\R032-59805655 CMVA.docx
Procesando archivo: D:/Carlos/Udenar-Investigación/Tesis/Extraccion Egresos CANREC/Extaer lab/LABORATORIOS/LABORATORIOS/LAB FERNANDO SANZON\R033-5285368SC.docx
Procesando archivo: D:/Carlos/Udenar-Investigación/Tesis/Extraccion Egresos CANREC/Extaer lab/LABORATORIOS/LABORATORIOS/LAB FERNANDO SANZON\R034-30711337SC.docx
Procesando archivo: D:/Carlos/Udenar-Investigación/Tesis/Extraccion Egresos CANREC/Extaer lab/LABORATORIOS/LABORATORIOS/LAB FERNANDO SANZON\R35 -59834885 SC.docx
Archivo lab_Fernando_Sanzon.csv guardado exitosamente en D:/Carlos/Udenar-Investigación/Tesis/Extraccion Egresos CANREC/Extraccion_Laboratorios/lab_Fernando_Sanzon.
Se insertaron 5 filas