# Proceso ETL: Extracción desde Azure y Carga en SQL Server Local

Este Notebook realiza un proceso ETL automático que extrae datos desde Azure SQL Database, los transforma con Pandas y los carga en SQL Server Local. Se trabajan las siguientes tablas: **Hechos**, **clientes**, **zona**, **tiempo** y **productos**.

Las etapas del proceso son:
1. **Configuración de Conexiones:** Se definen las conexiones para Azure y SQL Server Local.
2. **Definición de Consultas y Metadatos:** Se establecen las rutas a los archivos SQL de extracción y se definen las claves primarias y foráneas para cada tabla.
3. **Funciones Auxiliares:**
    - Función para crear tablas dinámicamente en SQL Server Local.
    - Función para eliminar tablas existentes en un orden que respete las dependencias.
4. **Ejecución del Proceso ETL:** Se extraen, transforman y cargan los datos en el Data Warehouse local.


In [12]:
import pyodbc         # Conexión con bases de datos SQL Server
import pandas as pd   # Manipulación y transformación de datos
import numpy as np    # Gestión de datos numéricos
import os             # Operaciones con archivos y rutas
import warnings       # Control de advertencias

# Suprimir avisos innecesarios
warnings.filterwarnings("ignore", category=UserWarning)

## CONFIGURACIÓN DE CONEXIONES

In [13]:
# -------------------------
# CONFIGURACIÓN DE CONEXIONES
# -------------------------

# Conexión a Azure SQL Database
AZURE_SERVER = 'uaxmathfis.database.windows.net'
AZURE_DATABASE = 'usecases'
AZURE_DRIVER = '{ODBC Driver 17 for SQL Server}'
azure_conn_str = f"DRIVER={AZURE_DRIVER};SERVER={AZURE_SERVER};DATABASE={AZURE_DATABASE};Authentication=ActiveDirectoryInteractive"

# Conexión a SQL Server Local
LOCAL_SERVER = 'localhost'
LOCAL_DATABASE = 'dwh_case1'
LOCAL_DRIVER = '{ODBC Driver 17 for SQL Server}'
local_conn_str = f"DRIVER={LOCAL_DRIVER};SERVER={LOCAL_SERVER};DATABASE={LOCAL_DATABASE};Trusted_Connection=yes;TrustServerCertificate=yes"

## DEFINICIÓN DE CONSULTAS Y METADATOS

In [14]:
# -------------------------
# DEFINICIÓN DE TABLAS Y ARCHIVOS SQL
# -------------------------

# Ubicación de los archivos SQL con las consultas de extracción
query_folder = "../Modelo_dimensional"  # Actualiza esta ruta según tu estructura

queries = {
    "dim_geo": "Zona.sql",
    "dim_product": "Productos.sql",
    "dim_time": "Tiempo.sql",
    "dim_client": "Clientes.sql",
    "fact_sales": "Hechos.sql"
}

# Definición de claves primarias (actualiza los nombres según corresponda)
primary_keys = {
    "fact_sales": ["CODE"],   # Se usa la columna "CODE" como identificador único
    "dim_client": ["Customer_ID"],
    "dim_geo": ["TIENDA_ID"],
    "dim_product": ["Id_Producto"],
    "dim_time": ["Date"]
}

# Definición de claves foráneas para fact_sales (ajusta los nombres si es necesario)
foreign_keys = {
    "fact_sales": {
        "Customer_ID": "dim_client(Customer_ID)",
        "TIENDA_ID": "dim_geo(TIENDA_ID)",  # Se asume que en zona se identifica mediante TIENDA_ID
        "Id_Producto": "dim_product(Id_Producto)",
        "Sales_Date": "dim_time(Date)"
    }
}

# Diccionario global para columnas de fecha (ajusta los nombres según tus datos)
date_columns_global = {
    "dim_client": ["Fecha_nacimiento"],
    "dim_time": ["inicio_de_mes", "fin_de_mes", "fecha"],
    "fact_sales": ["DATE_ULTIMA_REVISION", "Logistic_date", "Prod_date", "fecha_venta"]
}

## FUNCIONES AUXILIARES

### FUNCIÓN PARA CREAR TABLAS EN SQL SERVER LOCAL

In [15]:
def create_table_sql(table_name, df):
    date_columns = date_columns_global.get(table_name, [])
    col_defs = []
    for col in df.columns:
        if col in date_columns:
            col_defs.append(f'[{col}] DATE')
        elif np.issubdtype(df[col].dtype, np.datetime64):
            col_defs.append(f'[{col}] DATE')
        elif df[col].dtype == np.float32:
            col_defs.append(f'[{col}] FLOAT')
        elif df[col].dtype == np.int32:
            col_defs.append(f'[{col}] INT')
        else:
            max_len = df[col].astype(str).str.len().max()
            varchar_size = min(2000, max(1, int(max_len * 1.3)))
            col_defs.append(f'[{col}] NVARCHAR({varchar_size})')
    pk = ", PRIMARY KEY (" + ", ".join(primary_keys[table_name]) + ")" if table_name in primary_keys else ""
    fk = ""
    if table_name in foreign_keys:
        for col, ref in foreign_keys[table_name].items():
            fk += f", FOREIGN KEY ({col}) REFERENCES {ref}"
    return f"CREATE TABLE {table_name} ({', '.join(col_defs)}{pk}{fk});"

### Función para Eliminar Tablas en Orden


In [16]:
def drop_tables_in_order(cursor, conn):
    # Se elimina primero la tabla de hechos y luego las dimensiones, para respetar dependencias.
    drop_order = ["fact_sales", "dim_time", "dim_product", "dim_geo", "dim_client"]
    for table in drop_order:
        check_exists_query = f"IF OBJECT_ID('{table}', 'U') IS NOT NULL DROP TABLE {table};"
        try:
            cursor.execute(check_exists_query)
            conn.commit()
        except Exception as e:
            print(f"Error al eliminar la tabla {table}: {e}")

## EJECUCIÓN DEL PROCESO ETL

In [17]:
try:
    # Establecer conexiones con Azure SQL y SQL Server Local
    conn_azure = pyodbc.connect(azure_conn_str)
    conn_local = pyodbc.connect(local_conn_str)
    print("Conexiones establecidas correctamente.")
    
    # Eliminar tablas existentes en el orden correcto
    with conn_local.cursor() as cursor:
        drop_tables_in_order(cursor, conn_local)
        print("Tablas existentes eliminadas correctamente.")
    
    # Procesar cada tabla definida en el diccionario 'queries'
    for table_name, file in queries.items():
        print(f"\nProcesando: {table_name}")
        query_path = os.path.join(query_folder, file)
        
        with open(query_path, "r", encoding="utf-8") as f:
            sql_query = f.read()
        
        # Extraer datos desde Azure SQL
        df = pd.read_sql(sql_query, conn_azure)
        print(f"Columnas en {table_name}: {df.columns.tolist()}")  # Para depuración
        
        # Convertir columnas de fecha a datetime según corresponda
        for col in date_columns_global.get(table_name, []):
            if col in df.columns:
                df[col] = pd.to_datetime(df[col], errors='coerce')
        
        # Validar y eliminar duplicados según la clave primaria
        if table_name in primary_keys:
            pk_cols = primary_keys[table_name]
            if all(col in df.columns for col in pk_cols):
                df = df.drop_duplicates(subset=pk_cols)
            else:
                print(f"Advertencia: La clave primaria {pk_cols} no se encuentra en {table_name}. Columnas disponibles: {df.columns.tolist()}")
        
        if df.empty:
            print(f"La consulta para {table_name} no devolvió datos.")
            continue
        
        # Rellenar valores nulos en columnas (excepto fechas)
        for col in df.columns:
            if col not in date_columns_global.get(table_name, []):
                if df[col].dtype in [np.float64, np.int64]:
                    df[col] = df[col].fillna(0)
                else:
                    df[col] = df[col].fillna("")
        
        for col in df.select_dtypes(include=['float64']).columns:
            df[col] = df[col].astype(np.float32)
        for col in df.select_dtypes(include=['int64']).columns:
            df[col] = df[col].astype(np.int32)
        
        # Crear la tabla en SQL Server Local
        with conn_local.cursor() as cursor:
            create_sql = create_table_sql(table_name, df)
            cursor.execute(create_sql)
            conn_local.commit()
            print(f"Tabla '{table_name}' creada en SQL Server Local.")
            
            # Insertar los datos
            columns = ", ".join([f"[{col}]" for col in df.columns])
            placeholders = ", ".join(['?' for _ in df.columns])
            insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
            cursor.fast_executemany = True
            cursor.executemany(insert_sql, df.values.tolist())
            conn_local.commit()
            print(f"{df.shape[0]} registros insertados en '{table_name}'.")
    
except Exception as e:
    print(f"Error en el proceso ETL: {e}")

finally:
    conn_azure.close()
    conn_local.close()

print("ETL COMPLETADO.")

Conexiones establecidas correctamente.
Tablas existentes eliminadas correctamente.

Procesando: dim_geo
Columnas en dim_geo: ['TIENDA_ID', 'ZONA', 'TIENDA_DESC', 'PROVINCIA_ID', 'PROV_DESC']
Tabla 'dim_geo' creada en SQL Server Local.
12 registros insertados en 'dim_geo'.

Procesando: dim_product
Columnas en dim_product: ['Id_Producto', 'Modelo', 'CATEGORIA_ID', 'Equipamiento', 'Fuel_ID', 'FUEL', 'id_transmision', 'TIPO_CARROCERIA', 'Kw', 'cantidad_vendida', 'pvp_total']
Tabla 'dim_product' creada en SQL Server Local.
404 registros insertados en 'dim_product'.

Procesando: dim_time
Columnas en dim_time: ['Date', 'Anno', 'Mes', 'Mes_desc', 'Week', 'Dia', 'Diadelasemana', 'Diadelesemana_desc', 'Festivo', 'Findesemana', 'Laboral', 'InicioMes', 'FinMes']
Tabla 'dim_time' creada en SQL Server Local.
3652 registros insertados en 'dim_time'.

Procesando: dim_client
Columnas en dim_client: ['Customer_ID', 'CODIGO_POSTAL', 'Edad', 'GENERO', 'RENTA_MEDIA_ESTIMADA', 'STATUS_SOCIAL', 'Fecha_nacimi

### Resumen del Proceso ETL

- **Extracción:** Se obtienen datos de Azure SQL Database mediante consultas definidas en archivos SQL (para las tablas: hechos, clientes, zona, tiempo y productos).
- **Transformación:** Los datos se manipulan en Pandas, se convierten tipos de datos, se gestionan valores nulos y se prepara la estructura de las tablas.
- **Carga:** Se crean las tablas en SQL Server Local respetando claves primarias y foráneas y se insertan los registros.

Este Notebook automatiza el proceso ETL y permite monitorizar cada paso a través de los mensajes en consola.