In [6]:
import pandas as pd
import numpy as np
import re
import os

# ------------------------------------------------------------------------------
# 1) Funciones para homogeneizar y limpiar los datos
# ------------------------------------------------------------------------------

def unify_period_string(period_str: str) -> str:
    """
    Unifica variaciones del texto del período a un formato estándar.
    Ejemplos:
      - "1er Trim 1996" -> "1º Trim 1996"
      - "2do Trim 2020" -> "2º Trim 2020"
    """
    if pd.isna(period_str):
        return ""
    
    p = str(period_str).strip().lower()
    # Reemplazos comunes para números ordinales
    p = p.replace("1er", "1º")
    p = p.replace("2do", "2º")
    p = p.replace("3er", "3º")
    p = p.replace("4to", "4º")
    # Uniformar "trim"
    p = p.replace("trim", "Trim")
    # Eliminar espacios dobles
    p = re.sub(r"\s+", " ", p)
    # Convertir a "title" para tener "Trim" con mayúscula y dejar el formato uniforme
    p = p.title()
    return p

def clean_footnotes(df: pd.DataFrame, periodo_col="Período") -> pd.DataFrame:
    """
    Limpia el DataFrame:
      - Unifica el formato del Período.
      - Descarta filas cuyo valor en Período no sea exactamente un trimestre válido.
      - Si existe la columna "Valor", se descartan filas en las que ese valor no sea numérico.
      - Filtra filas que contengan palabras clave típicas de notas.
      
    Se espera que un período válido tenga el formato:
       "1º Trim YYYY" (con 1-4 para el trimestre, y un año de 4 dígitos).
    """
    if periodo_col not in df.columns:
        return df

    # 1) Unificar el texto en la columna de período
    df[periodo_col] = df[periodo_col].astype(str).apply(unify_period_string)
    
    # 2) Expresión regular para detectar períodos válidos, por ejemplo "1º Trim 1996"
    pattern_periodo = re.compile(r"^[1-4]º Trim \d{4}$", re.IGNORECASE)
    mask_periodo_valido = df[periodo_col].str.match(pattern_periodo)
    
    # 3) Descartar filas que tengan palabras clave de notas
    keywords = ["nota", "fuente", "variación", "observación", 
                "comentario", "disclaimer", "empresas", "desocup"]
    pattern_keywords = re.compile("|".join(keywords), re.IGNORECASE)
    mask_no_keywords = ~df[periodo_col].str.contains(pattern_keywords, na=False)
    
    # 4) Si existe la columna "Valor", verificar que sea numérica
    if "Valor" in df.columns:
        mask_valor_numeric = pd.to_numeric(df["Valor"], errors="coerce").notnull()
    else:
        mask_valor_numeric = True
    
    # 5) Combinar todas las máscaras
    mask_total = mask_periodo_valido & mask_no_keywords & mask_valor_numeric
    
    df_limpio = df[mask_total].copy()
    return df_limpio

# ------------------------------------------------------------------------------
# 2) Funciones para extraer metadatos y procesar hojas
# ------------------------------------------------------------------------------

def extract_metadata(file_path: str, sheet_name: str) -> dict:
    """
    Lee las dos primeras filas de la hoja (row 0 y row 1) sin encabezado (header=None)
    y devuelve un diccionario con el título y subtítulo.
    """
    try:
        df_header = pd.read_excel(file_path, sheet_name=sheet_name, header=None, nrows=2)
    except Exception as e:
        print(f"Error al leer metadatos de {sheet_name}: {e}")
        return {"Sheet": sheet_name, "Titulo": "", "Subtitulo": ""}
    
    titulo = ""
    subtitulo = ""
    if len(df_header) > 0 and len(df_header.columns) > 0:
        val_titulo = df_header.iloc[0, 0]
        titulo = str(val_titulo) if pd.notna(val_titulo) else ""
    if len(df_header) > 1 and len(df_header.columns) > 0:
        val_subtitulo = df_header.iloc[1, 0]
        subtitulo = str(val_subtitulo) if pd.notna(val_subtitulo) else ""
    
    return {"Sheet": sheet_name, "Titulo": titulo.strip(), "Subtitulo": subtitulo.strip()}

def process_vertical_sheet(file_path: str, sheet_name: str) -> pd.DataFrame:
    """
    Procesa hojas verticales (donde el período está en la primera columna).
    Se asume que la fila de encabezados es la 3ra (índice 2).
    Optimizado para reducir uso de memoria.
    """
    try:
        # Primero leemos una muestra para determinar columnas útiles
        df_sample = pd.read_excel(file_path, sheet_name=sheet_name, header=2, nrows=5)
        valid_cols = [col for col in df_sample.columns 
                     if not (isinstance(col, str) and "Volver al índice" in col)]
        
        # Leemos solo las columnas válidas
        df = pd.read_excel(file_path, sheet_name=sheet_name, header=2, usecols=valid_cols)
        
        # Convertimos tipos de datos para optimizar memoria
        if "Período" in df.columns:
            df["Período"] = df["Período"].astype(str).str.strip()
        
        df["Fuente"] = sheet_name
        return df
    except Exception as e:
        print(f"Error procesando hoja vertical {sheet_name}: {e}")
        # Devolver DataFrame vacío en caso de error
        return pd.DataFrame(columns=["Período", "Valor", "Fuente"])

def process_horizontal_sheet(file_path: str, sheet_name: str) -> pd.DataFrame:
    """
    Procesa hojas horizontales (donde el período está en los encabezados de columnas).
    Se asume que la fila de encabezados es la 3ra (índice 2) y se transforma a formato long.
    Implementa procesamiento por lotes para reducir uso de memoria.
    """
    try:
        # Leer una muestra para determinar columnas útiles
        df_sample = pd.read_excel(file_path, sheet_name=sheet_name, header=2, nrows=5)
        valid_cols = [col for col in df_sample.columns 
                     if not (isinstance(col, str) and "Volver al índice" in col)]
        
        # Leer solo las columnas válidas
        df = pd.read_excel(file_path, sheet_name=sheet_name, header=2, usecols=valid_cols)
        
        first_col = df.columns[0]
        if not isinstance(first_col, str) or first_col.strip() == "":
            df = df.rename(columns={first_col: "Categoría"})
        if "Categoría" not in df.columns:
            df = df.reset_index().rename(columns={"index": "Categoría"})
        
        # Procesamiento por lotes para evitar MemoryError
        result_chunks = []
        chunk_size = 100  # Ajustar según memoria disponible
        
        for i in range(0, len(df), chunk_size):
            try:
                chunk = df.iloc[i:i+chunk_size].copy()
                chunk_long = chunk.melt(id_vars=["Categoría"], var_name="Período", value_name="Valor")
                chunk_long["Fuente"] = sheet_name
                result_chunks.append(chunk_long)
            except Exception as e:
                print(f"Error procesando lote {i} de {sheet_name}: {e}")
                continue
        
        if result_chunks:
            return pd.concat(result_chunks, ignore_index=True)
        else:
            return pd.DataFrame(columns=["Categoría", "Período", "Valor", "Fuente"])
    except Exception as e:
        print(f"Error procesando hoja horizontal {sheet_name}: {e}")
        return pd.DataFrame(columns=["Categoría", "Período", "Valor", "Fuente"])

# ------------------------------------------------------------------------------
# 2.1) Función para procesar 'descriptores_actividad' a partir del Excel
# y guardarlo como archivo de texto, para luego procesarlo línea por línea.
# ------------------------------------------------------------------------------

def generate_descriptores_txt_from_excel(file_path: str, sheet_name: str, output_txt: str):
    """
    Lee la hoja 'Descriptores de actividad' del Excel y guarda su contenido
    en un archivo de texto (una línea por registro), usando la primera columna.
    """
    try:
        df_desc = pd.read_excel(file_path, sheet_name=sheet_name, header=None, usecols=[0])
        
        # Asegurarse de que el directorio exista
        os.makedirs(os.path.dirname(output_txt), exist_ok=True)
        
        # Escribir cada línea en el archivo
        with open(output_txt, 'w', encoding='utf-8') as f:
            for index, row in df_desc.iterrows():
                if pd.notna(row[0]):
                    f.write(str(row[0]) + "\n")
        
        print(f"Archivo de descriptores generado: {output_txt}")
    except Exception as e:
        print(f"Error generando archivo de descriptores: {e}")

def process_descriptores_txt(file_path: str) -> pd.DataFrame:
    """
    Lee el archivo de texto generado con descriptores línea por línea,
    descarta encabezados y extrae el código y la descripción.
    Retorna un DataFrame con columnas ['Codigo', 'DescripcionCompleta'].
    """
    try:
        if not os.path.exists(file_path):
            print(f"Archivo no encontrado: {file_path}")
            return pd.DataFrame(columns=["Codigo", "DescripcionCompleta"])
        
        lines_limpias = []
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                if "Descriptor completo de actividad" in line:
                    continue
                
                # Intentar extraer código y descripción separando por la primera coma
                match = re.match(r"^([^,]+),\s*(.*)$", line)
                if match:
                    codigo = match.group(1).strip()
                    descripcion = match.group(2).strip()
                    lines_limpias.append((codigo, descripcion))
        
        if lines_limpias:
            df = pd.DataFrame(lines_limpias, columns=["Codigo", "DescripcionCompleta"])
            return df
        else:
            return pd.DataFrame(columns=["Codigo", "DescripcionCompleta"])
    except Exception as e:
        print(f"Error procesando archivo de descriptores: {e}")
        return pd.DataFrame(columns=["Codigo", "DescripcionCompleta"])

# ------------------------------------------------------------------------------
# 3) Script principal de preprocesamiento
# ------------------------------------------------------------------------------

def main_preprocessing_with_metadata_and_cleanfootnotes():
    # Configuración para reducir uso de memoria
    pd.options.mode.chained_assignment = None  # Evitar warnings de copia
    
    # Ruta al archivo Excel
    file_path = r"Data\nacional_serie_empleo_trimestral_actualizado241312.xlsx"
    
    # Asegurarse de que el directorio Data exista
    os.makedirs("Data", exist_ok=True)
    
    # Definir hojas verticales y horizontales
    vertical_sheets = ["C1.1", "C1.2", "C2.1", "C2.2"]
    horizontal_sheets = ["C3", "C4", "C5", "C6", "C7"]
    
    dfs = {}
    metadata_list = []
    
    # Procesar hojas verticales
    for sheet in vertical_sheets:
        try:
            print(f"Procesando hoja vertical: {sheet}...")
            meta = extract_metadata(file_path, sheet)
            metadata_list.append(meta)
            df_v = process_vertical_sheet(file_path, sheet)
            dfs[sheet] = df_v
            print(f"  ✓ Hoja {sheet} procesada: {len(df_v)} filas")
        except Exception as e:
            print(f"  ✗ Error procesando hoja {sheet}: {e}")
            dfs[sheet] = pd.DataFrame()  # DataFrame vacío en caso de error
    
    # Procesar hojas horizontales
    for sheet in horizontal_sheets:
        try:
            print(f"Procesando hoja horizontal: {sheet}...")
            meta = extract_metadata(file_path, sheet)
            metadata_list.append(meta)
            df_h = process_horizontal_sheet(file_path, sheet)
            dfs[sheet] = df_h
            print(f"  ✓ Hoja {sheet} procesada: {len(df_h)} filas")
        except Exception as e:
            print(f"  ✗ Error procesando hoja {sheet}: {e}")
            dfs[sheet] = pd.DataFrame()  # DataFrame vacío en caso de error
    
    # --------------------------------------------------------------------------
    # 3.1) Procesar hoja de Descriptores de actividad
    # --------------------------------------------------------------------------
    descriptores_sheet = "Descriptores de actividad"
    descriptores_txt = "Data/descriptores_actividad_raw.txt"
    try:
        print("Procesando descriptores de actividad...")
        # Generar el archivo de texto desde el Excel
        generate_descriptores_txt_from_excel(file_path, descriptores_sheet, descriptores_txt)
        # Procesar el archivo de texto para obtener el DataFrame
        df_descriptores = process_descriptores_txt(descriptores_txt)
        print(f"  ✓ Descriptores procesados: {len(df_descriptores)} registros")
        dfs["Descriptores de actividad"] = df_descriptores
    except Exception as e:
        print(f"  ✗ Error procesando descriptores: {e}")
        df_descriptores = pd.DataFrame(columns=["Codigo", "DescripcionCompleta"])
        dfs["Descriptores de actividad"] = df_descriptores
    
    # --------------------------------------------------------------------------
    # 3.2) Concatenar DataFrames verticales y horizontales
    # --------------------------------------------------------------------------
    print("Concatenando y limpiando DataFrames...")
    
    # Filtrar DataFrames vacíos antes de concatenar
    vertical_dfs = [dfs[sheet] for sheet in vertical_sheets if sheet in dfs and not dfs[sheet].empty]
    horizontal_dfs = [dfs[sheet] for sheet in horizontal_sheets if sheet in dfs and not dfs[sheet].empty]
    
    if vertical_dfs:
        df_vertical = pd.concat(vertical_dfs, ignore_index=True)
        print(f"  ✓ DataFrame vertical concatenado: {len(df_vertical)} filas")
    else:
        df_vertical = pd.DataFrame(columns=["Período", "Valor", "Fuente"])
        print("  ! No hay datos verticales para concatenar")
    
    if horizontal_dfs:
        df_horizontal = pd.concat(horizontal_dfs, ignore_index=True)
        print(f"  ✓ DataFrame horizontal concatenado: {len(df_horizontal)} filas")
    else:
        df_horizontal = pd.DataFrame(columns=["Categoría", "Período", "Valor", "Fuente"])
        print("  ! No hay datos horizontales para concatenar")
    
    # --------------------------------------------------------------------------
    # 4) Limpiar y homogeneizar la columna de Período en ambos dataframes
    # --------------------------------------------------------------------------
    try:
        if not df_vertical.empty:
            df_vertical_clean = clean_footnotes(df_vertical, "Período")
            print(f"  ✓ DataFrame vertical limpiado: {len(df_vertical_clean)} filas")
        else:
            df_vertical_clean = df_vertical
        
        if not df_horizontal.empty:
            df_horizontal_clean = clean_footnotes(df_horizontal, "Período")
            print(f"  ✓ DataFrame horizontal limpiado: {len(df_horizontal_clean)} filas")
        else:
            df_horizontal_clean = df_horizontal
    except Exception as e:
        print(f"  ✗ Error en limpieza de datos: {e}")
        df_vertical_clean = df_vertical
        df_horizontal_clean = df_horizontal
    
    # --------------------------------------------------------------------------
    # 5) Merge df_horizontal_clean con df_descriptores para incluir la descripción
    # --------------------------------------------------------------------------
    try:
        if not df_horizontal_clean.empty and not df_descriptores.empty:
            df_horizontal_clean = pd.merge(
                df_horizontal_clean,
                df_descriptores,
                left_on="Categoría",
                right_on="Codigo",
                how="left"
            )
            print(f"  ✓ Merge con descriptores completado")
    except Exception as e:
        print(f"  ✗ Error en merge con descriptores: {e}")
    
    # --------------------------------------------------------------------------
    # 6) Guardar los resultados en archivos CSV dentro de Data
    # --------------------------------------------------------------------------
    try:
        output_vertical = "Data/preprocessed_vertical_clean.csv"
        output_horizontal = "Data/preprocessed_horizontal_clean.csv"
        output_descriptores = "Data/descriptores_actividad.csv"
        output_metadata = "Data/metadata_hojas.csv"
        
        # Guardar con manejo de errores
        print("Guardando archivos CSV...")
        df_vertical_clean.to_csv(output_vertical, index=False)
        print(f"  ✓ Guardado: {output_vertical}")
        
        df_horizontal_clean.to_csv(output_horizontal, index=False)
        print(f"  ✓ Guardado: {output_horizontal}")
        
        if not df_descriptores.empty:
            df_descriptores.to_csv(output_descriptores, index=False)
            print(f"  ✓ Guardado: {output_descriptores}")
        else:
            print(f"  ! No se guardó {output_descriptores} (sin datos)")
        
        df_metadata = pd.DataFrame(metadata_list)
        df_metadata.to_csv(output_metadata, index=False)
        print(f"  ✓ Guardado: {output_metadata}")
        
        print("\n¡Preprocesamiento completado con éxito!")
    except Exception as e:
        print(f"Error guardando archivos: {e}")

if __name__ == "__main__":
    try:
        main_preprocessing_with_metadata_and_cleanfootnotes()
    except Exception as e:
        print(f"Error en el script principal: {e}")

Procesando hoja vertical: C1.1...
Error al leer metadatos de C1.1: [Errno 2] No such file or directory: 'Data\\nacional_serie_empleo_trimestral_actualizado241312.xlsx'
Error procesando hoja vertical C1.1: [Errno 2] No such file or directory: 'Data\\nacional_serie_empleo_trimestral_actualizado241312.xlsx'
  ✓ Hoja C1.1 procesada: 0 filas
Procesando hoja vertical: C1.2...
Error al leer metadatos de C1.2: [Errno 2] No such file or directory: 'Data\\nacional_serie_empleo_trimestral_actualizado241312.xlsx'
Error procesando hoja vertical C1.2: [Errno 2] No such file or directory: 'Data\\nacional_serie_empleo_trimestral_actualizado241312.xlsx'
  ✓ Hoja C1.2 procesada: 0 filas
Procesando hoja vertical: C2.1...
Error al leer metadatos de C2.1: [Errno 2] No such file or directory: 'Data\\nacional_serie_empleo_trimestral_actualizado241312.xlsx'
Error procesando hoja vertical C2.1: [Errno 2] No such file or directory: 'Data\\nacional_serie_empleo_trimestral_actualizado241312.xlsx'
  ✓ Hoja C2.1 pr

Prueba 2