In [None]:
#### Optimización de procesos de excel a python - Cuentas
### Ladino Álvarez Ricardo Arturo


### Librerias

import os
import unicodedata
import numpy as np
import pandas as pd
from datetime import datetime
from openpyxl import load_workbook

In [2]:
### Función : Identificacion automáticamente de fila y columna donde comienza la tabla en la hoja (con imagen, texto aleatorio, etc)
##
#
def Tabla_Excel(ruta_archivo, hoja=0):
    """
    Carga una tabla desde un archivo de Excel y devuelve un DataFrame de Pandas.

    La función identifica automáticamente la fila y columna donde comienza la tabla en la hoja
    de Excel especificada y ajusta la lectura del archivo para capturar únicamente los datos relevantes.

    Argumentos:
    ruta_archivo : str
        Ruta del archivo de Excel.
    hoja : int o str, opcional
        Número de la hoja (por defecto es 0, la primera hoja) o nombre de la hoja a leer.

    Retorna:
    pd.DataFrame
        Un DataFrame que contiene la tabla encontrada en el archivo de Excel.

    Excepciones:
    ValueError
        Si no se encuentra ninguna tabla con al menos dos valores en una fila.
    """
    # Carga el archivo de Excel usando openpyxl
    wb = load_workbook(ruta_archivo, data_only=True)

    # Selecciona la hoja a procesar
    ws = wb[wb.sheetnames[hoja]] if isinstance(hoja, int) else wb[hoja]

    # Inicializa las variables para la fila y columna donde comienza la tabla
    inicio_fila = None
    inicio_columna = None

    # Itera sobre las filas y columnas para detectar la primera fila con datos relevantes
    for i, row in enumerate(ws.iter_rows(min_row=1, max_row=ws.max_row, min_col=1, max_col=ws.max_column), start=1):
        # Obtiene los valores no nulos en la fila
        valores_fila = [cell.value for cell in row if cell.value is not None]
        
        # Si hay más de un valor en la fila, asume que es el inicio de la tabla
        if len(valores_fila) > 1: 
            inicio_fila = i  # Guarda el número de fila donde comienza la tabla
            # Detecta la primera columna con un valor no nulo en la fila
            inicio_columna = next((j for j, cell in enumerate(row, start=1) if cell.value is not None), 1)
            break
    
    # Si no se encuentra ninguna tabla, lanza una excepción
    if inicio_fila is None:
        raise ValueError(f"No se encontró ninguna tabla en el archivo {ruta_archivo}.")

    # Lee la tabla desde la hoja de Excel usando Pandas, saltando las filas iniciales vacías
    df = pd.read_excel(ruta_archivo,
                       sheet_name=hoja,
                       skiprows=inicio_fila - 1,  # Ajusta para iniciar desde la fila detectada
                       header=0)  # La primera fila detectada se toma como encabezado

    # Si la tabla inicia en una columna distinta a la primera, ajusta las columnas del DataFrame
    if inicio_columna > 1:
        df = df.iloc[:, inicio_columna - 1:]  # Elimina las columnas vacías anteriores al inicio
    
    # Retorna el DataFrame con los datos de la tabla
    return (df)


In [3]:
### Función : Lectura automática de todos los archivos xlsx en una carpeta
##
#
def Archivos_Carpeta(carpeta):
    """
    Lee todos los archivos .xlsx en una carpeta y organiza los datos en un diccionario de DataFrames.

    Argumentos:
    carpeta : str
        Ruta de la carpeta que contiene los archivos de Excel.

    Retorna:
    tuple :
        - dataframes : dict
            Diccionario donde las claves son variables generadas (X1, X2, ...) y los valores son DataFrames.
        - subcatalogo_df : pd.DataFrame
            DataFrame que contiene el nombre de la variable y el archivo correspondiente.

    Excepciones:
    Imprime un mensaje de error si un archivo no puede procesarse.
    """
    # Obtiene una lista de rutas completas de archivos .xlsx en la carpeta
    archivos_excel = [os.path.join(carpeta, archivo) for archivo in os.listdir(carpeta) if archivo.endswith('.xlsx')]
    
    # Diccionario para almacenar los DataFrames
    dataframes = {}
    
    # Lista para el subcatálogo con información de las variables y los archivos
    subcatalogo = []

    # Itera sobre los archivos encontrados en la carpeta
    for idx, archivo in enumerate(archivos_excel):
        try:
            # Genera un nombre de variable (X1, X2, ...)
            variable_name = f"X{idx+1}"
            
            # Carga la tabla de Excel usando la función Tabla_Excel
            df = Tabla_Excel(archivo)
            
            # Agrega una columna al DataFrame con el nombre de la variable
            df["Archivo"] = variable_name 
            
            # Almacena el DataFrame en el diccionario
            dataframes[variable_name] = df
            
            # Agrega información del archivo al subcatálogo
            subcatalogo.append({"Variable": variable_name, "Archivo": os.path.basename(archivo)})
        except Exception as e:
            # Imprime un mensaje si ocurre un error al procesar un archivo
            print(f"Error al procesar el archivo '{archivo}': {e}")
    
    # Informa que todos los archivos se han leído correctamente
    print("Todos los archivos se han leído correctamente.")
    
    # Convierte la lista del subcatálogo a un DataFrame
    subcatalogo_df = pd.DataFrame(subcatalogo)
    
    # Retorna el diccionario de DataFrames y el subcatálogo como un DataFrame
    return (dataframes, subcatalogo_df)


In [4]:
### Función : Renombrar  columnas del DataFrames y generación de catálogo de equivalencias
##
#
def Renombrar_Catalogo(dataframes):
    """
    Renombra las columnas de un conjunto de DataFrames de forma uniforme y genera un catálogo de equivalencias.

    La función toma las columnas del primer DataFrame como referencia y las renombra en todos los
    DataFrames del diccionario, generando nombres estandarizados como C1, C2, ..., Cn.

    Argumentos:
    dataframes : dict
        Diccionario de DataFrames donde las claves son nombres de variables (e.g., X1, X2) y los valores son DataFrames.

    Retorna:
    tuple :
        - dataframes : dict
            Diccionario de DataFrames con las columnas renombradas.
        - catalogo_df : pd.DataFrame
            DataFrame que muestra la equivalencia entre los nombres originales y los nombres uniformes.
    """
    # Obtiene el nombre de la primera variable y sus columnas originales
    nombre_variable_principal = next(iter(dataframes.keys()))  # Nombre de la primera variable (e.g., 'X1')
    columnas_originales = dataframes[nombre_variable_principal].columns.tolist()
    
    # Genera nombres uniformes (C1, C2, ..., Cn) para las columnas
    catalogo = {"Original": columnas_originales, "Uniforme": [f"C{i+1}" for i in range(len(columnas_originales))]}
    catalogo_df = pd.DataFrame(catalogo)  # Crea el catálogo como DataFrame
    
    # Renombra las columnas de todos los DataFrames usando los nombres uniformes
    for nombre_variable, df in dataframes.items():
        nuevo_nombre_columnas = catalogo["Uniforme"]
        df.columns = nuevo_nombre_columnas
    
    # Informa que todas las columnas se han renombrado correctamente
    print("Todos los archivos se han renombrado correctamente.")
    
    # Retorna los DataFrames actualizados y el catálogo de nombres
    return (dataframes, catalogo_df)

In [5]:
### Función : Validación y concatenación
##
#
def Valida_Concatena(dataframes):
    """
    Valida que todos los DataFrames tengan las mismas columnas y los concatena en un solo DataFrame.

    Argumentos:
    dataframes : dict
        Diccionario de DataFrames donde las claves son nombres de variables y los valores son DataFrames.

    Retorna:
    pd.DataFrame o None
        - Un DataFrame concatenado si todos los DataFrames tienen las mismas columnas.
        - None si hay inconsistencias en las columnas.
    """
    # Obtiene la lista de columnas de referencia del primer DataFrame
    columnas_referencia = list(next(iter(dataframes.values())).columns)
    
    # Valida que todos los DataFrames tengan las mismas columnas
    for nombre_variable, df in dataframes.items():
        if list(df.columns) != columnas_referencia:
            print(f"El DataFrame '{nombre_variable}' no tiene las mismas columnas que el resto.")
            print(f"Columnas esperadas: {columnas_referencia}")
            print(f"Columnas actuales: {list(df.columns)}")
            return None
    
    # Informa que la validación fue exitosa y procede a concatenar
    print("Todos los DataFrames tienen las mismas columnas. Procediendo a concatenar...")
    datos_concatenados = pd.concat(dataframes.values(), ignore_index=True)
    print("Concatenación exitosa.")
    
    # Retorna el DataFrame concatenado
    return (datos_concatenados)

In [8]:
def Lectura_Hojas(carpeta):
    """
    Lee todos los archivos .xlsx de una carpeta y todas sus hojas. 
    Renombra las columnas con un prefijo y crea catálogos para las hojas y las columnas.

    :param carpeta: Ruta de la carpeta que contiene los archivos .xlsx.
    :return: Diccionario con los DataFrames, catálogo de hojas y catálogo de columnas.
    """
    # Obtener la lista de archivos .xlsx en la carpeta
    archivos_excel = [os.path.join(carpeta, archivo) for archivo in os.listdir(carpeta) if archivo.endswith('.xlsx')]
    dataframes = {}
    catalogo_hojas = []
    catalogo_columnas = []
    
    for archivo_idx, archivo in enumerate(archivos_excel):
        try:
            wb = load_workbook(archivo, data_only=True)  # Cargar el archivo Excel
            for hoja_idx, hoja in enumerate(wb.sheetnames):
                ws = wb[hoja]
                inicio_fila = Tabla_Inicio(ws)  # Detectar inicio de la tabla
                
                # Leer la hoja específica con pandas desde la fila detectada
                df = pd.read_excel(
                    archivo,
                    sheet_name=hoja,
                    skiprows=inicio_fila,
                    header=0
                )
                
                # Renombrar columnas con el prefijo AGRS_
                df, mapeo_columnas = Renombrar_CPre(df, prefijo="A_")
                
                # Crear un nombre único para el DataFrame
                variable_name = f"Y{archivo_idx+1}_H{hoja_idx+1}"  # Ej: X1_H1 para archivo 1, hoja 1
                dataframes[variable_name] = df
                
                # Agregar al catálogo de hojas
                catalogo_hojas.append({
                    "Variable": variable_name,
                    "Archivo": os.path.basename(archivo),
                    "Hoja": hoja,
                    "Inicio_Fila": inicio_fila
                })
                
                # Agregar al catálogo de columnas
                catalogo_columnas.append({
                    "Variable": variable_name,
                    "Archivo": os.path.basename(archivo),
                    "Hoja": hoja,
                    "Mapeo_Columnas": mapeo_columnas
                })
                
                print(f"Archivo '{os.path.basename(archivo)}', Hoja '{hoja}' cargado en la variable '{variable_name}'.")
        
        except Exception as e:
            print(f"Error al leer el archivo '{os.path.basename(archivo)}': {e}")
    
    # Convertir los catálogos en DataFrames
    catalogo_hojas_df = pd.DataFrame(catalogo_hojas)
    catalogo_columnas_df = pd.DataFrame(catalogo_columnas)
    
    return (dataframes, catalogo_hojas_df, catalogo_columnas_df)

### Funciones alternas

In [9]:
def Objeto_Int(df, columnas):
    """
    Convierte varias columnas de un DataFrame de tipo object a int64.

    :param df: DataFrame que contiene las columnas a convertir.
    :param columnas: Lista de nombres de columnas a convertir.
    :return: DataFrame con las columnas convertidas a int64.
    """
    for columna in columnas:
        try:
            # Reemplazar valores no numéricos con NaN, luego con 0, y convertir a int64
            df[columna] = pd.to_numeric(df[columna], errors='coerce').fillna(0).astype('int64')
            print(f"Columna '{columna}' convertida exitosamente a int64.")
        except Exception as e:
            print(f"Error al convertir la columna '{columna}' a int64: {e}")
    return (df)

In [10]:
def Column_Upp(df, columnas):
    """
    Convierte el texto de varias columnas de un DataFrame a mayúsculas.

    :param df: DataFrame que contiene las columnas a convertir.
    :param columnas: Lista de nombres de columnas a convertir.
    :return: DataFrame con las columnas convertidas a mayúsculas.
    """
    for columna in columnas:
        if columna in df.columns:  # Validar si la columna existe en el DataFrame
            # Convertir los valores de la columna a mayúsculas
            df[columna] = df[columna].astype(str).str.upper()
            print(f"Columna '{columna}' convertida exitosamente a mayúsculas.")
        else:
            # Advertencia si la columna no existe en el DataFrame
            print(f"Advertencia: La columna '{columna}' no existe en el DataFrame.")
    return df

In [11]:
def Restru_columna(df):
    """
    Reestructura las columnas de un DataFrame tomando la primera fila como encabezado,
    eliminando la primera fila y reiniciando los índices.

    :param df: DataFrame a reestructurar.
    :return: DataFrame reestructurado.
    """
    # Asignar la primera fila como nombres de columnas
    df.columns = df.iloc[0]
    
    # Eliminar la primera fila que ahora es redundante
    df = df[1:]
    
    # Reiniciar los índices del DataFrame
    df = df.reset_index(drop=True)
    
    return (df)

In [12]:
def Columnas_Unicas(df):
    """
    Modifica las columnas duplicadas de un DataFrame para que sean únicas agregando un sufijo numérico.

    :param df: DataFrame con columnas posiblemente duplicadas.
    :return: DataFrame con nombres de columnas únicos.
    """
    cols = pd.Series(df.columns)
    for dup in cols[cols.duplicated()].unique():  # Detectar duplicados
        dup_indices = cols[cols == dup].index
        for i, idx in enumerate(dup_indices):
            cols[idx] = f"{dup}_{i+1}"  # Agregar sufijo a los duplicados
    df.columns = cols
    return (df)


In [13]:
def Reno_Columnas(df):
    """
    Renombra todas las columnas de un DataFrame con nombres secuenciales (T1, T2, ...).

    :param df: DataFrame cuyas columnas serán renombradas.
    :return: Tuple: DataFrame con columnas renombradas, diccionario con el mapeo original-nuevo.
    """
    nuevos_nombres = {col: f"T{i+1}" for i, col in enumerate(df.columns)}
    df = df.rename(columns=nuevos_nombres)
    return (df, nuevos_nombres)

In [14]:
def Repla_V(df, columna, valor_a_reemplazar, nuevo_valor):
    """
    Reemplaza valores específicos en una columna de un DataFrame.

    :param df: DataFrame que contiene la columna.
    :param columna: Nombre de la columna donde se reemplazarán los valores.
    :param valor_a_reemplazar: Valor que será reemplazado.
    :param nuevo_valor: Nuevo valor que se asignará en su lugar.
    :return: DataFrame con los valores reemplazados.
    """
    if columna in df.columns:  # Verificar que la columna existe
        df[columna] = df[columna].replace(valor_a_reemplazar, nuevo_valor)
        print(f"Valores '{valor_a_reemplazar}' reemplazados por '{nuevo_valor}' en la columna '{columna}'.")
    else:
        print(f"Error: La columna '{columna}' no existe en el DataFrame.")
    return df

### Funciones Reglas de Negocio

In [15]:
def Norma_Text(texto):
    """
    Normaliza un texto convirtiéndolo a mayúsculas, eliminando espacios adicionales.

    :param texto: Texto a normalizar.
    :return: Texto normalizado.
    """
    if isinstance(texto, str):
        # Convertir a mayúsculas y eliminar espacios extra
        return " ".join(texto.strip().upper().split())
    return (texto)

In [16]:
def Regla_Negocio(conca_dat, referencia, columna_conca, columna_referencia, columna_valor, nueva_columna):
    """
    Agrega una nueva columna a un DataFrame basada en una regla de mapeo con otro DataFrame.

    :param conca_dat: DataFrame principal donde se agregará la nueva columna.
    :param referencia: DataFrame de referencia para el mapeo.
    :param columna_conca: Columna en el DataFrame principal a comparar.
    :param columna_referencia: Columna en el DataFrame de referencia para la clave de mapeo.
    :param columna_valor: Columna en el DataFrame de referencia que contiene los valores a asignar.
    :param nueva_columna: Nombre de la nueva columna a agregar en el DataFrame principal.
    :return: DataFrame principal con la nueva columna agregada.
    """
    # Normalizar las columnas involucradas
    referencia[columna_referencia] = referencia[columna_referencia].apply(Norma_Text)
    referencia[columna_valor] = referencia[columna_valor].apply(Norma_Text)
    conca_dat[columna_conca] = conca_dat[columna_conca].apply(Norma_Text)
    
    # Crear un diccionario de mapeo
    mapa_referencia = referencia.set_index(columna_referencia)[columna_valor].to_dict()
    
    # Aplicar la regla para agregar la nueva columna
    conca_dat[nueva_columna] = conca_dat[columna_conca].apply(
        lambda x: mapa_referencia[x] if x in mapa_referencia else "In Activo"
    )
    return (conca_dat)

In [68]:
def Regla_Negocio2(conca_dat, referencia, columna_origen, columna_referencia, nueva_columna):
    """
    Agrega una nueva columna a un DataFrame basada en la existencia de valores en otro DataFrame.

    :param conca_dat: DataFrame principal donde se agregará la nueva columna.
    :param referencia: DataFrame de referencia para verificar los valores.
    :param columna_origen: Columna en el DataFrame principal cuyos valores serán verificados.
    :param columna_referencia: Columna en el DataFrame de referencia que contiene los valores a verificar.
    :param nueva_columna: Nombre de la nueva columna a agregar en el DataFrame principal.
    :return: DataFrame principal con la nueva columna agregada.
    """
    # Crear un conjunto de valores únicos de la columna de referencia
    valores_referencia = set(referencia[columna_referencia])

    # Usar np.where para asignar valores según la regla
    conca_dat[nueva_columna] = np.where(
        conca_dat[columna_origen].isin(valores_referencia),  # Si el valor está en el conjunto
        conca_dat[columna_origen],                          # Mantener el valor original
        "In Activo"                                         # De lo contrario, asignar "in activo"
    )
    return (conca_dat)

In [19]:
### > Lectura de multiples archivos xlsx de la carpeta
#>
carpeta_entrada = Entradas
Usuarios, Usuarios_Catalogo = Archivos_Carpeta(carpeta_entrada)
globals().update(Usuarios)

Todos los archivos se han leído correctamente.


In [20]:
### > Renombramiento de columnas y creación de catálogos
#>
Usuarios, Usuarios_columnas = Renombrar_Catalogo(Usuarios)

Todos los archivos se han renombrado correctamente.


In [21]:
### > Concatenación de los multiples archivos en uno
#>
Usuarios_Concatenados = Valida_Concatena(Usuarios)

Todos los DataFrames tienen las mismas columnas. Procediendo a concatenar...
Concatenación exitosa.


In [22]:
### > Transformar de obj a int
#>
Column_Int = ["C7"]
Usuarios_Concatenados = Objeto_Int(Usuarios_Concatenados, Column_Int)

Columna 'C7' convertida exitosamente a int64.


In [23]:
### > Transformar a mayusculas
#>
Columnas_My = ["C1", "C2", "C3", "C4"]
Usuarios_Concatenados = Column_Upp(Usuarios_Concatenados, Columnas_My)

Columna 'C1' convertida exitosamente a mayúsculas.
Columna 'C2' convertida exitosamente a mayúsculas.
Columna 'C3' convertida exitosamente a mayúsculas.
Columna 'C4' convertida exitosamente a mayúsculas.


## Procesado de información
### **Reglas de negocio**

In [29]:
Conca_Usuarios = Usuarios_Concatenados.copy()

In [30]:
Conca_Usuarios = Regla_Negocio(Conca_Usuarios, 
                               Y1_H1, 
                               columna_conca = "C5", 
                               columna_referencia = "A_3", 
                               columna_valor = "A_1", 
                               nueva_columna = "EURFC")

In [31]:
Conca_Usuarios = Regla_Negocio(Conca_Usuarios, 
                               Y1_H1, 
                               columna_conca = "C6", 
                               columna_referencia = "A_4", 
                               columna_valor = "A_5", 
                               nueva_columna = "VEUIA")

In [70]:
Conca_Usuarios = Regla_Negocio(Conca_Usuarios, 
                               Y2_H2_R, 
                               columna_conca = "PNUSR", 
                               columna_referencia = "T1", 
                               columna_valor = "T8", 
                               nueva_columna = "AAUN")
Conca_Usuarios = Repla_V(Conca_Usuarios, "AAUN", "In Activo", "Sin Cambio")

Valores 'In Activo' reemplazados por 'Sin Cambio' en la columna 'AAUN'.


In [55]:
def Regla_Negocio3(conca_dat, referencia, columna_origen, columna_referencia, columna_valor, nueva_columna):
    """
    Agrega una nueva columna a un DataFrame basada en la existencia de valores en otro DataFrame.

    :param conca_dat: DataFrame principal donde se agregará la nueva columna.
    :param referencia: DataFrame de referencia para verificar los valores.
    :param columna_origen: Columna en el DataFrame principal cuyos valores serán verificados.
    :param columna_referencia: Columna en el DataFrame de referencia que contiene los valores a verificar.
    :param columna_valor: Columna en el DataFrame de referencia cuyos valores serán asignados si hay coincidencia.
    :param nueva_columna: Nombre de la nueva columna a agregar en el DataFrame principal.
    :return: DataFrame principal con la nueva columna agregada.
    """
    # Crear un diccionario de referencia para buscar valores
    valores_referencia = referencia.set_index(columna_referencia)[columna_valor].to_dict()

    # Usar np.where para asignar valores según la regla
    conca_dat[nueva_columna] = conca_dat[columna_origen].apply(
        lambda x: valores_referencia.get(x, 'No se encontro')
    )
    
    return conca_dat

In [61]:
Conca_Usuarios = Regla_Negocio3(conca_dat = Conca_Usuarios,
                                referencia = AGS_R,
                                columna_origen = 'C5',
                                columna_referencia = 'RFC_CORTO',
                                columna_valor = 'T8',
                                nueva_columna = 'ADGA')

In [64]:
Conca_Usuarios = Regla_Negocio3(conca_dat = Conca_Usuarios,
                                referencia = AGS_R,
                                columna_origen = 'C5',
                                columna_referencia = 'RFC_CORTO',
                                columna_valor = 'T8',
                                nueva_columna = 'EPRFC')
Conca_Usuarios = Repla_V(Conca_Usuarios, "EPRFC", "No se encontro", "Sin Información")

Valores 'No se encontro' reemplazados por 'Sin Información' en la columna 'EPRFC'.


In [65]:
Conca_Usuarios = Regla_Negocio3(conca_dat = Conca_Usuarios,
                                referencia = AGS_R,
                                columna_origen = 'C4',
                                columna_referencia = 'NOMBRE',
                                columna_valor = 'T8',
                                nueva_columna = 'EPNMB')
Conca_Usuarios = Repla_V(Conca_Usuarios, "EPNMB", "No se encontro", "Sin Información")

Valores 'No se encontro' reemplazados por 'Sin Información' en la columna 'EPNMB'.
