In [None]:
import pandas as pd
from datetime import date, timedelta
import pyodbc
from hdbcli import dbapi
import os
import smtplib
from email.message import EmailMessage
import csv

# --------------------------------------------
# PARÁMETROS DE CORREO (SENDGRID)
# --------------------------------------------
SERVER_HOST_MAIL = "smtp.sendgrid.net"
SERVER_PORT_MAIL = 587
SERVER_USER_MAIL = "apikey"
SERVER_FROM_MAIL = "no-reply@mobonet.mx"
DESTINATARIOS = ["jmarquez@mobo.com.mx"]

def enviar_correo(asunto, cuerpo):
    msg = EmailMessage()
    msg["Subject"] = asunto
    msg["From"] = SERVER_FROM_MAIL
    msg["To"] = ", ".join(DESTINATARIOS)
    msg.set_content(cuerpo)

    try:
        with smtplib.SMTP(SERVER_HOST_MAIL, SERVER_PORT_MAIL) as smtp:
            smtp.starttls()
            smtp.login(SERVER_USER_MAIL, SERVER_PASS_MAIL)
            smtp.send_message(msg)
        print("📧 Correo enviado.")
    except Exception as e:
        print("❌ Error al enviar el correo:", e)

# --------------------------------------------
# CONEXIONES
# --------------------------------------------
conn_hana = dbapi.connect(
    address="192.168.10.13",
    port=30015,
    user="bimobo",
    password=""
)

conn_sql = pyodbc.connect(
    'DRIVER={SQL Server};SERVER=SRVSQLBIDEV\\SQLSERVERDEV;DATABASE=MOBODW_STG;UID=SA;PWD='
)
cursor_sql = conn_sql.cursor()

# --------------------------------------------
# FECHAS Y RUTA ARCHIVO
# --------------------------------------------
ayer = date.today() - timedelta(days=1)
fecha_ayer = int(ayer.strftime('%Y%m%d'))
fecha_10 = ayer - timedelta(days=10)
fecha_inicio = int(fecha_10.strftime('%Y%m%d'))

fecha_ayer_sql = ayer.strftime('%Y-%m-%d')
fecha_10_sql = (ayer - timedelta(days=10)).strftime('%Y-%m-%d')

ruta_destino = r"C:\Users\jmarquez\Documents"
nombre_archivo = f"ventas_desde_{fecha_inicio}_hasta_{fecha_ayer}.csv"
ruta_completa = os.path.join(ruta_destino, nombre_archivo)

# --------------------------------------------
# EXTRACCIÓN DESDE HANA
# --------------------------------------------
query_hana = f"""
CALL "MOBO_PRODUCTIVO"."SYS_RP_FCT_VENTAS_COPY2" ('{fecha_inicio}', '{fecha_ayer}')
"""
print("📡 Ejecutando consulta en HANA...")

try:
    df_transacciones = pd.read_sql(query_hana, conn_hana)
    print(f"📥 Registros obtenidos: {len(df_transacciones)}")

    if not df_transacciones.empty:
        df_transacciones.to_csv(ruta_completa, index=False)
        print(f"📁 Archivo exportado a CSV: {ruta_completa}")

        delete_query = f"""
        DELETE FROM stage_ventas_devo
        WHERE cast(fecha_venta as date) BETWEEN '{fecha_10_sql}' AND '{fecha_ayer_sql}'
        """
        cursor_sql.execute(delete_query)
        conn_sql.commit()
        print("🧹 Registros eliminados de stage_ventas_devo.")
    else:
        raise ValueError("La consulta no devolvió ningún dato. No se exporta CSV ni se elimina tabla.")

except Exception as e:
    print("❌ Error durante el proceso:", e)
    enviar_correo(
        asunto="❌ Error en exportación o eliminación de registros",
        cuerpo=f"Ocurrió un error durante la ejecución del script:\n\n{e}"
    )

# --------------------------------------------
# CARGA A SQL SERVER DESDE CSV
# --------------------------------------------
print("📄 Leyendo archivo CSV...")
df = pd.read_csv(ruta_completa, encoding='utf-8-sig')

columnas_sql = [
    'venta_id', 'fecha_venta', 'fecha_proc', 'sucursal', 'hora_venta',
    'folio_venta', 'referencia', 'cliente_id', 'caja', 'agente_ventas',
    'subtotal', 'total_neto', 'tipo_transaccion', 'almacen', 'num_linea',
    'sku', 'cantidad', 'precio_bruto_c_dcto', 'impuesto', 'descuento',
    'precio_neto_c_dcto', 'importe_c_dcto', 'dcto_por_cupon', 'codigo_promo',
    'porc_descuento_nm', 'cant_gratis_nm', 'origen', 'tipo_documento', 'tipo',
    'hora_round', 'cupon_gen', 'cupon_red', 'estatus_sku'
]

columnas_numericas = [
    'subtotal', 'total_neto', 'cantidad', 'precio_bruto_c_dcto',
    'impuesto', 'descuento', 'precio_neto_c_dcto', 'importe_c_dcto',
    'dcto_por_cupon', 'porc_descuento_nm', 'cant_gratis_nm'
]

columnas_fecha = ['fecha_venta', 'fecha_proc']

# Agregar columnas faltantes
for col in ['venta_id', 'hora_round', 'estatus_sku']:
    if col not in df.columns:
        df[col] = None

df.rename(columns={
    'conf_cupon_gen': 'cupon_gen',
    'conf_cupon_red': 'cupon_red'
}, inplace=True)

# Relleno y limpieza
df['cupon_gen'] = df['cupon_gen'].fillna('')
df['cupon_red'] = df['cupon_red'].fillna('')

for col in columnas_numericas:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0.0)

columnas_texto = [
    col for col in columnas_sql
    if col not in columnas_numericas + columnas_fecha + ['venta_id', 'hora_round', 'estatus_sku']
]
for col in columnas_texto:
    if col in df.columns:
        df[col] = df[col].fillna('')

for col in columnas_fecha:
    df[col] = pd.to_datetime(df[col], errors='coerce')
    df[col] = df[col].where(pd.notnull(df[col]), None)

for col in ['venta_id', 'hora_round', 'estatus_sku']:
    df[col] = df[col].where(pd.notnull(df[col]), None)

df.replace('', None, inplace=True)
df = df.where(pd.notnull(df), None)
df = df[columnas_sql]



# --------------------------------------------
# LIMPIEZA Y TRANSFORMACIÓN DE FECHAS
# --------------------------------------------
for col in columnas_fecha:
    df[col] = pd.to_datetime(df[col], errors='coerce')  # convierte strings vacíos en NaT
    df[col] = df[col].astype(object)  # convertir a object para poder tener None
    df[col] = df[col].where(df[col].notnull(), None)  # reemplaza NaT con None

# Validación adicional: reportar si existen fechas nulas antes de insertar
if df['fecha_venta'].isnull().any() or df['fecha_proc'].isnull().any():
    errores_fecha = df[df['fecha_venta'].isnull() | df['fecha_proc'].isnull()]
    archivo_fechas_nulas = os.path.join(ruta_destino, f"errores_fechas_nulas_{fecha_ayer}.csv")
    errores_fecha.to_csv(archivo_fechas_nulas, index=False, encoding='utf-8-sig')
    
    mensaje_error = (
        f"❌ Se encontraron registros con fechas nulas en 'fecha_venta' o 'fecha_proc'.\n"
        f"Archivo con registros inválidos: {archivo_fechas_nulas}"
    )
    print(mensaje_error)
    
    enviar_correo(
        asunto="❌ Error: Fechas nulas detectadas en archivo de ventas",
        cuerpo=mensaje_error
    )


# --------------------------------------------
# INSERCIÓN POR BLOQUES CON LOG DE ERRORES
# --------------------------------------------
print("🖥️ Conectando a SQL Server...")
conn = pyodbc.connect(
    'DRIVER={SQL Server};SERVER=SRVSQLBIDEV\\SQLSERVERDEV;DATABASE=MOBODW_STG;UID=SA;PWD=BISAP_Mobo@DA2020'
)
cursor = conn.cursor()

insert_query = f"""
INSERT INTO stage_ventas_devo ({', '.join(['[' + col + ']' for col in columnas_sql])})
VALUES ({', '.join(['?'] * len(columnas_sql))})
"""

insertados = 0
errores = 0
chunk_size = 100000
total = len(df)

archivo_errores = os.path.join(ruta_destino, f"errores_insercion_{fecha_ayer}.csv")
errores_list = []


📡 Ejecutando consulta en HANA...


  df_transacciones = pd.read_sql(query_hana, conn_hana)


📥 Registros obtenidos: 124163
📁 Archivo exportado a CSV: C:\Users\jmarquez\Documents\ventas_desde_20250614_hasta_20250624.csv
🧹 Registros eliminados de stage_ventas_devo.
📄 Leyendo archivo CSV...


  df = pd.read_csv(ruta_completa, encoding='utf-8-sig')


❌ Se encontraron registros con fechas nulas en 'fecha_venta' o 'fecha_proc'.
Archivo con registros inválidos: C:\Users\jmarquez\Documents\errores_fechas_nulas_20250624.csv
📧 Correo enviado.
🖥️ Conectando a SQL Server...


In [None]:
cursor.fast_executemany = True
for start in range(0, total, chunk_size):
    end = min(start + chunk_size, total)
    chunk = df.iloc[start:end]
    data_chunk = [tuple(row) for _, row in chunk.iterrows()]     
    try:
        cursor.executemany(insert_query, data_chunk)
        conn.commit()
        insertados += len(data_chunk)
        print(f"✅ Registros insertados de la fila {start} a {end}: {len(data_chunk)}")
    except Exception as e:
        print(f"❌ Error al insertar registros de la fila {start} a {end}: {e}")
        errores += len(data_chunk)
        for index, row in chunk.iterrows():
            try:
                cursor.execute(insert_query, tuple(row))
            except Exception as e:
                errores_list.append({
                    'index': index,
                    'error': str(e),
                    **row.to_dict()
                })

❌ Error al insertar registros de la fila 0 a 100000: 


In [None]:

print(f"🚀 Iniciando inserción de {total} registros...")
for start in range(0, total, chunk_size):
    end = min(start + chunk_size, total)
    chunk = df.iloc[start:end]
    for idx, row in chunk.iterrows():
        try:
            cursor.execute(insert_query, tuple(row[col] for col in columnas_sql))
            insertados += 1
        except Exception as e:
            errores += 1
            errores_list.append({**row.to_dict(), 'error': str(e)})
    conn.commit()
    print(f"✅ {insertados} registros insertados (hasta la fila {end})")

In [None]:

print(f"🚀 Iniciando inserción de {total} registros...")
for start in range(0, total, chunk_size):
    end = min(start + chunk_size, total)
    chunk = df.iloc[start:end]
    for idx, row in chunk.iterrows():
        try:
            cursor.execute(insert_query, tuple(row[col] for col in columnas_sql))
            insertados += 1
        except Exception as e:
            errores += 1
            errores_list.append({**row.to_dict(), 'error': str(e)})
    conn.commit()
    print(f"✅ {insertados} registros insertados (hasta la fila {end})")

if errores_list:
    keys = errores_list[0].keys()
    with open(archivo_errores, 'w', newline='', encoding='utf-8-sig') as output_file:
        dict_writer = csv.DictWriter(output_file, fieldnames=keys)
        dict_writer.writeheader()
        dict_writer.writerows(errores_list)
    print(f"⚠️ {errores} registros fallaron. Guardados en: {archivo_errores}")

print(f"🎯 Inserción completada. Total: {insertados} registros. Errores: {errores}")
cursor.close()
conn.close()


In [None]:
# --------------------------------------------
# FUNCIÓN PARA REGISTRAR VALIDACIONES
# --------------------------------------------
def registrar_validacion(nombre_etl, tipo_validacion, resultado, valor_origen, valor_destino, diferencia, mensaje):
    try:
        insert_val = """
        INSERT INTO dbo.stage_quality_log (
            fecha_ejecucion, nombre_etl, tipo_validacion, resultado,
            valor_origen, valor_destino, diferencia, mensaje
        )
        VALUES (GETDATE(), ?, ?, ?, ?, ?, ?, ?)
        """
        cursor_sql.execute(insert_val, (
            nombre_etl, tipo_validacion, resultado,
            valor_origen, valor_destino, diferencia, mensaje
        ))
        conn_sql.commit()
        print(f"📝 Validación registrada: {tipo_validacion}")
    except Exception as e:
        print(f"❌ Error registrando validación: {tipo_validacion} -> {e}")

# --------------------------------------------
# VALIDACIÓN 1: TOTAL DE REGISTROS
# --------------------------------------------
try:
    cursor_sql.execute("""
        SELECT COUNT(*) FROM stage_ventas_devo
        WHERE CAST(fecha_venta AS DATE) BETWEEN ? AND ?
    """, fecha_10_sql, fecha_ayer_sql)
    registros_sql = cursor_sql.fetchone()[0]
    registros_csv = len(df)

    resultado = "OK" if registros_sql == registros_csv else "ERROR"
    diferencia = registros_sql - registros_csv
    mensaje = "Cantidad de registros coincide" if resultado == "OK" else "Cantidad de registros no coincide"

    registrar_validacion(
        nombre_etl="ventas_devo",
        tipo_validacion="total_registros",
        resultado=resultado,
        valor_origen=registros_csv,
        valor_destino=registros_sql,
        diferencia=diferencia,
        mensaje=mensaje
    )
except Exception as e:
    print(f"❌ Error validando total registros: {e}")

# --------------------------------------------
# VALIDACIÓN 2: SUMA DE COLUMNAS NUMÉRICAS
# --------------------------------------------
def validar_suma_columna(columna):
    try:
        suma_csv = df[columna].sum()
        cursor_sql.execute(f"""
            SELECT SUM([{columna}]) FROM stage_ventas_devo
            WHERE CAST(fecha_venta AS DATE) BETWEEN ? AND ?
        """, fecha_10_sql, fecha_ayer_sql)
        suma_sql = cursor_sql.fetchone()[0] or 0

        suma_csv = float(suma_csv)
        suma_sql = float(suma_sql)
        diferencia = round(suma_sql - suma_csv, 2)
        resultado = "OK" if abs(diferencia) < 0.01 else "ERROR"
        mensaje = f"Suma de {columna} {'coincide' if resultado == 'OK' else 'no coincide'}"

        registrar_validacion(
            nombre_etl="ventas_devo",
            tipo_validacion=f"suma_{columna}",
            resultado=resultado,
            valor_origen=suma_csv,
            valor_destino=suma_sql,
            diferencia=diferencia,
            mensaje=mensaje
        )
    except Exception as e:
        print(f"❌ Error validando suma de {columna}: {e}")

# Aplicar validación a columnas clave
validar_suma_columna("subtotal")
validar_suma_columna("total_neto")

# --------------------------------------------
# VALIDACIÓN 3: DUPLICADOS
# --------------------------------------------
try:
    query_duplicados = """
    SELECT folio_venta, sku, num_linea,tipo_transaccion, COUNT(*) as veces
    FROM stage_ventas_devo
    WHERE CAST(fecha_venta AS DATE) BETWEEN ? AND ?   and  sku <> 'PPPPAGOMF'
    GROUP BY folio_venta, sku, num_linea,tipo_transaccion
    HAVING COUNT(*) > 1
    """
    cursor_sql.execute(query_duplicados, fecha_10_sql, fecha_ayer_sql)
    duplicados = cursor_sql.fetchall()
    num_duplicados = len(duplicados)

    resultado = "OK" if num_duplicados == 0 else "ERROR"
    mensaje = "Duplicados por clave" if resultado == "ERROR" else "Sin duplicados"

    registrar_validacion(
        nombre_etl="ventas_devo",
        tipo_validacion="duplicados",
        resultado=resultado,
        valor_origen=num_duplicados,
        valor_destino=0,
        diferencia=num_duplicados,
        mensaje=mensaje
    )
except Exception as e:
    print(f"❌ Error validando duplicados: {e}")


📝 Validación registrada: total_registros
📝 Validación registrada: suma_subtotal
📝 Validación registrada: suma_total_neto
📝 Validación registrada: duplicados
