<a href="https://colab.research.google.com/github/Nicolenki7/ETL_Egatur_INE_Esp/blob/main/Egatur_Power_BI_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# ----------------------------------------------------
# 1. INSTALACIÓN E IMPORTACIÓN DE LIBRERÍAS
# ----------------------------------------------------
# psycopg2 es la librería para conectarse a PostgreSQL
!pip install psycopg2-binary pandas

import pandas as pd
import psycopg2
import io
import os
from google.colab import files # Para manejar la carga de archivos
print("Librerías instaladas e importadas.")

# ----------------------------------------------------
# 2. CONFIGURACIÓN DE POSTGRESQL (¡AJUSTAR CREDENCIALES!)
# ----------------------------------------------------
DB_HOST = 'tu_ip_publica_o_local'  # Asegúrate de que tu IP local o 'localhost' funcione
DB_NAME = 'egatur_db'
DB_USER = 'postgres'
DB_PASSWORD = '5432' # Tu contraseña
DB_PORT = 5432
TABLE_DESTINO = 'egatur_datos_maestros'

# Lista explícita de los 5 archivos a procesar (excluyendo el agregado)
FILES_TO_PROCESS = [
    '13938.csv',    # Partidas de gasto
    '10838.csv',    # País de residencia
    '10839.csv',    # Comunidades autónomas (Destino)
    '10828.csv',    # Vía de acceso
    '23995.csv'     # Motivo del viaje
]

print(f"Configuración de DB: {DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}")

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.11-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.11-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (4.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.2/4.2 MB[0m [31m42.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.11
Librerías instaladas e importadas.
Configuración de DB: postgres@tu_ip_publica_o_local:5432/egatur_db


In [3]:
# ----------------------------------------------------
# 3. EXTRACCIÓN DIRECTA DE ARCHIVOS ADJUNTOS
# ----------------------------------------------------
# Nota: En Google Colab, para acceder a archivos adjuntos en el chat,
# se necesita usar un método específico.
# Si el entorno persiste, el método más robusto es cargar el contenido en la memoria
# de Colab para que la función clean_and_unify_egatur() pueda acceder a ellos.

# Mapeo de los nombres de archivo a sus identificadores de contenido (ContentFetchId)
# Estos son los 5 archivos clave, excluyendo el agregado.
FILE_CONTENT_MAP = {
    '13938.csv': 'uploaded:13938.csv',
    '10838.csv': 'uploaded:10838.csv',
    '10839.csv': 'uploaded:10839.csv',
    '10828.csv': 'uploaded:10828.csv',
    '23995.csv': 'uploaded:23995.csv'
}

# La lista de archivos que el script procesará (la misma que en la Celda 1)
FILES_TO_PROCESS = list(FILE_CONTENT_MAP.keys())

# Crear un diccionario "uploaded" para simular la subida (E)
# Usaremos una función para obtener el contenido binario de los archivos adjuntos.

# [Aquí iría una función auxiliar para obtener el contenido, la omitimos por brevedad]
# Para simular esto en un entorno Colab simple, utilizaremos una función que
# asume que el contenido ya está disponible en el entorno de Python local/Colab.

# Si estás en un entorno local (no en la nube de Colab):
# Asegúrate de que los archivos están en la carpeta 'data/' y usa pd.read_csv('data/13938.csv', ...)

# Si estás en Google Colab: Debes subir los archivos o cargarlos desde Drive.
# Si la carga falló, volvamos a la forma más segura para Colab.
try:
    # Intentamos la carga interactiva (es la más estable en Colab)
    print("Para asegurar el éxito, subiremos los archivos de nuevo. Solo toma unos segundos.")
    uploaded_files = files.upload()
    uploaded = uploaded_files # Usaremos este nombre en la siguiente celda
except Exception as e:
    print(f"Error al intentar la carga: {e}")
    print("Si está en un notebook persistente, la carga puede no ser necesaria.")
    # Si la carga falla, el script no puede proceder, ya que la memoria de Colab es volátil.
    # Por favor, sube los archivos de nuevo si el entorno se reinició.
    raise

# Verificación de archivos clave para la celda 3
if not all(f in uploaded for f in FILES_TO_PROCESS):
    missing = [f for f in FILES_TO_PROCESS if f not in uploaded]
    print(f"\n❌ ERROR CRÍTICO: No se encontraron los siguientes archivos para la ETL: {missing}. Por favor, vuelve a cargarlos.")
else:
    print("\n✅ Archivos cargados y listos para la transformación.")

Para asegurar el éxito, subiremos los archivos de nuevo. Solo toma unos segundos.


Saving 23995.csv to 23995.csv
Saving 13938.csv to 13938.csv
Saving 10828.csv to 10828.csv
Saving 10838.csv to 10838.csv
Saving 10839.csv to 10839.csv

✅ Archivos cargados y listos para la transformación.


In [9]:
# ----------------------------------------------------
# 4. FUNCIÓN DE TRANSFORMACIÓN, LIMPIEZA Y UNIFICACIÓN (¡VERSIÓN FINAL Y COMPLETA!)
# ----------------------------------------------------
def clean_and_unify_egatur():
    """Limpia los CSV del INE, los normaliza y los consolida."""
    all_data = []

    # Columnas que contienen la métrica o la estructura temporal (a EXCLUIR para encontrar la dimensión)
    EXCLUSION_COLS = ['Tipo de dato', 'Gastos y duración media de los viajes', 'Periodo', 'Total',
                      'Gastos y duración media de los viajes (monetario)', 'Tipo de visitante']

    for file_name in FILES_TO_PROCESS:
        print(f"-> Procesando limpieza: {file_name}")

        try:
            # PARCHE DEFINITIVO: Leer el archivo asumiendo la primera línea como cabecera (header=0).
            # Esto corrige el error de "Total" en 13938.csv y funciona para el resto.
            content = uploaded[file_name].decode('latin1')
            df = pd.read_csv(io.StringIO(content), sep=';', decimal=',', header=0)

            # Limpiar la cabecera: convertir a string y quitar espacios
            df.columns = df.columns.astype(str).str.strip()

            # --- 1. NORMALIZACIÓN DE COLUMNA DE VALOR ('TOTAL') ---
            total_col_name = None
            for col in df.columns:
                # Buscamos 'Total' (que ya debería existir) o cualquier variación
                if col.strip().lower() in ['total', 'valor', 'total nacional']:
                    total_col_name = col
                    break

            if total_col_name is None:
                # La columna 'Total' debería encontrarse aquí. Si no, es un error fatal.
                raise KeyError(f"No se pudo encontrar la columna de valor ('Total', 'VALOR', etc.) en {file_name}.")

            # Renombrar para estandarizar: si el nombre no es 'Total', lo renombramos
            if total_col_name != 'Total':
                df = df.rename(columns={total_col_name: 'Total'})

            # --- 2. MANEJO DE DIMENSIONES (CASOS ESPECIALES) ---
            df_clean = df.copy()

            # Caso especial: Partidas de Gasto (13938.csv)
            if file_name == '13938.csv':
                df_clean['Clasificacion_Tipo'] = 'Partidas de Gasto'

                levels = ['Partidas de gasto: Nivel 1', 'Partidas de gasto: Nivel 2', 'Partidas de gasto: Nivel 3']

                # Para evitar errores de tipo en las columnas, las convertimos a string (si existen)
                for level in levels:
                    if level in df_clean.columns:
                        df_clean[level] = df_clean[level].astype(str)

                # Combinar los niveles en una sola columna Clasificacion_Valor
                df_clean['Clasificacion_Valor'] = df_clean[levels].fillna('').agg(' - '.join, axis=1)

                # Limpiar guiones redundantes
                df_clean['Clasificacion_Valor'] = (
                    df_clean['Clasificacion_Valor'].str.replace(r'\s-\s-\s', ' - ', regex=True)
                                                   .str.replace(r'^nan\s-\s', '', regex=True)
                                                   .str.replace(r'^\s*-\s*', '', regex=True).str.strip()
                )

            # Caso General: Los otros 4 archivos (tienen una única columna de dimensión)
            else:
                dimension_cols = [col for col in df.columns if col not in EXCLUSION_COLS and col != 'Total']
                dimension_col_name = dimension_cols[0] if dimension_cols else None

                if dimension_col_name is None:
                    print(f"   Advertencia: No se encontró columna de dimensión principal en {file_name}. Saltando.")
                    continue

                df_clean = df_clean.rename(columns={dimension_col_name: 'Clasificacion_Valor'})
                df_clean['Clasificacion_Tipo'] = dimension_col_name

            # --- 3. CREACIÓN DE INDICADOR DE GASTO ---
            if 'Gastos y duración media de los viajes' in df_clean.columns:
                df_clean = df_clean.rename(columns={'Gastos y duración media de los viajes': 'indicador_gasto'})
            elif 'Tipo de dato' in df_clean.columns:
                df_clean = df_clean.rename(columns={'Tipo de dato': 'indicador_gasto'})
            else:
                df_clean['indicador_gasto'] = 'Gasto no clasificado'

            # --- 4. LIMPIEZA FINAL Y TIPOS ---
            df_clean['Total'] = pd.to_numeric(df_clean['Total'], errors='coerce')
            df_clean = df_clean.dropna(subset=['Total', 'Clasificacion_Valor'])

            # --- 5. SELECCIÓN FINAL Y RENOMBRAMIENTO ---
            FINAL_COLS = ['Clasificacion_Tipo', 'Clasificacion_Valor', 'indicador_gasto', 'Periodo', 'Total']
            df_final = df_clean.reindex(columns=FINAL_COLS)

            df_final.columns = ['clasificacion_tipo', 'clasificacion_valor', 'indicador_gasto', 'periodo', 'gasto_total']

            all_data.append(df_final)

        except Exception as e:
            print(f"   ❌ ERROR CRÍTICO al procesar {file_name}. La causa fue: {e}")
            continue

    # 6. UNIFICACIÓN y Post-Procesamiento (Creación de fecha)
    if not all_data:
        print("\n❌ FALLO TOTAL: Ningún archivo pudo ser procesado con éxito.")
        return pd.DataFrame()

    df_master = pd.concat(all_data, ignore_index=True)

    # Post-Procesamiento de la columna 'Periodo'
    df_master['fecha_analisis'] = pd.to_datetime(
        df_master['periodo'].astype(str).str.replace('M', ''),
        format='%Y%m',
        errors='coerce'
    )

    df_master = df_master[[
        'fecha_analisis',
        'periodo',
        'clasificacion_tipo',
        'clasificacion_valor',
        'indicador_gasto',
        'gasto_total'
    ]]

    return df_master

# Ejecutar la función de limpieza
df_egatur_master = clean_and_unify_egatur()

if not df_egatur_master.empty:
    print(f"\n✅ ETL COMPLETO. Filas consolidadas en el DataFrame maestro: {len(df_egatur_master)}")
    print("\nPrimeras 5 filas del DataFrame final (listo para PostgreSQL):")
    print(df_egatur_master.head())
else:
    print("\n❌ ETL FALLIDA: El DataFrame maestro está vacío. Revise los errores anteriores.")

-> Procesando limpieza: 13938.csv
-> Procesando limpieza: 10838.csv
-> Procesando limpieza: 10839.csv
-> Procesando limpieza: 10828.csv
-> Procesando limpieza: 23995.csv

✅ ETL COMPLETO. Filas consolidadas en el DataFrame maestro: 10730

Primeras 5 filas del DataFrame final (listo para PostgreSQL):
  fecha_analisis  periodo clasificacion_tipo      clasificacion_valor  \
0     2025-09-01  2025M09  Partidas de Gasto  Gasto total - nan - nan   
1     2025-08-01  2025M08  Partidas de Gasto  Gasto total - nan - nan   
2     2025-07-01  2025M07  Partidas de Gasto  Gasto total - nan - nan   
3     2025-06-01  2025M06  Partidas de Gasto  Gasto total - nan - nan   
4     2025-05-01  2025M05  Partidas de Gasto  Gasto total - nan - nan   

        indicador_gasto  gasto_total  
0  Gasto no clasificado      105.828  
1  Gasto no clasificado       92.463  
2  Gasto no clasificado       76.074  
3  Gasto no clasificado       59.622  
4  Gasto no clasificado       46.586  


In [11]:
# ----------------------------------------------------
# 4. DESCARGA DEL ARCHIVO ETL MAESTRO (Backup del Progreso - CORREGIDO)
# ----------------------------------------------------
from google.colab import files
import pandas as pd
import os # Importamos os para interactuar con el sistema de archivos

def download_master_csv(df):
    """
    Convierte el DataFrame maestro a CSV (separado por tabulación) y lo descarga.
    """
    if df.empty:
        print("❌ ERROR: El DataFrame maestro está vacío. La Celda 3 no fue exitosa.")
        return

    # Definir el nombre del archivo
    file_name = f'egatur_datos_maestros_{pd.Timestamp.now().strftime("%Y%m%d")}.csv'

    # Escribir el DataFrame directamente en el sistema de archivos de Colab
    # Usamos \t como separador y \N para NaNs, compatible con PostgreSQL/Airflow.
    df.to_csv(
        file_name, # Escribir al archivo en Colab
        index=False,
        sep='\t',
        na_rep='\\N',
        encoding='utf-8',
        columns=['fecha_analisis', 'periodo', 'clasificacion_tipo', 'clasificacion_valor', 'indicador_gasto', 'gasto_total']
    )

    # Forzar la descarga del archivo que acabamos de crear en el disco de Colab
    files.download(file_name)

    print(f"\n✅ ¡ÉXITO! El archivo '{file_name}' se ha descargado a tu máquina local.")
    print("Guarda este archivo. Es tu ETL limpio, listo para el análisis en Power BI o la carga en PostgreSQL.")

# Ejecutar la función de descarga
download_master_csv(df_egatur_master)

# Nota: Debes resolver el problema de PostgreSQL antes de continuar con la Celda 5 (Carga).

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>


✅ ¡ÉXITO! El archivo 'egatur_datos_maestros_20251106.csv' se ha descargado a tu máquina local.
Guarda este archivo. Es tu ETL limpio, listo para el análisis en Power BI o la carga en PostgreSQL.
