# Envia anomalias a snowflake


In [1]:
# !pip install snowflake-snowpark-python
# !pip install python-dotenv

In [2]:
import time, pyperclip
start_i = time.time()

import pandas as pd
from glob import glob

# from tqdm import tqdm
# from datatools.helpers.Utils import run_time

from datatools.snowflake.Querys import run_sql_query
from datatools.snowflake.Conn import make_session
from datatools.helpers.Utils import run_time
from snowflake.snowpark import functions as F

## Funciones

In [3]:
def make_cols_anom(DF, models):
    cols_anom = [sorted([c for c in DF.columns if c.lower().startswith(m)]) for m in models]
    cols_anom = [b for a in cols_anom for b in a]
    cols_anom = dict(DF.select(cols_anom).dtypes)
    cols_anom = dict((k,"number" if v=="bigint" else v) for k,v in cols_anom.items())
    cols_anom.update({"ENSAMBLE_MAX__PROB":"double", "ENSAMBLE_MAX__POS":"integer", "PRESTADOR_PROCESADO": "boolean"})
    return cols_anom

In [4]:
def make_cols_for_dataset(snow_database, snow_schema, table, cols_anom, mode:str="make", debug:bool=False):
    for k,v in cols_anom.items():
        if mode.lower()=="drop":
            sql_columns = f"alter table {snow_database}.{snow_schema}.{table} drop column {k};"
        else:
            sql_columns = f"alter table {snow_database}.{snow_schema}.{table} add column {k} {v};"
        try:
            run_sql_query(sql_columns)
            if debug==True: print(sql_columns)
        except Exception as e:
            if "already exists" in str(e):
                pass
            else:
                print(f"{k=}\nERROR={e}")
    return

In [5]:
def load_anom_data(prestador, snow_database, snow_schema, debug:bool=False):
    prestador_table = "t__"+prestador
    if debug==True: print(f"{prestador_table=}\n")
    fName = f"./output/{prestador}/df.parquet"
    df = pd.read_parquet(fName)
    if debug==True: print(f"{df.shape=}")
    
    with make_session() as session:
        try:
            print("Tabla borrada: ", session.sql(f"drop table {snow_database}.{snow_schema}.{prestador_table}").collect())
        except:
            pass
        
        DF = session.write_pandas(
                df=df.reset_index()
                , database=snow_database
                , schema=snow_schema
                , table_name=prestador_table
                , quote_identifiers=False
                , auto_create_table=True
            )
        print(f"Tabla de prestador dispuesta en = [{snow_database}.{snow_schema}.{prestador_table}]")

    return df

In [6]:
def update_dataset_with_results(prestador, snow_database, snow_schema, cols_anom):
    prestador_table = "t__"+prestador
    cols_update_str = "\n\t,".join([f"a.{c}=b.{c}" for c in cols_anom if c not in ["PRESTADOR_PROCESADO"]])
    sql_update = f"""update {snow_database}.{snow_schema}.DATASET a\nset\n\t{cols_update_str}\nfrom (
        SELECT *, ROW_NUMBER() OVER(ORDER BY ENSAMBLE_MAX__PROB DESC, ENSAMBLE__POS) AS ENSAMBLE_MAX__POS
        FROM (
            SELECT *, CAST(ARRAY_MAX([LOF__PROB, IFOREST__PROB, AUTOENCODER__PROB]) AS double) AS ENSAMBLE_MAX__PROB
            from {snow_database}.{snow_schema}.{prestador_table}
        )
    ) b\nwhere a.id=b.id"""
    sql_update = f"""-- Actualiza 'DATASET' con la info de {prestador_table}\n\n{sql_update}"""
    try:
        pyperclip.copy(sql_update)
    except:
        pass
    run_sql_query(sql_update)
    print(f"DATASET actualizado con info de {prestador_table}")

## Params

In [7]:
snow_database   = "vp_informacion"
snow_schema     = "outliers"

models = ["lof","iforest","autoencoder","ensamble"]

cols_anom = {'LOF': 'number',
            'LOF__POS': 'number',
            'LOF__PROB': 'double',
            'LOF__SCORE': 'double',
            'IFOREST': 'number',
            'IFOREST__POS': 'number',
            'IFOREST__PROB': 'double',
            'IFOREST__SCORE': 'double',
            'AUTOENCODER': 'number',
            'AUTOENCODER__POS': 'number',
            'AUTOENCODER__PROB': 'double',
            'AUTOENCODER__SCORE': 'double',
            'ENSAMBLE': 'number',
            'ENSAMBLE_P_0_3': 'number',
            'ENSAMBLE__POS': 'number',
            'ENSAMBLE__PROB': 'double',
            'ENSAMBLE__PROB_STD': 'double',
            'ENSAMBLE_MAX__PROB': 'double',
            'ENSAMBLE_MAX__POS': 'integer',
            'PRESTADOR_PROCESADO': 'boolean'
        }


prestadores = ['0DD90F985DE9C9863A8073BB6BEDBED0',
                        '3F89EB097CB8FB95E71536FCE8F04D40',
                        '43246AB2ECE09E419D3C7D7C1D33C33B',
                        '4F09BB7381F2DD5BE74AEB854852AEFD',
                        '5833AB66972CC785431A16BE6CCD2C47',
                        'B725F4EAFE01B71BF7125AB91BC8356B',
                        'BDC9655DB70D0AD154FA9811931FC6C8',
                        'E0B158A4D09C4884A37BAA919DC7EB47']
prestadores = prestadores[1:]
prestadores

['3F89EB097CB8FB95E71536FCE8F04D40',
 '43246AB2ECE09E419D3C7D7C1D33C33B',
 '4F09BB7381F2DD5BE74AEB854852AEFD',
 '5833AB66972CC785431A16BE6CCD2C47',
 'B725F4EAFE01B71BF7125AB91BC8356B',
 'BDC9655DB70D0AD154FA9811931FC6C8',
 'E0B158A4D09C4884A37BAA919DC7EB47']

# Procesamiento

Creacion de columnas

In [8]:
## columnas en DATASET
with make_session() as session:
    cols_in_dataset = pd.DataFrame(session.sql(f"show columns in table {snow_database}.{snow_schema}.DATASET").collect())
if len(set(cols_anom) - set(cols_in_dataset["column_name"]))==0: print("Los campos de cols_anom ya existen.")

Los campos de cols_anom ya existen.


In [9]:
if len(set(cols_anom) - set(cols_in_dataset["column_name"]))==0:
    make_cols_for_dataset(snow_database=snow_database, snow_schema=snow_schema, table="DATASET", cols_anom=cols_anom, mode="drop")
    print("--Campos anom eliminados--")
    make_cols_for_dataset(snow_database=snow_database, snow_schema=snow_schema, table="DATASET", cols_anom=cols_anom, debug=True)
else:
    make_cols_for_dataset(snow_database=snow_database, snow_schema=snow_schema, table="DATASET", cols_anom=cols_anom, debug=True)

--Campos anom eliminados--
alter table vp_informacion.outliers.DATASET add column LOF number;
alter table vp_informacion.outliers.DATASET add column LOF__POS number;
alter table vp_informacion.outliers.DATASET add column LOF__PROB double;
alter table vp_informacion.outliers.DATASET add column LOF__SCORE double;
alter table vp_informacion.outliers.DATASET add column IFOREST number;
alter table vp_informacion.outliers.DATASET add column IFOREST__POS number;
alter table vp_informacion.outliers.DATASET add column IFOREST__PROB double;
alter table vp_informacion.outliers.DATASET add column IFOREST__SCORE double;
alter table vp_informacion.outliers.DATASET add column AUTOENCODER number;
alter table vp_informacion.outliers.DATASET add column AUTOENCODER__POS number;
alter table vp_informacion.outliers.DATASET add column AUTOENCODER__PROB double;
alter table vp_informacion.outliers.DATASET add column AUTOENCODER__SCORE double;
alter table vp_informacion.outliers.DATASET add column ENSAMBLE num

### Procesamiento del prestador

In [10]:
for p in prestadores:
    try:
        update_dataset_with_results(prestador=p, snow_database=snow_database, snow_schema=snow_schema, cols_anom=cols_anom)
    except Exception as e:
        print(f"ERROR con {p}::: {str(e)}")
        pass

DATASET actualizado con info de t__3F89EB097CB8FB95E71536FCE8F04D40
DATASET actualizado con info de t__43246AB2ECE09E419D3C7D7C1D33C33B
DATASET actualizado con info de t__4F09BB7381F2DD5BE74AEB854852AEFD
DATASET actualizado con info de t__5833AB66972CC785431A16BE6CCD2C47
DATASET actualizado con info de t__B725F4EAFE01B71BF7125AB91BC8356B
DATASET actualizado con info de t__BDC9655DB70D0AD154FA9811931FC6C8
DATASET actualizado con info de t__E0B158A4D09C4884A37BAA919DC7EB47


In [11]:
sql_update_procesado =[f"""UPDATE {snow_database}.{snow_schema}.DATASET a
		SET a.PRESTADOR_PROCESADO=TRUE
		FROM (
				SELECT DISTINCT PRESTADOR
				FROM {snow_database}.{snow_schema}.DATASET
				WHERE coalesce(LOF, IFOREST, AUTOENCODER) IS NOT NULL
			) b
		WHERE a.PRESTADOR=b.PRESTADOR"""
		, f"""UPDATE {snow_database}.{snow_schema}.DATASET a
SET a.PRESTADOR_PROCESADO=FALSE
WHERE a.PRESTADOR_PROCESADO IS NULL"""]
for sql in sql_update_procesado:
    print(run_sql_query(sql))

   number of rows updated  number of multi-joined rows updated
0                  847545                                    0
   number of rows updated  number of multi-joined rows updated
0                 9694147                                    0


In [12]:
run_time(start_i)

Tiempo de ejecucion total: 3.99min.
Fecha de ejecucion: 2025-06-07 12:06:28.526406
