## ETL: Extracción desde Azure y Carga en SQL Server Local

Este script realiza un proceso ETL automático extrayendo datos desde Azure SQL Database, transformándolos en Pandas y cargándolos en SQL Server Local.

### 1. Importación de Librerías

In [43]:
# Importación de librerías necesarias
import pyodbc  # Para conectar con bases de datos SQL Server
import pandas as pd  # Para manipulación de datos
import numpy as np  # Para manejo de valores numéricos
import os  # Para trabajar con archivos y directorios
import warnings  # Para ocultar warnings innecesarios

warnings.filterwarnings("ignore", category=UserWarning)  # Ocultar warnings


**Explicación:**

pyodbc → Conexión con bases de datos SQL Server.

pandas → Manipulación y limpieza de datos.

numpy → Optimización de datos numéricos.

os → Gestión de archivos.

warnings → Para suprimir advertencias innecesarias.

###  2. Configuración de Conexiones a Bases de Datos

In [44]:
# 🔹 Conexión a Azure SQL
AZURE_SERVER = 'uaxmathfis.database.windows.net'
AZURE_DATABASE = 'usecases'
AZURE_DRIVER = '{ODBC Driver 17 for SQL Server}'
azure_conn_str = f"DRIVER={AZURE_DRIVER};SERVER={AZURE_SERVER};DATABASE={AZURE_DATABASE};Authentication=ActiveDirectoryInteractive"

# 🔹 Conexión a SQL Server Local
LOCAL_SERVER = 'localhost'
LOCAL_DATABASE = 'dwh_case1'
LOCAL_DRIVER = '{ODBC Driver 17 for SQL Server}'
local_conn_str = f"DRIVER={LOCAL_DRIVER};SERVER={LOCAL_SERVER};DATABASE={LOCAL_DATABASE};Trusted_Connection=yes;TrustServerCertificate=yes"





**Explicación:**

Se definen las credenciales de conexión tanto para Azure SQL como para SQL Server Local.

ActiveDirectoryInteractive se usa para autenticación en Azure.

### 3. Definición de Tablas y Archivos SQL

In [45]:
# 📌 Ubicación de los archivos SQL con las consultas de extracción
query_folder = "../data/dwh"
queries = {
    "dim_geo": "dim_geo.sql",
    "dim_product": "dim_product.sql",
    "dim_time": "dim_date.sql",   # Corregido: en la imagen es dim_date.sql, no dim_time.sql
    "dim_client": "dim_client.sql",
    "fact_sales": "dim_fact.sql"  # Corregido: en la imagen es dim_fact.sql, no load_fact.sql
}

# 📌 Definir claves primarias para cada tabla
primary_keys = {
    "fact_sales": ["CODE"],
    "dim_client": ["Customer_ID"],
    "dim_geo": ["TIENDA_ID"],
    "dim_product": ["Id_Producto"],
    "dim_time": ["Fecha"]
}

# 📌 Definir claves foráneas
foreign_keys = {
    "fact_sales": {
        "Customer_ID": "dim_client(Customer_ID)",
        "TIENDA_ID": "dim_geo(TIENDA_ID)",
        "Id_Producto": "dim_product(Id_Producto)",
        "Sales_Date": "dim_time(Fecha)"
    }
}


**Explicación:**

queries → Diccionario con las rutas de los archivos SQL que contienen las consultas de extracción.

primary_keys → Diccionario con las claves primarias de cada tabla.

foreign_keys → Diccionario con las relaciones entre tablas.

### 4. Función para Crear Tablas en SQL Server

In [46]:
def create_table_sql(table_name, df):
    # Mapeo específico de columnas de fecha para cada tabla
    date_columns = {
        "dim_client": ["Fecha_nacimiento"],
        "dim_time": ["InicioMes", "FinMes", "Fecha"],
        "fact_sales": ["DATE_ULTIMA_REVISION", "Logistic_date", "Prod_date", "Sales_Date"]
    }
    
    col_defs = []
    for col in df.columns:
        # Verificar si la columna está en el mapeo manual de fechas
        if table_name in date_columns and col in date_columns[table_name]:
            col_defs.append(f'[{col}] DATE')
        # Detección automática de tipos para otras columnas
        elif np.issubdtype(df[col].dtype, np.datetime64):
            col_defs.append(f'[{col}] DATE')
        elif df[col].dtype == np.float32:
            col_defs.append(f'[{col}] FLOAT')
        elif df[col].dtype == np.int32:
            col_defs.append(f'[{col}] INT')
        else:
            # Para columnas de texto, ajustamos el tamaño según los datos
            max_len = df[col].astype(str).str.len().max()
            varchar_size = min(2000, max(1, int(max_len * 1.3)))  # Buffer del 30% con máximo 2000
            col_defs.append(f'[{col}] NVARCHAR({varchar_size})')

    # Agregar clave primaria si existe
    pk = ", PRIMARY KEY (" + ", ".join(primary_keys[table_name]) + ")" if table_name in primary_keys else ""
    
    # Agregar claves foráneas si existen
    fk = ""
    if table_name in foreign_keys:
        for col, ref in foreign_keys[table_name].items():
            fk += f", FOREIGN KEY ({col}) REFERENCES {ref}"

    return f"CREATE TABLE {table_name} ({', '.join(col_defs)}{pk}{fk});"



**Explicación:**

Genera una consulta CREATE TABLE dinámicamente.

Detecta tipos de datos (DATE, FLOAT, INT, NVARCHAR).

Incluye claves primarias y foráneas automáticamente.

### 5. Función para Eliminar Tablas en Orden

In [47]:
def drop_tables_in_order(cursor, conn):
    drop_order = ["fact_sales", "dim_time", "dim_product", "dim_geo", "dim_client"]
    for table in drop_order:
        check_exists_query = f"IF OBJECT_ID('{table}', 'U') IS NOT NULL DROP TABLE {table};"
        try:
            cursor.execute(check_exists_query)
            conn.commit()
        except Exception as e:
            print(f"Error al eliminar la tabla {table}: {e}")


**Explicación:**

Elimina las tablas en el orden correcto para evitar problemas con claves foráneas.

Verifica si la tabla existe antes de eliminarla.

### 6. Ejecución del Proceso ETL

In [48]:
try:
    # Conexión a las bases de datos.
    conn_azure = pyodbc.connect(azure_conn_str)
    conn_local = pyodbc.connect(local_conn_str)
    print("Conexiones correctamente establecidas.\n")

    with conn_local.cursor() as cursor:
        drop_tables_in_order(cursor, conn_local)
    # Procesamiento de cada tabla definida en el diccionario de Queries.
    for table_name, file in queries.items():
        print(f"Procesando: {table_name}")
        query_path = os.path.join(query_folder, file)
        with open(query_path, "r", encoding="utf-8") as f:
            sql_query = f.read()

        # Ejecución de la consulta sobre la base de datos de Azure.
        df = pd.read_sql(sql_query, conn_azure)

        # Eliminación de las columnas duplicadas.
        if df.columns.duplicated().any():
            print(f"Columnas duplicadas en {table_name}: {df.columns[df.columns.duplicated()].tolist()}")
            df = df.loc[:, ~df.columns.duplicated()]

        # Detección de las columnas tipo DATE para convertirlas adecuadamente.
        for col in df.columns:
            if df[col].dtype == object or df[col].dtype == "string":
                sample_values = df[col].astype(str).sample(min(len(df), 30), random_state=42)
                # Saltar si parece una columna numérica (para no confundir INT con DATE).
                if sample_values.str.isdigit().mean() > 0.8:
                    continue
                try:
                    parsed = pd.to_datetime(sample_values, errors='coerce')
                    if parsed.notna().sum() > 0.9 * len(sample_values):
                        df[col] = pd.to_datetime(df[col], errors='coerce')
                except:
                    pass
        # Si el DataFrame está vacío, se salta.
        if df.empty:
            print(f"La tabla {table_name} no devolvió resultados.\n")
            continue
        print(f"   - Filas obtenidas: {df.shape[0]}")
        print(f"   - Columnas: {df.columns.tolist()}")

        # Limpieza de valores nulos y tipos de datos.
        for col in df.columns:
            df[col] = df[col].replace(r'^\s*$', np.nan, regex=True) # Reemplazar espacios en blanco por NaN.
            if pd.api.types.is_numeric_dtype(df[col]):
                # Valor sentinel (ej: -1 o 999999).
                sentinel = -1
                df[col] = df[col].fillna(sentinel)
            elif pd.api.types.is_datetime64_any_dtype(df[col]):
                df[col] = df[col].fillna(df[col].mode(dropna=True)[0])
            else:
                df[col] = df[col].fillna("N/A")
        for col in df.select_dtypes(include=['float64']).columns:
            df[col] = df[col].astype(np.float32)
        for col in df.select_dtypes(include=['int64']).columns:
            df[col] = df[col].astype(np.int32)

        # Creación de la tabla en la base de datos local.
        with conn_local.cursor() as cursor:
            create_sql = create_table_sql(table_name, df)
            cursor.execute(create_sql)
            conn_local.commit()
            print(f"   - Tabla {table_name} creada correctamente.")

            placeholders = ', '.join(['?' for _ in df.columns])
            insert_sql = f"INSERT INTO {table_name} VALUES ({placeholders})"
            cursor.fast_executemany = True
            cursor.executemany(insert_sql, df.values.tolist())
            conn_local.commit()
            print(f"   - {df.shape[0]} filas insertadas.\n")

except Exception as e:
    print(f"Error: {e}")

finally:
    if 'conn_azure' in locals():
        conn_azure.close()
    if 'conn_local' in locals():
        conn_local.close()

print("ETL completado.")

Conexiones correctamente establecidas.

Procesando: dim_geo
   - Filas obtenidas: 12
   - Columnas: ['TIENDA_ID', 'TIENDA_DESC', 'PROV_DESC', 'ZONA']
   - Tabla dim_geo creada correctamente.
   - 12 filas insertadas.

Procesando: dim_product
   - Filas obtenidas: 404
   - Columnas: ['Id_Producto', 'Code_', 'Kw', 'TIPO_CARROCERIA', 'TRANSMISION_ID', 'Equipamiento', 'FUEL', 'Margen', 'Costetransporte', 'Margendistribuidor', 'GastosMarketing', 'Mantenimiento_medio', 'Comisión_Marca']
   - Tabla dim_product creada correctamente.
   - 404 filas insertadas.

Procesando: dim_time
   - Filas obtenidas: 3652
   - Columnas: ['Fecha', 'InicioMes', 'FinMes', 'Dia', 'Diadelasemana', 'Diadelesemana_desc', 'Mes', 'Mes_desc', 'Año', 'Añomes', 'Week', 'Trimestre', 'SemanaDelMes', 'DiaDelAño', 'DiaDelTrimestre', 'Findesemana', 'Festivo', 'Laboral']
   - Tabla dim_time creada correctamente.
   - 3652 filas insertadas.

Procesando: dim_client
   - Filas obtenidas: 44053
   - Columnas: ['Customer_ID', 'Eda

**Explicación:**

Extrae datos de Azure SQL.

Convierte tipos de datos antes de insertarlos.

Carga los datos en SQL Server Local.

Cierra conexiones después de finalizar el proceso.