In [None]:
# extraer_vehiculos_corregido.py

import pandas as pd
import os
import re
import unicodedata
from rapidfuzz import fuzz
import numpy as np
import datetime

# --- 1. Funciones de Normalizaci√≥n ---

_re_non_alnum = re.compile(r"[^0-9a-z]+")
_re_multi_unders = re.compile(r"_+")

def normalizar_texto(texto: str) -> str:
    """Normaliza texto: sin acentos, min√∫sculas, underscores limpios."""
    if not isinstance(texto, str):
        # Convierte valores no string a string (√∫til si hay n√∫meros o NaN)
        # Esto es vital para que las columnas de n√∫meros/fechas no causen error
        return str(texto)

    texto = unicodedata.normalize("NFKD", texto)
    texto = "".join(c for c in texto if not unicodedata.combining(c))
    texto = texto.lower()

    texto = _re_non_alnum.sub("_", texto)
    texto = _re_multi_unders.sub("_", texto)
    return texto.strip("_")

def normalize_sheet_name(name: str) -> str:
    """Normaliza y singulariza para evitar 'vehiculos' / 'vehiculo'."""
    norm = normalizar_texto(name)
    # singularizaci√≥n simple
    if norm.endswith("es") and len(norm) > 3:
        norm = norm[:-2]
    elif norm.endswith("s") and len(norm) > 2:
        norm = norm[:-1]
    return norm

# --- 2. Funciones de Extracci√≥n de Metadatos ---

MESES = {
    "enero": 1, "febrero": 2, "marzo": 3, "abril": 4,
    "mayo": 5, "junio": 6, "julio": 7, "agosto": 8,
    "septiembre": 9, "octubre": 10, "noviembre": 11, "diciembre": 12
}
a√±os = ['2025','2024','2023','2022','2021','2020','2019','2018','2017','2016','2015','2014','2013','2012','2011','2010']

def extraer_mes_nomb_excel(nombre_archivo: str) -> str | None:
    # quitar extensi√≥n
    nombre = os.path.splitext(nombre_archivo)[0].lower()

    # buscar coincidencia
    for mes in MESES:
        if mes in nombre:
            return mes
    return None

def extraer_a√±o(nombre_archivo: str) -> str | None:
    # quitar extensi√≥n
    nombre = os.path.splitext(nombre_archivo)[0].lower()

    # buscar coincidencia
    for a√±o in a√±os:
        if a√±o in nombre:
            return a√±o
    return None

# --- 3. Detecci√≥n de Hojas de Veh√≠culos ---

def es_hoja_vehiculo(sheet_name: str) -> bool:
    norm = normalize_sheet_name(sheet_name)
    tokens = norm.split("_")

    # 1) Token empieza con "veh" ‚Üí MUY seguro
    for t in tokens:
        if t.startswith("veh"):
            return True

    # 2) Excluir expl√≠citamente cosas tipo "servicio" / "servicios"
    if any(t.startswith(("serv", "servi", "servic")) for t in tokens):
        return False

    # 3) Fuzzy matching seguro contra "vehiculo"
    score = fuzz.partial_ratio(norm, "vehiculo")
    if score >= 75:
        return True

    # 4) Fuzzy por token con longitud m√≠nima
    for t in tokens:
        if len(t) >= 3:
            if fuzz.partial_ratio(t, "veh") >= 90:
                return True

    return False

# --- 4. Extracci√≥n Principal ---

def leer_excels_vehiculos(root_path: str) -> pd.DataFrame:
    todos = []

    for year_folder in os.listdir(root_path):
        ruta_a√±o = os.path.join(root_path, year_folder)
        if not os.path.isdir(ruta_a√±o):
            continue

        for archivo in os.listdir(ruta_a√±o):
            if not archivo.lower().endswith((".xlsx", ".xls", ".xlsm")):
                continue

            ruta_file = os.path.join(ruta_a√±o, archivo)

            a√±o_det = extraer_a√±o(archivo)
            mes_det = extraer_mes_nomb_excel(archivo)

            try:
                excel = pd.ExcelFile(ruta_file)
            except Exception as e:
                print(f"‚ùå Error abriendo {archivo}: {e}")
                continue

            encontro_hoja = False

            for hoja in excel.sheet_names:
                if es_hoja_vehiculo(hoja):
                    print(f"‚úî Leyendo {archivo} | hoja autom√°tica: {hoja}")
                    try:
                        df = pd.read_excel(ruta_file, sheet_name=hoja)
                        
                        # ! CORRECCI√ìN CLAVE 1: Normalizar columnas inmediatamente
                        df.columns = [normalizar_texto(c) for c in df.columns]
                        
                        df["a√±o_archivo"] = a√±o_det
                        df["mes_archivo"] = mes_det
                        df["archivo"] = archivo
                        df["hoja"] = hoja
                        
                        todos.append(df)
                        encontro_hoja = True
                    except Exception as e:
                        print(f"‚ùå Error leyendo hoja {hoja} en {archivo}: {e}")
                    
                    break # Asume que solo quieres una hoja por archivo
            
            # --- Fallback interactivo si NO encontr√≥ nada ---
            if not encontro_hoja:
                print(f"\n‚ö† No encontr√© ninguna hoja tipo 'vehiculo' en el archivo:")
                print(f"   ‚Üí {archivo}")
                print("   Hojas disponibles:")

                for i, h in enumerate(excel.sheet_names):
                    print(f"   [{i}] {h}")

                try:
                    idx = int(input(f"üëâ Ingresa el n√∫mero de la hoja que quieres usar (o -1 para saltar) opciones{list(range(len(excel.sheet_names)))}: "))
                except:
                    print("Entrada inv√°lida. Saltando archivo.")
                    continue

                if idx == -1:
                    print("‚è≠ Saltando archivo.")
                    continue
                if idx < 0 or idx >= len(excel.sheet_names):
                    print("‚ùå √çndice fuera de rango. Saltando archivo.")
                    continue

                hoja_manual = excel.sheet_names[idx]
                print(f"‚úî Leyendo manualmente: {archivo} | hoja: {hoja_manual}")

                try:
                    df = pd.read_excel(ruta_file, sheet_name=hoja_manual)
                    
                    # ! CORRECCI√ìN CLAVE 1: Normalizar columnas inmediatamente
                    df.columns = [normalizar_texto(c) for c in df.columns]
                    
                    df["a√±o_archivo"] = a√±o_det
                    df["mes_archivo"] = mes_det
                    df["archivo"] = archivo
                    df["hoja"] = hoja_manual
                    print(df.isnull().sum())
                    todos.append(df)
                except Exception as e:
                    print(f"‚ùå Error leyendo hoja manual {hoja_manual} en {archivo}: {e}")
                    continue

    if todos:
        return pd.concat(todos, ignore_index=True)
    return pd.DataFrame()

# --- 5. Unificaci√≥n de Columnas y Log de Diagn√≥stico ---

# Definici√≥n de grupos de columnas para unificaci√≥n
# ! REVISA Y COMPLETA ESTA LISTA si encuentras otros nombres de combustible normalizados.
column_groups = {
    "tipo_servicio": [
        "tipo_servicio", "tiposervicio", "tipo_serviccio",
        "tipo_servcio", "tipo_servicio_", "tipo_servicio_1",
        "tipo_servicio1"
    ],
    "fecha_ingreso_rnt": [
        "fecha_ingreso_rnt", "fecha_ingreso_rntt", "fecha_ingreso_rnttt",
        "fecha_ingreso_rnt_1", "fecha_ingreso_rnt1",
        "fecha_ingreso_rnt_", "fecha_ingreso_rnt__"
    ],
    "combustible": [
        "combustible", "tipo_combustible", "tipo_combustible_",
        "tipo_combustible1", "combust", "combustible_del_vehiculo" 
        # A√±ade aqu√≠ otros nombres normalizados que encuentres, ej: 'tipo_fuel'
    ],
    "fecha_ingreso": [
        "fecha_ingreso", "fecha_ingreso_servicio", "fecha_ingreso1"
    ],
    "ano_fabricacion": [
        "ano_fabricacion", "anofabricacion", "a√±o_fabricacion",
        "ano_fabricacion_", "ano_fabricacion1"
    ]
}

def unificar_columnas(df: pd.DataFrame, groups: dict) -> pd.DataFrame:
    df = df.copy()
    
    # Lista para almacenar las columnas de metadatos (evita dropear metadatos)
    meta_cols = ["a√±o_archivo", "mes_archivo", "archivo", "hoja"]

    for final_col, variantes in groups.items():
        # Aseguramos el orden de las columnas existentes (ya normalizadas)
        existentes = [c for c in variantes if c in df.columns]
        
        if not existentes:
            # Si no existe ninguna variante, creamos la columna final con nulos (tipo String)
            df[final_col] = pd.NA
            continue
        
        # L√≥gica de consolidaci√≥n: Empezamos con la primera variante, y llenamos sus nulos con la siguiente.
        df[final_col] = df[existentes[0]]
        for col in existentes[1:]:
            df[final_col] = df[final_col].fillna(df[col])

        # eliminar columnas usadas excepto la final y las de metadatos
        to_drop = [c for c in existentes if c != final_col and c not in meta_cols]
        df = df.drop(columns=to_drop, errors='ignore')

        # ! LOG DE DIAGN√ìSTICO: Muestra el estado de la columna 'combustible'
        if final_col == 'combustible':
            nulos = df[final_col].isnull().sum()
            total = len(df)
            porcentaje_nulos = (nulos / total) * 100 if total > 0 else 0
            
            print("="*60)
            print(f"‚úÖ CONSOLIDACI√ìN DE COLUMNA: '{final_col}'")
            print("="*60)
            print(f"   ‚Üí Columnas fuente usadas: {existentes}")
            print(f"   ‚Üí Total filas: {total}")
            print(f"   ‚Üí Filas con Nulos: {nulos}")
            print(f"   ‚Üí Porcentaje de Nulos: {porcentaje_nulos:.2f}%")
            print("-" * 60)
            print(f"   ‚Üí Top 5 Valores despu√©s de la unificaci√≥n (incluyendo nulos):\n{df[final_col].value_counts(dropna=False).head(5)}")
            print("="*60 + "\n")

    return df

# --- 6. Conversi√≥n de Tipos y Guardado ---

def detect_and_convert_datetime(df: pd.DataFrame, sample_size=100) -> tuple[pd.DataFrame, list]:
    """Detecta y convierte columnas de objetos que parecen fechas a datetime."""
    df = df.copy()
    obj_cols = df.select_dtypes(include=["object"]).columns
    date_cols = []
    
    for col in obj_cols:
        nonnull = df[col].dropna()
        if nonnull.empty:
            continue
        sample = nonnull.head(sample_size)
        
        # Si contiene objetos datetime/Timestamp
        if sample.map(lambda x: isinstance(x, (pd.Timestamp, datetime.datetime))).any():
            date_cols.append(col)
            continue
            
        # Intentar parsear si parecen cadenas de fecha
        parsed = pd.to_datetime(sample, errors="coerce", infer_datetime_format=True)
        if parsed.notna().any():
            date_cols.append(col)

    # Convertir columnas detectadas
    for col in date_cols:
        df[col] = pd.to_datetime(df[col], errors="coerce", infer_datetime_format=True)

    return df, date_cols

# --- 7. Ejecuci√≥n Principal ---

if __name__ == '__main__':
    # ! IMPORTANTE: Define la ruta ra√≠z de tus archivos Excel
    root_excels = r"C:\\Users\\hiros\\\\Desktop\\\\CMS-PRUEBA-TECNICA\\\\descargas"
    
    # 1. Extracci√≥n y normalizaci√≥n
    print("INICIANDO EXTRACCI√ìN Y NORMALIZACI√ìN DE ARCHIVOS EXCEL...")
    df_vehiculos = leer_excels_vehiculos(root_excels)

    if df_vehiculos.empty:
        print("üõë No se extrajeron datos. Terminando ejecuci√≥n.")
    else:
        # 2. Unificaci√≥n de columnas y diagn√≥stico de nulos
        print("\nINICIANDO UNIFICACI√ìN DE COLUMNAS (Aqu√≠ ver√°s el LOG de 'combustible')...")
        vehiculos_df = unificar_columnas(df_vehiculos.copy(), column_groups)

        # 3. Conversi√≥n de tipos
        vehiculos_df, converted_cols = detect_and_convert_datetime(vehiculos_df)
        print("Columnas convertidas a datetime:", converted_cols)

        # 4. Asegurar tipo string para el resto de objetos
        obj_cols = vehiculos_df.select_dtypes(include=["object"]).columns
        if len(obj_cols) > 0:
            vehiculos_df[obj_cols] = vehiculos_df[obj_cols].astype("string")

        # 5. Ordenar columnas (adaptado a los nuevos nombres)
        column_order = [
            "folio", "region", "ppu", "linea", "marca", "modelo",
            "ano_fabricacion", "capacidad", "tipo_servicio", "combustible",
            "fecha_ingreso", "fecha_ingreso_rnt", "mes_archivo", "a√±o_archivo"
        ]
        
        final_cols = [c for c in column_order if c in vehiculos_df.columns]
        final_cols += [c for c in vehiculos_df.columns if c not in final_cols]
        vehiculos_df = vehiculos_df[final_cols]

        # 6. Guardar resultados
        print("\nGUARDANDO RESULTADOS...")
        vehiculos_df.to_parquet("veh_raw.parquet", index=False)
        vehiculos_df.to_csv("vehiculos_extraidos.csv", index=False)
        
        print(f"‚úÖ Proceso terminado. Se extrajeron {len(vehiculos_df)} filas.")
        print("Primeras 5 filas del resultado final:")
        print(vehiculos_df.head())

INICIANDO EXTRACCI√ìN Y NORMALIZACI√ìN DE ARCHIVOS EXCEL...
‚úî Leyendo Base de Datos de Transporte P√∫blico - Abril 2021.xlsx | hoja autom√°tica: Veh√≠culos
‚úî Leyendo Base de Datos de Transporte P√∫blico - Agosto 2021.xlsx | hoja autom√°tica: Veh√≠culo
