ETL son las siglas de **Extract, Transform, Load**, es decir, **Extraer**, **Transformar** y **Cargar**. Es un proceso esencial para preparar datos desde diversas fuentes para su análisis posterior. El objetivo principal es:

- **Extraer** los datos desde una o más fuentes (por ejemplo, bases de datos locales, en la nube, APIs).
- **Transformar** los datos aplicando reglas de negocio, limpieza, agregación o restructuración.
- **Cargar** los datos limpios y estructurados en un sistema de destino, como una base de datos analítica o un modelo de datos para reporting o machine learning.

Importación de las librerías necesarias para la ejecución del proceso ETL. `pyodbc` se utiliza para la conexión con bases de datos SQL, `pandas` y `numpy` para la manipulación de datos, `os` para operaciones del sistema de archivos, y `warnings` para suprimir mensajes de advertencia innecesarios.


In [10]:
import pyodbc
import pandas as pd
import numpy as np
import os
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

Definición de los parámetros de conexión para establecer comunicación con una base de datos en la nube de Azure SQL Database. Se especifica el servidor, la base de datos y el driver necesario para autenticar con Azure Active Directory.


In [11]:
#  Conexión a **Azure SQL**
AZURE_SERVER = 'uaxmathfis.database.windows.net'
AZURE_DATABASE = 'usecases'
AZURE_DRIVER = '{ODBC Driver 17 for SQL Server}'

azure_conn_str = f"DRIVER={AZURE_DRIVER};SERVER={AZURE_SERVER};DATABASE={AZURE_DATABASE};Authentication=ActiveDirectoryInteractive"

Definición de los parámetros de conexión para una base de datos local utilizando SQL Server. Esta conexión permitirá ejecutar consultas y cargar tablas desde una base de datos on-premise.


In [12]:
# Parámetros de conexión
LOCAL_SERVER = 'localhost'
LOCAL_DATABASE = 'dwh_case1'
LOCAL_DRIVER = '{ODBC Driver 17 for SQL Server}'

# Conexión a SQL Server local
local_conn_local = f"DRIVER={LOCAL_DRIVER};SERVER={LOCAL_SERVER};DATABASE={LOCAL_DATABASE};Trusted_Connection=yes"

Se establece la ruta donde se encuentran los scripts SQL para crear o cargar las distintas tablas del modelo dimensional. Además, se crea un diccionario que asocia cada tabla con su archivo SQL correspondiente.

In [13]:
query_folder = "../database/dimensional"
# Lista de archivos SQL y nombre de tabla destino
tablas = {
    "dim_clientes": "dim_clientes.sql",
    "dim_tiempo": "dim_tiempo.sql",
    "dim_producto": "dim_producto.sql",
    "dim_zona": "dim_zona.sql",
    "tabla_hechos": "tabla_hechos.sql"
}

Se define un diccionario que mapea cada tabla con su clave primaria. Esto será útil más adelante para establecer restricciones de integridad y relaciones entre tablas, especialmente si se cargan en un modelo relacional o se realiza deduplicación.


In [14]:
# Primary keys for each table.
primary_keys = {
    "tabla_hechos": ["CODE"],
    "dim_clientes": ["Customer_ID"],
    "dim_zona": ["TIENDA_ID"],
    "dim_producto": ["Id_Producto"],
    "dim_tiempo": ["Fecha"]
}

En esta celda se define un diccionario llamado `foreign_keys` que especifica las relaciones entre la tabla principal de hechos (`tabla_hechos`) y las tablas dimensionales del modelo estrella (`dim_clientes`, `dim_zona`, `dim_producto`, `dim_tiempo`). 

Cada entrada del diccionario asocia una columna de la tabla de hechos con su tabla de referencia, indicando así cómo se deben establecer las claves foráneas en la base de datos. Esto permite mantener la integridad referencial durante la creación de las tablas y garantiza que las relaciones entre entidades estén correctamente representadas dentro del modelo de datos.

In [15]:
# Foreign keys for each table.
foreign_keys = {
    "tabla_hechos": {
        "Customer_ID": "dim_clientes(Customer_ID)",
        "TIENDA_ID": "dim_zona(TIENDA_ID)",
        "Id_Producto": "dim_producto(Id_Producto)",
        "Sales_Date": "dim_tiempo(Fecha)"
    }
}

Se define una función llamada `create_table_sql` que genera automáticamente el script SQL para crear una tabla en la base de datos a partir de un DataFrame de pandas. Esta función detecta el tipo de datos de cada columna (fecha, número entero, flotante o texto) y construye el `CREATE TABLE` con los tipos correspondientes. Además, añade claves primarias y foráneas si se encuentran definidas en los diccionarios `primary_keys` y `foreign_keys`. Esto automatiza la creación de tablas durante el proceso ETL, garantizando que la estructura sea coherente con los datos de origen.

In [16]:
def create_table_sql(table_name, df):
    # Definición de los tipos de datos SQL para cada columna del DataFrame: Por defecto tipo TEXTO.
    col_defs = []
    for col in df.columns:
        if np.issubdtype(df[col].dtype, np.datetime64):
            col_defs.append(f'[{col}] DATE')
        elif df[col].dtype == np.float32:
            col_defs.append(f'[{col}] FLOAT')
        elif df[col].dtype == np.int32:
            col_defs.append(f'[{col}] INT')
        else:
            col_defs.append(f'[{col}] NVARCHAR(255)')

    # Agregación clave primaria si corresponde.
    pk = ", PRIMARY KEY (" + ", ".join(primary_keys[table_name]) + ")" if table_name in primary_keys else ""
    # Agregación claves foráneas si corresponde.
    fk = ""
    if table_name in foreign_keys:
        for col, ref in foreign_keys[table_name].items():
            fk += f", FOREIGN KEY ({col}) REFERENCES {ref}"

    return f"CREATE TABLE {table_name} ({', '.join(col_defs)}{pk}{fk});"

Aquí se define una función `drop_tables_in_order` que se encarga de eliminar tablas de la base de datos local en un orden específico. Esto es importante cuando existen relaciones entre tablas (por ejemplo, claves foráneas) que impiden borrarlas en cualquier orden. La función comprueba si la tabla existe antes de intentar eliminarla para evitar errores. Se ejecuta dentro de un bloque `try-except` para manejar posibles errores durante la eliminación.


In [17]:
def drop_tables_in_order(cursor, conn):
    drop_order = ["tabla_hechos", "dim_tiempo", "dim_producto", "dim_zona", "dim_clientes"]
    for table in drop_order:
        # Verifica si la tabla existe en el esquema actual.
        check_exists_query = f"""
        IF OBJECT_ID('{table}', 'U') IS NOT NULL
            DROP TABLE {table};
        """
        try:
            cursor.execute(check_exists_query)
            conn.commit()
        except Exception as e:
            print(f"Error al intentar eliminar la tabla {table}: {e}")

Esta celda representa el núcleo de la ETL (Extracción, Transformación y Carga):

- Se establecen conexiones a la base de datos de Azure y a la base de datos local.
- Se eliminan las tablas locales existentes usando la función definida anteriormente.
- Para cada tabla definida en el diccionario `tablas`, se:
  - Ejecuta una consulta SQL contra la base de datos de Azure.
  - Carga el resultado en un DataFrame.
  - Elimina columnas duplicadas, detecta columnas de fecha y convierte tipos de datos.
  - Limpia valores nulos, usando lógicas distintas según el tipo de dato.
  - Convierte `float64` y `int64` a `float32` e `int32` para optimización.
  - Crea dinámicamente la tabla SQL usando `create_table_sql`.
  - Inserta los datos en la base de datos local utilizando `executemany` para eficiencia.

Al final, se cierran las conexiones y se indica que el proceso ETL ha finalizado correctamente.

In [18]:
try:
    # Conexión a las bases de datos.
    conn_azure = pyodbc.connect(azure_conn_str)
    conn_local = pyodbc.connect(local_conn_local)
    print("Conexiones correctamente establecidas.\n")

    with conn_local.cursor() as cursor:
        drop_tables_in_order(cursor, conn_local)
    # Procesamiento de cada tabla definida en el diccionario de Queries.
    for table_name, file in tablas.items():
        print(f"Procesando: {table_name}")
        query_path = os.path.join(query_folder, file)
        with open(query_path, "r", encoding="utf-8") as f:
            sql_query = f.read()

        # Ejecución de la consulta sobre la base de datos de Azure.
        df = pd.read_sql(sql_query, conn_azure)

        # Eliminación de las columnas duplicadas.
        if df.columns.duplicated().any():
            print(f"Columnas duplicadas en {table_name}: {df.columns[df.columns.duplicated()].tolist()}")
            df = df.loc[:, ~df.columns.duplicated()]

        # Detección de las columnas tipo DATE para convertirlas adecuadamente.
        for col in df.columns:
            if df[col].dtype == object or df[col].dtype == "string":
                sample_values = df[col].astype(str).sample(min(len(df), 30), random_state=42)
                # Saltar si parece una columna numérica (para no confundir INT con DATE).
                if sample_values.str.isdigit().mean() > 0.8:
                    continue
                try:
                    parsed = pd.to_datetime(sample_values, errors='coerce')
                    if parsed.notna().sum() > 0.9 * len(sample_values):
                        df[col] = pd.to_datetime(df[col], errors='coerce')
                except:
                    pass
        # Si el DataFrame está vacío, se salta.
        if df.empty:
            print(f"La tabla {table_name} no devolvió resultados.\n")
            continue
        print(f"   - Filas obtenidas: {df.shape[0]}")
        print(f"   - Columnas: {df.columns.tolist()}")

        # Limpieza de valores nulos y tipos de datos.
        for col in df.columns:
            df[col] = df[col].replace(r'^\s*$', np.nan, regex=True) # Reemplazar espacios en blanco por NaN.
            if pd.api.types.is_numeric_dtype(df[col]):
                # Valor sentinel (ej: -1 o 999999).
                sentinel = -1
                df[col] = df[col].fillna(sentinel)
            elif pd.api.types.is_datetime64_any_dtype(df[col]):
                df[col] = df[col].fillna(df[col].mode(dropna=True)[0])
            else:
                df[col] = df[col].fillna("N/A")
        for col in df.select_dtypes(include=['float64']).columns:
            df[col] = df[col].astype(np.float32)
        for col in df.select_dtypes(include=['int64']).columns:
            df[col] = df[col].astype(np.int32)

        # Creación de la tabla en la base de datos local.
        with conn_local.cursor() as cursor:
            create_sql = create_table_sql(table_name, df)
            cursor.execute(create_sql)
            conn_local.commit()
            print(f"   - Tabla {table_name} creada correctamente.")

            placeholders = ', '.join(['?' for _ in df.columns])
            insert_sql = f"INSERT INTO {table_name} VALUES ({placeholders})"
            cursor.fast_executemany = True
            cursor.executemany(insert_sql, df.values.tolist())
            conn_local.commit()
            print(f"   - {df.shape[0]} filas insertadas.\n")

except Exception as e:
    print(f"Error: {e}")

finally:
    if 'conn_azure' in locals():
        conn_azure.close()
    if 'conn_local' in locals():
        conn_local.close()

print("ETL completado.")

Conexiones correctamente establecidas.

Procesando: dim_clientes
   - Filas obtenidas: 44053
   - Columnas: ['Customer_ID', 'CODIGO_POSTAL', 'Edad', 'GENERO', 'RENTA_MEDIA_ESTIMADA', 'STATUS_SOCIAL', 'Fecha_nacimiento', 'ENCUESTA_CLIENTE_ZONA_TALLER', 'ENCUESTA_ZONA_CLIENTE_VENTA', 'poblacion', 'provincia', 'lat', 'lon', 'CP', 'Max_Mosaic', 'Max_Mosaic_G', 'Renta_Media', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K']
   - Tabla dim_clientes creada correctamente.
   - 44053 filas insertadas.

Procesando: dim_tiempo
   - Filas obtenidas: 3652
   - Columnas: ['Fecha', 'Año', 'Añomes', 'Mes', 'Dia', 'Diadelasemana', 'Diadelesemana_desc', 'Festivo', 'Findesemana', 'FinMes', 'InicioMes', 'Laboral', 'Mes_desc', 'Semana']
   - Tabla dim_tiempo creada correctamente.
   - 3652 filas insertadas.

Procesando: dim_producto
   - Filas obtenidas: 404
   - Columnas: ['Id_Producto', 'Code_', 'Modelo', 'Kw', 'TIPO_CARROCERIA', 'TRANSMISION_ID', 'FUEL', 'CATEGORIA_ID', 'Equipamiento', 'Margen', '