In [22]:
####################################################################
#################### CODIGO CORREGIDO ##############################
####################################################################

import os
import urllib.parse

import pandas as pd
from sqlalchemy import create_engine, text

# ==========================
# 1. CONFIGURACIÓN BÁSICA
# ==========================

# Ruta del archivo de entrada (tu Excel)
ruta_entrada = os.environ.get("INPUT_EXCEL", r"data/subida.xlsx")

# Parámetros de conexión a SQL Server
server = os.environ.get("SQL_SERVER", "localhost")                # ej: "10.0.0.1" o "MI_SERVER\SQL2019"
database = os.environ.get("SQL_DB", "")           # ej: "CUN_REPOSITORIO"
driver = os.environ.get("SQL_DRIVER", "ODBC Driver 17 for SQL Server")  # Ajusta si usas otro

# Tabla destino (donde deben quedar los datos al final)
schema_destino = "COE"
tabla_destino = "CLTIENE_LLAMADAS"

# Nombre de la tabla temporal (física)
tabla_temporal = f"{tabla_destino}_TEMP_CARGA"



In [23]:

# =======================================================
# 2. CARGAR EXCEL EN PANDAS (ORIGEN DE LOS REGISTROS)
# =======================================================

if not os.path.exists(ruta_entrada):
    raise FileNotFoundError(f"La ruta no existe: {ruta_entrada}")

# Aquí cargas la hoja correcta
df = pd.read_excel(ruta_entrada, sheet_name="Sheet1")

print("Registros cargados desde Excel:", df.shape)



Registros cargados desde Excel: (4859, 37)


In [24]:

# =======================================================
# 3. CREAR ENGINE DE SQLALCHEMY PARA SQL SERVER
# =======================================================

params = urllib.parse.quote_plus(
    f"DRIVER={{{driver}}};"
    f"SERVER={server};"
    f"DATABASE={database};"
    f"Trusted_Connection=yes;"  # O reemplaza por UID=...;PWD=...; si usas usuario/clave
)

engine = create_engine(f"mssql+pyodbc:///?odbc_connect={params}")


In [25]:
# =======================================================
# 4. LEER EL ESQUEMA DE LA TABLA DESTINO DESDE SQL
# =======================================================

query_schema = text("""
SELECT 
    COLUMN_NAME,
    DATA_TYPE,
    CHARACTER_MAXIMUM_LENGTH,
    NUMERIC_PRECISION,
    NUMERIC_SCALE,
    IS_NULLABLE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = :schema
  AND TABLE_NAME = :tabla
ORDER BY ORDINAL_POSITION;
""")

with engine.connect() as conn:
    schema_df = pd.read_sql(query_schema, conn, params={"schema": schema_destino, "tabla": tabla_destino})

print("Columnas encontradas en tabla destino:")
print(schema_df)


Columnas encontradas en tabla destino:
                          COLUMN_NAME DATA_TYPE  CHARACTER_MAXIMUM_LENGTH  \
0                               Fecha  nvarchar                     255.0   
1   Contacto (Identificacion - Nombre  nvarchar                     255.0   
2                            Telefono     float                       NaN   
3                              Agente  nvarchar                     255.0   
4                              Cuenta  nvarchar                     255.0   
5                              Modulo  nvarchar                     255.0   
6                   Nombre del Modulo  nvarchar                     255.0   
7                              Motivo  nvarchar                     255.0   
8                Estado de la LLamada  nvarchar                     255.0   
9                  Tiempo  de Llamada  nvarchar                     255.0   
10            Tiempo  de Conversacion  nvarchar                     255.0   
11                 Estado de Registro

In [26]:


# =======================================================
# 5. FUNCIÓN PARA CONVERTIR TIPOS SEGÚN ESQUEMA SQL
# =======================================================

def convertir_dataframe_a_esquema_sql(df_origen: pd.DataFrame, schema_sql: pd.DataFrame) -> pd.DataFrame:
    """
    Convierte el DataFrame de entrada a los tipos que espera SQL Server
    según INFORMATION_SCHEMA.COLUMNS.
    Aquí asumimos que el Excel TIENE todas las columnas de la tabla en SQL.
    """

    # ---- 5.1 Normalizar nombres de columnas (case-insensitive) ----
    # Mapeo: nombre_en_minuscula -> nombre_exactamente_como_en_SQL
    mapeo_nombres = {col.lower(): col for col in schema_sql["COLUMN_NAME"]}

    df_renombrado = df_origen.copy()
    df_renombrado.columns = [
        mapeo_nombres.get(str(c).lower(), str(c))  # si coincide por lower, usamos el de SQL
        for c in df_renombrado.columns
    ]

    cols_sql = set(schema_sql["COLUMN_NAME"])
    cols_excel = set(df_renombrado.columns)

    # Columnas que SQL espera y NO están en el Excel (esto sí es error duro)
    faltantes = cols_sql - cols_excel

    # Columnas que vienen en Excel pero NO existen en la tabla destino (se pueden ignorar)
    sobrantes = cols_excel - cols_sql

    if faltantes:
        raise ValueError(
            f"El Excel NO tiene estas columnas que sí existen en {schema_destino}.{tabla_destino}: "
            f"{sorted(faltantes)}"
        )

    if sobrantes:
        print("⚠️ Aviso: Estas columnas vienen en el Excel pero NO existen en la tabla destino "
              f"{schema_destino}.{tabla_destino} y serán ignoradas:")
        print(sorted(sobrantes))
        df_renombrado = df_renombrado.drop(columns=list(sobrantes))

    # A partir de aquí, df_renombrado solo tiene columnas que están en la tabla SQL
    df_convertido = df_renombrado.copy()
    errores = []

    for _, row in schema_sql.iterrows():
        col = row["COLUMN_NAME"]
        tipo = row["DATA_TYPE"]
        max_len = row["CHARACTER_MAXIMUM_LENGTH"]
        is_nullable = row["IS_NULLABLE"]

        # Ya garantizamos que la columna existe
        serie = df_convertido[col]

        try:
            if tipo in ("int", "bigint", "smallint", "tinyint"):
                serie_nueva = pd.to_numeric(serie, errors="coerce").astype("Int64")

            elif tipo in ("decimal", "numeric", "float", "real", "money", "smallmoney"):
                serie_nueva = pd.to_numeric(serie, errors="coerce")

            elif tipo in ("date", "datetime", "datetime2", "smalldatetime", "datetimeoffset", "time"):
                serie_nueva = pd.to_datetime(serie, errors="coerce")

            elif tipo == "bit":
                def _map_bit(x):
                    if pd.isna(x):
                        return None
                    s = str(x).strip().lower()
                    if s in ("1", "true", "si", "sí", "y", "yes"):
                        return 1
                    if s in ("0", "false", "no", "n"):
                        return 0
                    return None

                serie_nueva = serie.map(_map_bit).astype("Int64")

            else:
                # Asumimos tipos de texto (varchar, nvarchar, char, text, etc.)
                # Si viene NaN, lo dejamos como NaN para que SQL lo reciba como NULL
                serie_nueva = serie.astype("string")

                if pd.notna(max_len) and max_len is not None and max_len > 0:
                    serie_nueva = serie_nueva.str.slice(0, int(max_len))

            # -------- Validación de conversión en campos NO NULL --------
            if tipo in (
                "int", "bigint", "smallint", "tinyint",
                "decimal", "numeric", "float", "real", "money", "smallmoney",
                "date", "datetime", "datetime2", "smalldatetime", "datetimeoffset", "time",
                "bit"
            ):
                mask_original_no_nulo = serie.notna() & (serie.astype(str).str.strip() != "")
                mask_convertido_nulo = pd.isna(serie_nueva)

                problematicos = df_convertido[mask_original_no_nulo & mask_convertido_nulo]

                if not problematicos.empty and is_nullable == "NO":
                    errores.append(
                        f"Columna '{col}' ({tipo}) tiene {len(problematicos)} "
                        f"valores no convertibles y la columna NO admite NULL."
                    )

            df_convertido[col] = serie_nueva

        except Exception as e:
            errores.append(f"Error convirtiendo columna '{col}' a tipo {tipo}: {e}")

    if errores:
        print("⚠️ Se encontraron problemas de conversión de tipos:")
        for e in errores:
            print(" -", e)
        # Si quieres que aún así inserte NULL en lo que no se pudo convertir,
        # deja esto como solo print. Si quieres que reviente, deja el raise:
        raise ValueError("Errores de conversión de tipos: revisa los mensajes anteriores.")

    # Ordenar columnas en el mismo orden de la tabla en SQL
    columnas_finales = schema_sql["COLUMN_NAME"].tolist()
    df_convertido = df_convertido[columnas_finales]

    return df_convertido


# Convertimos df al esquema esperado en SQL
df_convertido = convertir_dataframe_a_esquema_sql(df, schema_df)

print("✅ DataFrame convertido a tipos compatibles con SQL.")



✅ DataFrame convertido a tipos compatibles con SQL.


In [27]:
from sqlalchemy import text, event

# ==========================
# Acelerar inserts en SQL Server (pyodbc)
# ==========================
@event.listens_for(engine, "before_cursor_execute")
def _set_fast_executemany(conn, cursor, statement, parameters, context, executemany):
    if executemany:
        cursor.fast_executemany = True


# =======================================================
# 6. CREAR TABLA TEMPORAL Y CARGAR DATOS (CORREGIDO)
# =======================================================

# Usa nombres con corchetes para evitar líos
tabla_destino_full  = f"[{schema_destino}].[{tabla_destino}]"
tabla_temporal_full = f"[{schema_destino}].[{tabla_temporal}]"

sql_crear_temporal = f"""
IF OBJECT_ID(N'{schema_destino}.{tabla_temporal}', 'U') IS NOT NULL
    DROP TABLE {tabla_temporal_full};

SELECT TOP (0) *
INTO {tabla_temporal_full}
FROM {tabla_destino_full};
"""

with engine.begin() as conn:
    print("Creando tabla temporal...")
    conn.execute(text(sql_crear_temporal))

    print("Insertando datos en tabla temporal...")

    # ✅ QUITAMOS method="multi" (evita el límite 2100 parámetros)
    df_convertido.to_sql(
        name=tabla_temporal,
        con=conn,
        schema=schema_destino,
        if_exists="append",
        index=False,
        chunksize=1000
    )

    print("Insertando datos desde temporal hacia tabla destino...")

    # Columnas desde schema_df (en el mismo orden)
    columnas = schema_df["COLUMN_NAME"].tolist()
    columnas_sql = ", ".join(f"[{c}]" for c in columnas)

    sql_insert_final = f"""
    INSERT INTO {tabla_destino_full} ({columnas_sql})
    SELECT {columnas_sql}
    FROM {tabla_temporal_full};
    """
    conn.execute(text(sql_insert_final))

    print("Eliminando tabla temporal...")
    conn.execute(text(f"DROP TABLE {tabla_temporal_full};"))

print("✅ Proceso completado: tipos validados y datos cargados en la tabla destino.")


Creando tabla temporal...
Insertando datos en tabla temporal...
Insertando datos desde temporal hacia tabla destino...
Eliminando tabla temporal...
✅ Proceso completado: tipos validados y datos cargados en la tabla destino.
