# =====================================
# BLOQUE 1: Imports y Config
# =====================================

In [25]:
import re
import pandas as pd
import numpy as np
import unicodedata
from datetime import datetime
import os
import warnings

warnings.filterwarnings("ignore")

# Configuración de Directorio, Patrón y Listado de Archivos

# Directorio donde se encuentran los archivos
DIRECTORIO_SAFETRACK = "../Safetrack"

# Patrón de regex para coincidir con los nombres de archivos esperados
PATRON_ARCHIVO = r"Reporte de viaje\((\d{8})-(\d{8})\)\.xlsx$"

# Mapeo de números de mes a nombres en español
MESES_ES = [
    'Enero', 'Febrero', 'Marzo', 'Abril', 'Mayo', 'Junio',
    'Julio', 'Agosto', 'Septiembre', 'Octubre', 'Noviembre', 'Diciembre'
]

# =====================================
# BLOQUE 2: Función de Logging
# =====================================

In [26]:
def log_proceso(df_antes, df_despues, etapa):
  eliminados = len(df_antes) - len(df_despues)
  porcentaje = (eliminados / len(df_antes)) * 100 if len(df_antes) > 0 else 0
  print(f"\n=== Etapa: {etapa} ===")
  print(f"Registros antes: {len(df_antes)}")
  print(f"Registros después: {len(df_despues)}")
  print(f"Registros eliminados: {eliminados} ({porcentaje:.2f}%)")

  if eliminados > 0:
      registros_eliminados = df_antes[~df_antes.index.isin(df_despues.index)]
      # Guardar registros eliminados en un archivo
      archivo_eliminados = f"../Limpia/eliminados_{etapa.lower().replace(' ', '_')}.xlsx"
      registros_eliminados.to_excel(archivo_eliminados, index=False)
      print(f"Registros eliminados guardados en: {archivo_eliminados}")

# =====================================
# BLOQUE 3: Generar listas de archivos
# =====================================

In [27]:
def listar_archivos(directorio, patron):
    archivos_coincidentes = []
    for archivo in os.listdir(directorio):
        if re.match(patron, archivo):
            ruta_completa = os.path.join(directorio, archivo)
            archivos_coincidentes.append(ruta_completa)
    return archivos_coincidentes

def generar_listas(directorio):
    archivos = listar_archivos(directorio, PATRON_ARCHIVO)
    archivos.sort(key=lambda x: datetime.strptime(re.match(PATRON_ARCHIVO, os.path.basename(x)).group(1), '%Y%m%d'))
    meses = []
    for arch in archivos:
        match = re.match(PATRON_ARCHIVO, os.path.basename(arch))
        fecha_inicio_str = match.group(1)
        fecha_inicio = datetime.strptime(fecha_inicio_str, '%Y%m%d')
        meses.append(MESES_ES[fecha_inicio.month - 1])
    return archivos, meses

archivos, meses = generar_listas(DIRECTORIO_SAFETRACK)
print("Archivos a procesar:")
for arch in archivos:
    print(f" - {arch}")
print("\nMeses extraídos:")
for mes in meses:
    print(f" - {mes}")

Archivos a procesar:
 - ../Safetrack\Reporte de viaje(20250501-20250531).xlsx
 - ../Safetrack\Reporte de viaje(20250601-20250630).xlsx
 - ../Safetrack\Reporte de viaje(20250701-20250725).xlsx

Meses extraídos:
 - Mayo
 - Junio
 - Julio


# =====================================
# BLOQUE 4: Funciones Auxiliares
# =====================================

**Normalizacion**
- normalize_str: Elimina tildes, pasa a minúsculas y quita espacios extremos
- normalize_column: Normaliza nombres de columnas según un diccionario especial


In [28]:
# --- Normalización ---
def normalize_str(s):
    s2 = unicodedata.normalize('NFKD', s).encode('ascii','ignore').decode('ascii')
    return s2.lower().strip()

def normalize_column(col):
    special_columns = {
        'Indice': 'Indice',
        'Numero_de_placa': 'Numero_de_placa',
        'Estado_de_viaje': 'Estado_de_viaje',
        'Tiempo_de_Inicio': 'Tiempo_de_Inicio',
        'Tiempo_Final': 'Tiempo_Final',
        'Duracion': 'Duracion',
        'Lugar_de_inicio': 'Lugar_de_inicio',
        'LATITUD': 'LATITUD',
        'LONGITUD': 'LONGITUD',
        'camion_y': 'camion_y',
        'camion_x': 'camion_x'
    }
    col_normalized = unicodedata.normalize('NFKD', col).encode('ASCII','ignore').decode('ASCII').strip()
    if col in special_columns:
        return special_columns[col]
    for key in special_columns:
        if col_normalized.lower() == key.lower():
            return special_columns[key]
    return None


**Conversión de duración** 
- convertir_a_decimal: Convierte una duración en formato 'XdYhZMinutosWs' a minutos decimales.
- Ejemplo: '1d2h30Minutos45s' => (1*24*60)+(2*60)+30+45/60.

In [29]:
# --- Conversión de Duración ---
def convertir_a_decimal(duracion):
    try:
        if not isinstance(duracion, str):
            duracion = str(duracion)
        dias = 0
        total_min = 0
        if 'd' in duracion:
            partes = duracion.split('d')
            dias = int(partes[0])
            resto = partes[1]
        else:
            resto = duracion
        total_min = dias * 24 * 60
        pattern = r'(?:(\d+)h)?(?:(\d+)Minutos)?(?:(\d+)s)?'
        match = re.match(pattern, resto)
        if match:
            h = int(match.group(1)) if match.group(1) else 0
            m = int(match.group(2)) if match.group(2) else 0
            s = int(match.group(3)) if match.group(3) else 0
            total_min += h * 60 + m + s / 60
        else:
            return None
        return round(total_min, 2)
    except Exception as e:
        print(f"Error en convertir_a_decimal({duracion}): {e}")
        return None

**Procesamiento de Coordenadas**:
- convertir_coordenada: Convierte una cadena de coordenada, ej. '34.766437S' o '58.1234N',
    a float. 
- split_coords: Separa latitud y longitud de un string 'LAT, LON'.
    Retorna un pd.Series con [lat, lon] convertidos a float.

In [30]:
def convertir_coordenada(coord_str):
    if not isinstance(coord_str, str):
        return None
    c = coord_str.strip().upper()
    if not c:
        return None
    sign = 1
    if c[-1] in ['S','W']:
        sign = -1
        c = c[:-1]
    elif c[-1] in ['N','E']:
        c = c[:-1]
    try:
        return sign * float(c)
    except:
        return None

def split_coords(coord_str):
    if not isinstance(coord_str, str):
        return pd.Series([None, None])
    parts = coord_str.split(',')
    if len(parts) != 2:
        return pd.Series([None, None])
    lat_str, lon_str = parts[0].strip(), parts[1].strip()
    lat = convertir_coordenada(lat_str)
    lon = convertir_coordenada(lon_str)
    return pd.Series([lat, lon])

**Mapeo de Placas**:
- Aplica un diccionario de mapeo a la columna 'Numero_de_placa'.
- Normaliza quitando espacios y pasando a mayúsculas antes de mapear.

In [31]:
# --- Mapeo de Placas ---
def mapear_placas(df, mapeo):
    if 'Numero_de_placa' not in df.columns:
        print("No existe 'Numero_de_placa' en el DataFrame; se omite mapeo.")
        return df
    df['Numero_de_placa_normalizada'] = df['Numero_de_placa'].str.replace(' ', '').str.upper()
    print("Placas únicas antes del mapeo:", sorted(df['Numero_de_placa_normalizada'].unique()))
    print("Mapeo disponible:", mapeo)
    mapeo_normalizado = {k.upper(): v for k, v in mapeo.items()}
    df['Numero_de_placa'] = df['Numero_de_placa_normalizada'].map(mapeo_normalizado).fillna(df['Numero_de_placa'])
    df.drop(columns=['Numero_de_placa_normalizada'], inplace=True)
    print("Placas únicas después del mapeo:", sorted(df['Numero_de_placa'].unique()))
    return df

**Parseo de Archivos Excel**:
-  Lee un archivo Excel sin encabezado fijo y detecta bloques de datos.
-  Retorna un DataFrame unificado con columnas:
   -  ['Indice', 'Numero_de_placa', 'Estado_de_viaje', 'Tiempo_de_Inicio',
     'Tiempo_Final', 'Kilometraje_km', 'Duracion', 'Lugar_de_inicio']

In [32]:
def parse_report_file(path):
    import os
    print(f"Parseando archivo: {os.path.basename(path)}")
    try:
        df_raw = pd.read_excel(path, header=None, engine='openpyxl')
    except Exception as e:
        print(f"❌ Error al leer el archivo {path}: {e}")
        return pd.DataFrame()
    df_norm = df_raw.copy()
    for col_idx in df_norm.columns:
        df_norm[col_idx] = df_norm[col_idx].astype(str).apply(normalize_str)
    start_rows = []
    for i in range(len(df_norm)):
        row_text = " ".join(df_norm.loc[i].tolist()).strip()
        if '#' in row_text and 'numero' in row_text and 'placa' in row_text:
            start_rows.append(i)
    print(f"Filas de encabezado detectadas: {start_rows}")
    if not start_rows:
        print("No se detectaron filas de encabezado. Retornando DataFrame vacío.")
        return pd.DataFrame()
    fin_regex_list = [
        r'dispositivo\s*:',
        r'fecha\s*:',
        r'odometro\s*:',
        r'recuento\s*de',
        r'duraci.n\s+del',
        r'sin\s+datos'
    ]
    fin_regex = "|".join(fin_regex_list)
    all_blocks = []
    for idx in range(len(start_rows)):
        header_row = start_rows[idx]
        data_start = header_row + 1
        next_header = start_rows[idx+1] if idx < len(start_rows)-1 else len(df_norm)
        data_end = None
        for row_i in range(data_start, next_header):
            row_text = " ".join(df_norm.loc[row_i].tolist()).strip()
            if re.search(fin_regex, row_text, flags=re.IGNORECASE):
                data_end = row_i
                break
        if not data_end:
            data_end = next_header
        if data_end > data_start:
            df_block = df_raw.iloc[data_start:data_end].copy().reset_index(drop=True)
            if df_block.shape[1] < 9:
                print(f"Bloque ignorado (filas {data_start}:{data_end}) por tener menos de 9 columnas.")
                continue
            df_block = df_block.iloc[:, :9]
            df_block.columns = [
                "Indice", "Numero_de_placa", "Estado_de_viaje",
                "Tiempo_de_Inicio", "Tiempo_Final", "Kilometraje_km",
                "Duracion", "Lugar_de_inicio", "Fin_Localizacion"
            ]
            df_block['Tiempo_de_Inicio'] = pd.to_datetime(df_block['Tiempo_de_Inicio'], errors='coerce')
            df_block['Tiempo_Final'] = pd.to_datetime(df_block['Tiempo_Final'], errors='coerce')
            df_block[['camion_x', 'camion_y']] = df_block['Lugar_de_inicio'].apply(lambda x: split_coords(x))
            df_block.drop(columns=['Fin_Localizacion', 'Kilometraje_km'], inplace=True, errors='ignore')
            all_blocks.append(df_block)
            print(f"Bloque {idx+1}: filas {data_start} a {data_end}, shape={df_block.shape}")
    if not all_blocks:
        print("No se obtuvo ningún bloque válido. Retornando DataFrame vacío.")
        return pd.DataFrame()
    df_final = pd.concat(all_blocks, ignore_index=True)
    print(f"Parseo final: {len(df_final)} filas obtenidas de {len(all_blocks)} bloques.")
    return df_final

**Limpieza global** aplica una serie de pasos de limpieza al DataFrame:
* Elimina filas con 'Dispositivo:' o 'Fecha:'.
*  Normaliza nombres de columnas.
*  Renombra 'numero_de_placa_del_vehiculo' a 'Numero_de_placa' si es necesario.
*  Aplica mapeo de placas.
*  Convierte 'Duracion' a minutos decimales.
*  Filtra registros con 'Tiempo_de_Inicio' entre 07:00 y 18:00.

In [33]:
# --- Limpieza Global ---
def limpieza_global(df, map_placas_dict):
    total_inicial = len(df)
    print("=== LIMPIEZA GLOBAL INICIADA ===")
    print(f"Registros iniciales: {total_inicial}")
    if df.empty:
        print("DataFrame vacío, no se realiza limpieza.")
        return df
 
    cols_originales = df.columns.tolist()
    df.columns = [normalize_column(c) for c in df.columns]
    df = df[[col for col in df.columns if col is not None]]
    print("Columnas antes:", cols_originales)
    print("Columnas después:", df.columns.tolist())
    if 'numero_de_placa_del_vehiculo' in df.columns and 'Numero_de_placa' not in df.columns:
        df.rename(columns={'numero_de_placa_del_vehiculo': 'Numero_de_placa'}, inplace=True)
        print("Renombrada 'numero_de_placa_del_vehiculo' a 'Numero_de_placa'")
    df = mapear_placas(df, map_placas_dict)
    if 'Duracion' in df.columns:
        df['Duracion'] = df['Duracion'].apply(convertir_a_decimal)
        nulos = df['Duracion'].isnull().sum()
        df.dropna(subset=['Duracion'], inplace=True)
        print(f"Convertir duración: eliminados {nulos} registros por valor nulo")
    else:
        print("No existe columna 'Duracion'; se omite conversión")
    if 'Tiempo_de_Inicio' in df.columns:
        df['Tiempo_de_Inicio'] = pd.to_datetime(df['Tiempo_de_Inicio'], errors='coerce')
        nulos = df['Tiempo_de_Inicio'].isnull().sum()
        if nulos:
            df.dropna(subset=['Tiempo_de_Inicio'], inplace=True)
            print(f"Eliminados {nulos} registros con 'Tiempo_de_Inicio' nulo")
        mask_horario = (df['Tiempo_de_Inicio'].dt.hour >= 7) & (df['Tiempo_de_Inicio'].dt.hour < 18)
        df = df[mask_horario]
        print(f"Filtrado horario: registros que cumplen la condición: {len(df)}")
    else:
        print("No existe 'Tiempo_de_Inicio'; omitiendo filtro horario")
    print("=== LIMPIEZA GLOBAL COMPLETADA ===")
    print(f"Registros finales: {len(df)}")
    return df


# =====================================
# BLOQUE 5: Procesamiento y Creación df_unificado
# =====================================

In [34]:
lista_df = []
for archivo in archivos:
    print(f"Procesando archivo: {archivo}")
    df_temp = parse_report_file(archivo)
    if not df_temp.empty:
        lista_df.append(df_temp)
    else:
        print(f"Archivo {archivo} no produjo datos válidos.")

if lista_df:
    df_unificado = pd.concat(lista_df, ignore_index=True)
    print(f"Total registros unificados: {len(df_unificado)}")
else:
    print("No se obtuvieron registros de ningún archivo.")
    df_unificado = pd.DataFrame()  # df_unificado vacío si no hay nada

Procesando archivo: ../Safetrack\Reporte de viaje(20250501-20250531).xlsx
Parseando archivo: Reporte de viaje(20250501-20250531).xlsx
Filas de encabezado detectadas: [3, 7, 11, 15, 19]
Bloque 1: filas 4 a 5, shape=(1, 9)
Bloque 2: filas 8 a 9, shape=(1, 9)
Bloque 3: filas 12 a 13, shape=(1, 9)
Bloque 4: filas 16 a 17, shape=(1, 9)
Bloque 5: filas 20 a 560, shape=(540, 9)
Parseo final: 544 filas obtenidas de 5 bloques.
Procesando archivo: ../Safetrack\Reporte de viaje(20250601-20250630).xlsx
Parseando archivo: Reporte de viaje(20250601-20250630).xlsx
Filas de encabezado detectadas: [3, 23, 173, 307, 689]
Bloque 1: filas 4 a 16, shape=(12, 9)
Bloque 2: filas 24 a 166, shape=(142, 9)
Bloque 3: filas 174 a 300, shape=(126, 9)
Bloque 4: filas 308 a 682, shape=(374, 9)
Bloque 5: filas 690 a 691, shape=(1, 9)
Parseo final: 655 filas obtenidas de 5 bloques.
Procesando archivo: ../Safetrack\Reporte de viaje(20250701-20250725).xlsx
Parseando archivo: Reporte de viaje(20250701-20250725).xlsx
Fila

# =====================================
# BLOQUE 6: Filtrar por Estacionamiento (Opcional) y Limpieza
# =====================================

In [35]:

# --- Filtrado por 'Estacionamiento' y Limpieza Global ---
df_estacionamiento = df_unificado[df_unificado["Estado_de_viaje"].isin(["Estacionamiento"])].copy()
print(f"Registros 'Estacionamiento': {len(df_estacionamiento)}")

# PLACAS A REEMPLAZAR
MAP_PLACAS = {
    'CAA 1076': 'BYD1004',
    'Wireless-16956': 'PartnerABG9758',
    'AAW 4251': 'PARTNER 4251'
}


Registros 'Estacionamiento': 2125


## Aplicamos la limpieza

In [36]:
df_limpio = limpieza_global(df_estacionamiento, MAP_PLACAS)

=== LIMPIEZA GLOBAL INICIADA ===
Registros iniciales: 2125
Columnas antes: ['Indice', 'Numero_de_placa', 'Estado_de_viaje', 'Tiempo_de_Inicio', 'Tiempo_Final', 'Duracion', 'Lugar_de_inicio', 'camion_x', 'camion_y']
Columnas después: ['Indice', 'Numero_de_placa', 'Estado_de_viaje', 'Tiempo_de_Inicio', 'Tiempo_Final', 'Duracion', 'Lugar_de_inicio', 'camion_x', 'camion_y']
Placas únicas antes del mapeo: ['CAT1004-4G', 'CAT1006-4G', 'PARTNER4251-4G', 'PEUGEOTAVG9758-4G', 'WIRELESS-16956']
Mapeo disponible: {'CAA 1076': 'BYD1004', 'Wireless-16956': 'PartnerABG9758', 'AAW 4251': 'PARTNER 4251'}
Placas únicas después del mapeo: ['CAT 1004 - 4G', 'CAT 1006 - 4G', 'Partner 4251 - 4G', 'PartnerABG9758', 'Peugeot AVG9758 - 4G']
Convertir duración: eliminados 0 registros por valor nulo
Filtrado horario: registros que cumplen la condición: 1961
=== LIMPIEZA GLOBAL COMPLETADA ===
Registros finales: 1961


# =====================================
# BLOQUE 7: Guardado de Resultados
# =====================================

In [37]:
if not df_limpio.empty:
    estacionados_camion = df_limpio.copy()
    estacionados_camion.to_excel("../Limpia/estacionados_camion.xlsx", index=False)
    print("Archivo 'estacionados_camion.xlsx' guardado en ../Limpia")
else:
    print("El DataFrame final está vacío, no se genera archivo.")

Archivo 'estacionados_camion.xlsx' guardado en ../Limpia


# =====================================

In [38]:
df_limpio.info()
df_limpio.head(2)

<class 'pandas.core.frame.DataFrame'>
Index: 1961 entries, 5 to 4254
Data columns (total 9 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   Indice            1961 non-null   object        
 1   Numero_de_placa   1961 non-null   object        
 2   Estado_de_viaje   1961 non-null   object        
 3   Tiempo_de_Inicio  1961 non-null   datetime64[ns]
 4   Tiempo_Final      1961 non-null   datetime64[ns]
 5   Duracion          1961 non-null   float64       
 6   Lugar_de_inicio   1961 non-null   object        
 7   camion_x          1961 non-null   float64       
 8   camion_y          1961 non-null   float64       
dtypes: datetime64[ns](2), float64(3), object(4)
memory usage: 153.2+ KB


Unnamed: 0,Indice,Numero_de_placa,Estado_de_viaje,Tiempo_de_Inicio,Tiempo_Final,Duracion,Lugar_de_inicio,camion_x,camion_y
5,2,PartnerABG9758,Estacionamiento,2025-05-02 07:44:55,2025-05-02 08:02:54,0.0,"34.761447S,55.747421W",-34.761447,-55.747421
7,4,PartnerABG9758,Estacionamiento,2025-05-02 08:25:34,2025-05-02 09:04:10,0.0,"34.778828S,55.859923W",-34.778828,-55.859923
