In [3]:
import os
import pandas as pd
import datetime
import re


def leer_y_procesar_archivos(ruta_directorio, nombres_archivos):
    lista_dataframes = []
    for nombre_archivo in nombres_archivos:
        archivo_completo = os.path.join(ruta_directorio, nombre_archivo)
        print(f"Procesando {nombre_archivo}")
        df_principal = pd.read_excel(archivo_completo, skiprows=4, header=[0, 1])
        df_info_adicional = pd.read_excel(archivo_completo, nrows=3)
        numero_archivo = re.search(r'(\d+)', nombre_archivo).group(1)
        df_principal['Nombre del Proceso'] = f"proceso {numero_archivo}"
        for i in range(3):
            df_principal[df_info_adicional.iloc[i, 0]] = df_info_adicional.iloc[i, 1]
        df_principal['RECORD_SOURCE'] = nombre_archivo
        df_principal['LOAD_DATE'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
        lista_dataframes.append(df_principal)
    return pd.concat(lista_dataframes)

def limpiar_nombres_columnas(df):
    new_columns = []
    for col in df.columns:
        if isinstance(col, tuple):
            if "Unnamed:" in col[1] or col[1] == "" or col[1] is None:
                new_col = col[0]
            else:
                new_col = '_'.join(col)
        else:
            new_col = col
        new_columns.append(new_col)
    df.columns = new_columns
    df.columns = df.columns.str.replace('\n', ' ').str.strip()
    return df

def filtrar_competencias(df, competencias, categorias):
    # Comenzar con una máscara que selecciona ninguna fila
    mask = False
    for competencia in competencias:
        for categoria in categorias:
            columna = f"{competencia}{categoria}"
            # Actualizar la máscara para incluir filas que tienen datos en la columna actual
            mask |= df[columna].notna()
    # Retornar el DataFrame con las filas que cumplen con la máscara
    return df[mask]


def procesar_dataframe(df, fecha_columna, identificacion_columna):
    if fecha_columna in df.columns:
        df[fecha_columna] = df[fecha_columna].fillna(df["Fecha de Ingreso a Proceso (Zona horaria GMT 0)"])
    df = df.sort_values(by=fecha_columna, ascending=False)
    df = df.drop_duplicates(subset=identificacion_columna, keep='first')
    df['PROCESS_DATA'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
    df['CREATION_USER'] = os.getlogin()
    return df


In [5]:

# Uso del código reestructurado
ruta = r'C:\Users\IanRJ\Documents\PEDE3_Airflow\dags\Prueba reto'
nombres_archivos = ['Base_1.xlsx', 'Base_2.xlsx']
Base_Total = leer_y_procesar_archivos(ruta, nombres_archivos)
Base_Total = limpiar_nombres_columnas(Base_Total)

competencias = ["Calidad del trabajo", "Desarrollo de relaciones", "Escrupulosidad/Minuciosidad",
                "Flexibilidad y Adaptabilidad", "Orden y la calidad", "Orientación al Logro",
                "Pensamiento Analítico", "Resolución de problemas", "Tesón y disciplina", "Trabajo en equipo"]
categorias = ["_Valor", "_Esperado", "_Brecha", "_Cumplimiento"]

Base_Total = filtrar_competencias(Base_Total, competencias, categorias)
Base_Total = procesar_dataframe(Base_Total, 'Fecha de Finalización de Proceso (Zona horaria GMT 0)', 'No. Identificación')

Base_permanencia = pd.read_excel(os.path.join(ruta, "Base_permanencia.xlsx"))
Base_Total["No. Identificación"] = Base_Total["No. Identificación"].str.replace(' ', '', regex=True)
Base_permanencia["No. Identificación"] = Base_permanencia["No. Identificación"].str.replace(' ', '', regex=True)
Base_Total = pd.merge(Base_Total, Base_permanencia, how='inner', on='No. Identificación')
Base_Total = Base_Total.sort_values(by=["Nombre del Proceso", 'Ranking'], ascending=True)

Base_Total.to_excel("Modelo_base_consolidado.xlsx", index=False)


Procesando Base_1.xlsx
Procesando Base_2.xlsx


  warn(msg)


In [1]:

import os
import pandas as pd
import datetime
import re 

def procesar_archivos_especificos(ruta_directorio):
    """
    Procesa los archivos Excel específicos dentro de una ruta dada.
    
    Args:
    - ruta_directorio (str): Ruta al directorio que contiene los archivos a procesar.
    
    Returns:
    - DataFrame: Un conjunto de datos concatenado con el contenido de los archivos procesados.
    """
    nombres_archivos = ['Base_1.xlsx', 'Base_2.xlsx']  # Nombres de los archivos a procesar
    lista_dataframes = []

    for nombre_archivo in nombres_archivos:
        archivo_completo = os.path.join(ruta_directorio, nombre_archivo)
        print(f"Procesando {nombre_archivo}")
        
        # Leer el archivo Excel principal, omitiendo las primeras 4 filas y usando 2 filas como encabezado
        df_principal = pd.read_excel(archivo_completo, skiprows=4, header=[0, 1])
        
        # Leer las primeras 3 filas del mismo archivo para capturar información adicional
        df_info_adicional = pd.read_excel(archivo_completo, nrows=3)

        # Extraer el número del nombre del archivo
        numero_archivo = re.search(r'(\d+)', nombre_archivo).group(1)
        df_principal['Nombre del Proceso'] = f"proceso {numero_archivo}"
        
        # Capturar información adicional y añadirla al DataFrame principal
        for i in range(3):
            df_principal[df_info_adicional.iloc[i, 0]] = df_info_adicional.iloc[i, 1]

        # Agregar información de metadatos
        df_principal['RECORD_SOURCE'] = nombre_archivo
        fecha_formateada = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
        df_principal['LOAD_DATE'] = fecha_formateada
        
        # Agregar DataFrame procesado a la lista
        lista_dataframes.append(df_principal)

    # Concatena todos los DataFrames en uno solo
    dataset_final = pd.concat(lista_dataframes)
    
    return dataset_final

# Uso del código:
ruta = r'C:\Users\IanRJ\Documents\PEDE3_Airflow\dags\Prueba reto'
Base_Total = procesar_archivos_especificos(ruta)
Base_permanencia = pd.read_excel(os.path.join(ruta, "Base_permanencia.xlsx"))



# Cleaning up the column names
cleaned_columns = []
for col in Base_Total.columns:
    # Check if the second level of the multi-index is a string and contains "Unnamed:"
    if isinstance(col[1], str) and ("Unnamed:" in col[1] or col[1] == "" or col[1] is None):
        cleaned_columns.append(col[0])
    else:
        cleaned_columns.append(col)

Base_Total.columns = cleaned_columns

Base_Total.head(5)


new_columns = []
for col in Base_Total.columns:
    if isinstance(col, tuple):
        new_col = '_'.join(col)
    else:
        new_col = col
    new_columns.append(new_col)

Base_Total.columns = new_columns


Base_Total.columns = Base_Total.columns.str.replace('\n', ' ')



# Lista de competencias que quieres revisar
competencias = ["Calidad del trabajo", 
                "Desarrollo de relaciones", 
                "Escrupulosidad/Minuciosidad", 
                "Flexibilidad y Adaptabilidad", 
                "Orden y la calidad", 
                "Orientación al Logro", 
                "Pensamiento Analítico", 
                "Resolución de problemas", 
                "Tesón y disciplina", 
                "Trabajo en equipo"]

# Categorías de las competencias a verificar
categorias = ["_Valor", "_Esperado", "_Brecha", "_Cumplimiento"]

# Construye una máscara para filtrar las filas
mask = True
for competencia in competencias:
    for categoria in categorias:
        columna = f"{competencia}{categoria}"
        mask &= Base_Total[columna].notna()

# Elimina las filas usando la máscara inversa (con ~)
Base_Total.drop(Base_Total[~mask].index, inplace=True)



Base_Total.shape


Base_Total["Fecha de Finalización de Proceso (Zona horaria GMT 0)"] = Base_Total["Fecha de Finalización de Proceso (Zona horaria GMT 0)"].fillna(Base_Total["Fecha de Ingreso a Proceso (Zona horaria GMT 0)"])


# Ordena el DataFrame por 'Fecha de Finalización de Proceso (Zona horaria GMT 0)' de manera descendente
Base_Total = Base_Total.sort_values(by='Fecha de Finalización de Proceso (Zona horaria GMT 0)', ascending=False)

# Elimina duplicados por 'No. Identificación' y se queda con el primer registro (el más reciente debido al ordenamiento previo)
Base_Total = Base_Total.drop_duplicates(subset='No. Identificación', keep='first')



Base_Total.shape


current_datetime = datetime.datetime.now()
formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M")  # Formatea la fecha y hora actual
Base_Total['PROCESS_DATA'] = formatted_datetime
# Obtener el nombre de usuario de Windows
windows_username = os.getlogin()

# Establecer el valor de la columna CREATION_USER
Base_Total['CREATION_USER'] = windows_username


Base_Total["No. Identificación"]=Base_Total["No. Identificación"].str.replace(' ', '', regex=True)
Base_permanencia["No. Identificación"]=Base_permanencia["No. Identificación"].str.replace(' ', '', regex=True)
Base_Total=pd.merge(Base_Total,Base_permanencia,how='inner',on='No. Identificación')


Base_Total = Base_Total.sort_values(by=["Nombre del Proceso",'Ranking'], ascending=True)



Base_Total.to_excel("Modelo_base_consolidado.xlsx",index=False)




Procesando Base_1.xlsx
Procesando Base_2.xlsx


  warn(msg)
