### Dependencias

In [1]:
#Librerias
import pandas as pd
pd.set_option('display.float_format', '{:,.2f}'.format)
import numpy as np
import snowflake.connector
import os
from datetime import datetime
import re
from sqlalchemy import create_engine
from thefuzz import process
from dotenv import load_dotenv
# Cargar variables desde .env
load_dotenv()

True

In [2]:
paises_snowflake = {
    'brasil': ("Brasil", (22,)),
    'caricam': ("CARICAM", (14, 15, 16, 17, 19, 20)),
    'eu': ("EU", (8,)),
    'paraguay': ("Paraguay", (24,)),
    'colombia': ("Colombia", (6,)),
    'ecuador': ("Ecuador", (12,)),
    'uruguay': ("Uruguay", (23,)),
    'chile': ("Chile", (13,)),
    'dominicana': ("Rep. Dominicana", (21,)),
    'peru': ("Perú", (10,))
}
columnas_sumar = [
    'Precio Unitario VTA GLI', 'Unidades Desplazadas',
    'Unidades Inv. CEDIS', 'Unidades Inv. Transito', 'Unidades Inv. Tienda',
    'Monto Desplazado Bruto ML'
]

In [3]:
# Modificar
ambiente = "DEV_"
PaisCarga ="Colombia"  # Cambiar por el país deseado

# No es necesario modificar
PaisCarpeta = paises_snowflake[PaisCarga.lower()][0]
PaisSF = paises_snowflake[PaisCarga.lower()][1]

### Funciones

In [4]:
def extraer_rango_semanas(nombre_archivo):
    """
    Extrae rango de semanas del patrón SO_yyyyww_yyyyww.
    Devuelve (inicio, fin) como enteros YYYYWW.
    Considera años bisiestos para validación precisa de semana 53.
    """
    # Patrón básico para capturar años y semanas
    patron = r'SI_(20\d{2})(0[1-9]|[1-4]\d|5[0-3])_(20\d{2})(0[1-9]|[1-4]\d|5[0-3])'
    match = re.search(patron, nombre_archivo, re.IGNORECASE)
    
    if not match:
        raise ValueError("Patrón SI_yyyyww_yyyyww no encontrado o formato inválido.")

    año1, sem1, año2, sem2 = map(int, match.groups())
    
    # Validar semana 53 solo para años que la tienen
    def tiene_semana_53(año):
        # Años que tienen 53 semanas: cuando 1 de enero cae en jueves, 
        # o cuando es año bisiesto y 1 de enero cae en miércoles
        from datetime import date
        primer_dia = date(año, 1, 1).weekday()  # 0=lunes, 6=domingo
        es_bisiesto = año % 4 == 0 and (año % 100 != 0 or año % 400 == 0)
        return primer_dia == 3 or (es_bisiesto and primer_dia == 2)
    
    if sem1 == 53 and not tiene_semana_53(año1):
        raise ValueError(f"El año {año1} no tiene semana 53.")
    if sem2 == 53 and not tiene_semana_53(año2):
        raise ValueError(f"El año {año2} no tiene semana 53.")
    
    return año1 * 100 + sem1, año2 * 100 + sem2


In [5]:
def resumir_dataframe(df, columnas_sumar,desc,montos_faltan=None):
 
    # Convertir columnas de suma a float si existen y no lo son
    columnas_sumar_existentes = [col for col in columnas_sumar if col in df.columns]
    for col in columnas_sumar_existentes:
        if col in df.columns and not pd.api.types.is_float_dtype(df[col]):
            df[col] = pd.to_numeric(df[col], errors='coerce')
    # Calcular resumen)
    suma_totales = df[columnas_sumar_existentes].sum()
    if montos_faltan is not None and not montos_faltan.empty:
        suma_totales = suma_totales.add(montos_faltan, fill_value=0)

    # Unir ambos resultados
    resumen = suma_totales.reset_index()
    resumen.columns = ['Variable', f'Resumen_{desc}']
    # Agregar columna con longitud del DataFrame original
    resumen.loc[len(resumen)] = ['Total Filas en df', len(df)]
    return resumen

In [6]:
def exportar_rechazos_por_campo(
    df: pd.DataFrame,
    campo_faltante: str,
    columnas_agrupacion: list,
    columnas_suma: list,
    carpeta_pais_destino: str,
    pais: str
):
    try:
        # Filtrar rechazos
        df_rechazos = df[df[campo_faltante].isna()]

        df_rechazos = df_rechazos.dropna(axis=1, how='all')
        if df_rechazos.empty:
            print(f"Sin rechazos de {campo_faltante}")
            return None
        # Agrupar datos con las que columnas que sí existan en el DataFrame
        columnas_agrupacion = [col for col in columnas_agrupacion if col in df_rechazos.columns]
        columnas_suma = [col for col in columnas_suma if col in df_rechazos.columns]
        df_rechazos_agrupado = df_rechazos.groupby(columnas_agrupacion, as_index=False)[columnas_suma].sum()
        # Eliminar registros donde todas las columnas de suma sean cero
        #df_rechazos_agrupado = df_rechazos_agrupado.loc[df_rechazos_agrupado[columnas_suma].sum(axis=1) != 0]
        # Agregar columnas adicionales
        df_rechazos_agrupado['Fecha Rechazo'] = datetime.now().strftime("%Y-%m-%d")
        #df_rechazos_agrupado['Sem Métrica'] = semana
        df_rechazos_agrupado['Métrica'] = 'SO'
        df_rechazos_agrupado.rename(columns={'PAISID': 'País', 'Nombre Grupo':'GrpNombre', 'Nombre Cadena/Formato':'Cadena', 
                                            'GRPID':'GrpID', 'Descripción presentación de producto':'Descripción','Material SAP':'Código (SAP/ERP)', 
                                            'Cód. Interno ERP':'Código (SAP/ERP)'}, inplace=True)
        df_rechazos_agrupado.drop(columns=['SEMID'], inplace=True, errors='ignore')
        # Exportar archivo
        hoy = datetime.now().strftime("%Y-%m-%d")
        salida = os.path.join(carpeta_pais_destino, f"Rechazos_Sellin_{campo_faltante}_{pais}_{hoy}.xlsx")
        df_rechazos_agrupado.to_excel(salida, index=False)
        print(f"Se generaron rechazos de {campo_faltante}: {salida}")
        return df_rechazos_agrupado
    except Exception as e:
        print(f"Error al exportar rechazos de {campo_faltante}: {e}")
        return None

In [7]:
def read_sql_upper(query: str, engine, params=None):
    try:
        df = pd.read_sql(query, engine, params=params)
        df.columns = [col.upper() for col in df.columns]
        return df
    except Exception as e:
        raise RuntimeError(f"Error al ejecutar la consulta SQL o procesar los resultados: {e}")

### Cargar archivo

In [10]:
ruta_catalogo_grp = os.getenv("RUTA_CATGRUPOS")
df_cat_grupos = pd.read_excel(ruta_catalogo_grp)
df_cat_grupos['GrpNombre'] = df_cat_grupos['GrpNombre'].str.strip().str.upper()
df_cat_grupos.rename(columns={'GrpID': 'GRPID', 'GrpNombre': 'GRPNOMBRE', 'PaisID':'PAISID'}, inplace=True)
df_cat_grupos = df_cat_grupos.replace('-', '', regex=True)

In [None]:
ruta_base = os.getenv("RUTA_BASE")
ruta_destino = os.getenv("RUTA_DESTINO")

carpeta_pais = os.path.join(ruta_base, PaisCarga)
carpeta_pais_destino = os.path.join(ruta_destino, PaisCarga)

if os.path.exists(carpeta_pais) and os.path.isdir(carpeta_pais):
    archivos_xlsx = [
        os.path.join(carpeta_pais, f)
        for f in os.listdir(carpeta_pais)
        if f.endswith(".xlsx") and os.path.isfile(os.path.join(carpeta_pais, f))
    ]

    if not archivos_xlsx:
        raise RuntimeError("No se encontró ningún archivo .xlsx en la carpeta.")
    # Ordenar por fecha de modificación descendente (más reciente primero)
    archivos_xlsx.sort(key=os.path.getmtime, reverse=True)
    ruta_archivo = archivos_xlsx[0]
    print(f"Archivo más reciente: {ruta_archivo}")
    try:
        df = pd.read_excel(
            ruta_archivo,sheet_name="LayOut_SellIn", skiprows=6,engine='openpyxl',
            dtype={ #"Monto Facturación Bruta ML": float,"Monto Facturación Neta ML": float,"Factor Bruto Neto SellIn": float, 
                        'Nombre Cadena/Formato': str,'Nombre Grupo': str,}
        )
    except Exception as e:
        raise RuntimeError(f"Error al leer el archivo: {e}")
    print(df['Semana Genomma'].unique())
    # Obtener solo las columnas necesarias
    df = df.iloc[:, 7:]
    df = df.replace('-', '', regex=True)
    df = df[
        df['EAN'].notna() &
        df['Unidades Facturadas'].notna() &
        df['Fecha Ult. Día de la semana'].notna()
    ]
    # Extraer el rango de semanas del nombre del archivo
    rango_semanas = extraer_rango_semanas(os.path.basename(ruta_archivo))
    semana_inicio, semana_fin = sorted(rango_semanas)
    print(F'Semanas a procesar : {semana_inicio}, {semana_fin}')
else:
    raise RuntimeError(f"La carpeta del país '{carpeta_pais}' no existe.")

Archivo más reciente: C:\\Users\\elsilva\\OneDrive - genommalabinternacional\\Layout Internacional\\data\Colombia\Colombia_SO_202527_202534_SI_202522_202534.xlsx
[27. 28. 29. 30. 26. 22. 23. 24. 25. nan]
Semanas a procesar : 202522, 202534


### Consultas

In [12]:
# Acceder a las variables
user = os.getenv("USER")
password = os.getenv("PASSWORD")
account = os.getenv("ACCOUNT")
role = os.getenv("ROLE")
# Configura el motor de conexión
conn_sf = create_engine(
    f'snowflake://{user}:{password}@{account}/?role={role}'
)

In [13]:
# Generar placeholders para cada país
placeholders_pais = ", ".join(["%s"] * len(PaisSF))
#Consulta Snowflake Tiempo
Query_catsemanas = f"""
  SELECT *
  FROM PRD_STG.GNM_CT.CATSEMANAS
  WHERE (SEMANIO * 100 + SEMNUMERO) BETWEEN {semana_inicio} AND {semana_fin};
  """
df_CatSem = read_sql_upper(Query_catsemanas, conn_sf)
#Consulta Snowflake TipoComProd
Query_Pais =  """
  SELECT PAIS, PAISID FROM
  PRD_CNS_MX.CATALOGOS.VW_DIM_PAIS
  """
df_Pais = read_sql_upper(Query_Pais, conn_sf)
df_Pais['PAIS'] = df_Pais['PAIS'].str.strip().str.upper()
Query_Productos = f"""
  SELECT PAISID, PROID, PROPSTID, PROPSTCODBARRAS
  FROM PRD_CNS_MX.CATALOGOS.VW_ESTRUCTURAPRODUCTOSTOTALPAISES
  WHERE PAISID IN ({placeholders_pais})
  ORDER BY PROPSTID ASC
  """
df_Prod = read_sql_upper(Query_Productos, conn_sf, PaisSF)
df_Prod = df_Prod.drop_duplicates(subset=['PROPSTCODBARRAS','PAISID'], keep='first')

### Filtrar archivo

In [14]:
columnas_contar = [
    'Nombre Grupo', 'Nombre Cadena/Formato', 'EAN',
    'Material SAP', 'Descripción presentación de producto'
]

columnas_sumar = [
 "Unidades Facturadas", "Monto Facturación Bruta ML", "Monto Facturación Neta ML", "Factor Bruto Neto SellIn"
]

In [15]:
df_CatSem['SEMFIN'] = pd.to_datetime(df_CatSem['SEMFIN'], errors='coerce')
# Ejemplo de DataFrame con intervalos
df_CatSem = df_CatSem.sort_values('SEMINICIO')
if df.empty:
    raise RuntimeError("El DataFrame está vacío después de la carga.")
# DataFrame con fechas a unir
df['Fecha Ult. Día de la semana'] = pd.to_datetime(df['Fecha Ult. Día de la semana'], errors='coerce')
df = df.sort_values('Fecha Ult. Día de la semana')
# merge_asof une hacia atrás: buscamos el último SEMINICIO <= fecha
df_merge = pd.merge_asof(
    df,
    df_CatSem,
    left_on='Fecha Ult. Día de la semana',
    right_on='SEMINICIO',
    direction='backward'
)
# Luego validamos que fecha <= SEMFIN
df_filtrado = df_merge[df_merge['Fecha Ult. Día de la semana'] <= df_merge['SEMFIN']]
if df_filtrado.empty:
    raise RuntimeError("El DataFrame está vacío después de la carga.")
df_filtrado = df_filtrado.copy()
df_filtrado['Pais Nombre']= df_filtrado['Pais Nombre'].str.upper().str.strip()
df_filtrado['Nombre Grupo'] = df_filtrado['Nombre Grupo'].str.upper().str.strip()
# df_filtrado['Nombre Cadena/Formato'] = df_filtrado['Nombre Cadena/Formato'].str.upper().str.strip()
# Alternativa usando expresiones regulares para eliminar decimales .0 si existen y quitar espacios
df_filtrado['EAN'] = df_filtrado['EAN'].astype(str).str.replace(r'\.0$', '', regex=True).str.strip()
# Ver un resumen iinicial
resumen = resumir_dataframe(df_filtrado, columnas_sumar, 'df_inicial')

### Rechazos

In [16]:
# Unir los dataframes por las claves correspondientes
df_merged = df_filtrado.merge(df_Pais[['PAIS', 'PAISID']],
                left_on='Pais Nombre',
                right_on='PAIS',
                how='left')
# Hacer el merge para obtener grpid
df_merged2 = df_merged.merge(df_cat_grupos[['PAISID','GRPNOMBRE', 'GRPID']].drop_duplicates(),
                            left_on =['PAISID','Nombre Grupo'],
                            right_on = ['PAISID','GRPNOMBRE'],
                            how='left')

rechazo1 = exportar_rechazos_por_campo(df_merged2,'GRPID', ['PAISID',"Nombre Grupo","Nombre Cadena/Formato","SEMID", 'SEMNUMERO'], ["Unidades Desplazadas","Monto Desplazado Bruto ML"], 
                            carpeta_pais_destino, PaisCarga)

# Convetir al mismo tipo de dato
df_Prod['PROPSTCODBARRAS'] = df_Prod['PROPSTCODBARRAS'].astype(str).str.replace(r'\.0$', '', regex=True).str.strip()
# Unir los dataframes por Productos
df_merged3 = df_merged2.merge(df_Prod,
                left_on=['PAISID','EAN'],
                right_on=['PAISID','PROPSTCODBARRAS'],
                how='left')
rechazo2 = exportar_rechazos_por_campo(df_merged3,'PROPSTID', ['PAISID',"EAN","Descripción presentación de producto","SEMID","Cód. Interno ERP","Material SAP", 'SEMNUMERO'], 
                                        ["Unidades Desplazadas","Monto Desplazado Bruto ML"], 
                                        carpeta_pais_destino, PaisCarga)
# Resumen del último merge
resumen2 = resumir_dataframe(df_merged3, columnas_sumar,'df_merged')
# Obtener registros que faltan
registros_faltan = df_merged3[df_merged3[["GRPID", "PROPSTID"]].isna().any(axis=1)]

Se generaron rechazos de GRPID: C:\\Users\\elsilva\\OneDrive - genommalabinternacional\\Layout Internacional\\data\\Rechazos\Colombia\Rechazos_Sellin_GRPID_Colombia_2025-09-01.xlsx
Se generaron rechazos de PROPSTID: C:\\Users\\elsilva\\OneDrive - genommalabinternacional\\Layout Internacional\\data\\Rechazos\Colombia\Rechazos_Sellin_PROPSTID_Colombia_2025-09-01.xlsx


### Agrupación

In [17]:
# Montos faltantes
if not registros_faltan.empty:
    # Resumen del último merge
    resumen_faltan = resumir_dataframe(registros_faltan, columnas_sumar,'df_faltan')
    montos_faltan = registros_faltan[columnas_sumar].sum()
else:
    montos_faltan = pd.Series()

In [None]:
if df_merged3.empty:
    raise Exception("El DataFrame está vacío. Revisa los cruces anteriores.")
# Filtrar filas válidas (sin valores nulos en campos clave)
df_validado = df_merged3.dropna(subset=['GRPID', 'PROPSTID'])
resumen3 = resumir_dataframe(df_validado, columnas_sumar,'df_sin_nulos', montos_faltan)
# Renombrar solo las columnas que realmente cambian
df_validado = df_validado.copy()
df_validado.rename(columns={
    "PAIS": "PAISNOM",
    "SEMANIO": "ANOGLI",
    "SEMNUMERO": "SEMGLI",
    "SEMMES": "MESGLI",
    "Fecha Ult. Día de la semana": "ULTIMODIASEM",
    "Nombre Grupo": "NOMGRUPO",
    "Material SAP": "CODERP",
    #"Cód. Interno ERP": "CODERP",  # Si quieres conservar ambos, usa otro nombre para uno
    "Cód. Cliente SellOut": "CODCTESELLOUT",
    "Descripción presentación de producto": "PROPSTDESCERP",
    "Precio Factura ML": "PRECIOFACTML",
    "Unidades Facturadas": "UNFACT",
    "Monto Facturación Bruta ML": "MONTOFACTBRTML",
    "Monto Facturación Neta ML": "MONTOFACTNETAML",
    "Factor Bruto Neto SellIn": "FACTBRUTONETOSELLIN"
}, inplace=True)

# Identificar tipos de columnas automáticamente
numeric_cols = df_validado.select_dtypes(include=['number']).columns
text_cols = df_validado.select_dtypes(include=['object', 'string']).columns

# Rellenar
df_validado[numeric_cols] = df_validado[numeric_cols].fillna(0)
df_validado[text_cols] = df_validado[text_cols].fillna('')
# Asegurarse de que las columnas clave estén en el DataFrame
columnas_agrupan = [
    'SEMID', 'ANOGLI', 'MESGLI', 'SEMGLI', 'PAISID', 'GRPID', 'PROPSTID',
    'PAISNOM', 'ULTIMODIASEM', 'NOMGRUPO', 'EAN', 'CODERP', 'CODCTESELLOUT',
    'PROPSTDESCERP'
    # ← NO incluir las columnas numéricas que vas a agregar
]
# Agrupar por claves y sumar los valores numéricos
for col in ["PRECIOFACTML", "UNFACT", "MONTOFACTBRTML", "MONTOFACTNETAML", "FACTBRUTONETOSELLIN"]:
    if col in df_validado.columns:
        df_validado[col] = df_validado[col].fillna(0)
df_validado = df_validado.dropna(axis=1, how='all')
# Filtrar columnas que sí existen en el DataFrame
columnas_agrupan = [col for col in columnas_agrupan if col in df_validado.columns]
df_grouped = df_validado.groupby(
    columnas_agrupan,
    as_index=False
).agg({
    "PRECIOFACTML": 'mean',
    "UNFACT": "sum",
    "MONTOFACTBRTML": "sum",
    "MONTOFACTNETAML": "sum",
    "FACTBRUTONETOSELLIN": "sum"
})
if df_grouped.empty:
    raise Exception("El DataFrame está vacío.")
# Añadir fechas
df_grouped[['CREATED_AT', 'UPDATED_AT']] = datetime.now().strftime("%Y-%m-%d")

columnas_objetivo = [
'SEMID', 'ANOGLI', 'MESGLI', 'SEMGLI', 'PAISID', 'GRPID', 'PROPSTID',
'PAISNOM', 'ULTIMODIASEM', 'NOMGRUPO', 'EAN', 'CODERP', 'CODCTESELLOUT',
'PROPSTDESCERP', 'PRECIOFACTML', 'UNFACT', 'MONTOFACTBRTML',
'MONTOFACTNETAML', 'FACTBRUTONETOSELLIN', 'CREATED_AT', 'UPDATED_AT'
]
# Reindexa columnas: agrega las que falten con NaN, y ordena según columnas_objetivo
df_final = df_grouped.reindex(columns=columnas_objetivo)

if not montos_faltan.empty:
    montos_faltan = montos_faltan.rename({
        "Unidades Facturadas": "UNFACT",
        "Monto Facturación Bruta ML": "MONTOFACTBRTML",
        "Monto Facturación Neta ML": "MONTOFACTNETAML",
        "Factor Bruto Neto SellIn": "FACTBRUTONETOSELLIN"
    })
# Resúmenes finales
resumen4 = resumir_dataframe(df_grouped, [ 'UNFACT', 'MONTOFACTBRTML',
       'MONTOFACTNETAML', 'FACTBRUTONETOSELLIN'],'df_calculado',montos_faltan)
resumen5 = resumir_dataframe(df_grouped, [ 'UNFACT', 'MONTOFACTBRTML',
       'MONTOFACTNETAML', 'FACTBRUTONETOSELLIN'], 'df_final')

In [None]:
# Vista de todos los resúmenes
resumen_final = resumen.merge(resumen2, on="Variable", how="outer")
resumen_final = resumen_final.merge(resumen3, on="Variable", how="outer")
resumen_final.replace({ "Unidades Facturadas": "UNFACT",
                  "Factor Bruto Neto SellIn": "FACTBRUTONETOSELLIN",
                  "Monto Facturación Bruta ML": "MONTOFACTBRTML",
                   "Monto Facturación Neta ML": "MONTOFACTNETAML"}, inplace=True)
resumen_final = resumen_final.merge(resumen4, on="Variable", how="outer")
resumen_final = resumen_final.merge(resumen5, on="Variable", how="outer")
resumen_final.replace(np.nan, '', inplace=True)
resumen_final

Unnamed: 0,Variable,Resumen_df_inicial,Resumen_df_merged,Resumen_df_sin_nulos,Resumen_df_calculado,Resumen_df_final
0,FACTBRUTONETOSELLIN,0.0,0.0,0.0,0.0,0.0
1,MONTOFACTBRTML,63014771777.37,63014771777.37,63014771777.37,63014771777.37,59008072660.37
2,MONTOFACTNETAML,48937105022.93,48937105022.93,48937105022.93,48937105022.93,45221351933.69
3,Total Filas en df,43891.0,43891.0,40488.0,20274.0,20274.0
4,UNFACT,2446702.0,2446702.0,2446702.0,2446702.0,2287511.0


In [20]:
ruta_destino = os.path.join(r'\\NASPRO.infovisiontv.com\DGI\DGIBancoCentral\27SIInternacional', f"CARGA_{PaisCarga}_SEM_{semana_inicio}_{semana_fin }.xlsx")
try:
    df_final.to_excel(ruta_destino, index=False)
    print(f"Se generó el archivo final: {ruta_destino}")
except Exception as e:
    raise RuntimeError(f"Error al guardar el archivo final: {e}")

Se generó el archivo final: \\NASPRO.infovisiontv.com\DGI\DGIBancoCentral\27SIInternacional\CARGA_Colombia_SEM_202522_202534.xlsx
